位置逻辑更新,代码修改
All checks were successful
构建并部署 Spring Boot 应用 / build-and-deploy (push) Successful in 52m56s

This commit is contained in:
2025-10-18 22:20:07 +08:00
parent c173c480b2
commit 74c0908b58
6 changed files with 805 additions and 158 deletions

171
README.md
View File

@@ -86,6 +86,93 @@ src
位置信息现在存储在服务器内存缓存中,而不是持久化到数据库,以提高访问速度和减少数据库负载。
#### WebSocket接口和交互逻辑
系统通过WebSocket实现实时位置同步端点为`/ws/location`
##### 连接建立
客户端通过WebSocket连接到`/ws/location`端点建立连接。
##### 消息格式
所有消息都使用JSON格式。
###### 1. 客户端发送的消息类型
1. **updateLocation位置更新** - 更新用户位置
```json
{
"type": "updateLocation",
"userId": 123,
"latitude": 25.0342,
"longitude": 102.7057,
"timestamp": 1634567890000
}
```
###### 2. 服务器发送的消息类型
1. **subscribed订阅成功响应** - 服务器确认订阅成功
```json
{
"type": "subscribed",
"userId": 123
}
```
2. **onlineUserList在线用户列表** - 服务器发送当前在线用户列表
```json
{
"type": "onlineUserList",
"users": [
{
"userId": 123,
"name": "张三",
"role": "DELIVERY_PERSON"
},
{
"userId": 456,
"name": "李四",
"role": "ADMIN"
}
]
}
```
3. **updateLocation位置更新** - 服务器向所有已签到用户广播位置更新
```json
{
"type": "updateLocation",
"userId": 123,
"userRole": "DELIVERY_PERSON",
"latitude": 25.0342,
"longitude": 102.7057,
"timestamp": 1634567890000
}
```
##### 交互流程
```
用户签到] --> B{签到成功?}
B -->|是| C[建立WebSocket连接]
B -->|否| D[签到失败]
C --> E[自动订阅位置更新]
E --> F[接收subscribed确认]
F --> G[接收在线用户列表]
G --> H[开始位置更新循环]
H --> I[发送位置更新]
I --> J[接收位置广播]
J --> H
K[用户签退] --> L{签退成功?}
L -->|是| M[自动取消订阅并关闭WebSocket连接]
L -->|否| N[签退失败]
```
##### 角色区分
- 管理员ADMIN和配送员DELIVERY_PERSON都可以接收位置更新
- 位置广播消息中包含用户角色信息,便于客户端区分显示
- 只有已签到用户才会收到位置更新广播
### 数据模型
主要实体包括:
@@ -167,6 +254,12 @@ java -jar target/light-delivery-1.0.0.jar --spring.profiles.active=local
java -jar target/light-delivery-1.0.0.jar --spring.profiles.active=test
```
在PowerShell中也可以使用以下命令运行本地环境
```powershell
$env:SPRING_PROFILES_ACTIVE = "local"
mvn spring-boot:run
```
### 运行Docker容器
```bash
@@ -258,9 +351,83 @@ deploy.bat
### 位置同步相关
- `GET /location-sync/delivery-persons/locations` - 获取所有配送员位置
- WebSocket端点: `/ws/location` - 实时位置同步
### WebSocket消息格式
客户端和服务器通过WebSocket发送JSON格式的消息。
#### 客户端发送的消息类型
1. **updateLocation位置更新** - 更新用户位置
```json
{
"type": "updateLocation",
"userId": 123,
"latitude": 25.0342,
"longitude": 102.7057,
"timestamp": 1634567890000
}
```
#### 服务器发送的消息类型
1. **subscribed订阅成功响应** - 服务器确认订阅成功
```json
{
"type": "subscribed",
"userId": 123
}
```
2. **onlineUserList在线用户列表** - 服务器发送当前在线用户列表
```json
{
"type": "onlineUserList",
"users": [
{
"userId": 123,
"name": "张三",
"role": "DELIVERY_PERSON"
},
{
"userId": 456,
"name": "李四",
"role": "ADMIN"
}
]
}
```
3. **updateLocation位置更新** - 服务器向所有已签到用户广播位置更新
```json
{
"type": "updateLocation",
"userId": 123,
"userRole": "DELIVERY_PERSON",
"latitude": 25.0342,
"longitude": 102.7057,
"timestamp": 1634567890000
}
```
## 最近更新
### WebSocket使用方式变更重要变更
为了更好地分离关注点和提高系统安全性我们对WebSocket使用方式进行了重要变更
- 移除了WebSocket中的签到/签退功能这些操作现在完全通过REST API进行
- WebSocket现在仅用于位置同步不再处理用户状态变更
- 用户需要先通过REST API签到然后才能建立WebSocket连接并接收位置更新
- 用户签退时服务器会自动取消订阅并关闭WebSocket连接
- 客户端不再需要发送subscribe/unsubscribe消息这些操作由服务器自动处理
### WebSocket位置同步增强新增
为了支持管理员和配送员都能接收位置更新信息我们对WebSocket位置同步功能进行了增强
- 修改了LocationWebSocketHandler使其支持所有用户类型包括管理员和配送员
- 在位置消息中添加了用户角色信息,便于客户端区分并使用不同图标显示
- 重构了WebSocket处理逻辑使其更加通用和可扩展
- 实现了在线用户列表管理和广播机制
### 用户角色管理优化(新增)
为解决数据一致性问题,我们优化了用户角色管理机制:
- 移除了User表中的role字段
@@ -292,4 +459,6 @@ deploy.bat
3. 配送员位置信息具有时效性默认5分钟内有效
4. 位置历史记录功能已被移除,如有需要可使用第三方服务进行位置追踪
5. 员工管理接口仅限管理员角色访问
6. 用户角色信息现在通过员工表动态获取,确保数据一致性
6. 用户角色信息现在通过员工表动态获取,确保数据一致性
7. WebSocket位置同步现在支持所有已签到用户包括管理员和配送员
8. WebSocket仅用于位置同步签到/签退操作需通过REST API完成

View File

@@ -1,46 +1,51 @@
package com.light.delivery.controller;
import com.light.delivery.model.DeliveryPerson;
import com.light.delivery.service.DeliveryPersonService;
import com.light.delivery.service.LocationWebSocketHandler;
import lombok.Data;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* 位置同步控制器,提供配送员位置信息查询接口。
* 用于获取所有已签到配送员的实时位置信息。
* 位置同步控制器,提供已签到员工位置信息查询接口。
* 用于获取所有已签到员的实时位置信息。
*/
@RestController
@RequestMapping("/location-sync")
public class LocationSyncController {
/**
* 配送员服务依赖注入
* WebSocket位置处理器依赖注入
*/
@Autowired
private DeliveryPersonService deliveryPersonService;
private LocationWebSocketHandler locationWebSocketHandler;
/**
* 获取所有已签到配送员的实时位置。
* 仅返回有位置信息的配送员(最近有更新位置的)。
* @return 配送员位置列表
* 获取所有已签到员的实时位置。
* 仅返回有位置信息的员(最近有更新位置的)。
* @return 员位置列表
*/
@GetMapping("/delivery-persons/locations")
public ResponseEntity<List<DeliveryPersonLocation>> getDeliveryPersonLocations() {
List<DeliveryPerson> allDeliveryPersons = deliveryPersonService.getAllDeliveryPersons();
@GetMapping("/employees/locations")
public ResponseEntity<List<EmployeeLocation>> getEmployeeLocations() {
// 获取所有已签到的员工位置信息
Map<Long, LocationWebSocketHandler.UserInfo> userInfos = locationWebSocketHandler.getAllSignedInUsers();
// 过滤出有位置信息的配送员(最近有更新位置的)
List<DeliveryPersonLocation> locations = allDeliveryPersons.stream()
.filter(person -> person.getLatitude() != null && person.getLongitude() != null)
.map(person -> {
DeliveryPersonLocation location = new DeliveryPersonLocation();
location.setDeliveryPersonId(person.getId());
location.setLatitude(person.getLatitude());
location.setLongitude(person.getLongitude());
location.setStatus(person.getStatus());
List<EmployeeLocation> locations = userInfos.entrySet().stream()
.map(entry -> {
Long userId = entry.getKey();
LocationWebSocketHandler.UserInfo userInfo = entry.getValue();
EmployeeLocation location = new EmployeeLocation();
location.setEmployeeId(userId);
location.setLatitude(locationWebSocketHandler.getUserLatitude(userId));
location.setLongitude(locationWebSocketHandler.getUserLongitude(userId));
location.setStatus(locationWebSocketHandler.getUserStatus(userId));
location.setRole(userInfo.getRole() != null ? userInfo.getRole() : "");
location.setName(userInfo.getName() != null ? userInfo.getName() : "");
return location;
})
.collect(Collectors.toList());
@@ -49,14 +54,19 @@ public class LocationSyncController {
}
/**
* 配送员位置信息DTO用于向前端返回配送员位置数据。
* 在线员工位置信息DTO用于向前端返回员位置数据。
*/
@Data
public static class DeliveryPersonLocation {
public static class EmployeeLocation {
/**
* 配送员ID
* 员ID
*/
private Long deliveryPersonId;
private Long employeeId;
/**
* 姓名
*/
private String name;
/**
* 纬度
@@ -72,5 +82,10 @@ public class LocationSyncController {
* 状态
*/
private String status;
/**
* 角色
*/
private String role;
}
}

View File

@@ -6,12 +6,14 @@ import com.light.delivery.model.RegisterRequest;
import com.light.delivery.model.User;
import com.light.delivery.model.UserRole;
import com.light.delivery.model.WxLoginRequest;
import com.light.delivery.service.LocationWebSocketHandler;
import com.light.delivery.service.UserService;
import com.light.delivery.service.impl.UserServiceImpl;
import com.light.delivery.util.JwtUtil;
import jakarta.servlet.http.HttpServletRequest;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
@@ -45,6 +47,12 @@ public class UserController {
*/
@Autowired
private UserServiceImpl userServiceImpl;
/**
* ApplicationContext依赖注入用于获取其他Bean。
*/
@Autowired
private ApplicationContext applicationContext;
/**
* 获取当前用户状态接口。
@@ -185,6 +193,15 @@ public class UserController {
User user = userService.getUserInfo(token);
User updatedUser = userService.signIn(user.getId());
UserInfoResponse response = toUserInfoResponse(updatedUser);
// 通知WebSocket处理器用户已签到并自动订阅
try {
LocationWebSocketHandler handler = applicationContext.getBean(LocationWebSocketHandler.class);
handler.userSignedIn(user.getId());
} catch (Exception e) {
System.err.println("通知WebSocket签到状态时出错: " + e.getMessage());
}
return ResponseEntity.ok(response);
} catch (Exception e) {
return ResponseEntity.badRequest().body("Invalid token: " + e.getMessage());
@@ -192,13 +209,42 @@ public class UserController {
}
/**
* 注册为骑手接口。
* 用户签退接口。
* @param request HTTP请求对象用于提取认证令牌
* @return 操作结果
*/
@PostMapping("/signout")
public ResponseEntity<?> signOut(HttpServletRequest request) {
String token = extractToken(request);
if (token == null) {
return ResponseEntity.badRequest().body("Authorization token is missing");
}
try {
User user = userService.getUserInfo(token);
// 通知WebSocket处理器用户已签退并自动取消订阅
try {
LocationWebSocketHandler handler = applicationContext.getBean(LocationWebSocketHandler.class);
handler.userSignedOut(user.getId());
} catch (Exception e) {
System.err.println("通知WebSocket签退状态时出错: " + e.getMessage());
}
return ResponseEntity.ok("签退成功");
} catch (Exception e) {
return ResponseEntity.badRequest().body("Invalid token: " + e.getMessage());
}
}
/**
* 注册为正式员工接口。
* @param request HTTP请求对象用于提取认证令牌
* @param registerRequest 注册请求对象,包含姓名和手机号
* @return 更新后的用户信息
*/
@PostMapping("/register")
public ResponseEntity<UserInfoResponse> registerAsDeliveryPerson(
public ResponseEntity<UserInfoResponse> registerAsEmployee(
HttpServletRequest request,
@RequestBody RegisterRequest registerRequest) {
try {
@@ -213,7 +259,7 @@ public class UserController {
User user = userService.getUserInfo(token);
System.out.println("获取到用户信息: " + user);
User updatedUser = userService.registerAsDeliveryPerson(
User updatedUser = userService.registerAsEmployee(
user.getId(),
registerRequest.getName(),
registerRequest.getPhone());
@@ -221,12 +267,12 @@ public class UserController {
return ResponseEntity.ok(response);
} catch (IllegalArgumentException e) {
// 记录错误日志
System.err.println("注册配送员时发生错误: " + e.getMessage());
System.err.println("注册员时发生错误: " + e.getMessage());
e.printStackTrace();
return ResponseEntity.badRequest().build();
} catch (Exception e) {
// 记录未预期的错误
System.err.println("注册配送员时发生未预期错误: " + e.getMessage());
System.err.println("注册员时发生未预期错误: " + e.getMessage());
e.printStackTrace();
return ResponseEntity.status(500).build();
}

View File

@@ -1,6 +1,13 @@
package com.light.delivery.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.light.delivery.model.User;
import com.light.delivery.model.UserRole;
import com.light.delivery.repository.UserRepository;
import com.light.delivery.service.impl.LocationSyncServiceImpl;
import com.light.delivery.service.impl.UserServiceImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
@@ -9,23 +16,40 @@ import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@Component
public class LocationWebSocketHandler extends TextWebSocketHandler {
// 存储所有连接的会话key为配送员IDvalue为会话
// 存储所有连接的会话key为用户IDvalue为会话
private final Map<Long, WebSocketSession> sessions = new ConcurrentHashMap<>();
// 存储配送员最后更新位置的时间
// 存储用户最后更新位置的时间
private final Map<Long, Long> lastUpdateTimes = new ConcurrentHashMap<>();
// 存储配送员的登录状态true表示已签到false表示已签退
private final Map<Long, Boolean> deliveryPersonStatus = new ConcurrentHashMap<>();
// 存储用户的登录状态true表示已签到false表示已签退
private final Map<Long, Boolean> userStatus = new ConcurrentHashMap<>();
// 存储用户的角色信息
private final Map<Long, UserRole> userRoles = new ConcurrentHashMap<>();
// 存储用户的基本信息
private final Map<Long, User> userInfos = new ConcurrentHashMap<>();
private final ObjectMapper objectMapper = new ObjectMapper();
@Autowired
private UserRepository userRepository;
@Autowired
private UserServiceImpl userService;
@Autowired
private ApplicationContext applicationContext;
// 位置过期时间毫秒默认5分钟
private static final long LOCATION_EXPIRE_TIME = 5 * 60 * 1000;
@@ -33,21 +57,158 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
private static final long WEBSOCKET_TIMEOUT = 30 * 60 * 1000;
/**
* 获取配送员的签到状态
* @param deliveryPersonId 配送员ID
* @return true表示已签到false表示未签到或已签退
* 处理签退消息
*/
public boolean isDeliveryPersonSignedIn(Long deliveryPersonId) {
return Boolean.TRUE.equals(deliveryPersonStatus.get(deliveryPersonId));
private void handleSignOut(LocationMessage locationMessage) {
Long userId = locationMessage.getUserId();
if (userId != null) {
userSignedOut(userId);
}
}
/**
* 获取配送员最后更新时间
* @param deliveryPersonId 配送员ID
* 用户签到
* @param userId 用户ID
*/
public void userSignedIn(Long userId) {
if (userId != null) {
userStatus.put(userId, true);
lastUpdateTimes.put(userId, System.currentTimeMillis());
System.out.println("用户 " + userId + " 已签到");
// 如果用户已建立WebSocket连接则自动订阅
WebSocketSession session = sessions.get(userId);
if (session != null && session.isOpen()) {
try {
autoSubscribe(session, userId);
} catch (IOException e) {
System.err.println("自动订阅时出错: " + e.getMessage());
}
}
// 广播更新后的在线用户列表
broadcastOnlineUserList();
// 广播所有在线用户的位置
broadcastAllLocations();
}
}
/**
* 用户签退
* @param userId 用户ID
*/
public void userSignedOut(Long userId) {
if (userId != null) {
// 从所有映射中移除用户信息
sessions.remove(userId);
userStatus.remove(userId);
lastUpdateTimes.remove(userId);
userRoles.remove(userId);
userInfos.remove(userId);
System.out.println("用户 " + userId + " 已签退");
// 广播更新后的在线用户列表
broadcastOnlineUserList();
}
}
/**
* 获取用户的签到状态
* @param userId 用户ID
* @return true表示已签到false表示未签到或已签退
*/
public boolean isUserSignedIn(Long userId) {
return Boolean.TRUE.equals(userStatus.get(userId));
}
/**
* 获取用户最后更新时间
* @param userId 用户ID
* @return 最后更新时间戳
*/
public Long getLastUpdateTime(Long deliveryPersonId) {
return lastUpdateTimes.get(deliveryPersonId);
public Long getLastUpdateTime(Long userId) {
return lastUpdateTimes.get(userId);
}
/**
* 获取用户角色
* @param userId 用户ID
* @return 用户角色
*/
public UserRole getUserRole(Long userId) {
return userRoles.get(userId);
}
/**
* 获取所有已签到用户的信息
* @return 已签到用户信息映射
*/
public Map<Long, UserInfo> getAllSignedInUsers() {
return userStatus.entrySet().stream()
.filter(entry -> Boolean.TRUE.equals(entry.getValue())) // 只包含已签到的用户
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> {
Long userId = entry.getKey();
User user = userInfos.get(userId);
UserRole role = userRoles.get(userId);
UserInfo userInfo = new UserInfo();
userInfo.setUserId(userId);
userInfo.setName(user != null ? user.getName() : "");
userInfo.setRole(role != null ? role.getCode() : "");
return userInfo;
}
));
}
/**
* 获取用户纬度
* @param userId 用户ID
* @return 纬度
*/
public Double getUserLatitude(Long userId) {
// 从位置同步服务获取纬度
try {
LocationSyncServiceImpl locationSyncService = applicationContext.getBean(LocationSyncServiceImpl.class);
return locationSyncService.getDeliveryPersonLatitude(userId);
} catch (Exception e) {
System.err.println("获取用户纬度时出错: " + e.getMessage());
return null;
}
}
/**
* 获取用户经度
* @param userId 用户ID
* @return 经度
*/
public Double getUserLongitude(Long userId) {
// 从位置同步服务获取经度
try {
LocationSyncServiceImpl locationSyncService = applicationContext.getBean(LocationSyncServiceImpl.class);
return locationSyncService.getDeliveryPersonLongitude(userId);
} catch (Exception e) {
System.err.println("获取用户经度时出错: " + e.getMessage());
return null;
}
}
/**
* 获取用户状态
* @param userId 用户ID
* @return 状态
*/
public String getUserStatus(Long userId) {
// 从位置同步服务获取状态
try {
LocationSyncServiceImpl locationSyncService = applicationContext.getBean(LocationSyncServiceImpl.class);
return locationSyncService.getDeliveryPersonStatus(userId);
} catch (Exception e) {
System.err.println("获取用户状态时出错: " + e.getMessage());
return null;
}
}
@Override
@@ -66,26 +227,19 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
if ("updateLocation".equals(locationMessage.getType())) {
// 更新位置信息
handleLocationUpdate(locationMessage);
} else if ("subscribe".equals(locationMessage.getType())) {
// 订阅位置更新
handleSubscribe(session, locationMessage);
} else if ("unsubscribe".equals(locationMessage.getType())) {
// 取消订阅
handleUnsubscribe(session, locationMessage);
} else if ("signin".equals(locationMessage.getType())) {
// 处理签到
handleSignIn(locationMessage);
} else if ("signout".equals(locationMessage.getType())) {
// 处理签退
// 签退
handleSignOut(locationMessage);
}
} catch (Exception e) {
System.err.println("处理WebSocket消息时出错: " + e.getMessage());
e.printStackTrace();
// 确保不因异常导致连接关闭
try {
session.sendMessage(new TextMessage("{\"error\":\"消息处理失败\"}"));
session.sendMessage(new TextMessage("{\"error\":\"消息处理失败: " + e.getMessage() + "\"}"));
} catch (IOException ioException) {
System.err.println("发送错误消息时出错: " + ioException.getMessage());
ioException.printStackTrace();
}
}
}
@@ -94,16 +248,16 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
* 处理位置更新消息
*/
private void handleLocationUpdate(LocationMessage locationMessage) {
Long deliveryPersonId = locationMessage.getDeliveryPersonId();
if (deliveryPersonId == null) {
Long userId = locationMessage.getUserId();
if (userId == null) {
return;
}
// 更新最后更新时间
lastUpdateTimes.put(deliveryPersonId, System.currentTimeMillis());
lastUpdateTimes.put(userId, System.currentTimeMillis());
// 只有已签到的配送员才广播位置更新
if (Boolean.TRUE.equals(deliveryPersonStatus.get(deliveryPersonId))) {
// 只有已签到的用户才广播位置更新
if (Boolean.TRUE.equals(userStatus.get(userId))) {
try {
// 广播位置更新给所有连接的客户端
broadcastLocationUpdate(locationMessage);
@@ -114,52 +268,166 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
}
/**
* 处理订阅消息
* 用户连接时自动订阅(在用户签到成功后调用)
* @param session WebSocket会话
* @param userId 用户ID
*/
private void handleSubscribe(WebSocketSession session, LocationMessage locationMessage) throws IOException {
Long deliveryPersonId = locationMessage.getDeliveryPersonId();
if (deliveryPersonId != null) {
sessions.put(deliveryPersonId, session);
// 默认状态为未签到
deliveryPersonStatus.putIfAbsent(deliveryPersonId, false);
lastUpdateTimes.putIfAbsent(deliveryPersonId, System.currentTimeMillis());
session.sendMessage(new TextMessage("{\"type\":\"subscribed\",\"deliveryPersonId\":" + deliveryPersonId + "}"));
public void autoSubscribe(WebSocketSession session, Long userId) throws IOException {
if (userId != null) {
sessions.put(userId, session);
// 默认状态为未签到但WebSocket连接已建立
userStatus.putIfAbsent(userId, false);
lastUpdateTimes.putIfAbsent(userId, System.currentTimeMillis());
// 获取并存储用户信息和角色
try {
User user = userRepository.findById(userId).orElse(null);
if (user != null) {
UserRole userRole = userService.getUserRole(user);
userInfos.putIfAbsent(userId, user);
userRoles.putIfAbsent(userId, userRole);
}
} catch (Exception e) {
System.err.println("获取用户信息时出错: " + e.getMessage());
}
session.sendMessage(new TextMessage("{\"type\":\"subscribed\",\"userId\":" + userId + "}"));
// 发送当前在线用户列表
sendOnlineUserList(session);
}
}
/**
* 处理取消订阅消息
* 用户签退时自动取消订阅
* @param userId 用户ID
*/
private void handleUnsubscribe(WebSocketSession session, LocationMessage locationMessage) {
Long deliveryPersonId = locationMessage.getDeliveryPersonId();
if (deliveryPersonId != null) {
sessions.remove(deliveryPersonId);
deliveryPersonStatus.remove(deliveryPersonId);
lastUpdateTimes.remove(deliveryPersonId);
public void autoUnsubscribe(Long userId) {
if (userId != null) {
sessions.remove(userId);
userStatus.remove(userId);
lastUpdateTimes.remove(userId);
userRoles.remove(userId);
userInfos.remove(userId);
// 广播更新后的在线用户列表
broadcastOnlineUserList();
}
}
/**
* 处理签到消息
* 发送在线用户列表给指定客户端
*/
private void handleSignIn(LocationMessage locationMessage) {
Long deliveryPersonId = locationMessage.getDeliveryPersonId();
if (deliveryPersonId != null) {
deliveryPersonStatus.put(deliveryPersonId, true);
lastUpdateTimes.put(deliveryPersonId, System.currentTimeMillis());
System.out.println("配送员 " + deliveryPersonId + " 已签到");
private void sendOnlineUserList(WebSocketSession session) throws IOException {
List<UserInfo> onlineUsers = userStatus.entrySet().stream()
.filter(entry -> Boolean.TRUE.equals(entry.getValue())) // 只包含已签到的用户
.map(entry -> {
Long userId = entry.getKey();
User user = userInfos.get(userId);
UserRole role = userRoles.get(userId);
UserInfo userInfo = new UserInfo();
userInfo.setUserId(userId);
userInfo.setName(user != null ? user.getName() : "");
userInfo.setRole(role != null ? role.getCode() : "");
return userInfo;
})
.collect(Collectors.toList());
OnlineUserListMessage message = new OnlineUserListMessage();
message.setType("onlineUserList");
message.setUsers(onlineUsers);
String jsonMessage = objectMapper.writeValueAsString(message);
session.sendMessage(new TextMessage(jsonMessage));
}
/**
* 广播在线用户列表给所有连接的客户端
*/
private void broadcastOnlineUserList() {
try {
List<UserInfo> onlineUsers = userStatus.entrySet().stream()
.filter(entry -> Boolean.TRUE.equals(entry.getValue())) // 只包含已签到的用户
.map(entry -> {
Long userId = entry.getKey();
User user = userInfos.get(userId);
UserRole role = userRoles.get(userId);
UserInfo userInfo = new UserInfo();
userInfo.setUserId(userId);
userInfo.setName(user != null ? user.getName() : "");
userInfo.setRole(role != null ? role.getCode() : "");
return userInfo;
})
.collect(Collectors.toList());
OnlineUserListMessage message = new OnlineUserListMessage();
message.setType("onlineUserList");
message.setUsers(onlineUsers);
String jsonMessage = objectMapper.writeValueAsString(message);
TextMessage textMessage = new TextMessage(jsonMessage);
// 发送给所有连接的客户端
for (Map.Entry<Long, WebSocketSession> entry : sessions.entrySet()) {
Long userId = entry.getKey();
WebSocketSession session = entry.getValue();
try {
if (session != null && session.isOpen()) {
session.sendMessage(textMessage);
} else {
// 清理已关闭的会话
System.out.println("清理已关闭的WebSocket会话用户ID: " + userId);
sessions.remove(userId);
userStatus.remove(userId);
lastUpdateTimes.remove(userId);
userRoles.remove(userId);
userInfos.remove(userId);
}
} catch (IOException e) {
System.err.println("发送在线用户列表给用户 " + userId + " 时出错: " + e.getMessage());
// 移除发送失败的会话
sessions.remove(userId);
userStatus.remove(userId);
lastUpdateTimes.remove(userId);
userRoles.remove(userId);
userInfos.remove(userId);
}
}
} catch (Exception e) {
System.err.println("广播在线用户列表时出错: " + e.getMessage());
e.printStackTrace();
}
}
/**
* 处理签退消息
* 广播所有在线用户的位置
*/
private void handleSignOut(LocationMessage locationMessage) {
Long deliveryPersonId = locationMessage.getDeliveryPersonId();
if (deliveryPersonId != null) {
deliveryPersonStatus.put(deliveryPersonId, false);
lastUpdateTimes.put(deliveryPersonId, System.currentTimeMillis());
System.out.println("配送员 " + deliveryPersonId + " 已签退");
private void broadcastAllLocations() {
try {
// 收集所有在线用户的位置信息
for (Map.Entry<Long, Boolean> entry : userStatus.entrySet()) {
Long userId = entry.getKey();
Boolean isSignedIn = entry.getValue();
// 只广播已签到用户的位置
if (Boolean.TRUE.equals(isSignedIn)) {
// 创建一个位置更新消息
LocationMessage locationMessage = new LocationMessage();
locationMessage.setType("updateLocation");
locationMessage.setUserId(userId);
// 添加用户角色信息
UserRole userRole = userRoles.get(userId);
if (userRole != null) {
locationMessage.setUserRole(userRole.getCode());
}
// 广播这个位置更新
broadcastLocationUpdate(locationMessage);
}
}
} catch (Exception e) {
System.err.println("广播所有位置时出错: " + e.getMessage());
}
}
@@ -167,24 +435,35 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
* 广播位置更新给所有连接的客户端
*/
private void broadcastLocationUpdate(LocationMessage locationMessage) throws IOException {
// 添加用户角色信息到消息中
Long userId = locationMessage.getUserId();
if (userId != null) {
UserRole userRole = userRoles.get(userId);
if (userRole != null) {
locationMessage.setUserRole(userRole.getCode());
}
}
String message = objectMapper.writeValueAsString(locationMessage);
TextMessage textMessage = new TextMessage(message);
// 发送给所有连接且已签到的客户端
for (Map.Entry<Long, WebSocketSession> entry : sessions.entrySet()) {
Long deliveryPersonId = entry.getKey();
Long userIdKey = entry.getKey();
WebSocketSession session = entry.getValue();
// 只发送给已签到的配送员
if (session != null && session.isOpen() && Boolean.TRUE.equals(deliveryPersonStatus.get(deliveryPersonId))) {
// 只发送给已签到的用户
if (session != null && session.isOpen() && Boolean.TRUE.equals(userStatus.get(userIdKey))) {
try {
session.sendMessage(textMessage);
} catch (IOException e) {
System.err.println("配送员 " + deliveryPersonId + " 发送消息时出错: " + e.getMessage());
System.err.println("用户 " + userIdKey + " 发送消息时出错: " + e.getMessage());
// 移除已断开的连接
sessions.remove(deliveryPersonId);
deliveryPersonStatus.remove(deliveryPersonId);
lastUpdateTimes.remove(deliveryPersonId);
sessions.remove(userIdKey);
userStatus.remove(userIdKey);
lastUpdateTimes.remove(userIdKey);
userRoles.remove(userIdKey);
userInfos.remove(userIdKey);
}
}
}
@@ -193,11 +472,16 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
// 连接关闭时,移除会话
Long deliveryPersonId = findDeliveryPersonIdBySession(session);
if (deliveryPersonId != null) {
sessions.remove(deliveryPersonId);
deliveryPersonStatus.remove(deliveryPersonId);
lastUpdateTimes.remove(deliveryPersonId);
Long userId = findUserIdBySession(session);
if (userId != null) {
sessions.remove(userId);
userStatus.remove(userId);
lastUpdateTimes.remove(userId);
userRoles.remove(userId);
userInfos.remove(userId);
// 广播更新后的在线用户列表
broadcastOnlineUserList();
}
System.out.println("WebSocket连接已关闭: " + session.getId() + ", 状态: " + status);
}
@@ -205,19 +489,26 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
System.err.println("WebSocket传输错误: " + exception.getMessage());
Long deliveryPersonId = findDeliveryPersonIdBySession(session);
if (deliveryPersonId != null) {
sessions.remove(deliveryPersonId);
deliveryPersonStatus.remove(deliveryPersonId);
lastUpdateTimes.remove(deliveryPersonId);
exception.printStackTrace();
Long userId = findUserIdBySession(session);
if (userId != null) {
sessions.remove(userId);
userStatus.remove(userId);
lastUpdateTimes.remove(userId);
userRoles.remove(userId);
userInfos.remove(userId);
// 广播更新后的在线用户列表
broadcastOnlineUserList();
}
// 不要关闭session因为可能已经关闭了
}
/**
* 根据WebSocket会话查找配送员ID
* 根据WebSocket会话查找用户ID
*/
private Long findDeliveryPersonIdBySession(WebSocketSession session) {
private Long findUserIdBySession(WebSocketSession session) {
for (Map.Entry<Long, WebSocketSession> entry : sessions.entrySet()) {
if (entry.getValue().equals(session)) {
return entry.getKey();
@@ -226,6 +517,23 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
return null;
}
/**
* 清理用户连接和相关状态
* @param userId 用户ID
*/
public void cleanupUserConnection(Long userId) {
if (userId != null) {
sessions.remove(userId);
userStatus.remove(userId);
lastUpdateTimes.remove(userId);
userRoles.remove(userId);
userInfos.remove(userId);
// 广播更新后的在线用户列表
broadcastOnlineUserList();
}
}
/**
* 定时检查并清理过期的连接和位置信息每30秒执行一次
*/
@@ -233,14 +541,16 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
public void cleanupExpiredConnections() {
long currentTime = System.currentTimeMillis();
for (Map.Entry<Long, Long> entry : lastUpdateTimes.entrySet()) {
Long deliveryPersonId = entry.getKey();
Long userId = entry.getKey();
Long lastUpdateTime = entry.getValue();
// 如果超过WebSocket超时时间则移除该配送员的连接
// 如果超过WebSocket超时时间则移除该用户的连接
if (currentTime - lastUpdateTime > WEBSOCKET_TIMEOUT) {
WebSocketSession session = sessions.remove(deliveryPersonId);
deliveryPersonStatus.remove(deliveryPersonId);
lastUpdateTimes.remove(deliveryPersonId);
WebSocketSession session = sessions.remove(userId);
userStatus.remove(userId);
lastUpdateTimes.remove(userId);
userRoles.remove(userId);
userInfos.remove(userId);
// 关闭WebSocket连接
if (session != null && session.isOpen()) {
@@ -251,13 +561,19 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
}
}
System.out.println("已清理过期WebSocket连接配送员ID: " + deliveryPersonId);
System.out.println("已清理过期WebSocket连接用户ID: " + userId);
// 广播更新后的在线用户列表
broadcastOnlineUserList();
}
// 如果超过位置过期时间,则只清理位置信息,保留连接
else if (currentTime - lastUpdateTime > LOCATION_EXPIRE_TIME) {
// 标记为未签到状态,不再广播其位置
deliveryPersonStatus.put(deliveryPersonId, false);
System.out.println("配送员 " + deliveryPersonId + " 位置信息已过期");
userStatus.put(userId, false);
System.out.println("用户 " + userId + " 位置信息已过期");
// 广播更新后的在线用户列表
broadcastOnlineUserList();
}
}
}
@@ -266,8 +582,9 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
* 内部消息类用于解析WebSocket消息
*/
public static class LocationMessage {
private String type; // 消息类型: updateLocation, subscribe, unsubscribe, signin, signout
private Long deliveryPersonId; // 配送员ID
private String type; // 消息类型: updateLocation, signin, signout
private Long userId; // 用户ID
private String userRole; // 用户角色
private Double latitude; // 纬度
private Double longitude; // 经度
private Long timestamp; // 时间戳
@@ -281,12 +598,20 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
this.type = type;
}
public Long getDeliveryPersonId() {
return deliveryPersonId;
public Long getUserId() {
return userId;
}
public void setDeliveryPersonId(Long deliveryPersonId) {
this.deliveryPersonId = deliveryPersonId;
public void setUserId(Long userId) {
this.userId = userId;
}
public String getUserRole() {
return userRole;
}
public void setUserRole(String userRole) {
this.userRole = userRole;
}
public Double getLatitude() {
@@ -313,4 +638,61 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
this.timestamp = timestamp;
}
}
/**
* 在线用户列表消息类
*/
public static class OnlineUserListMessage {
private String type; // 消息类型: onlineUserList
private List<UserInfo> users; // 在线用户列表
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public List<UserInfo> getUsers() {
return users;
}
public void setUsers(List<UserInfo> users) {
this.users = users;
}
}
/**
* 用户信息类
*/
public static class UserInfo {
private Long userId;
private String name;
private String role;
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getRole() {
return role;
}
public void setRole(String role) {
this.role = role;
}
}
}

View File

@@ -23,11 +23,14 @@ public interface UserService {
/**
* 微信小程序登录接收code返回登录响应。
* @param code 微信登录凭证
* @return 登录响应
*/
LoginResponse wxLogin(String code);
/**
* 获取当前已登录用户(从上下文或模拟实现中)
* @return 当前用户
*/
User getCurrentUser();
@@ -39,19 +42,25 @@ public interface UserService {
*/
User signIn(Long userId);
/**
* 注册为骑手
* @param userId 用户ID
* @param name 姓名
* @param phone 手机号
* @return 注册结果
*/
User registerAsDeliveryPerson(Long userId, String name, String phone);
/**
* 检查用户是否已签到
* @param userId 用户ID
* @return true表示已签到false表示未签到
*/
boolean isUserSignedIn(Long userId);
/**
* 用户签退功能
* @param userId 用户ID
*/
void signOut(Long userId);
/**
* 注册为正式员工
* @param userId 用户ID
* @param name 姓名
* @param phone 手机号
* @return 注册结果
*/
User registerAsEmployee(Long userId, String name, String phone);
}

View File

@@ -15,6 +15,7 @@ import com.light.delivery.service.UserService;
import com.light.delivery.util.JwtUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
@@ -43,16 +44,16 @@ public class UserServiceImpl implements UserService {
private EmployeeRepository employeeRepository;
/**
* JWT工具类用于生成和解析JWT令牌
* Jwt工具类用于生成和解析JWT令牌
*/
@Autowired
private JwtUtil jwtUtil;
/**
* WebSocket处理器用于管理WebSocket连接
* Spring应用上下文用于获取其他Bean
*/
@Autowired
private LocationWebSocketHandler locationWebSocketHandler;
private ApplicationContext applicationContext;
/**
* 微信小程序appId
@@ -103,6 +104,25 @@ public class UserServiceImpl implements UserService {
throw new IllegalArgumentException("Invalid token: " + e.getMessage());
}
}
/**
* 根据用户ID获取用户信息。
* @param userId 用户ID
* @return 用户实体对象
* @throws IllegalArgumentException 当用户不存在时抛出异常
*/
public User getUserInfoById(Long userId) {
if (userId == null) {
throw new IllegalArgumentException("User ID is null");
}
Optional<User> userOptional = userRepository.findById(userId);
if (!userOptional.isPresent()) {
throw new IllegalArgumentException("用户不存在");
}
return userOptional.get();
}
/**
* 用户登出逻辑。
@@ -127,14 +147,18 @@ public class UserServiceImpl implements UserService {
return;
}
// 如果用户是配送员,通知WebSocket处理器清理连接
// 注意这里需要根据实际业务逻辑获取配送员ID
// 可能需要通过其他方式关联用户ID和配送员ID
// 这里假设用户ID和配送员ID相同根据项目实际情况调整
// locationWebSocketHandler.removeUserConnection(user.getId());
// 通知WebSocket处理器清理连接
try {
LocationWebSocketHandler handler = applicationContext.getBean(LocationWebSocketHandler.class);
// 清理用户状态
handler.cleanupUserConnection(user.getId());
} catch (Exception e) {
// 记录日志但不中断登出流程
System.err.println("清理WebSocket连接时出错: " + e.getMessage());
}
} catch (Exception e) {
// 记录日志但不中断登出流程
System.err.println("清理WebSocket连接时出错: " + e.getMessage());
System.err.println("登出时出错: " + e.getMessage());
}
}
@@ -227,6 +251,12 @@ public class UserServiceImpl implements UserService {
return user;
}
@Override
public void signOut(Long userId) {
// 签退操作在REST控制器中通过WebSocket处理器完成
// 这里可以添加其他签退逻辑(如记录签退时间等)
}
/**
* 用户注册成为配送员。
@@ -239,8 +269,8 @@ public class UserServiceImpl implements UserService {
* @throws IllegalArgumentException 当用户不存在、手机号未找到或姓名不匹配时抛出异常
*/
@Override
public User registerAsDeliveryPerson(Long userId, String name, String phone) {
System.out.println("尝试注册配送用户ID: " + userId);
public User registerAsEmployee(Long userId, String name, String phone) {
System.out.println("尝试注册员用户ID: " + userId);
Optional<User> userOptional = userRepository.findById(userId);
if (!userOptional.isPresent()) {
@@ -330,22 +360,18 @@ public class UserServiceImpl implements UserService {
*/
@Override
public boolean isUserSignedIn(Long userId) {
// 只有配送员角色才有签到状态
Optional<User> userOptional = userRepository.findById(userId);
if (!userOptional.isPresent()) {
return false;
}
User user = userOptional.get();
UserRole userRole = getUserRole(user);
if (userRole != UserRole.DELIVERY_PERSON) {
// 非配送员角色没有签到状态概念
return false;
}
// 检查WebSocket中的签到状态
return locationWebSocketHandler.isDeliveryPersonSignedIn(userId);
// 注意由于循环依赖问题这里不能直接注入LocationWebSocketHandler
// 在实际应用中,可能需要通过其他方式获取签到状态
// 现在通过静态方法或上下文获取LocationWebSocketHandler实例
try {
// 使用Spring上下文获取LocationWebSocketHandler实例
LocationWebSocketHandler handler = applicationContext.getBean(LocationWebSocketHandler.class);
return handler.isUserSignedIn(userId);
} catch (Exception e) {
System.err.println("获取WebSocket签到状态时出错: " + e.getMessage());
return false; // 默认返回false
}
}
/**