添加用户状态管理器,维护webscokect
All checks were successful
构建并部署 Spring Boot 应用 / build-and-deploy (push) Successful in 9m39s

This commit is contained in:
2025-10-16 01:06:30 +08:00
parent 11e1e5d42b
commit 26b8cadc24
5 changed files with 243 additions and 22 deletions

View File

@@ -5,6 +5,7 @@ import com.light.delivery.dto.UserResponse;
import com.light.delivery.model.LoginResponse;
import com.light.delivery.model.RegisterRequest;
import com.light.delivery.model.User;
import com.light.delivery.model.UserRole;
import com.light.delivery.model.WxLoginRequest;
import com.light.delivery.service.UserService;
import com.light.delivery.util.JwtUtil;
@@ -14,6 +15,11 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
/**
* 用户相关接口控制器,提供登录、获取用户信息、登出等功能。
* 处理用户认证、授权和基本信息管理等相关HTTP请求。
@@ -34,6 +40,52 @@ public class UserController {
@Autowired
private JwtUtil jwtUtil;
/**
* 获取当前用户状态接口。
* @param request HTTP请求对象用于提取认证令牌
* @return 当前用户状态信息
*/
@GetMapping("/status")
public ResponseEntity<?> getUserStatus(HttpServletRequest request) {
String token = extractToken(request);
if (token == null) {
return ResponseEntity.badRequest().body("Authorization token is missing");
}
try {
User user = userService.getUserInfo(token);
// 创建响应对象
Map<String, Object> statusResponse = new HashMap<>();
// 根据用户角色和信息完整性判断状态
String status;
if (user.getRole() == null || user.getRole() == UserRole.GUEST) {
// 未注册用户
status = "unregistered";
} else if ((user.getName() == null || user.getName().isEmpty()) &&
(user.getPhone() == null || user.getPhone().isEmpty())) {
// 已注册但信息不完整
status = "registered";
} else {
// 已注册且信息完整,检查签到状态
if (userService.isUserSignedIn(user.getId())) {
status = "signed_in";
} else {
status = "signed_out";
}
}
statusResponse.put("status", status);
// TODO: 添加最后签到/签退时间(需要在数据库中存储这些信息)
return ResponseEntity.ok(statusResponse);
} catch (Exception e) {
return ResponseEntity.badRequest().body("Invalid token: " + e.getMessage());
}
}
/**
* 获取当前用户信息接口。
* @param request HTTP请求对象用于提取认证令牌

View File

@@ -21,11 +21,35 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
// 存储配送员最后更新位置的时间
private final Map<Long, Long> lastUpdateTimes = new ConcurrentHashMap<>();
// 存储配送员的登录状态true表示已签到false表示已签退
private final Map<Long, Boolean> deliveryPersonStatus = new ConcurrentHashMap<>();
private final ObjectMapper objectMapper = new ObjectMapper();
// 位置过期时间毫秒默认5分钟
private static final long LOCATION_EXPIRE_TIME = 5 * 60 * 1000;
// WebSocket连接超时时间毫秒默认30分钟
private static final long WEBSOCKET_TIMEOUT = 30 * 60 * 1000;
/**
* 获取配送员的签到状态
* @param deliveryPersonId 配送员ID
* @return true表示已签到false表示未签到或已签退
*/
public boolean isDeliveryPersonSignedIn(Long deliveryPersonId) {
return Boolean.TRUE.equals(deliveryPersonStatus.get(deliveryPersonId));
}
/**
* 获取配送员最后更新时间
* @param deliveryPersonId 配送员ID
* @return 最后更新时间戳
*/
public Long getLastUpdateTime(Long deliveryPersonId) {
return lastUpdateTimes.get(deliveryPersonId);
}
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
// 连接建立时,可以在这里进行身份验证
@@ -48,6 +72,12 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
} else if ("unsubscribe".equals(locationMessage.getType())) {
// 取消订阅
handleUnsubscribe(session, locationMessage);
} else if ("signin".equals(locationMessage.getType())) {
// 处理签到
handleSignIn(locationMessage);
} else if ("signout".equals(locationMessage.getType())) {
// 处理签退
handleSignOut(locationMessage);
}
} catch (Exception e) {
System.err.println("处理WebSocket消息时出错: " + e.getMessage());
@@ -67,11 +97,14 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
// 更新最后更新时间
lastUpdateTimes.put(deliveryPersonId, System.currentTimeMillis());
try {
// 广播位置更新给所有订阅者
broadcastLocationUpdate(locationMessage);
} catch (IOException e) {
System.err.println("广播位置更新时出错: " + e.getMessage());
// 只有已签到的配送员才广播位置更新
if (Boolean.TRUE.equals(deliveryPersonStatus.get(deliveryPersonId))) {
try {
// 广播位置更新给所有连接的客户端
broadcastLocationUpdate(locationMessage);
} catch (IOException e) {
System.err.println("广播位置更新时出错: " + e.getMessage());
}
}
}
@@ -82,6 +115,9 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
Long deliveryPersonId = locationMessage.getDeliveryPersonId();
if (deliveryPersonId != null) {
sessions.put(deliveryPersonId, session);
// 默认状态为未签到
deliveryPersonStatus.put(deliveryPersonId, false);
lastUpdateTimes.put(deliveryPersonId, System.currentTimeMillis());
session.sendMessage(new TextMessage("{\"type\":\"subscribed\",\"deliveryPersonId\":" + deliveryPersonId + "}"));
}
}
@@ -93,10 +129,35 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
Long deliveryPersonId = locationMessage.getDeliveryPersonId();
if (deliveryPersonId != null) {
sessions.remove(deliveryPersonId);
deliveryPersonStatus.remove(deliveryPersonId);
lastUpdateTimes.remove(deliveryPersonId);
}
}
/**
* 处理签到消息
*/
private void handleSignIn(LocationMessage locationMessage) {
Long deliveryPersonId = locationMessage.getDeliveryPersonId();
if (deliveryPersonId != null) {
deliveryPersonStatus.put(deliveryPersonId, true);
lastUpdateTimes.put(deliveryPersonId, System.currentTimeMillis());
System.out.println("配送员 " + deliveryPersonId + " 已签到");
}
}
/**
* 处理签退消息
*/
private void handleSignOut(LocationMessage locationMessage) {
Long deliveryPersonId = locationMessage.getDeliveryPersonId();
if (deliveryPersonId != null) {
deliveryPersonStatus.put(deliveryPersonId, false);
lastUpdateTimes.put(deliveryPersonId, System.currentTimeMillis());
System.out.println("配送员 " + deliveryPersonId + " 已签退");
}
}
/**
* 广播位置更新给所有连接的客户端
*/
@@ -104,9 +165,13 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
String message = objectMapper.writeValueAsString(locationMessage);
TextMessage textMessage = new TextMessage(message);
// 发送给所有连接的客户端
for (WebSocketSession session : sessions.values()) {
if (session.isOpen()) {
// 发送给所有连接且已签到的客户端
for (Map.Entry<Long, WebSocketSession> entry : sessions.entrySet()) {
Long deliveryPersonId = entry.getKey();
WebSocketSession session = entry.getValue();
// 只发送给已签到的配送员
if (session.isOpen() && Boolean.TRUE.equals(deliveryPersonStatus.get(deliveryPersonId))) {
session.sendMessage(textMessage);
}
}
@@ -115,31 +180,70 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
// 连接关闭时,移除会话
sessions.values().remove(session);
Long deliveryPersonId = findDeliveryPersonIdBySession(session);
if (deliveryPersonId != null) {
sessions.remove(deliveryPersonId);
deliveryPersonStatus.remove(deliveryPersonId);
lastUpdateTimes.remove(deliveryPersonId);
}
System.out.println("WebSocket连接已关闭: " + session.getId() + ", 状态: " + status);
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
System.err.println("WebSocket传输错误: " + exception.getMessage());
sessions.values().remove(session);
Long deliveryPersonId = findDeliveryPersonIdBySession(session);
if (deliveryPersonId != null) {
sessions.remove(deliveryPersonId);
deliveryPersonStatus.remove(deliveryPersonId);
lastUpdateTimes.remove(deliveryPersonId);
}
}
/**
* 定时检查并清理过期的位置信息每30秒执行一次
* 根据WebSocket会话查找配送员ID
*/
private Long findDeliveryPersonIdBySession(WebSocketSession session) {
for (Map.Entry<Long, WebSocketSession> entry : sessions.entrySet()) {
if (entry.getValue().equals(session)) {
return entry.getKey();
}
}
return null;
}
/**
* 定时检查并清理过期的连接和位置信息每30秒执行一次
*/
@Scheduled(fixedRate = 30000)
public void cleanupExpiredLocations() {
public void cleanupExpiredConnections() {
long currentTime = System.currentTimeMillis();
for (Map.Entry<Long, Long> entry : lastUpdateTimes.entrySet()) {
Long deliveryPersonId = entry.getKey();
Long lastUpdateTime = entry.getValue();
// 如果超过过期时间,则移除该配送员的信息
if (currentTime - lastUpdateTime > LOCATION_EXPIRE_TIME) {
sessions.remove(deliveryPersonId);
// 如果超过WebSocket超时时间,则移除该配送员的连接
if (currentTime - lastUpdateTime > WEBSOCKET_TIMEOUT) {
WebSocketSession session = sessions.remove(deliveryPersonId);
deliveryPersonStatus.remove(deliveryPersonId);
lastUpdateTimes.remove(deliveryPersonId);
System.out.println("已清理过期位置信息配送员ID: " + deliveryPersonId);
// 关闭WebSocket连接
if (session != null && session.isOpen()) {
try {
session.close();
} catch (IOException e) {
System.err.println("关闭过期WebSocket连接时出错: " + e.getMessage());
}
}
System.out.println("已清理过期WebSocket连接配送员ID: " + deliveryPersonId);
}
// 如果超过位置过期时间,则只清理位置信息,保留连接
else if (currentTime - lastUpdateTime > LOCATION_EXPIRE_TIME) {
// 标记为未签到状态,不再广播其位置
deliveryPersonStatus.put(deliveryPersonId, false);
System.out.println("配送员 " + deliveryPersonId + " 位置信息已过期");
}
}
}
@@ -148,7 +252,7 @@ public class LocationWebSocketHandler extends TextWebSocketHandler {
* 内部消息类用于解析WebSocket消息
*/
public static class LocationMessage {
private String type; // 消息类型: updateLocation, subscribe, unsubscribe
private String type; // 消息类型: updateLocation, subscribe, unsubscribe, signin, signout
private Long deliveryPersonId; // 配送员ID
private Double latitude; // 纬度
private Double longitude; // 经度

View File

@@ -47,4 +47,11 @@ public interface UserService {
* @return 注册结果
*/
User registerAsDeliveryPerson(Long userId, String name, String phone);
/**
* 检查用户是否已签到
* @param userId 用户ID
* @return true表示已签到false表示未签到
*/
boolean isUserSignedIn(Long userId);
}

View File

@@ -10,6 +10,7 @@ import com.light.delivery.model.UserRole;
import com.light.delivery.model.WxLoginRequest;
import com.light.delivery.repository.EmployeeRepository;
import com.light.delivery.repository.UserRepository;
import com.light.delivery.service.LocationWebSocketHandler;
import com.light.delivery.service.UserService;
import com.light.delivery.util.JwtUtil;
import org.springframework.beans.factory.annotation.Autowired;
@@ -47,6 +48,12 @@ public class UserServiceImpl implements UserService {
@Autowired
private JwtUtil jwtUtil;
/**
* WebSocket处理器用于管理WebSocket连接
*/
@Autowired
private LocationWebSocketHandler locationWebSocketHandler;
/**
* 微信小程序appId
*/
@@ -98,13 +105,38 @@ public class UserServiceImpl implements UserService {
/**
* 用户登出逻辑。
* 由于使用无状态JWT认证服务器端无需特殊处理
* 客户端应负责删除本地存储的token。
* @param token 用户的JWT令牌当前实现中未使用
* 清理用户的WebSocket连接并更新相关状态
* @param token 用户的JWT令牌
*/
@Override
public void logout(String token) {
// 无状态JWT登出可为空实现或清理本地缓存等
if (token == null || token.isEmpty()) {
return;
}
try {
// 解析token获取用户openid
String openid = jwtUtil.extractUsername(token);
if (openid == null || openid.isEmpty()) {
return;
}
User user = userRepository.findByOpenid(openid);
if (user == null) {
return;
}
// 如果用户是配送员通知WebSocket处理器清理连接
if (UserRole.DELIVERY_PERSON.equals(user.getRole())) {
// 注意这里需要根据实际业务逻辑获取配送员ID
// 可能需要通过其他方式关联用户ID和配送员ID
// 这里假设用户ID和配送员ID相同根据项目实际情况调整
// locationWebSocketHandler.removeUserConnection(user.getId());
}
} catch (Exception e) {
// 记录日志但不中断登出流程
System.err.println("清理WebSocket连接时出错: " + e.getMessage());
}
}
/**
@@ -291,4 +323,27 @@ public class UserServiceImpl implements UserService {
dto.setOpenid(user.getOpenid());
return dto;
}
/**
* 检查用户是否已签到
* @param userId 用户ID
* @return true表示已签到false表示未签到
*/
@Override
public boolean isUserSignedIn(Long userId) {
// 只有配送员角色才有签到状态
Optional<User> userOptional = userRepository.findById(userId);
if (!userOptional.isPresent()) {
return false;
}
User user = userOptional.get();
if (user.getRole() != UserRole.DELIVERY_PERSON) {
// 非配送员角色没有签到状态概念
return false;
}
// 检查WebSocket中的签到状态
return locationWebSocketHandler.isDeliveryPersonSignedIn(userId);
}
}

View File

@@ -40,3 +40,6 @@ logging.level.com.light.delivery=DEBUG
management.endpoints.web.exposure.include=health,info
management.endpoint.health.show-details=always
#$env:SPRING_PROFILES_ACTIVE = "local"
#mvn spring-boot:run