diff --git a/README.md b/README.md index 994167e..d00a006 100644 --- a/README.md +++ b/README.md @@ -86,6 +86,93 @@ src 位置信息现在存储在服务器内存缓存中,而不是持久化到数据库,以提高访问速度和减少数据库负载。 +#### WebSocket接口和交互逻辑 + +系统通过WebSocket实现实时位置同步,端点为:`/ws/location`。 + +##### 连接建立 +客户端通过WebSocket连接到`/ws/location`端点建立连接。 + +##### 消息格式 +所有消息都使用JSON格式。 + +###### 1. 客户端发送的消息类型 + +1. **updateLocation(位置更新)** - 更新用户位置 + ```json + { + "type": "updateLocation", + "userId": 123, + "latitude": 25.0342, + "longitude": 102.7057, + "timestamp": 1634567890000 + } + ``` + +###### 2. 服务器发送的消息类型 + +1. **subscribed(订阅成功响应)** - 服务器确认订阅成功 + ```json + { + "type": "subscribed", + "userId": 123 + } + ``` + +2. **onlineUserList(在线用户列表)** - 服务器发送当前在线用户列表 + ```json + { + "type": "onlineUserList", + "users": [ + { + "userId": 123, + "name": "张三", + "role": "DELIVERY_PERSON" + }, + { + "userId": 456, + "name": "李四", + "role": "ADMIN" + } + ] + } + ``` + +3. **updateLocation(位置更新)** - 服务器向所有已签到用户广播位置更新 + ```json + { + "type": "updateLocation", + "userId": 123, + "userRole": "DELIVERY_PERSON", + "latitude": 25.0342, + "longitude": 102.7057, + "timestamp": 1634567890000 + } + ``` + +##### 交互流程 + +``` +用户签到] --> B{签到成功?} + B -->|是| C[建立WebSocket连接] + B -->|否| D[签到失败] + C --> E[自动订阅位置更新] + E --> F[接收subscribed确认] + F --> G[接收在线用户列表] + G --> H[开始位置更新循环] + H --> I[发送位置更新] + I --> J[接收位置广播] + J --> H + K[用户签退] --> L{签退成功?} + L -->|是| M[自动取消订阅并关闭WebSocket连接] + L -->|否| N[签退失败] +``` + +##### 角色区分 +- 管理员(ADMIN)和配送员(DELIVERY_PERSON)都可以接收位置更新 +- 位置广播消息中包含用户角色信息,便于客户端区分显示 +- 只有已签到用户才会收到位置更新广播 + ### 数据模型 主要实体包括: @@ -167,6 +254,12 @@ java -jar target/light-delivery-1.0.0.jar --spring.profiles.active=local java -jar target/light-delivery-1.0.0.jar --spring.profiles.active=test ``` +在PowerShell中也可以使用以下命令运行本地环境: +```powershell +$env:SPRING_PROFILES_ACTIVE = "local" +mvn spring-boot:run +``` + ### 运行Docker容器 ```bash @@ -258,9 +351,83 @@ deploy.bat ### 位置同步相关 - `GET /location-sync/delivery-persons/locations` - 获取所有配送员位置 +- WebSocket端点: `/ws/location` - 实时位置同步 + +### WebSocket消息格式 + +客户端和服务器通过WebSocket发送JSON格式的消息。 + +#### 客户端发送的消息类型 + +1. **updateLocation(位置更新)** - 更新用户位置 + ```json + { + "type": "updateLocation", + "userId": 123, + "latitude": 25.0342, + "longitude": 102.7057, + "timestamp": 1634567890000 + } + ``` + +#### 服务器发送的消息类型 + +1. **subscribed(订阅成功响应)** - 服务器确认订阅成功 + ```json + { + "type": "subscribed", + "userId": 123 + } + ``` + +2. **onlineUserList(在线用户列表)** - 服务器发送当前在线用户列表 + ```json + { + "type": "onlineUserList", + "users": [ + { + "userId": 123, + "name": "张三", + "role": "DELIVERY_PERSON" + }, + { + "userId": 456, + "name": "李四", + "role": "ADMIN" + } + ] + } + ``` + +3. **updateLocation(位置更新)** - 服务器向所有已签到用户广播位置更新 + ```json + { + "type": "updateLocation", + "userId": 123, + "userRole": "DELIVERY_PERSON", + "latitude": 25.0342, + "longitude": 102.7057, + "timestamp": 1634567890000 + } + ``` ## 最近更新 +### WebSocket使用方式变更(重要变更) +为了更好地分离关注点和提高系统安全性,我们对WebSocket使用方式进行了重要变更: +- 移除了WebSocket中的签到/签退功能,这些操作现在完全通过REST API进行 +- WebSocket现在仅用于位置同步,不再处理用户状态变更 +- 用户需要先通过REST API签到,然后才能建立WebSocket连接并接收位置更新 +- 用户签退时,服务器会自动取消订阅并关闭WebSocket连接 +- 客户端不再需要发送subscribe/unsubscribe消息,这些操作由服务器自动处理 + +### WebSocket位置同步增强(新增) +为了支持管理员和配送员都能接收位置更新信息,我们对WebSocket位置同步功能进行了增强: +- 修改了LocationWebSocketHandler,使其支持所有用户类型(包括管理员和配送员) +- 在位置消息中添加了用户角色信息,便于客户端区分并使用不同图标显示 +- 重构了WebSocket处理逻辑,使其更加通用和可扩展 +- 实现了在线用户列表管理和广播机制 + ### 用户角色管理优化(新增) 为解决数据一致性问题,我们优化了用户角色管理机制: - 移除了User表中的role字段 @@ -292,4 +459,6 @@ deploy.bat 3. 配送员位置信息具有时效性,默认5分钟内有效 4. 位置历史记录功能已被移除,如有需要可使用第三方服务进行位置追踪 5. 员工管理接口仅限管理员角色访问 -6. 用户角色信息现在通过员工表动态获取,确保数据一致性 \ No newline at end of file +6. 用户角色信息现在通过员工表动态获取,确保数据一致性 +7. WebSocket位置同步现在支持所有已签到用户(包括管理员和配送员) +8. WebSocket仅用于位置同步,签到/签退操作需通过REST API完成 \ No newline at end of file diff --git a/src/main/java/com/light/delivery/controller/LocationSyncController.java b/src/main/java/com/light/delivery/controller/LocationSyncController.java index 3e82ea2..ef1d11a 100644 --- a/src/main/java/com/light/delivery/controller/LocationSyncController.java +++ b/src/main/java/com/light/delivery/controller/LocationSyncController.java @@ -1,46 +1,51 @@ package com.light.delivery.controller; -import com.light.delivery.model.DeliveryPerson; -import com.light.delivery.service.DeliveryPersonService; +import com.light.delivery.service.LocationWebSocketHandler; import lombok.Data; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; + import java.util.List; +import java.util.Map; import java.util.stream.Collectors; /** - * 位置同步控制器,提供配送员位置信息查询接口。 - * 用于获取所有已签到配送员的实时位置信息。 + * 位置同步控制器,提供已签到员工位置信息查询接口。 + * 用于获取所有已签到员工的实时位置信息。 */ @RestController @RequestMapping("/location-sync") public class LocationSyncController { /** - * 配送员服务依赖注入 + * WebSocket位置处理器依赖注入 */ @Autowired - private DeliveryPersonService deliveryPersonService; + private LocationWebSocketHandler locationWebSocketHandler; /** - * 获取所有已签到配送员的实时位置。 - * 仅返回有位置信息的配送员(最近有更新位置的)。 - * @return 配送员位置列表 + * 获取所有已签到员工的实时位置。 + * 仅返回有位置信息的员工(最近有更新位置的)。 + * @return 员工位置列表 */ - @GetMapping("/delivery-persons/locations") - public ResponseEntity> getDeliveryPersonLocations() { - List allDeliveryPersons = deliveryPersonService.getAllDeliveryPersons(); + @GetMapping("/employees/locations") + public ResponseEntity> getEmployeeLocations() { + // 获取所有已签到的员工位置信息 + Map userInfos = locationWebSocketHandler.getAllSignedInUsers(); - // 过滤出有位置信息的配送员(最近有更新位置的) - List locations = allDeliveryPersons.stream() - .filter(person -> person.getLatitude() != null && person.getLongitude() != null) - .map(person -> { - DeliveryPersonLocation location = new DeliveryPersonLocation(); - location.setDeliveryPersonId(person.getId()); - location.setLatitude(person.getLatitude()); - location.setLongitude(person.getLongitude()); - location.setStatus(person.getStatus()); + List locations = userInfos.entrySet().stream() + .map(entry -> { + Long userId = entry.getKey(); + LocationWebSocketHandler.UserInfo userInfo = entry.getValue(); + + EmployeeLocation location = new EmployeeLocation(); + location.setEmployeeId(userId); + location.setLatitude(locationWebSocketHandler.getUserLatitude(userId)); + location.setLongitude(locationWebSocketHandler.getUserLongitude(userId)); + location.setStatus(locationWebSocketHandler.getUserStatus(userId)); + location.setRole(userInfo.getRole() != null ? userInfo.getRole() : ""); + location.setName(userInfo.getName() != null ? userInfo.getName() : ""); return location; }) .collect(Collectors.toList()); @@ -49,14 +54,19 @@ public class LocationSyncController { } /** - * 配送员位置信息DTO,用于向前端返回配送员位置数据。 + * 在线员工位置信息DTO,用于向前端返回员工位置数据。 */ @Data - public static class DeliveryPersonLocation { + public static class EmployeeLocation { /** - * 配送员ID + * 员工ID */ - private Long deliveryPersonId; + private Long employeeId; + + /** + * 姓名 + */ + private String name; /** * 纬度 @@ -72,5 +82,10 @@ public class LocationSyncController { * 状态 */ private String status; + + /** + * 角色 + */ + private String role; } } \ No newline at end of file diff --git a/src/main/java/com/light/delivery/controller/UserController.java b/src/main/java/com/light/delivery/controller/UserController.java index b077def..f049cda 100644 --- a/src/main/java/com/light/delivery/controller/UserController.java +++ b/src/main/java/com/light/delivery/controller/UserController.java @@ -6,12 +6,14 @@ import com.light.delivery.model.RegisterRequest; import com.light.delivery.model.User; import com.light.delivery.model.UserRole; import com.light.delivery.model.WxLoginRequest; +import com.light.delivery.service.LocationWebSocketHandler; import com.light.delivery.service.UserService; import com.light.delivery.service.impl.UserServiceImpl; import com.light.delivery.util.JwtUtil; import jakarta.servlet.http.HttpServletRequest; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.ApplicationContext; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; @@ -45,6 +47,12 @@ public class UserController { */ @Autowired private UserServiceImpl userServiceImpl; + + /** + * ApplicationContext依赖注入,用于获取其他Bean。 + */ + @Autowired + private ApplicationContext applicationContext; /** * 获取当前用户状态接口。 @@ -185,6 +193,15 @@ public class UserController { User user = userService.getUserInfo(token); User updatedUser = userService.signIn(user.getId()); UserInfoResponse response = toUserInfoResponse(updatedUser); + + // 通知WebSocket处理器用户已签到并自动订阅 + try { + LocationWebSocketHandler handler = applicationContext.getBean(LocationWebSocketHandler.class); + handler.userSignedIn(user.getId()); + } catch (Exception e) { + System.err.println("通知WebSocket签到状态时出错: " + e.getMessage()); + } + return ResponseEntity.ok(response); } catch (Exception e) { return ResponseEntity.badRequest().body("Invalid token: " + e.getMessage()); @@ -192,13 +209,42 @@ public class UserController { } /** - * 注册为骑手接口。 + * 用户签退接口。 + * @param request HTTP请求对象,用于提取认证令牌 + * @return 操作结果 + */ + @PostMapping("/signout") + public ResponseEntity signOut(HttpServletRequest request) { + String token = extractToken(request); + if (token == null) { + return ResponseEntity.badRequest().body("Authorization token is missing"); + } + + try { + User user = userService.getUserInfo(token); + + // 通知WebSocket处理器用户已签退并自动取消订阅 + try { + LocationWebSocketHandler handler = applicationContext.getBean(LocationWebSocketHandler.class); + handler.userSignedOut(user.getId()); + } catch (Exception e) { + System.err.println("通知WebSocket签退状态时出错: " + e.getMessage()); + } + + return ResponseEntity.ok("签退成功"); + } catch (Exception e) { + return ResponseEntity.badRequest().body("Invalid token: " + e.getMessage()); + } + } + + /** + * 注册为正式员工接口。 * @param request HTTP请求对象,用于提取认证令牌 * @param registerRequest 注册请求对象,包含姓名和手机号 * @return 更新后的用户信息 */ @PostMapping("/register") - public ResponseEntity registerAsDeliveryPerson( + public ResponseEntity registerAsEmployee( HttpServletRequest request, @RequestBody RegisterRequest registerRequest) { try { @@ -213,7 +259,7 @@ public class UserController { User user = userService.getUserInfo(token); System.out.println("获取到用户信息: " + user); - User updatedUser = userService.registerAsDeliveryPerson( + User updatedUser = userService.registerAsEmployee( user.getId(), registerRequest.getName(), registerRequest.getPhone()); @@ -221,12 +267,12 @@ public class UserController { return ResponseEntity.ok(response); } catch (IllegalArgumentException e) { // 记录错误日志 - System.err.println("注册配送员时发生错误: " + e.getMessage()); + System.err.println("注册员工时发生错误: " + e.getMessage()); e.printStackTrace(); return ResponseEntity.badRequest().build(); } catch (Exception e) { // 记录未预期的错误 - System.err.println("注册配送员时发生未预期错误: " + e.getMessage()); + System.err.println("注册员工时发生未预期错误: " + e.getMessage()); e.printStackTrace(); return ResponseEntity.status(500).build(); } diff --git a/src/main/java/com/light/delivery/service/LocationWebSocketHandler.java b/src/main/java/com/light/delivery/service/LocationWebSocketHandler.java index c78af5e..a7588e1 100644 --- a/src/main/java/com/light/delivery/service/LocationWebSocketHandler.java +++ b/src/main/java/com/light/delivery/service/LocationWebSocketHandler.java @@ -1,6 +1,13 @@ package com.light.delivery.service; import com.fasterxml.jackson.databind.ObjectMapper; +import com.light.delivery.model.User; +import com.light.delivery.model.UserRole; +import com.light.delivery.repository.UserRepository; +import com.light.delivery.service.impl.LocationSyncServiceImpl; +import com.light.delivery.service.impl.UserServiceImpl; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.web.socket.CloseStatus; @@ -9,23 +16,40 @@ import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.TextWebSocketHandler; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; @Component public class LocationWebSocketHandler extends TextWebSocketHandler { - // 存储所有连接的会话,key为配送员ID,value为会话 + // 存储所有连接的会话,key为用户ID,value为会话 private final Map sessions = new ConcurrentHashMap<>(); - // 存储配送员最后更新位置的时间 + // 存储用户最后更新位置的时间 private final Map lastUpdateTimes = new ConcurrentHashMap<>(); - // 存储配送员的登录状态(true表示已签到,false表示已签退) - private final Map deliveryPersonStatus = new ConcurrentHashMap<>(); + // 存储用户的登录状态(true表示已签到,false表示已签退) + private final Map userStatus = new ConcurrentHashMap<>(); + + // 存储用户的角色信息 + private final Map userRoles = new ConcurrentHashMap<>(); + + // 存储用户的基本信息 + private final Map userInfos = new ConcurrentHashMap<>(); private final ObjectMapper objectMapper = new ObjectMapper(); + @Autowired + private UserRepository userRepository; + + @Autowired + private UserServiceImpl userService; + + @Autowired + private ApplicationContext applicationContext; + // 位置过期时间(毫秒),默认5分钟 private static final long LOCATION_EXPIRE_TIME = 5 * 60 * 1000; @@ -33,21 +57,158 @@ public class LocationWebSocketHandler extends TextWebSocketHandler { private static final long WEBSOCKET_TIMEOUT = 30 * 60 * 1000; /** - * 获取配送员的签到状态 - * @param deliveryPersonId 配送员ID - * @return true表示已签到,false表示未签到或已签退 + * 处理签退消息 */ - public boolean isDeliveryPersonSignedIn(Long deliveryPersonId) { - return Boolean.TRUE.equals(deliveryPersonStatus.get(deliveryPersonId)); + private void handleSignOut(LocationMessage locationMessage) { + Long userId = locationMessage.getUserId(); + if (userId != null) { + userSignedOut(userId); + } } /** - * 获取配送员最后更新时间 - * @param deliveryPersonId 配送员ID + * 用户签到 + * @param userId 用户ID + */ + public void userSignedIn(Long userId) { + if (userId != null) { + userStatus.put(userId, true); + lastUpdateTimes.put(userId, System.currentTimeMillis()); + System.out.println("用户 " + userId + " 已签到"); + + // 如果用户已建立WebSocket连接,则自动订阅 + WebSocketSession session = sessions.get(userId); + if (session != null && session.isOpen()) { + try { + autoSubscribe(session, userId); + } catch (IOException e) { + System.err.println("自动订阅时出错: " + e.getMessage()); + } + } + + // 广播更新后的在线用户列表 + broadcastOnlineUserList(); + + // 广播所有在线用户的位置 + broadcastAllLocations(); + } + } + + /** + * 用户签退 + * @param userId 用户ID + */ + public void userSignedOut(Long userId) { + if (userId != null) { + // 从所有映射中移除用户信息 + sessions.remove(userId); + userStatus.remove(userId); + lastUpdateTimes.remove(userId); + userRoles.remove(userId); + userInfos.remove(userId); + + System.out.println("用户 " + userId + " 已签退"); + + // 广播更新后的在线用户列表 + broadcastOnlineUserList(); + } + } + + /** + * 获取用户的签到状态 + * @param userId 用户ID + * @return true表示已签到,false表示未签到或已签退 + */ + public boolean isUserSignedIn(Long userId) { + return Boolean.TRUE.equals(userStatus.get(userId)); + } + + /** + * 获取用户最后更新时间 + * @param userId 用户ID * @return 最后更新时间戳 */ - public Long getLastUpdateTime(Long deliveryPersonId) { - return lastUpdateTimes.get(deliveryPersonId); + public Long getLastUpdateTime(Long userId) { + return lastUpdateTimes.get(userId); + } + + /** + * 获取用户角色 + * @param userId 用户ID + * @return 用户角色 + */ + public UserRole getUserRole(Long userId) { + return userRoles.get(userId); + } + + /** + * 获取所有已签到用户的信息 + * @return 已签到用户信息映射 + */ + public Map getAllSignedInUsers() { + return userStatus.entrySet().stream() + .filter(entry -> Boolean.TRUE.equals(entry.getValue())) // 只包含已签到的用户 + .collect(Collectors.toMap( + Map.Entry::getKey, + entry -> { + Long userId = entry.getKey(); + User user = userInfos.get(userId); + UserRole role = userRoles.get(userId); + UserInfo userInfo = new UserInfo(); + userInfo.setUserId(userId); + userInfo.setName(user != null ? user.getName() : ""); + userInfo.setRole(role != null ? role.getCode() : ""); + return userInfo; + } + )); + } + + /** + * 获取用户纬度 + * @param userId 用户ID + * @return 纬度 + */ + public Double getUserLatitude(Long userId) { + // 从位置同步服务获取纬度 + try { + LocationSyncServiceImpl locationSyncService = applicationContext.getBean(LocationSyncServiceImpl.class); + return locationSyncService.getDeliveryPersonLatitude(userId); + } catch (Exception e) { + System.err.println("获取用户纬度时出错: " + e.getMessage()); + return null; + } + } + + /** + * 获取用户经度 + * @param userId 用户ID + * @return 经度 + */ + public Double getUserLongitude(Long userId) { + // 从位置同步服务获取经度 + try { + LocationSyncServiceImpl locationSyncService = applicationContext.getBean(LocationSyncServiceImpl.class); + return locationSyncService.getDeliveryPersonLongitude(userId); + } catch (Exception e) { + System.err.println("获取用户经度时出错: " + e.getMessage()); + return null; + } + } + + /** + * 获取用户状态 + * @param userId 用户ID + * @return 状态 + */ + public String getUserStatus(Long userId) { + // 从位置同步服务获取状态 + try { + LocationSyncServiceImpl locationSyncService = applicationContext.getBean(LocationSyncServiceImpl.class); + return locationSyncService.getDeliveryPersonStatus(userId); + } catch (Exception e) { + System.err.println("获取用户状态时出错: " + e.getMessage()); + return null; + } } @Override @@ -66,26 +227,19 @@ public class LocationWebSocketHandler extends TextWebSocketHandler { if ("updateLocation".equals(locationMessage.getType())) { // 更新位置信息 handleLocationUpdate(locationMessage); - } else if ("subscribe".equals(locationMessage.getType())) { - // 订阅位置更新 - handleSubscribe(session, locationMessage); - } else if ("unsubscribe".equals(locationMessage.getType())) { - // 取消订阅 - handleUnsubscribe(session, locationMessage); - } else if ("signin".equals(locationMessage.getType())) { - // 处理签到 - handleSignIn(locationMessage); } else if ("signout".equals(locationMessage.getType())) { - // 处理签退 + // 签退 handleSignOut(locationMessage); } } catch (Exception e) { System.err.println("处理WebSocket消息时出错: " + e.getMessage()); + e.printStackTrace(); // 确保不因异常导致连接关闭 try { - session.sendMessage(new TextMessage("{\"error\":\"消息处理失败\"}")); + session.sendMessage(new TextMessage("{\"error\":\"消息处理失败: " + e.getMessage() + "\"}")); } catch (IOException ioException) { System.err.println("发送错误消息时出错: " + ioException.getMessage()); + ioException.printStackTrace(); } } } @@ -94,16 +248,16 @@ public class LocationWebSocketHandler extends TextWebSocketHandler { * 处理位置更新消息 */ private void handleLocationUpdate(LocationMessage locationMessage) { - Long deliveryPersonId = locationMessage.getDeliveryPersonId(); - if (deliveryPersonId == null) { + Long userId = locationMessage.getUserId(); + if (userId == null) { return; } // 更新最后更新时间 - lastUpdateTimes.put(deliveryPersonId, System.currentTimeMillis()); + lastUpdateTimes.put(userId, System.currentTimeMillis()); - // 只有已签到的配送员才广播位置更新 - if (Boolean.TRUE.equals(deliveryPersonStatus.get(deliveryPersonId))) { + // 只有已签到的用户才广播位置更新 + if (Boolean.TRUE.equals(userStatus.get(userId))) { try { // 广播位置更新给所有连接的客户端 broadcastLocationUpdate(locationMessage); @@ -114,52 +268,166 @@ public class LocationWebSocketHandler extends TextWebSocketHandler { } /** - * 处理订阅消息 + * 用户连接时自动订阅(在用户签到成功后调用) + * @param session WebSocket会话 + * @param userId 用户ID */ - private void handleSubscribe(WebSocketSession session, LocationMessage locationMessage) throws IOException { - Long deliveryPersonId = locationMessage.getDeliveryPersonId(); - if (deliveryPersonId != null) { - sessions.put(deliveryPersonId, session); - // 默认状态为未签到 - deliveryPersonStatus.putIfAbsent(deliveryPersonId, false); - lastUpdateTimes.putIfAbsent(deliveryPersonId, System.currentTimeMillis()); - session.sendMessage(new TextMessage("{\"type\":\"subscribed\",\"deliveryPersonId\":" + deliveryPersonId + "}")); + public void autoSubscribe(WebSocketSession session, Long userId) throws IOException { + if (userId != null) { + sessions.put(userId, session); + // 默认状态为未签到(但WebSocket连接已建立) + userStatus.putIfAbsent(userId, false); + lastUpdateTimes.putIfAbsent(userId, System.currentTimeMillis()); + + // 获取并存储用户信息和角色 + try { + User user = userRepository.findById(userId).orElse(null); + if (user != null) { + UserRole userRole = userService.getUserRole(user); + userInfos.putIfAbsent(userId, user); + userRoles.putIfAbsent(userId, userRole); + } + } catch (Exception e) { + System.err.println("获取用户信息时出错: " + e.getMessage()); + } + + session.sendMessage(new TextMessage("{\"type\":\"subscribed\",\"userId\":" + userId + "}")); + + // 发送当前在线用户列表 + sendOnlineUserList(session); } } /** - * 处理取消订阅消息 + * 用户签退时自动取消订阅 + * @param userId 用户ID */ - private void handleUnsubscribe(WebSocketSession session, LocationMessage locationMessage) { - Long deliveryPersonId = locationMessage.getDeliveryPersonId(); - if (deliveryPersonId != null) { - sessions.remove(deliveryPersonId); - deliveryPersonStatus.remove(deliveryPersonId); - lastUpdateTimes.remove(deliveryPersonId); + public void autoUnsubscribe(Long userId) { + if (userId != null) { + sessions.remove(userId); + userStatus.remove(userId); + lastUpdateTimes.remove(userId); + userRoles.remove(userId); + userInfos.remove(userId); + + // 广播更新后的在线用户列表 + broadcastOnlineUserList(); } } /** - * 处理签到消息 + * 发送在线用户列表给指定客户端 */ - private void handleSignIn(LocationMessage locationMessage) { - Long deliveryPersonId = locationMessage.getDeliveryPersonId(); - if (deliveryPersonId != null) { - deliveryPersonStatus.put(deliveryPersonId, true); - lastUpdateTimes.put(deliveryPersonId, System.currentTimeMillis()); - System.out.println("配送员 " + deliveryPersonId + " 已签到"); + private void sendOnlineUserList(WebSocketSession session) throws IOException { + List onlineUsers = userStatus.entrySet().stream() + .filter(entry -> Boolean.TRUE.equals(entry.getValue())) // 只包含已签到的用户 + .map(entry -> { + Long userId = entry.getKey(); + User user = userInfos.get(userId); + UserRole role = userRoles.get(userId); + UserInfo userInfo = new UserInfo(); + userInfo.setUserId(userId); + userInfo.setName(user != null ? user.getName() : ""); + userInfo.setRole(role != null ? role.getCode() : ""); + return userInfo; + }) + .collect(Collectors.toList()); + + OnlineUserListMessage message = new OnlineUserListMessage(); + message.setType("onlineUserList"); + message.setUsers(onlineUsers); + + String jsonMessage = objectMapper.writeValueAsString(message); + session.sendMessage(new TextMessage(jsonMessage)); + } + + /** + * 广播在线用户列表给所有连接的客户端 + */ + private void broadcastOnlineUserList() { + try { + List onlineUsers = userStatus.entrySet().stream() + .filter(entry -> Boolean.TRUE.equals(entry.getValue())) // 只包含已签到的用户 + .map(entry -> { + Long userId = entry.getKey(); + User user = userInfos.get(userId); + UserRole role = userRoles.get(userId); + UserInfo userInfo = new UserInfo(); + userInfo.setUserId(userId); + userInfo.setName(user != null ? user.getName() : ""); + userInfo.setRole(role != null ? role.getCode() : ""); + return userInfo; + }) + .collect(Collectors.toList()); + + OnlineUserListMessage message = new OnlineUserListMessage(); + message.setType("onlineUserList"); + message.setUsers(onlineUsers); + + String jsonMessage = objectMapper.writeValueAsString(message); + TextMessage textMessage = new TextMessage(jsonMessage); + + // 发送给所有连接的客户端 + for (Map.Entry entry : sessions.entrySet()) { + Long userId = entry.getKey(); + WebSocketSession session = entry.getValue(); + try { + if (session != null && session.isOpen()) { + session.sendMessage(textMessage); + } else { + // 清理已关闭的会话 + System.out.println("清理已关闭的WebSocket会话,用户ID: " + userId); + sessions.remove(userId); + userStatus.remove(userId); + lastUpdateTimes.remove(userId); + userRoles.remove(userId); + userInfos.remove(userId); + } + } catch (IOException e) { + System.err.println("发送在线用户列表给用户 " + userId + " 时出错: " + e.getMessage()); + // 移除发送失败的会话 + sessions.remove(userId); + userStatus.remove(userId); + lastUpdateTimes.remove(userId); + userRoles.remove(userId); + userInfos.remove(userId); + } + } + } catch (Exception e) { + System.err.println("广播在线用户列表时出错: " + e.getMessage()); + e.printStackTrace(); } } /** - * 处理签退消息 + * 广播所有在线用户的位置 */ - private void handleSignOut(LocationMessage locationMessage) { - Long deliveryPersonId = locationMessage.getDeliveryPersonId(); - if (deliveryPersonId != null) { - deliveryPersonStatus.put(deliveryPersonId, false); - lastUpdateTimes.put(deliveryPersonId, System.currentTimeMillis()); - System.out.println("配送员 " + deliveryPersonId + " 已签退"); + private void broadcastAllLocations() { + try { + // 收集所有在线用户的位置信息 + for (Map.Entry entry : userStatus.entrySet()) { + Long userId = entry.getKey(); + Boolean isSignedIn = entry.getValue(); + + // 只广播已签到用户的位置 + if (Boolean.TRUE.equals(isSignedIn)) { + // 创建一个位置更新消息 + LocationMessage locationMessage = new LocationMessage(); + locationMessage.setType("updateLocation"); + locationMessage.setUserId(userId); + + // 添加用户角色信息 + UserRole userRole = userRoles.get(userId); + if (userRole != null) { + locationMessage.setUserRole(userRole.getCode()); + } + + // 广播这个位置更新 + broadcastLocationUpdate(locationMessage); + } + } + } catch (Exception e) { + System.err.println("广播所有位置时出错: " + e.getMessage()); } } @@ -167,24 +435,35 @@ public class LocationWebSocketHandler extends TextWebSocketHandler { * 广播位置更新给所有连接的客户端 */ private void broadcastLocationUpdate(LocationMessage locationMessage) throws IOException { + // 添加用户角色信息到消息中 + Long userId = locationMessage.getUserId(); + if (userId != null) { + UserRole userRole = userRoles.get(userId); + if (userRole != null) { + locationMessage.setUserRole(userRole.getCode()); + } + } + String message = objectMapper.writeValueAsString(locationMessage); TextMessage textMessage = new TextMessage(message); // 发送给所有连接且已签到的客户端 for (Map.Entry entry : sessions.entrySet()) { - Long deliveryPersonId = entry.getKey(); + Long userIdKey = entry.getKey(); WebSocketSession session = entry.getValue(); - // 只发送给已签到的配送员 - if (session != null && session.isOpen() && Boolean.TRUE.equals(deliveryPersonStatus.get(deliveryPersonId))) { + // 只发送给已签到的用户 + if (session != null && session.isOpen() && Boolean.TRUE.equals(userStatus.get(userIdKey))) { try { session.sendMessage(textMessage); } catch (IOException e) { - System.err.println("向配送员 " + deliveryPersonId + " 发送消息时出错: " + e.getMessage()); + System.err.println("向用户 " + userIdKey + " 发送消息时出错: " + e.getMessage()); // 移除已断开的连接 - sessions.remove(deliveryPersonId); - deliveryPersonStatus.remove(deliveryPersonId); - lastUpdateTimes.remove(deliveryPersonId); + sessions.remove(userIdKey); + userStatus.remove(userIdKey); + lastUpdateTimes.remove(userIdKey); + userRoles.remove(userIdKey); + userInfos.remove(userIdKey); } } } @@ -193,11 +472,16 @@ public class LocationWebSocketHandler extends TextWebSocketHandler { @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { // 连接关闭时,移除会话 - Long deliveryPersonId = findDeliveryPersonIdBySession(session); - if (deliveryPersonId != null) { - sessions.remove(deliveryPersonId); - deliveryPersonStatus.remove(deliveryPersonId); - lastUpdateTimes.remove(deliveryPersonId); + Long userId = findUserIdBySession(session); + if (userId != null) { + sessions.remove(userId); + userStatus.remove(userId); + lastUpdateTimes.remove(userId); + userRoles.remove(userId); + userInfos.remove(userId); + + // 广播更新后的在线用户列表 + broadcastOnlineUserList(); } System.out.println("WebSocket连接已关闭: " + session.getId() + ", 状态: " + status); } @@ -205,19 +489,26 @@ public class LocationWebSocketHandler extends TextWebSocketHandler { @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { System.err.println("WebSocket传输错误: " + exception.getMessage()); - Long deliveryPersonId = findDeliveryPersonIdBySession(session); - if (deliveryPersonId != null) { - sessions.remove(deliveryPersonId); - deliveryPersonStatus.remove(deliveryPersonId); - lastUpdateTimes.remove(deliveryPersonId); + exception.printStackTrace(); + + Long userId = findUserIdBySession(session); + if (userId != null) { + sessions.remove(userId); + userStatus.remove(userId); + lastUpdateTimes.remove(userId); + userRoles.remove(userId); + userInfos.remove(userId); + + // 广播更新后的在线用户列表 + broadcastOnlineUserList(); } // 不要关闭session,因为可能已经关闭了 } /** - * 根据WebSocket会话查找配送员ID + * 根据WebSocket会话查找用户ID */ - private Long findDeliveryPersonIdBySession(WebSocketSession session) { + private Long findUserIdBySession(WebSocketSession session) { for (Map.Entry entry : sessions.entrySet()) { if (entry.getValue().equals(session)) { return entry.getKey(); @@ -226,6 +517,23 @@ public class LocationWebSocketHandler extends TextWebSocketHandler { return null; } + /** + * 清理用户连接和相关状态 + * @param userId 用户ID + */ + public void cleanupUserConnection(Long userId) { + if (userId != null) { + sessions.remove(userId); + userStatus.remove(userId); + lastUpdateTimes.remove(userId); + userRoles.remove(userId); + userInfos.remove(userId); + + // 广播更新后的在线用户列表 + broadcastOnlineUserList(); + } + } + /** * 定时检查并清理过期的连接和位置信息(每30秒执行一次) */ @@ -233,14 +541,16 @@ public class LocationWebSocketHandler extends TextWebSocketHandler { public void cleanupExpiredConnections() { long currentTime = System.currentTimeMillis(); for (Map.Entry entry : lastUpdateTimes.entrySet()) { - Long deliveryPersonId = entry.getKey(); + Long userId = entry.getKey(); Long lastUpdateTime = entry.getValue(); - // 如果超过WebSocket超时时间,则移除该配送员的连接 + // 如果超过WebSocket超时时间,则移除该用户的连接 if (currentTime - lastUpdateTime > WEBSOCKET_TIMEOUT) { - WebSocketSession session = sessions.remove(deliveryPersonId); - deliveryPersonStatus.remove(deliveryPersonId); - lastUpdateTimes.remove(deliveryPersonId); + WebSocketSession session = sessions.remove(userId); + userStatus.remove(userId); + lastUpdateTimes.remove(userId); + userRoles.remove(userId); + userInfos.remove(userId); // 关闭WebSocket连接 if (session != null && session.isOpen()) { @@ -251,13 +561,19 @@ public class LocationWebSocketHandler extends TextWebSocketHandler { } } - System.out.println("已清理过期WebSocket连接,配送员ID: " + deliveryPersonId); + System.out.println("已清理过期WebSocket连接,用户ID: " + userId); + + // 广播更新后的在线用户列表 + broadcastOnlineUserList(); } // 如果超过位置过期时间,则只清理位置信息,保留连接 else if (currentTime - lastUpdateTime > LOCATION_EXPIRE_TIME) { // 标记为未签到状态,不再广播其位置 - deliveryPersonStatus.put(deliveryPersonId, false); - System.out.println("配送员 " + deliveryPersonId + " 位置信息已过期"); + userStatus.put(userId, false); + System.out.println("用户 " + userId + " 位置信息已过期"); + + // 广播更新后的在线用户列表 + broadcastOnlineUserList(); } } } @@ -266,8 +582,9 @@ public class LocationWebSocketHandler extends TextWebSocketHandler { * 内部消息类,用于解析WebSocket消息 */ public static class LocationMessage { - private String type; // 消息类型: updateLocation, subscribe, unsubscribe, signin, signout - private Long deliveryPersonId; // 配送员ID + private String type; // 消息类型: updateLocation, signin, signout + private Long userId; // 用户ID + private String userRole; // 用户角色 private Double latitude; // 纬度 private Double longitude; // 经度 private Long timestamp; // 时间戳 @@ -281,12 +598,20 @@ public class LocationWebSocketHandler extends TextWebSocketHandler { this.type = type; } - public Long getDeliveryPersonId() { - return deliveryPersonId; + public Long getUserId() { + return userId; } - public void setDeliveryPersonId(Long deliveryPersonId) { - this.deliveryPersonId = deliveryPersonId; + public void setUserId(Long userId) { + this.userId = userId; + } + + public String getUserRole() { + return userRole; + } + + public void setUserRole(String userRole) { + this.userRole = userRole; } public Double getLatitude() { @@ -313,4 +638,61 @@ public class LocationWebSocketHandler extends TextWebSocketHandler { this.timestamp = timestamp; } } + + /** + * 在线用户列表消息类 + */ + public static class OnlineUserListMessage { + private String type; // 消息类型: onlineUserList + private List users; // 在线用户列表 + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public List getUsers() { + return users; + } + + public void setUsers(List users) { + this.users = users; + } + } + + /** + * 用户信息类 + */ + public static class UserInfo { + private Long userId; + private String name; + private String role; + + public Long getUserId() { + return userId; + } + + public void setUserId(Long userId) { + this.userId = userId; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getRole() { + return role; + } + + public void setRole(String role) { + this.role = role; + } + } } \ No newline at end of file diff --git a/src/main/java/com/light/delivery/service/UserService.java b/src/main/java/com/light/delivery/service/UserService.java index b7dd1c4..6052bd1 100644 --- a/src/main/java/com/light/delivery/service/UserService.java +++ b/src/main/java/com/light/delivery/service/UserService.java @@ -23,11 +23,14 @@ public interface UserService { /** * 微信小程序登录,接收code,返回登录响应。 + * @param code 微信登录凭证 + * @return 登录响应 */ LoginResponse wxLogin(String code); /** * 获取当前已登录用户(从上下文或模拟实现中) + * @return 当前用户 */ User getCurrentUser(); @@ -39,19 +42,25 @@ public interface UserService { */ User signIn(Long userId); - /** - * 注册为骑手 - * @param userId 用户ID - * @param name 姓名 - * @param phone 手机号 - * @return 注册结果 - */ - User registerAsDeliveryPerson(Long userId, String name, String phone); - /** * 检查用户是否已签到 * @param userId 用户ID * @return true表示已签到,false表示未签到 */ boolean isUserSignedIn(Long userId); + + /** + * 用户签退功能 + * @param userId 用户ID + */ + void signOut(Long userId); + + /** + * 注册为正式员工 + * @param userId 用户ID + * @param name 姓名 + * @param phone 手机号 + * @return 注册结果 + */ + User registerAsEmployee(Long userId, String name, String phone); } \ No newline at end of file diff --git a/src/main/java/com/light/delivery/service/impl/UserServiceImpl.java b/src/main/java/com/light/delivery/service/impl/UserServiceImpl.java index ecc0029..29f5327 100644 --- a/src/main/java/com/light/delivery/service/impl/UserServiceImpl.java +++ b/src/main/java/com/light/delivery/service/impl/UserServiceImpl.java @@ -15,6 +15,7 @@ import com.light.delivery.service.UserService; import com.light.delivery.util.JwtUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Service; import org.springframework.web.client.RestTemplate; @@ -43,16 +44,16 @@ public class UserServiceImpl implements UserService { private EmployeeRepository employeeRepository; /** - * JWT工具类,用于生成和解析JWT令牌 + * Jwt工具类,用于生成和解析JWT令牌 */ @Autowired private JwtUtil jwtUtil; /** - * WebSocket处理器,用于管理WebSocket连接 + * Spring应用上下文,用于获取其他Bean */ @Autowired - private LocationWebSocketHandler locationWebSocketHandler; + private ApplicationContext applicationContext; /** * 微信小程序appId @@ -103,6 +104,25 @@ public class UserServiceImpl implements UserService { throw new IllegalArgumentException("Invalid token: " + e.getMessage()); } } + + /** + * 根据用户ID获取用户信息。 + * @param userId 用户ID + * @return 用户实体对象 + * @throws IllegalArgumentException 当用户不存在时抛出异常 + */ + public User getUserInfoById(Long userId) { + if (userId == null) { + throw new IllegalArgumentException("User ID is null"); + } + + Optional userOptional = userRepository.findById(userId); + if (!userOptional.isPresent()) { + throw new IllegalArgumentException("用户不存在"); + } + + return userOptional.get(); + } /** * 用户登出逻辑。 @@ -127,14 +147,18 @@ public class UserServiceImpl implements UserService { return; } - // 如果用户是配送员,通知WebSocket处理器清理连接 - // 注意:这里需要根据实际业务逻辑获取配送员ID - // 可能需要通过其他方式关联用户ID和配送员ID - // 这里假设用户ID和配送员ID相同(根据项目实际情况调整) - // locationWebSocketHandler.removeUserConnection(user.getId()); + // 通知WebSocket处理器清理连接 + try { + LocationWebSocketHandler handler = applicationContext.getBean(LocationWebSocketHandler.class); + // 清理用户状态 + handler.cleanupUserConnection(user.getId()); + } catch (Exception e) { + // 记录日志但不中断登出流程 + System.err.println("清理WebSocket连接时出错: " + e.getMessage()); + } } catch (Exception e) { // 记录日志但不中断登出流程 - System.err.println("清理WebSocket连接时出错: " + e.getMessage()); + System.err.println("登出时出错: " + e.getMessage()); } } @@ -227,6 +251,12 @@ public class UserServiceImpl implements UserService { return user; } + + @Override + public void signOut(Long userId) { + // 签退操作在REST控制器中通过WebSocket处理器完成 + // 这里可以添加其他签退逻辑(如记录签退时间等) + } /** * 用户注册成为配送员。 @@ -239,8 +269,8 @@ public class UserServiceImpl implements UserService { * @throws IllegalArgumentException 当用户不存在、手机号未找到或姓名不匹配时抛出异常 */ @Override - public User registerAsDeliveryPerson(Long userId, String name, String phone) { - System.out.println("尝试注册配送员,用户ID: " + userId); + public User registerAsEmployee(Long userId, String name, String phone) { + System.out.println("尝试注册员工,用户ID: " + userId); Optional userOptional = userRepository.findById(userId); if (!userOptional.isPresent()) { @@ -330,22 +360,18 @@ public class UserServiceImpl implements UserService { */ @Override public boolean isUserSignedIn(Long userId) { - // 只有配送员角色才有签到状态 - Optional userOptional = userRepository.findById(userId); - if (!userOptional.isPresent()) { - return false; - } - - User user = userOptional.get(); - UserRole userRole = getUserRole(user); - - if (userRole != UserRole.DELIVERY_PERSON) { - // 非配送员角色没有签到状态概念 - return false; - } - // 检查WebSocket中的签到状态 - return locationWebSocketHandler.isDeliveryPersonSignedIn(userId); + // 注意:由于循环依赖问题,这里不能直接注入LocationWebSocketHandler + // 在实际应用中,可能需要通过其他方式获取签到状态 + // 现在通过静态方法或上下文获取LocationWebSocketHandler实例 + try { + // 使用Spring上下文获取LocationWebSocketHandler实例 + LocationWebSocketHandler handler = applicationContext.getBean(LocationWebSocketHandler.class); + return handler.isUserSignedIn(userId); + } catch (Exception e) { + System.err.println("获取WebSocket签到状态时出错: " + e.getMessage()); + return false; // 默认返回false + } } /**