修改签到和更新逻辑
All checks were successful
构建并部署 Spring Boot 应用 / build-and-deploy (push) Successful in 8m53s

This commit is contained in:
2025-10-19 05:41:24 +08:00
parent 74c0908b58
commit c33cbd799d
9 changed files with 564 additions and 570 deletions

View File

@@ -91,7 +91,7 @@ src
系统通过WebSocket实现实时位置同步端点为`/ws/location`
##### 连接建立
客户端通过WebSocket连接到`/ws/location`端点建立连接
客户端通过WebSocket连接到`/ws/location?userId={userId}`端点建立连接需要在查询参数中提供用户ID
##### 消息格式
所有消息都使用JSON格式。
@@ -127,33 +127,40 @@ src
{
"userId": 123,
"name": "张三",
"role": "DELIVERY_PERSON"
},
{
"userId": 456,
"name": "李四",
"role": "ADMIN"
"role": "DELIVERY_PERSON",
"userStatus": true,
"lastUpdateTime": 1634567890000,
"locationData": {
"latitude": 25.0342,
"longitude": 102.7057,
"timestamp": 1634567890000
}
}
]
}
```
3. **updateLocation(位置更新)** - 服务器向所有已签到用户广播位置更新
3. **userLocationList用户位置列表** - 服务器每30秒发送所有在线用户的位置列表
```json
{
"type": "updateLocation",
"type": "userLocationList",
"users": [
{
"userId": 123,
"userRole": "DELIVERY_PERSON",
"locationData": {
"latitude": 25.0342,
"longitude": 102.7057,
"timestamp": 1634567890000
}
}
]
}
```
##### 交互流程
```
用户签到] --> B{签到成功?}
[用户签到] --> B{签到成功?}
B -->|是| C[建立WebSocket连接]
B -->|否| D[签到失败]
C --> E[自动订阅位置更新]
@@ -161,8 +168,7 @@ src
F --> G[接收在线用户列表]
G --> H[开始位置更新循环]
H --> I[发送位置更新]
I --> J[接收位置广播]
J --> H
I --> J[每30秒接收一次位置列表]
K[用户签退] --> L{签退成功?}
L -->|是| M[自动取消订阅并关闭WebSocket连接]
L -->|否| N[签退失败]
@@ -331,6 +337,7 @@ deploy.bat
- `GET /user/info` - 获取用户信息
- `POST /user/logout` - 用户登出
- `POST /user/signin` - 用户签到
- `POST /user/signout` - 用户签退
- `POST /user/register` - 注册为配送员
### 订单相关
@@ -341,7 +348,7 @@ deploy.bat
### 配送员相关
- `GET /delivery-persons` - 获取配送员列表
- `GET /delivery-persons/{id}` - 获取配送员详情
- `PUT /delivery-persons/{id}/location` - 更新配送员位置
- `GET /delivery-persons/{id}/orders` - 获取配送员订单
### 员工管理相关(仅限管理员访问)
- `GET /employees` - 获取员工列表
@@ -388,27 +395,34 @@ deploy.bat
{
"userId": 123,
"name": "张三",
"role": "DELIVERY_PERSON"
},
{
"userId": 456,
"name": "李四",
"role": "ADMIN"
"role": "DELIVERY_PERSON",
"userStatus": true,
"lastUpdateTime": 1634567890000,
"locationData": {
"latitude": 25.0342,
"longitude": 102.7057,
"timestamp": 1634567890000
}
}
]
}
```
3. **updateLocation(位置更新)** - 服务器向所有已签到用户广播位置更新
3. **userLocationList用户位置列表** - 服务器每30秒发送所有在线用户的位置列表
```json
{
"type": "updateLocation",
"type": "userLocationList",
"users": [
{
"userId": 123,
"userRole": "DELIVERY_PERSON",
"locationData": {
"latitude": 25.0342,
"longitude": 102.7057,
"timestamp": 1634567890000
}
}
]
}
```
## 最近更新
@@ -420,6 +434,7 @@ deploy.bat
- 用户需要先通过REST API签到然后才能建立WebSocket连接并接收位置更新
- 用户签退时服务器会自动取消订阅并关闭WebSocket连接
- 客户端不再需要发送subscribe/unsubscribe消息这些操作由服务器自动处理
- 服务器现在每30秒批量发送一次所有在线用户的位置信息而不是实时发送单个用户位置更新
### WebSocket位置同步增强新增
为了支持管理员和配送员都能接收位置更新信息我们对WebSocket位置同步功能进行了增强

View File

@@ -69,18 +69,6 @@ public class DeliveryPersonController {
return ResponseEntity.ok(response);
}
/**
* 更新指定货运人员位置。
* @param id 货运人员ID
* @param locationRequest 包含经纬度
* @return 操作结果
*/
@PutMapping("/{id}/location")
public ResponseEntity<String> updateLocation(@PathVariable Long id, @RequestBody LocationRequest locationRequest) {
deliveryPersonService.updateLocation(id, locationRequest.getLongitude(), locationRequest.getLatitude());
return ResponseEntity.ok("位置更新成功");
}
/**
* 获取货运人员的当前订单。
* @param id 货运人员ID
@@ -112,13 +100,4 @@ public class DeliveryPersonController {
}).collect(Collectors.toList());
return ResponseEntity.ok(orders);
}
/**
* 位置请求体
*/
@Data
public static class LocationRequest {
private Double longitude;
private Double latitude;
}
}

View File

@@ -1,91 +0,0 @@
package com.light.delivery.controller;
import com.light.delivery.service.LocationWebSocketHandler;
import lombok.Data;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* 位置同步控制器,提供已签到员工位置信息查询接口。
* 用于获取所有已签到员工的实时位置信息。
*/
@RestController
@RequestMapping("/location-sync")
public class LocationSyncController {
/**
* WebSocket位置处理器依赖注入
*/
@Autowired
private LocationWebSocketHandler locationWebSocketHandler;
/**
* 获取所有已签到员工的实时位置。
* 仅返回有位置信息的员工(最近有更新位置的)。
* @return 员工位置列表
*/
@GetMapping("/employees/locations")
public ResponseEntity<List<EmployeeLocation>> getEmployeeLocations() {
// 获取所有已签到的员工位置信息
Map<Long, LocationWebSocketHandler.UserInfo> userInfos = locationWebSocketHandler.getAllSignedInUsers();
List<EmployeeLocation> locations = userInfos.entrySet().stream()
.map(entry -> {
Long userId = entry.getKey();
LocationWebSocketHandler.UserInfo userInfo = entry.getValue();
EmployeeLocation location = new EmployeeLocation();
location.setEmployeeId(userId);
location.setLatitude(locationWebSocketHandler.getUserLatitude(userId));
location.setLongitude(locationWebSocketHandler.getUserLongitude(userId));
location.setStatus(locationWebSocketHandler.getUserStatus(userId));
location.setRole(userInfo.getRole() != null ? userInfo.getRole() : "");
location.setName(userInfo.getName() != null ? userInfo.getName() : "");
return location;
})
.collect(Collectors.toList());
return ResponseEntity.ok(locations);
}
/**
* 在线员工位置信息DTO用于向前端返回员工位置数据。
*/
@Data
public static class EmployeeLocation {
/**
* 员工ID
*/
private Long employeeId;
/**
* 姓名
*/
private String name;
/**
* 纬度
*/
private Double latitude;
/**
* 经度
*/
private Double longitude;
/**
* 状态
*/
private String status;
/**
* 角色
*/
private String role;
}
}

View File

@@ -180,15 +180,26 @@ public class UserController {
/**
* 用户签到接口。
* @param request HTTP请求对象用于提取认证令牌
* @param initialLocation 初始位置信息
* @return 更新后的用户信息
*/
@PostMapping("/signin")
public ResponseEntity<?> signIn(HttpServletRequest request) {
public ResponseEntity<?> signIn(HttpServletRequest request, @RequestBody LocationWebSocketHandler.LocationData initialLocation) {
String token = extractToken(request);
if (token == null) {
return ResponseEntity.badRequest().body("Authorization token is missing");
}
// 检查位置信息是否为空
if (initialLocation == null) {
return ResponseEntity.badRequest().body("Initial location information is required");
}
// 检查位置信息的必要字段是否为空
if (initialLocation.getLatitude() == null || initialLocation.getLongitude() == null) {
return ResponseEntity.badRequest().body("Latitude and longitude are required in location information");
}
try {
User user = userService.getUserInfo(token);
User updatedUser = userService.signIn(user.getId());
@@ -197,7 +208,7 @@ public class UserController {
// 通知WebSocket处理器用户已签到并自动订阅
try {
LocationWebSocketHandler handler = applicationContext.getBean(LocationWebSocketHandler.class);
handler.userSignedIn(user.getId());
handler.userSignedIn(user.getId(), initialLocation);
} catch (Exception e) {
System.err.println("通知WebSocket签到状态时出错: " + e.getMessage());
}

View File

@@ -20,13 +20,6 @@ public interface DeliveryPersonService {
* @return 配送员对象
*/
DeliveryPerson getDeliveryPersonById(Long id);
/**
* 更新配送员当前位置。
* @param id 配送员ID
* @param longitude 经度
* @param latitude 纬度
*/
void updateLocation(Long id, Double longitude, Double latitude);
/**
* 获取指定配送员的所有订单。
* @param id 配送员ID

View File

@@ -5,14 +5,6 @@ package com.light.delivery.service;
*/
public interface LocationSyncService {
/**
* 处理配送员位置更新
* @param deliveryPersonId 配送员ID
* @param longitude 经度
* @param latitude 纬度
*/
void handleLocationUpdate(Long deliveryPersonId, Double longitude, Double latitude);
/**
* 订阅位置更新
* @param sessionId 会话ID

View File

@@ -18,6 +18,7 @@ import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@@ -27,17 +28,11 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
// 存储所有连接的会话key为用户IDvalue为会话
private final Map<Long, WebSocketSession> sessions = new ConcurrentHashMap<>();
// 存储用户最后更新位置的时间
private final Map<Long, Long> lastUpdateTimes = new ConcurrentHashMap<>();
// 统一存储在线用户完整信息的映射
private final Map<Long, UserInfo> userFullInfos = new ConcurrentHashMap<>();
// 存储用户的登录状态true表示已签到false表示已签退
private final Map<Long, Boolean> userStatus = new ConcurrentHashMap<>();
// 存储用户的角色信息
private final Map<Long, UserRole> userRoles = new ConcurrentHashMap<>();
// 存储用户的基本信息
private final Map<Long, User> userInfos = new ConcurrentHashMap<>();
// 存储在线用户的实时位置信息
private final Map<Long, UserLocation> userLocations = new ConcurrentHashMap<>();
private final ObjectMapper objectMapper = new ObjectMapper();
@@ -51,10 +46,10 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
private ApplicationContext applicationContext;
// 位置过期时间毫秒默认5分钟
private static final long LOCATION_EXPIRE_TIME = 5 * 60 * 1000;
private final long LOCATION_EXPIRE_TIME = 5 * 60 * 1000;
// WebSocket连接超时时间毫秒默认30分钟
private static final long WEBSOCKET_TIMEOUT = 30 * 60 * 1000;
private final long WEBSOCKET_TIMEOUT = 30 * 60 * 1000;
/**
* 处理签退消息
@@ -69,30 +64,67 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
/**
* 用户签到
* @param userId 用户ID
* @param initialLocation 初始位置信息
*/
public void userSignedIn(Long userId) {
public void userSignedIn(Long userId, LocationData initialLocation) {
System.out.println("处理用户签到用户ID: " + userId + ", 初始位置: " + initialLocation);
if (userId != null) {
userStatus.put(userId, true);
lastUpdateTimes.put(userId, System.currentTimeMillis());
// 获取用户信息
User user = userRepository.findById(userId).orElse(null);
if (user != null) {
UserRole userRole = userService.getUserRole(user);
System.out.println("用户角色: " + userRole);
// 创建并存储用户完整信息
UserInfo userInfo = new UserInfo();
userInfo.setUserId(userId);
userInfo.setName(user.getName());
userInfo.setRole(userRole != null ? userRole : null);
userInfo.setUserStatus(true); // 设置为已签到状态
userInfo.setLastUpdateTime(System.currentTimeMillis());
userFullInfos.put(userId, userInfo);
System.out.println("用户信息已添加到userFullInfos");
System.out.println("用户 " + userId + " 已签到");
// 如果提供了初始位置,存储它
if (initialLocation != null) {
UserLocation userLocation = new UserLocation();
userLocation.setUserId(userId);
userLocation.setLocationData(initialLocation);
userLocations.put(userId, userLocation);
System.out.println("初始位置已存储: " + initialLocation.getLatitude() + ", " + initialLocation.getLongitude());
}
// 如果用户已建立WebSocket连接则自动订阅
WebSocketSession session = sessions.get(userId);
System.out.println("用户的WebSocket会话: " + session);
if (session != null && session.isOpen()) {
System.out.println("会话已打开,尝试自动订阅");
try {
autoSubscribe(session, userId);
} catch (IOException e) {
System.err.println("自动订阅时出错: " + e.getMessage());
}
} else {
System.out.println("会话未打开或为空");
}
// 广播更新后的在线用户列表
System.out.println("准备广播在线用户列表");
broadcastOnlineUserList();
// 广播所有在线用户的位置
broadcastAllLocations();
}
}
}
/**
* 用户签到(重载方法,不带初始位置)
* @param userId 用户ID
*/
public void userSignedIn(Long userId) {
userSignedIn(userId, null);
}
/**
* 用户签退
@@ -102,10 +134,8 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
if (userId != null) {
// 从所有映射中移除用户信息
sessions.remove(userId);
userStatus.remove(userId);
lastUpdateTimes.remove(userId);
userRoles.remove(userId);
userInfos.remove(userId);
userFullInfos.remove(userId);
userLocations.remove(userId);
System.out.println("用户 " + userId + " 已签退");
@@ -120,7 +150,8 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
* @return true表示已签到false表示未签到或已签退
*/
public boolean isUserSignedIn(Long userId) {
return Boolean.TRUE.equals(userStatus.get(userId));
UserInfo userInfo = userFullInfos.get(userId);
return userInfo != null && userInfo.isUserStatus();
}
/**
@@ -129,7 +160,8 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
* @return 最后更新时间戳
*/
public Long getLastUpdateTime(Long userId) {
return lastUpdateTimes.get(userId);
UserInfo userInfo = userFullInfos.get(userId);
return userInfo != null ? userInfo.getLastUpdateTime() : null;
}
/**
@@ -138,7 +170,8 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
* @return 用户角色
*/
public UserRole getUserRole(Long userId) {
return userRoles.get(userId);
UserInfo userInfo = userFullInfos.get(userId);
return userInfo != null ? userInfo.getRole() : null;
}
/**
@@ -146,21 +179,9 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
* @return 已签到用户信息映射
*/
public Map<Long, UserInfo> getAllSignedInUsers() {
return userStatus.entrySet().stream()
.filter(entry -> Boolean.TRUE.equals(entry.getValue())) // 只包含已签到的用户
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> {
Long userId = entry.getKey();
User user = userInfos.get(userId);
UserRole role = userRoles.get(userId);
UserInfo userInfo = new UserInfo();
userInfo.setUserId(userId);
userInfo.setName(user != null ? user.getName() : "");
userInfo.setRole(role != null ? role.getCode() : "");
return userInfo;
}
));
return userFullInfos.entrySet().stream()
.filter(entry -> entry.getValue() != null && entry.getValue().isUserStatus())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
/**
@@ -169,14 +190,8 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
* @return 纬度
*/
public Double getUserLatitude(Long userId) {
// 从位置同步服务获取纬度
try {
LocationSyncServiceImpl locationSyncService = applicationContext.getBean(LocationSyncServiceImpl.class);
return locationSyncService.getDeliveryPersonLatitude(userId);
} catch (Exception e) {
System.err.println("获取用户纬度时出错: " + e.getMessage());
return null;
}
UserLocation userLocation = userLocations.get(userId);
return userLocation != null ? userLocation.getLocationData().getLatitude() : null;
}
/**
@@ -185,14 +200,8 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
* @return 经度
*/
public Double getUserLongitude(Long userId) {
// 从位置同步服务获取经度
try {
LocationSyncServiceImpl locationSyncService = applicationContext.getBean(LocationSyncServiceImpl.class);
return locationSyncService.getDeliveryPersonLongitude(userId);
} catch (Exception e) {
System.err.println("获取用户经度时出错: " + e.getMessage());
return null;
}
UserLocation userLocation = userLocations.get(userId);
return userLocation != null ? userLocation.getLocationData().getLongitude() : null;
}
/**
@@ -215,6 +224,63 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
// 连接建立时,可以在这里进行身份验证
System.out.println("WebSocket连接已建立: " + session.getId());
// 从会话中获取用户ID需要在握手时设置
String userIdStr = session.getUri().getQuery();
System.out.println("收到的查询参数: " + userIdStr);
if (userIdStr != null && userIdStr.startsWith("userId=")) {
try {
Long userId = Long.parseLong(userIdStr.substring(7));
System.out.println("解析出的用户ID: " + userId);
sessions.put(userId, session);
System.out.println("已将会话添加到sessions映射中");
// 获取并存储用户信息和角色
User user = userRepository.findById(userId).orElse(null);
if (user != null) {
System.out.println("找到用户: " + user.getId() + ", 姓名: " + user.getName());
UserRole userRole = userService.getUserRole(user);
System.out.println("用户角色: " + userRole);
// 如果用户信息不存在,创建新的;如果存在,更新会话引用
UserInfo userInfo = userFullInfos.get(userId);
System.out.println("当前用户在userFullInfos中的信息: " + userInfo);
if (userInfo == null) {
userInfo = new UserInfo();
userInfo.setUserId(userId);
userInfo.setName(user.getName());
userInfo.setRole(userRole);
userInfo.setUserStatus(false); // 默认未签到
userInfo.setLastUpdateTime(System.currentTimeMillis());
userFullInfos.put(userId, userInfo);
System.out.println("创建新的用户信息并添加到userFullInfos");
} else {
System.out.println("用户信息已存在");
}
} else {
System.out.println("未找到用户");
}
System.out.println("WebSocket会话已添加到会话映射中用户ID: " + userId);
// 如果用户已经签到,自动订阅
UserInfo userInfo = userFullInfos.get(userId);
if (userInfo != null && userInfo.isUserStatus()) {
System.out.println("用户已签到,尝试自动订阅");
try {
autoSubscribe(session, userId);
} catch (IOException e) {
System.err.println("自动订阅时出错: " + e.getMessage());
}
} else {
System.out.println("用户未签到或用户信息为空,不自动订阅");
}
} catch (NumberFormatException e) {
System.err.println("无效的用户ID格式: " + userIdStr);
}
} else {
System.err.println("WebSocket连接中未找到用户ID参数");
}
}
@Override
@@ -224,6 +290,8 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
String payload = message.getPayload();
LocationMessage locationMessage = objectMapper.readValue(payload, LocationMessage.class);
System.out.println("收到WebSocket消息: " + payload);
if ("updateLocation".equals(locationMessage.getType())) {
// 更新位置信息
handleLocationUpdate(locationMessage);
@@ -253,17 +321,33 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
return;
}
// 更新最后更新时间
lastUpdateTimes.put(userId, System.currentTimeMillis());
System.out.println("处理用户 " + userId + " 的位置更新: 纬度=" + locationMessage.getLatitude() +
", 经度=" + locationMessage.getLongitude());
// 只有已签到的用户才广播位置更新
if (Boolean.TRUE.equals(userStatus.get(userId))) {
try {
// 广播位置更新给所有连接的客户端
broadcastLocationUpdate(locationMessage);
} catch (IOException e) {
System.err.println("广播位置更新时出错: " + e.getMessage());
// 更新用户最后更新时间
UserInfo userInfo = userFullInfos.get(userId);
if (userInfo != null) {
userInfo.setLastUpdateTime(System.currentTimeMillis());
}
// 只有已签到的用户才存储位置更新
if (userInfo != null && userInfo.isUserStatus()) {
// 存储用户位置信息
UserLocation userLocation = new UserLocation();
userLocation.setUserId(userId);
LocationData locationData = new LocationData();
locationData.setLatitude(locationMessage.getLatitude());
locationData.setLongitude(locationMessage.getLongitude());
locationData.setTimestamp(locationMessage.getTimestamp() != null ?
locationMessage.getTimestamp() : System.currentTimeMillis());
userLocation.setLocationData(locationData);
userLocations.put(userId, userLocation);
System.out.println("已存储用户 " + userId + " 的位置信息");
} else {
System.out.println("用户 " + userId + " 未签到,忽略位置更新");
}
}
@@ -274,44 +358,18 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
*/
public void autoSubscribe(WebSocketSession session, Long userId) throws IOException {
if (userId != null) {
sessions.put(userId, session);
// 默认状态为未签到但WebSocket连接已建立
userStatus.putIfAbsent(userId, false);
lastUpdateTimes.putIfAbsent(userId, System.currentTimeMillis());
// 用户必须是已签到状态才能订阅
UserInfo userInfo = userFullInfos.get(userId);
if (userInfo == null || !userInfo.isUserStatus()) {
System.err.println("用户 " + userId + " 未签到,无法订阅");
return;
}
// 获取并存储用户信息和角色
try {
User user = userRepository.findById(userId).orElse(null);
if (user != null) {
UserRole userRole = userService.getUserRole(user);
userInfos.putIfAbsent(userId, user);
userRoles.putIfAbsent(userId, userRole);
}
} catch (Exception e) {
System.err.println("获取用户信息时出错: " + e.getMessage());
}
// 更新最后更新时间
userInfo.setLastUpdateTime(System.currentTimeMillis());
session.sendMessage(new TextMessage("{\"type\":\"subscribed\",\"userId\":" + userId + "}"));
// 发送当前在线用户列表
sendOnlineUserList(session);
}
}
/**
* 用户签退时自动取消订阅
* @param userId 用户ID
*/
public void autoUnsubscribe(Long userId) {
if (userId != null) {
sessions.remove(userId);
userStatus.remove(userId);
lastUpdateTimes.remove(userId);
userRoles.remove(userId);
userInfos.remove(userId);
// 广播更新后的在线用户列表
broadcastOnlineUserList();
System.out.println("向用户 " + userId + " 发送订阅确认消息");
}
}
@@ -319,18 +377,31 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
* 发送在线用户列表给指定客户端
*/
private void sendOnlineUserList(WebSocketSession session) throws IOException {
List<UserInfo> onlineUsers = userStatus.entrySet().stream()
.filter(entry -> Boolean.TRUE.equals(entry.getValue())) // 只包含已签到的用户
.map(entry -> {
Long userId = entry.getKey();
User user = userInfos.get(userId);
UserRole role = userRoles.get(userId);
UserInfo userInfo = new UserInfo();
userInfo.setUserId(userId);
userInfo.setName(user != null ? user.getName() : "");
userInfo.setRole(role != null ? role.getCode() : "");
List<UserInfo> onlineUsers = userFullInfos.values().stream()
.filter(UserInfo::isUserStatus) // 只包含已签到的用户
.map(userInfo -> {
// 如果有位置信息,也一并发送
UserLocation userLocation = userLocations.get(userInfo.getUserId());
if (userLocation != null && userLocation.getLocationData() != null) {
// 创建包含位置信息的用户信息对象
UserInfo userInfoWithLocation = new UserInfo();
userInfoWithLocation.setUserId(userInfo.getUserId());
userInfoWithLocation.setName(userInfo.getName());
userInfoWithLocation.setRole(userInfo.getRole());
userInfoWithLocation.setUserStatus(userInfo.isUserStatus());
userInfoWithLocation.setLastUpdateTime(userInfo.getLastUpdateTime());
LocationData locationData = new LocationData();
locationData.setLatitude(userLocation.getLocationData().getLatitude());
locationData.setLongitude(userLocation.getLocationData().getLongitude());
locationData.setTimestamp(userLocation.getLocationData().getTimestamp());
userInfoWithLocation.setLocationData(locationData);
return userInfoWithLocation;
}
return userInfo;
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
OnlineUserListMessage message = new OnlineUserListMessage();
@@ -338,6 +409,7 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
message.setUsers(onlineUsers);
String jsonMessage = objectMapper.writeValueAsString(message);
System.out.println("向会话 " + session.getId() + " 发送在线用户列表: " + jsonMessage);
session.sendMessage(new TextMessage(jsonMessage));
}
@@ -346,20 +418,45 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
*/
private void broadcastOnlineUserList() {
try {
List<UserInfo> onlineUsers = userStatus.entrySet().stream()
.filter(entry -> Boolean.TRUE.equals(entry.getValue())) // 只包含已签到的用户
.map(entry -> {
Long userId = entry.getKey();
User user = userInfos.get(userId);
UserRole role = userRoles.get(userId);
UserInfo userInfo = new UserInfo();
userInfo.setUserId(userId);
userInfo.setName(user != null ? user.getName() : "");
userInfo.setRole(role != null ? role.getCode() : "");
System.out.println("开始广播在线用户列表...");
System.out.println("当前userFullInfos大小: " + userFullInfos.size());
System.out.println("当前userFullInfos内容: " + userFullInfos);
System.out.println("当前sessions大小: " + sessions.size());
System.out.println("当前sessions内容: " + sessions);
List<UserInfo> onlineUsers = userFullInfos.values().stream()
.filter(UserInfo::isUserStatus) // 只包含已签到的用户
.map(userInfo -> {
System.out.println("处理用户: " + userInfo.getUserId() + ", 状态: " + userInfo.isUserStatus());
// 如果有位置信息,也一并发送
UserLocation userLocation = userLocations.get(userInfo.getUserId());
if (userLocation != null && userLocation.getLocationData() != null) {
System.out.println("用户 " + userInfo.getUserId() + " 有位置信息");
// 创建包含位置信息的用户信息对象
UserInfo userInfoWithLocation = new UserInfo();
userInfoWithLocation.setUserId(userInfo.getUserId());
userInfoWithLocation.setName(userInfo.getName());
userInfoWithLocation.setRole(userInfo.getRole());
userInfoWithLocation.setUserStatus(userInfo.isUserStatus());
userInfoWithLocation.setLastUpdateTime(userInfo.getLastUpdateTime());
LocationData locationData = new LocationData();
locationData.setLatitude(userLocation.getLocationData().getLatitude());
locationData.setLongitude(userLocation.getLocationData().getLongitude());
locationData.setTimestamp(userLocation.getLocationData().getTimestamp());
userInfoWithLocation.setLocationData(locationData);
return userInfoWithLocation;
}
System.out.println("用户 " + userInfo.getUserId() + " 没有位置信息");
return userInfo;
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
System.out.println("过滤后的在线用户数量: " + onlineUsers.size());
System.out.println("在线用户列表: " + onlineUsers);
OnlineUserListMessage message = new OnlineUserListMessage();
message.setType("onlineUserList");
message.setUsers(onlineUsers);
@@ -367,30 +464,29 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
String jsonMessage = objectMapper.writeValueAsString(message);
TextMessage textMessage = new TextMessage(jsonMessage);
System.out.println("广播在线用户列表给所有连接的客户端: " + jsonMessage);
System.out.println("当前会话数量: " + sessions.size());
System.out.println("会话列表: " + sessions.keySet());
// 发送给所有连接的客户端
for (Map.Entry<Long, WebSocketSession> entry : sessions.entrySet()) {
Long userId = entry.getKey();
WebSocketSession session = entry.getValue();
System.out.println("检查用户 " + userId + " 的会话状态: session=" + session + ", isOpen=" + (session != null ? session.isOpen() : "null"));
try {
if (session != null && session.isOpen()) {
System.out.println("向用户 " + userId + " 发送在线用户列表");
session.sendMessage(textMessage);
} else {
System.out.println("用户 " + userId + " 的会话已关闭或不存在");
// 清理已关闭的会话
System.out.println("清理已关闭的WebSocket会话用户ID: " + userId);
sessions.remove(userId);
userStatus.remove(userId);
lastUpdateTimes.remove(userId);
userRoles.remove(userId);
userInfos.remove(userId);
cleanupUserConnection(userId);
}
} catch (IOException e) {
System.err.println("发送在线用户列表给用户 " + userId + " 时出错: " + e.getMessage());
// 移除发送失败的会话
sessions.remove(userId);
userStatus.remove(userId);
lastUpdateTimes.remove(userId);
userRoles.remove(userId);
userInfos.remove(userId);
cleanupUserConnection(userId);
}
}
} catch (Exception e) {
@@ -400,90 +496,83 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
}
/**
* 广播所有在线用户的位置
* 定时广播所有在线用户的位置列表每30秒一次
*/
private void broadcastAllLocations() {
@Scheduled(fixedRate = 30000) // 每30秒执行一次
public void broadcastAllUserLocations() {
System.out.println("开始执行定时位置广播任务");
try {
// 收集所有在线用户的位置信息
for (Map.Entry<Long, Boolean> entry : userStatus.entrySet()) {
Long userId = entry.getKey();
Boolean isSignedIn = entry.getValue();
// 构建所有在线用户的位置列表
List<UserLocation> userLocationsList = userFullInfos.values().stream()
.filter(UserInfo::isUserStatus) // 只包含已签到的用户
.map(userInfo -> {
Long userId = userInfo.getUserId();
UserLocation userLocation = userLocations.get(userId);
// 只广播已签到用户的位置
if (Boolean.TRUE.equals(isSignedIn)) {
// 创建一个位置更新消息
LocationMessage locationMessage = new LocationMessage();
locationMessage.setType("updateLocation");
locationMessage.setUserId(userId);
if (userLocation != null) {
UserLocation broadcastLocation = new UserLocation();
broadcastLocation.setUserId(userId);
broadcastLocation.setLocationData(userLocation.getLocationData());
return broadcastLocation;
}
return null;
})
.filter(Objects::nonNull) // 过滤掉没有位置信息的用户
.collect(Collectors.toList());
// 添加用户角色信息
UserRole userRole = userRoles.get(userId);
if (userRole != null) {
locationMessage.setUserRole(userRole.getCode());
System.out.println("准备发送位置列表,用户数量: " + userLocationsList.size());
// 如果没有用户有位置信息,则不发送
if (userLocationsList.isEmpty()) {
System.out.println("没有用户有位置信息,跳过发送");
return;
}
// 广播这个位置更新
broadcastLocationUpdate(locationMessage);
}
}
} catch (Exception e) {
System.err.println("广播所有位置时出错: " + e.getMessage());
}
}
// 创建位置列表消息
UserLocationListMessage message = new UserLocationListMessage();
message.setType("userLocationList");
message.setUsers(userLocationsList);
/**
* 广播位置更新给所有连接的客户端
*/
private void broadcastLocationUpdate(LocationMessage locationMessage) throws IOException {
// 添加用户角色信息到消息中
Long userId = locationMessage.getUserId();
if (userId != null) {
UserRole userRole = userRoles.get(userId);
if (userRole != null) {
locationMessage.setUserRole(userRole.getCode());
}
}
// 转换为JSON并广播给所有连接的客户端
String jsonMessage = objectMapper.writeValueAsString(message);
TextMessage textMessage = new TextMessage(jsonMessage);
String message = objectMapper.writeValueAsString(locationMessage);
TextMessage textMessage = new TextMessage(message);
System.out.println("广播用户位置列表: " + jsonMessage);
// 发送给所有连接且已签到的客户端
for (Map.Entry<Long, WebSocketSession> entry : sessions.entrySet()) {
Long userIdKey = entry.getKey();
Long userId = entry.getKey();
WebSocketSession session = entry.getValue();
// 只发送给已签到的用户
if (session != null && session.isOpen() && Boolean.TRUE.equals(userStatus.get(userIdKey))) {
UserInfo userInfo = userFullInfos.get(userId);
if (session != null && session.isOpen() && userInfo != null && userInfo.isUserStatus()) {
try {
System.out.println("向用户 " + userId + " 发送位置列表");
session.sendMessage(textMessage);
} catch (IOException e) {
System.err.println("向用户 " + userIdKey + " 发送消息时出错: " + e.getMessage());
System.err.println("向用户 " + userId + " 发送位置列表时出错: " + e.getMessage());
// 移除已断开的连接
sessions.remove(userIdKey);
userStatus.remove(userIdKey);
lastUpdateTimes.remove(userIdKey);
userRoles.remove(userIdKey);
userInfos.remove(userIdKey);
cleanupUserConnection(userId);
}
}
}
} catch (Exception e) {
System.err.println("广播所有用户位置列表时出错: " + e.getMessage());
e.printStackTrace();
}
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
// 连接关闭时,移除会话
Long userId = findUserIdBySession(session);
System.out.println("WebSocket连接已关闭: " + session.getId() + ", 用户ID: " + userId + ", 状态: " + status);
if (userId != null) {
sessions.remove(userId);
userStatus.remove(userId);
lastUpdateTimes.remove(userId);
userRoles.remove(userId);
userInfos.remove(userId);
// 广播更新后的在线用户列表
broadcastOnlineUserList();
cleanupUserConnection(userId);
}
System.out.println("WebSocket连接关闭: " + session.getId() + ", 状态: " + status);
System.out.println("WebSocket连接关闭处理完成: " + session.getId());
}
@Override
@@ -493,14 +582,7 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
Long userId = findUserIdBySession(session);
if (userId != null) {
sessions.remove(userId);
userStatus.remove(userId);
lastUpdateTimes.remove(userId);
userRoles.remove(userId);
userInfos.remove(userId);
// 广播更新后的在线用户列表
broadcastOnlineUserList();
cleanupUserConnection(userId);
}
// 不要关闭session因为可能已经关闭了
}
@@ -524,175 +606,206 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
public void cleanupUserConnection(Long userId) {
if (userId != null) {
sessions.remove(userId);
userStatus.remove(userId);
lastUpdateTimes.remove(userId);
userRoles.remove(userId);
userInfos.remove(userId);
// 广播更新后的在线用户列表
broadcastOnlineUserList();
// 注意这里不清理userFullInfos和userLocations因为用户可能只是断开了WebSocket连接但未签退
// 签退应该通过明确的签退操作来处理
}
}
/**
* 定时检查并清理过期的连接和位置信息每30秒执行一次
*
* 该方法主要处理两种超时情况:
* 1. WebSocket连接超时当用户超过30分钟未活动时将被自动签退
* 2. 位置信息过期当用户位置信息超过5分钟未更新时将被清理但保留用户状态
*
* 处理逻辑:
* - 遍历所有已登录用户的信息
* - 检查每个用户的最后活动时间
* - 对于超时的连接,执行完整的清理操作(关闭连接、移除会话、广播更新等)
* - 对于位置信息过期的用户,仅清理位置数据
*/
@Scheduled(fixedRate = 30000)
public void cleanupExpiredConnections() {
System.out.println("开始执行定时清理过期连接任务...");
long currentTime = System.currentTimeMillis();
for (Map.Entry<Long, Long> entry : lastUpdateTimes.entrySet()) {
System.out.println("当前时间: " + currentTime + ", 在线用户数: " + userFullInfos.size());
// 检查所有用户的最后更新时间
for (Map.Entry<Long, UserInfo> entry : userFullInfos.entrySet()) {
Long userId = entry.getKey();
Long lastUpdateTime = entry.getValue();
UserInfo userInfo = entry.getValue();
System.out.println("检查用户 " + userId + " 的活动状态");
// 如果超过WebSocket超时时间则移除该用户的连接
if (currentTime - lastUpdateTime > WEBSOCKET_TIMEOUT) {
// 如果用户信息中没有最后更新时间,则跳过
if (userInfo.getLastUpdateTime() == null) {
System.out.println("用户 " + userId + " 没有最后更新时间,跳过检查");
continue;
}
long timeSinceLastUpdate = currentTime - userInfo.getLastUpdateTime();
System.out.println("用户 " + userId + " 距离上次更新时间: " + timeSinceLastUpdate + " ms");
// 如果超过WebSocket超时时间默认30分钟则移除该用户的连接和状态
// 这表示用户长时间未活动,需要执行自动签退操作
if (timeSinceLastUpdate > WEBSOCKET_TIMEOUT) {
System.out.println("用户 " + userId + " 已超时 (" + timeSinceLastUpdate + " ms > " + WEBSOCKET_TIMEOUT + " ms),执行自动签退");
// 从会话映射中移除用户会话
WebSocketSession session = sessions.remove(userId);
userStatus.remove(userId);
lastUpdateTimes.remove(userId);
userRoles.remove(userId);
userInfos.remove(userId);
System.out.println("已从会话映射中移除用户 " + userId);
// 关闭WebSocket连接
// 从用户信息映射中移除用户信息
userFullInfos.remove(userId);
System.out.println("已从用户信息映射中移除用户 " + userId);
// 从位置信息映射中移除用户位置
userLocations.remove(userId);
System.out.println("已从位置信息映射中移除用户 " + userId);
// 关闭WebSocket连接如果会话存在且处于打开状态
if (session != null && session.isOpen()) {
System.out.println("尝试关闭用户 " + userId + " 的WebSocket连接");
try {
session.close();
System.out.println("成功关闭用户 " + userId + " 的WebSocket连接");
} catch (IOException e) {
System.err.println("关闭过期WebSocket连接时出错: " + e.getMessage());
}
} else {
System.out.println("用户 " + userId + " 没有有效的WebSocket连接需要关闭");
}
System.out.println("已清理过期WebSocket连接用户ID: " + userId);
// 广播更新后的在线用户列表
// 广播更新后的在线用户列表,通知所有客户端有用户被自动签退
System.out.println("广播更新后的在线用户列表");
broadcastOnlineUserList();
}
// 如果超过位置过期时间,则清理位置信息,保留连接
else if (currentTime - lastUpdateTime > LOCATION_EXPIRE_TIME) {
// 标记为未签到状态,不再广播其位置
userStatus.put(userId, false);
// 如果未达到连接超时但超过位置过期时间默认5分钟,则清理位置信息,保留用户状态
// 这表示用户虽然在线但长时间未更新位置信息
else if (timeSinceLastUpdate > LOCATION_EXPIRE_TIME) {
System.out.println("用户 " + userId + " 位置信息已过期 (" + timeSinceLastUpdate + " ms > " + LOCATION_EXPIRE_TIME + " ms),清理位置信息");
// 仅移除用户的位置信息,保留用户在线状态
userLocations.remove(userId);
System.out.println("用户 " + userId + " 位置信息已过期");
// 广播更新后的在线用户列表
broadcastOnlineUserList();
} else {
System.out.println("用户 " + userId + " 状态正常,距离上次更新: " + timeSinceLastUpdate + " ms");
}
}
System.out.println("定时清理过期连接任务执行完成");
}
/**
* 内部消息类用于解析WebSocket消息
*/
// 内部消息类和数据结构保持不变
public static class LocationMessage {
private String type; // 消息类型: updateLocation, signin, signout
private Long userId; // 用户ID
private String userRole; // 用户角色
private Double latitude; // 纬度
private Double longitude; // 经度
private Long timestamp; // 时间戳
private String type;
private Long userId;
private String userRole;
private Double latitude;
private Double longitude;
private Long timestamp;
// 添加无参构造函数
public LocationMessage() {}
// Getters and Setters
public String getType() {
return type;
public String getType() { return type; }
public void setType(String type) { this.type = type; }
public Long getUserId() { return userId; }
public void setUserId(Long userId) { this.userId = userId; }
public String getUserRole() { return userRole; }
public void setUserRole(String userRole) { this.userRole = userRole; }
public Double getLatitude() { return latitude; }
public void setLatitude(Double latitude) { this.latitude = latitude; }
public Double getLongitude() { return longitude; }
public void setLongitude(Double longitude) { this.longitude = longitude; }
public Long getTimestamp() { return timestamp; }
public void setTimestamp(Long timestamp) { this.timestamp = timestamp; }
}
public void setType(String type) {
this.type = type;
public class OnlineUserListMessage {
private String type;
private List<UserInfo> users;
// 添加无参构造函数
public OnlineUserListMessage() {}
public String getType() { return type; }
public void setType(String type) { this.type = type; }
public List<UserInfo> getUsers() { return users; }
public void setUsers(List<UserInfo> users) { this.users = users; }
}
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
public String getUserRole() {
return userRole;
}
public void setUserRole(String userRole) {
this.userRole = userRole;
}
public Double getLatitude() {
return latitude;
}
public void setLatitude(Double latitude) {
this.latitude = latitude;
}
public Double getLongitude() {
return longitude;
}
public void setLongitude(Double longitude) {
this.longitude = longitude;
}
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
}
/**
* 在线用户列表消息类
*/
public static class OnlineUserListMessage {
private String type; // 消息类型: onlineUserList
private List<UserInfo> users; // 在线用户列表
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public List<UserInfo> getUsers() {
return users;
}
public void setUsers(List<UserInfo> users) {
this.users = users;
}
}
/**
* 用户信息类
*/
public static class UserInfo {
private Long userId;
private String name;
private String role;
private UserRole role;
private LocationData locationData;
private boolean userStatus;
private Long lastUpdateTime;
public Long getUserId() {
return userId;
// 添加无参构造函数
public UserInfo() {}
public Long getUserId() { return userId; }
public void setUserId(Long userId) { this.userId = userId; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public UserRole getRole() { return role; }
public void setRole(UserRole role) { this.role = role; }
public LocationData getLocationData() { return locationData; }
public void setLocationData(LocationData locationData) { this.locationData = locationData; }
public boolean isUserStatus() { return userStatus; }
public void setUserStatus(boolean userStatus) { this.userStatus = userStatus; }
public Long getLastUpdateTime() { return lastUpdateTime; }
public void setLastUpdateTime(Long lastUpdateTime) { this.lastUpdateTime = lastUpdateTime; }
}
public void setUserId(Long userId) {
this.userId = userId;
public static class LocationData {
private Double latitude;
private Double longitude;
private Long timestamp;
// 添加无参构造函数
public LocationData() {}
// 添加有参构造函数
public LocationData(Double latitude, Double longitude, Long timestamp) {
this.latitude = latitude;
this.longitude = longitude;
this.timestamp = timestamp;
}
public String getName() {
return name;
public Double getLatitude() { return latitude; }
public void setLatitude(Double latitude) { this.latitude = latitude; }
public Double getLongitude() { return longitude; }
public void setLongitude(Double longitude) { this.longitude = longitude; }
public Long getTimestamp() { return timestamp; }
public void setTimestamp(Long timestamp) { this.timestamp = timestamp; }
}
public void setName(String name) {
this.name = name;
public static class UserLocation {
private Long userId;
private LocationData locationData;
// 添加无参构造函数
public UserLocation() {}
public Long getUserId() { return userId; }
public void setUserId(Long userId) { this.userId = userId; }
public LocationData getLocationData() { return locationData; }
public void setLocationData(LocationData locationData) { this.locationData = locationData; }
}
public String getRole() {
return role;
}
public static class UserLocationListMessage {
private String type;
private List<UserLocation> users;
public void setRole(String role) {
this.role = role;
}
// 添加无参构造函数
public UserLocationListMessage() {}
public String getType() { return type; }
public void setType(String type) { this.type = type; }
public List<UserLocation> getUsers() { return users; }
public void setUsers(List<UserLocation> users) { this.users = users; }
}
}

View File

@@ -34,12 +34,6 @@ public class DeliveryPersonServiceImpl implements DeliveryPersonService {
return person.orElse(null);
}
@Override
public void updateLocation(Long id, Double longitude, Double latitude) {
// 使用 LocationSyncService 更新位置信息到服务器缓存,而不是直接更新数据库
locationSyncService.handleLocationUpdate(id, longitude, latitude);
}
@Override
public List<Order> getCurrentOrders(Long id) {
return orderRepository.findByDeliveryPersonId(id);

View File

@@ -34,19 +34,7 @@ public class LocationSyncServiceImpl implements LocationSyncService {
// 位置过期时间(分钟)
private static final int LOCATION_EXPIRE_MINUTES = 5;
@Override
public void handleLocationUpdate(Long deliveryPersonId, Double longitude, Double latitude) {
try {
// 更新内存缓存中的配送员位置,而不是数据库
deliveryPersonLongitudeMap.put(deliveryPersonId, longitude);
deliveryPersonLatitudeMap.put(deliveryPersonId, latitude);
// 更新最后更新时间
lastUpdateTimes.put(deliveryPersonId, LocalDateTime.now());
} catch (Exception e) {
System.err.println("更新配送员位置时出错: " + e.getMessage());
}
}
@Override
public void subscribe(String sessionId, Long deliveryPersonId) {