From ba7c4f2a343b797efb09665b5ee82bacab821509 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9B=B9=E9=B9=8F=E9=A3=9E?= Date: Wed, 5 Mar 2025 20:18:06 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E7=A7=BB=E9=99=A4=E4=B8=8D=E9=9C=80?= =?UTF-8?q?=E8=A6=81=E7=9A=84=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../admin/controller/SSEController.java | 34 -------- .../service/impl/SSEINotifyPushService.java | 43 ---------- .../cfs/controller/AnalysisController.java | 29 ------- .../cfs/controller/SSEController.java | 36 --------- .../cfs/controller/TestController.java | 13 --- .../starter/service/SSEManagerBase.java | 79 ------------------- .../starter/service/SSEManagerService.java | 20 ----- .../service/impl/APPSSEManagerService.java | 70 ---------------- .../service/impl/AdminSSEManagerService.java | 70 ---------------- 9 files changed, 394 deletions(-) delete mode 100644 nflg-mobilebroken-admin/src/main/java/com/nflg/mobilebroken/admin/controller/SSEController.java delete mode 100644 nflg-mobilebroken-admin/src/main/java/com/nflg/mobilebroken/admin/service/impl/SSEINotifyPushService.java delete mode 100644 nflg-mobilebroken-cfs-app/src/main/java/com/nflg/mobilebroken/cfs/controller/AnalysisController.java delete mode 100644 nflg-mobilebroken-cfs-app/src/main/java/com/nflg/mobilebroken/cfs/controller/SSEController.java delete mode 100644 nflg-mobilebroken-starter/src/main/java/com/nflg/mobilebroken/starter/service/SSEManagerBase.java delete mode 100644 nflg-mobilebroken-starter/src/main/java/com/nflg/mobilebroken/starter/service/SSEManagerService.java delete mode 100644 nflg-mobilebroken-starter/src/main/java/com/nflg/mobilebroken/starter/service/impl/APPSSEManagerService.java delete mode 100644 nflg-mobilebroken-starter/src/main/java/com/nflg/mobilebroken/starter/service/impl/AdminSSEManagerService.java diff --git a/nflg-mobilebroken-admin/src/main/java/com/nflg/mobilebroken/admin/controller/SSEController.java b/nflg-mobilebroken-admin/src/main/java/com/nflg/mobilebroken/admin/controller/SSEController.java deleted file mode 100644 index 10e5e210..00000000 --- a/nflg-mobilebroken-admin/src/main/java/com/nflg/mobilebroken/admin/controller/SSEController.java +++ /dev/null @@ -1,34 +0,0 @@ -package com.nflg.mobilebroken.admin.controller; - -import com.nflg.mobilebroken.admin.annotation.ApiMark; -import com.nflg.mobilebroken.starter.service.impl.APPSSEManagerService; -import lombok.extern.slf4j.Slf4j; -import org.springframework.http.MediaType; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestParam; -import org.springframework.web.bind.annotation.RestController; -import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; - -import javax.annotation.Resource; - -@RestController -@Slf4j -@RequestMapping("/sse") -public class SSEController extends ControllerBase { - - - @Resource - private APPSSEManagerService sseManagerService; - - /** - * 建立sse连接 - * - * @param userId 用户id - */ - @GetMapping(value = "connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE) - @ApiMark(moduleName = "连接", apiName = "SSE", isPublic = true) - public SseEmitter connect(@RequestParam String userId) { - return sseManagerService.connect(Integer.valueOf(userId)); - } -} \ No newline at end of file diff --git a/nflg-mobilebroken-admin/src/main/java/com/nflg/mobilebroken/admin/service/impl/SSEINotifyPushService.java b/nflg-mobilebroken-admin/src/main/java/com/nflg/mobilebroken/admin/service/impl/SSEINotifyPushService.java deleted file mode 100644 index e8aea669..00000000 --- a/nflg-mobilebroken-admin/src/main/java/com/nflg/mobilebroken/admin/service/impl/SSEINotifyPushService.java +++ /dev/null @@ -1,43 +0,0 @@ -package com.nflg.mobilebroken.admin.service.impl; - -import com.nflg.mobilebroken.common.constant.Constant; -import com.nflg.mobilebroken.common.pojo.dto.UserDTO; -import com.nflg.mobilebroken.starter.service.INotifyPushService; -import com.nflg.mobilebroken.starter.service.SSEManagerService; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.stereotype.Service; - -import javax.annotation.Resource; -import java.util.Objects; - -@Service -@Slf4j -public class SSEINotifyPushService implements INotifyPushService { - - @Resource - @Qualifier("APPSSEManagerService") - private SSEManagerService sseManagerService; - - @Resource - private RedisTemplate redisTemplate; - - @Override - public void push(UserDTO user, String subject, String content) { -// try { -// SSEMessageDTO message = new SSEMessageDTO() -// .setType(2) -// .setData(new NotifyDTO().setSubject(subject).setContent(content)); -// sseManagerService.send(user.getId(), message); -// } catch (IOException e) { -// log.error("发送SSE失败", e); -// } - } - - @Override - public boolean check(UserDTO user) { - Object value = redisTemplate.opsForHash().get("message:config:" + user.getId(), Constant.REDIS_KEY_MESSAGECONFIG_APP); - return Objects.isNull(value) || (boolean) value; - } -} diff --git a/nflg-mobilebroken-cfs-app/src/main/java/com/nflg/mobilebroken/cfs/controller/AnalysisController.java b/nflg-mobilebroken-cfs-app/src/main/java/com/nflg/mobilebroken/cfs/controller/AnalysisController.java deleted file mode 100644 index 706fedd4..00000000 --- a/nflg-mobilebroken-cfs-app/src/main/java/com/nflg/mobilebroken/cfs/controller/AnalysisController.java +++ /dev/null @@ -1,29 +0,0 @@ -package com.nflg.mobilebroken.cfs.controller; - -import com.nflg.mobilebroken.starter.service.impl.AdminSSEManagerService; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; - -import javax.annotation.Resource; -import java.util.Collection; - -/** - * 统计分析相关 - */ -@RestController -@RequestMapping("/analysis") -public class AnalysisController extends ControllerBase{ - - @Resource - private AdminSSEManagerService adminSSEManagerService; - - /** - * 获取当前已连接SSE的管理端用户列表 - * @return 当前已连接SSE的管理端用户列表 - */ - @GetMapping("getSSEConnects") - public Collection getSSEConnects(){ - return adminSSEManagerService.getUserIds(); - } -} \ No newline at end of file diff --git a/nflg-mobilebroken-cfs-app/src/main/java/com/nflg/mobilebroken/cfs/controller/SSEController.java b/nflg-mobilebroken-cfs-app/src/main/java/com/nflg/mobilebroken/cfs/controller/SSEController.java deleted file mode 100644 index 5c2ec40e..00000000 --- a/nflg-mobilebroken-cfs-app/src/main/java/com/nflg/mobilebroken/cfs/controller/SSEController.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.nflg.mobilebroken.cfs.controller; - -import com.nflg.mobilebroken.starter.service.impl.AdminSSEManagerService; -import lombok.extern.slf4j.Slf4j; -import org.springframework.http.MediaType; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestParam; -import org.springframework.web.bind.annotation.RestController; -import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; - -import javax.annotation.Resource; - -/** - * sse相关接口 - * 曹鹏飞 - */ -@RestController -@Slf4j -@RequestMapping("/sse") -//@SaUserCheckLogin -public class SSEController extends ControllerBase { - - @Resource - private AdminSSEManagerService adminSSEManagerService; - - /** - * 建立sse连接 - * - * @param userId 用户id - */ - @GetMapping(value = "connect",produces = MediaType.TEXT_EVENT_STREAM_VALUE) - public SseEmitter connect(@RequestParam Integer userId) { - return adminSSEManagerService.connect(userId); - } -} diff --git a/nflg-mobilebroken-cfs-app/src/main/java/com/nflg/mobilebroken/cfs/controller/TestController.java b/nflg-mobilebroken-cfs-app/src/main/java/com/nflg/mobilebroken/cfs/controller/TestController.java index c7c97e80..5ffd75ad 100644 --- a/nflg-mobilebroken-cfs-app/src/main/java/com/nflg/mobilebroken/cfs/controller/TestController.java +++ b/nflg-mobilebroken-cfs-app/src/main/java/com/nflg/mobilebroken/cfs/controller/TestController.java @@ -5,7 +5,6 @@ import com.nflg.mobilebroken.common.pojo.ApiResult; import com.nflg.mobilebroken.common.util.MultilingualUtil; import com.nflg.mobilebroken.repository.entity.Ticket; import com.nflg.mobilebroken.repository.service.ITicketService; -import com.nflg.mobilebroken.starter.service.impl.APPSSEManagerService; import lombok.extern.slf4j.Slf4j; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; @@ -18,24 +17,12 @@ import javax.annotation.Resource; @RequestMapping("/test") public class TestController extends ControllerBase { - @Resource - private APPSSEManagerService sseManagerService; - @Resource private TicketEventPublisher ticketEventPublisher; @Resource private ITicketService ticketService; -// @GetMapping("sse/send") -// public ApiResult sendSse(@RequestParam String userId, @RequestParam String message) throws IOException { -// SSEMessageDTO messageDTO = new SSEMessageDTO() -// .setType(2) -// .setData(new NotifyDTO().setSubject("消息测试").setContent("消息内容")); -// sseManagerService.send(Integer.valueOf(userId), messageDTO); -// return ApiResult.success(); -// } - @GetMapping("sss") public ApiResult sss(){ Ticket ticket =ticketService.getById(8); diff --git a/nflg-mobilebroken-starter/src/main/java/com/nflg/mobilebroken/starter/service/SSEManagerBase.java b/nflg-mobilebroken-starter/src/main/java/com/nflg/mobilebroken/starter/service/SSEManagerBase.java deleted file mode 100644 index 6c96401b..00000000 --- a/nflg-mobilebroken-starter/src/main/java/com/nflg/mobilebroken/starter/service/SSEManagerBase.java +++ /dev/null @@ -1,79 +0,0 @@ -package com.nflg.mobilebroken.starter.service; - -import com.nflg.mobilebroken.common.constant.STATE; -import com.nflg.mobilebroken.common.util.VUtils; -import lombok.extern.slf4j.Slf4j; -import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; - -import java.io.IOException; -import java.util.Map; -import java.util.Objects; - -@Slf4j -public class SSEManagerBase { - - private static boolean IS_SHUTDOWN = false; - - protected void check(){ - VUtils.trueThrow(IS_SHUTDOWN).throwMessage(STATE.ServiceConnectRefused,"SSE服务已关闭"); - } - - protected static void shutdown(Map emitters) { - IS_SHUTDOWN=true; - log.warn("准备关闭SSE服务"); - emitters.forEach((k,v)->{ - try { - v.send("因SSE服务关闭,连接即将断开"); - }catch (Exception ex){ - log.error("SSE发送消息失败:"+k,ex); - } - v.complete(); - }); - log.warn("SSE服务已关闭"); - } - - protected static void close(SseEmitter emitter){ - emitter.complete(); - } - - protected SseEmitter connect(Integer userId, Map emitters) { - SseEmitter emitter = new SseEmitter(Long.MAX_VALUE); - SseEmitter old=emitters.put(userId, emitter); - if (Objects.nonNull(old)){ - log.warn("停止旧连接:"+userId); - try { - old.send(SseEmitter.event().name("被踢下线").data("你已在其他地方连接")); - old.complete(); - } catch (Exception e) { - old.completeWithError(e); - } - } - emitter.onError((ex) -> { - emitters.remove(userId); - emitter.complete(); - log.error("SSE异常:"+userId, ex); - }); - emitter.onTimeout(() -> { - emitters.remove(userId); - emitter.complete(); - log.error("SSE超时:"+userId); - }); - emitter.onCompletion(() -> { - emitters.remove(userId); - emitter.complete(); - log.error("SSE完成:"+userId); - }); - try { - emitter.send(SseEmitter.event().data("已连接").reconnectTime(5000)); - } catch (IOException e) { - log.error("sse发送数据出错", e); - } - return emitter; - } - - protected void send(Object message, SseEmitter emitter) throws IOException { - VUtils.trueThrowBusinessError(Objects.isNull(emitter)).throwMessage("没有找到sse"); - log.error("没有找到sse: "); - emitter.send(message); - } -} diff --git a/nflg-mobilebroken-starter/src/main/java/com/nflg/mobilebroken/starter/service/SSEManagerService.java b/nflg-mobilebroken-starter/src/main/java/com/nflg/mobilebroken/starter/service/SSEManagerService.java deleted file mode 100644 index 837d54a2..00000000 --- a/nflg-mobilebroken-starter/src/main/java/com/nflg/mobilebroken/starter/service/SSEManagerService.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.nflg.mobilebroken.starter.service; - -import com.nflg.mobilebroken.common.pojo.dto.SSEMessageDTO; -import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; - -import java.io.IOException; -import java.util.Collection; - -public interface SSEManagerService { - - SseEmitter connect(Integer userId); - - void send(Integer userId, SSEMessageDTO message) throws IOException; - - void close(Integer userId); - - void shutdown(); - - Collection getUserIds(); -} diff --git a/nflg-mobilebroken-starter/src/main/java/com/nflg/mobilebroken/starter/service/impl/APPSSEManagerService.java b/nflg-mobilebroken-starter/src/main/java/com/nflg/mobilebroken/starter/service/impl/APPSSEManagerService.java deleted file mode 100644 index c6669277..00000000 --- a/nflg-mobilebroken-starter/src/main/java/com/nflg/mobilebroken/starter/service/impl/APPSSEManagerService.java +++ /dev/null @@ -1,70 +0,0 @@ -package com.nflg.mobilebroken.starter.service.impl; - -import cn.hutool.core.util.StrUtil; -import com.nflg.mobilebroken.common.pojo.dto.SSEMessageDTO; -import com.nflg.mobilebroken.starter.service.SSEManagerBase; -import com.nflg.mobilebroken.starter.service.SSEManagerService; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; -import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; - -import javax.annotation.PreDestroy; -import java.io.IOException; -import java.util.Collection; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; - -@Service -@Slf4j -public class APPSSEManagerService extends SSEManagerBase implements SSEManagerService { - - public static final Map EMITTERS = new ConcurrentHashMap<>(); - - @Override - public SseEmitter connect(Integer userId) { - check(); - log.info("APP端SSE已连接:"+userId); - return connect(userId,EMITTERS); - } - - @Override - public void send(Integer userId, SSEMessageDTO message) throws IOException { - log.info(StrUtil.format("APP端SSE发送消息,用户id: {},内容: {}", userId, message)); - SseEmitter emitter = EMITTERS.get(userId); - if (Objects.isNull(emitter)) { - log.error("用户未连接SSE: " + userId); - } else { - send(message, emitter); - } - } - - @Override - public void close(Integer userId) { - log.info("APP端SSE连接主动关闭:"+userId); - close(EMITTERS.remove(userId)); - } - - @Override - public void shutdown() { - shutdown(EMITTERS); - } - - @Override - public Collection getUserIds() { - return EMITTERS.keySet(); - } - - @PreDestroy - public void cleanup() { - log.info("释放SSE连接"); - for (SseEmitter emitter : EMITTERS.values()) { - try { - emitter.complete(); - } catch (Exception e) { - emitter.completeWithError(e); - } - } - EMITTERS.clear(); - } -} diff --git a/nflg-mobilebroken-starter/src/main/java/com/nflg/mobilebroken/starter/service/impl/AdminSSEManagerService.java b/nflg-mobilebroken-starter/src/main/java/com/nflg/mobilebroken/starter/service/impl/AdminSSEManagerService.java deleted file mode 100644 index 0fe7930d..00000000 --- a/nflg-mobilebroken-starter/src/main/java/com/nflg/mobilebroken/starter/service/impl/AdminSSEManagerService.java +++ /dev/null @@ -1,70 +0,0 @@ -package com.nflg.mobilebroken.starter.service.impl; - -import cn.hutool.core.util.StrUtil; -import com.nflg.mobilebroken.common.pojo.dto.SSEMessageDTO; -import com.nflg.mobilebroken.starter.service.SSEManagerBase; -import com.nflg.mobilebroken.starter.service.SSEManagerService; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; -import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; - -import javax.annotation.PreDestroy; -import java.io.IOException; -import java.util.Collection; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; - -@Service -@Slf4j -public class AdminSSEManagerService extends SSEManagerBase implements SSEManagerService { - - protected static final Map EMITTERS = new ConcurrentHashMap<>(); - - @Override - public SseEmitter connect(Integer userId) { - check(); - log.info("管理端SSE已连接:"+userId); - return connect(userId,EMITTERS); - } - - @Override - public void send(Integer userId, SSEMessageDTO message) throws IOException { - log.info(StrUtil.format("管理端SSE发送消息,用户id: {},内容: {}", userId, message)); - SseEmitter emitter = EMITTERS.get(userId); - if (Objects.isNull(emitter)) { - log.error("用户未连接SSE: " + userId); - } else { - send(message, emitter); - } - } - - @Override - public void close(Integer userId) { - close(EMITTERS.remove(userId)); - log.info("管理端SSE连接主动关闭:"+userId); - } - - @Override - public void shutdown() { - shutdown(EMITTERS); - } - - @Override - public Collection getUserIds() { - return EMITTERS.keySet(); - } - - @PreDestroy - public void cleanup() { - log.info("释放SSE连接"); - for (SseEmitter emitter : EMITTERS.values()) { - try { - emitter.complete(); - } catch (Exception e) { - emitter.completeWithError(e); - } - } - EMITTERS.clear(); - } -}