From 02667a58be797d66294d24b9030836ba7fc112c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9B=B9=E9=B9=8F=E9=A3=9E?= Date: Tue, 25 Feb 2025 20:54:11 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20sse=E7=8B=AC=E7=AB=8B=E5=87=BA=E6=9D=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- nflg-mobilebroken-admin/pom.xml | 4 +- .../mobilebroken/admin/AdminApplication.java | 2 + .../admin/controller/TicketController.java | 146 +++++++----------- .../admin/service/SsePushService.java | 82 ++++++++++ .../nflg/mobilebroken/cfs/CfsApplication.java | 2 + .../cfs/controller/TiketController.java | 53 +------ .../cfs/service/SsePushService.java | 71 +++++++++ .../common/pojo/dto/SSEMessageDTO.java | 4 + .../common/pojo/request/PushRequest.java | 22 +++ nflg-mobilebroken-push/pom.xml | 50 ++++++ .../mobilebroken/push/PushApplication.java | 20 +++ .../advice/GlobalRestControllerAdvice.java | 67 ++++++++ .../mobilebroken/push/config/CorsConfig.java | 25 +++ .../config/SaTokenAnnotationConfigure.java | 23 +++ .../push/controller/SSEController.java | 76 +++++++++ .../push/service/SSEManagerBase.java | 73 +++++++++ .../push/service/SSEManagerService.java | 20 +++ .../service/impl/APPSSEManagerService.java | 68 ++++++++ .../service/impl/AdminSSEManagerService.java | 68 ++++++++ .../push/task/SSEScheduledTasks.java | 41 +++++ .../main/resources/application-dev.properties | 1 + .../src/main/resources/application.properties | 14 ++ .../src/main/resources/logback-sit.xml | 71 +++++++++ .../mapper/TBaseDepartmentMapper.xml | 3 + pom.xml | 1 + 25 files changed, 865 insertions(+), 142 deletions(-) create mode 100644 nflg-mobilebroken-admin/src/main/java/com/nflg/mobilebroken/admin/service/SsePushService.java create mode 100644 nflg-mobilebroken-cfs-app/src/main/java/com/nflg/mobilebroken/cfs/service/SsePushService.java create mode 100644 nflg-mobilebroken-common/src/main/java/com/nflg/mobilebroken/common/pojo/request/PushRequest.java create mode 100644 nflg-mobilebroken-push/pom.xml create mode 100644 nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/PushApplication.java create mode 100644 nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/advice/GlobalRestControllerAdvice.java create mode 100644 nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/config/CorsConfig.java create mode 100644 nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/config/SaTokenAnnotationConfigure.java create mode 100644 nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/controller/SSEController.java create mode 100644 nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/service/SSEManagerBase.java create mode 100644 nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/service/SSEManagerService.java create mode 100644 nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/service/impl/APPSSEManagerService.java create mode 100644 nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/service/impl/AdminSSEManagerService.java create mode 100644 nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/task/SSEScheduledTasks.java create mode 100644 nflg-mobilebroken-push/src/main/resources/application-dev.properties create mode 100644 nflg-mobilebroken-push/src/main/resources/application.properties create mode 100644 nflg-mobilebroken-push/src/main/resources/logback-sit.xml diff --git a/nflg-mobilebroken-admin/pom.xml b/nflg-mobilebroken-admin/pom.xml index 2a39765d..58685ac6 100644 --- a/nflg-mobilebroken-admin/pom.xml +++ b/nflg-mobilebroken-admin/pom.xml @@ -119,8 +119,8 @@ org.apache.maven.plugins maven-compiler-plugin - 10 - 10 + 11 + 11 diff --git a/nflg-mobilebroken-admin/src/main/java/com/nflg/mobilebroken/admin/AdminApplication.java b/nflg-mobilebroken-admin/src/main/java/com/nflg/mobilebroken/admin/AdminApplication.java index cd15c5c0..7b45e487 100644 --- a/nflg-mobilebroken-admin/src/main/java/com/nflg/mobilebroken/admin/AdminApplication.java +++ b/nflg-mobilebroken-admin/src/main/java/com/nflg/mobilebroken/admin/AdminApplication.java @@ -8,12 +8,14 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; import org.springframework.context.annotation.ComponentScan; import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication @MapperScan("com.nflg.mobilebroken.repository.mapper") @ComponentScan(basePackages = {"com.nflg.mobilebroken.repository.service", "com.nflg.mobilebroken.admin" , "com.nflg.mobilebroken.starter"}) @EnableDiscoveryClient +@EnableScheduling @Slf4j @EnableAsync public class AdminApplication { diff --git a/nflg-mobilebroken-admin/src/main/java/com/nflg/mobilebroken/admin/controller/TicketController.java b/nflg-mobilebroken-admin/src/main/java/com/nflg/mobilebroken/admin/controller/TicketController.java index d34ad7df..b1d1f529 100644 --- a/nflg-mobilebroken-admin/src/main/java/com/nflg/mobilebroken/admin/controller/TicketController.java +++ b/nflg-mobilebroken-admin/src/main/java/com/nflg/mobilebroken/admin/controller/TicketController.java @@ -1,28 +1,25 @@ package com.nflg.mobilebroken.admin.controller; import cn.hutool.core.collection.CollectionUtil; -import cn.hutool.core.date.DatePattern; import cn.hutool.core.util.StrUtil; import com.itextpdf.html2pdf.ConverterProperties; import com.itextpdf.html2pdf.HtmlConverter; import com.itextpdf.layout.font.FontProvider; import com.nflg.mobilebroken.admin.annotation.ApiMark; import com.nflg.mobilebroken.admin.publisher.TicketEventPublisher; +import com.nflg.mobilebroken.admin.service.SsePushService; import com.nflg.mobilebroken.common.constant.STATE; import com.nflg.mobilebroken.common.constant.TicketState; import com.nflg.mobilebroken.common.exception.NflgException; import com.nflg.mobilebroken.common.pojo.ApiResult; import com.nflg.mobilebroken.common.pojo.PageData; import com.nflg.mobilebroken.common.pojo.dto.ChatMessageDTO; -import com.nflg.mobilebroken.common.pojo.dto.SSEMessageDTO; -import com.nflg.mobilebroken.common.pojo.dto.TicketChatDTO; import com.nflg.mobilebroken.common.pojo.request.*; import com.nflg.mobilebroken.common.pojo.vo.*; import com.nflg.mobilebroken.common.util.*; import com.nflg.mobilebroken.repository.entity.*; import com.nflg.mobilebroken.repository.service.*; import com.nflg.mobilebroken.starter.annotation.MethodInfoMark; -import com.nflg.mobilebroken.starter.service.impl.APPSSEManagerService; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.http.HttpHeaders; @@ -42,8 +39,6 @@ import java.io.IOException; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; import java.time.Instant; -import java.time.ZoneId; -import java.time.format.DateTimeFormatter; import java.util.*; import java.util.stream.Collectors; @@ -77,7 +72,7 @@ public class TicketController extends ControllerBase { private TicketChatService ticketChatService; @Resource - private APPSSEManagerService sseManagerService; + private SsePushService ssePushService; @Resource private IAppAreaService appAreaService; @@ -204,25 +199,18 @@ public class TicketController extends ControllerBase { @ApiMark(moduleName = "工单管理", apiName = "分派工单") public ApiResult assignmentTicket(@Valid @RequestBody AssignmentTicketRequest request) { Ticket ticket=ticketService.assignmentTicket(request); - TicketChatDTO message=new TicketChatDTO() - .setTicketId(ticket.getId()) - .setMessages(Collections.singletonList(new ChatMessageDTO() - .setFrom("system") - .setTicketState(ticket.getState()) - .setSenderId(0) - .setSenderName("服务助手") - .setCreateTime(Instant.now()) - .setContent("已为您分派工程师
如需补充信息,请继续留言。"))); - ticketChatService.add(message); - try { - SSEMessageDTO messageDTO = new SSEMessageDTO() - .setType(1) - .setData(message); - sseManagerService.send(ticket.getUserId(), messageDTO); - } catch (IOException e) { - log.error("发送SSE消息出错", e); - } ticketEventPublisher.publishTicketAssignedEvent(ticket,request.getUserIds()); + ChatMessageDTO message = new ChatMessageDTO() + .setId(cn.hutool.core.util.IdUtil.getSnowflakeNextIdStr()) + .setFrom("system") + .setTicketState(ticket.getState()) + .setSenderId(0) + .setSenderName("服务助手") + .setContent("已为您分派工程师
如需补充信息,请继续留言。") + .setCreateTime(Instant.now()); + ticketChatService.addMessage(ticket.getId(), message); + //推送消息 + ssePushService.sendTicketMessageToApp(ticket.getUserId(),message); return ApiResult.success(); } @@ -261,23 +249,22 @@ public class TicketController extends ControllerBase { for (Integer id : ids){ Ticket ticket=ticketService.completeTicket(id); ticketEventPublisher.publishTicketCompleteEvent(ticket); - TicketChatDTO message=new TicketChatDTO() - .setTicketId(ticket.getId()) - .setMessages(Collections.singletonList(new ChatMessageDTO() - .setFrom("system") - .setTicketState(ticket.getState()) - .setSenderId(0) - .setSenderName("服务助手") - .setCreateTime(Instant.now()) - .setContent("工单已完成
工单已完成,如需补充信息,请重新打开工单,继续留言。"))); - ticketChatService.add(message); - try { - SSEMessageDTO messageDTO = new SSEMessageDTO() - .setType(1) - .setData(message); - sseManagerService.send(ticket.getUserId(), messageDTO); - } catch (IOException e) { - log.error("发送SSE消息出错", e); + ChatMessageDTO message = new ChatMessageDTO() + .setId(cn.hutool.core.util.IdUtil.getSnowflakeNextIdStr()) + .setFrom("system") + .setTicketState(ticket.getState()) + .setSenderId(0) + .setSenderName("服务助手") + .setContent("工单已完成
工单已完成,如需补充信息,请重新打开工单,继续留言。") + .setCreateTime(Instant.now()); + ticketChatService.addMessage(id, message); + //推送消息 + ssePushService.sendTicketMessageToApp(ticket.getUserId(),message); + List handles = Arrays.stream(ticket.getHandle().split(",")) + .map(Integer::parseInt).collect(Collectors.toList()); + handles.remove(AdminUserUtil.getUserId()); + if (CollectionUtil.isNotEmpty(handles)){ + handles.forEach(uid->ssePushService.sendTicketMessageToAdmin(uid,message)); } } return ApiResult.success(); @@ -293,23 +280,22 @@ public class TicketController extends ControllerBase { public ApiResult closeTicket(@Valid @RequestBody TicketCloseRequest request) { Ticket ticket=ticketService.closeTicket(request); ticketEventPublisher.publishTicketCloseEvent(ticket); - TicketChatDTO message=new TicketChatDTO() - .setTicketId(ticket.getId()) - .setMessages(Collections.singletonList(new ChatMessageDTO() - .setFrom("system") - .setTicketState(ticket.getState()) - .setSenderId(0) - .setSenderName("服务助手") - .setCreateTime(Instant.now()) - .setContent("工单已关闭
感谢你的使用,如有问题,请重新提交新的工单。"))); - ticketChatService.add(message); - try { - SSEMessageDTO messageDTO = new SSEMessageDTO() - .setType(1) - .setData(message); - sseManagerService.send(ticket.getUserId(), messageDTO); - } catch (IOException e) { - log.error("发送SSE消息出错", e); + ChatMessageDTO message = new ChatMessageDTO() + .setId(cn.hutool.core.util.IdUtil.getSnowflakeNextIdStr()) + .setFrom("system") + .setTicketState(ticket.getState()) + .setSenderId(0) + .setSenderName("服务助手") + .setContent("工单已关闭
感谢你的使用,如有问题,请重新提交新的工单。") + .setCreateTime(Instant.now()); + ticketChatService.addMessage(request.getTicketId(), message); + //推送消息 + ssePushService.sendTicketMessageToApp(ticket.getUserId(),message); + List handles = Arrays.stream(ticket.getHandle().split(",")) + .map(Integer::parseInt).collect(Collectors.toList()); + handles.remove(AdminUserUtil.getUserId()); + if (CollectionUtil.isNotEmpty(handles)){ + handles.forEach(uid->ssePushService.sendTicketMessageToAdmin(uid,message)); } return ApiResult.success(); } @@ -545,8 +531,10 @@ public class TicketController extends ControllerBase { VUtils.trueThrowBusinessError(Objects.isNull(ticket)).throwMessage("工单不存在"); VUtils.trueThrowBusinessError(!Objects.equals(ticket.getState(), TicketState.Processing.getState())) .throwMessage("当前工单状态不允许发送消息"); - VUtils.trueThrowBusinessError(Arrays.stream(ticket.getHandle().split(",")) - .noneMatch(uid -> StrUtil.equals(uid, AdminUserUtil.getUserId().toString()))) + List handles=Arrays.stream(ticket.getHandle().split(",")) + .map(Integer::parseInt).collect(Collectors.toList()); + VUtils.trueThrowBusinessError(handles.stream() + .noneMatch(uid -> Objects.equals(uid, AdminUserUtil.getUserId()))) .throwMessage("只有工单处理人能发送消息"); ticket.setCurrentHandle(AdminUserUtil.getUserId()); ticketService.updateById(ticket); @@ -568,38 +556,10 @@ public class TicketController extends ControllerBase { } ticketChatService.addMessage(request.getTicketId(), message); //推送消息 - try { - String zone = MultilingualUtil.getZone(); - ZoneId zoneId = ZoneId.of(zone); - DateTimeFormatter formatter = DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN).withZone(zoneId); - ChatMessageVO messageVO = new ChatMessageVO() - .setId(message.getId()) - .setFrom(message.getFrom()) - .setSenderId(message.getSenderId()) - .setSenderName(message.getSenderName()) - .setSenderAvatar(message.getSenderAvatar()) - .setContent(message.getContent()) - .setTicketState(message.getTicketState()) - .setCreateTime(formatter.format(message.getCreateTime())) - .setImages(message.getImages()) - .setAttachments(message.getAttachments()) - .setQuote(Objects.isNull(message.getQuote()) ? null : new ChatMessageVO() - .setId(message.getQuote().getId()) - .setFrom(message.getQuote().getFrom()) - .setSenderId(message.getQuote().getSenderId()) - .setSenderName(message.getQuote().getSenderName()) - .setSenderAvatar(message.getQuote().getSenderAvatar()) - .setTicketState(message.getQuote().getTicketState()) - .setContent(message.getQuote().getContent()) - .setAttachments(message.getQuote().getAttachments()) - .setImages(message.getQuote().getImages()) - .setCreateTime(formatter.format(message.getQuote().getCreateTime()))); - SSEMessageDTO messageDTO = new SSEMessageDTO() - .setType(1) - .setData(messageVO); - sseManagerService.send(ticket.getUserId(), messageDTO); - } catch (IOException e) { - log.error("发送SSE消息出错", e); + ssePushService.sendTicketMessageToApp(ticket.getUserId(),message); + handles.remove(AdminUserUtil.getUserId()); + if (CollectionUtil.isNotEmpty(handles)){ + handles.forEach(uid->ssePushService.sendTicketMessageToAdmin(uid,message)); } ticketEventPublisher.publishTicketReplyEvent(ticket); return ApiResult.success(); diff --git a/nflg-mobilebroken-admin/src/main/java/com/nflg/mobilebroken/admin/service/SsePushService.java b/nflg-mobilebroken-admin/src/main/java/com/nflg/mobilebroken/admin/service/SsePushService.java new file mode 100644 index 00000000..d47667db --- /dev/null +++ b/nflg-mobilebroken-admin/src/main/java/com/nflg/mobilebroken/admin/service/SsePushService.java @@ -0,0 +1,82 @@ +package com.nflg.mobilebroken.admin.service; + +import cn.hutool.core.date.DatePattern; +import cn.hutool.json.JSONUtil; +import com.nflg.mobilebroken.common.pojo.ApiResult; +import com.nflg.mobilebroken.common.pojo.dto.ChatMessageDTO; +import com.nflg.mobilebroken.common.pojo.dto.SSEMessageDTO; +import com.nflg.mobilebroken.common.pojo.request.PushRequest; +import com.nflg.mobilebroken.common.pojo.vo.ChatMessageVO; +import com.nflg.mobilebroken.common.util.MultilingualUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.cloud.context.config.annotation.RefreshScope; +import org.springframework.http.ResponseEntity; +import org.springframework.stereotype.Component; +import org.springframework.web.client.RestTemplate; + +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.Objects; + +@Slf4j +@Component +@RefreshScope +public class SsePushService { + + @Value("${sse.url}") + private String sseUrl; + + public void sendTicketMessageToAdmin(Integer userId, ChatMessageDTO message){ + try { + PushRequest request=new PushRequest().setUserId(userId).setMessage(buildMessage(message)); + RestTemplate restTemplate = new RestTemplate(); + ResponseEntity response = restTemplate.postForEntity(sseUrl+"/sse/admin/push",request, ApiResult.class); + log.debug("发送消息结果:{}", JSONUtil.toJsonStr(response.getBody())); + } catch (Exception e) { + log.error("发送消息出错", e); + } + } + + public void sendTicketMessageToApp(Integer userId, ChatMessageDTO message){ + try { + PushRequest request=new PushRequest().setUserId(userId).setMessage(buildMessage(message)); + RestTemplate restTemplate = new RestTemplate(); + ResponseEntity response = restTemplate.postForEntity(sseUrl+"/sse/app/push",request, ApiResult.class); + log.debug("发送消息结果:{}", JSONUtil.toJsonStr(response.getBody())); + } catch (Exception e) { + log.error("发送消息出错", e); + } + } + + private SSEMessageDTO buildMessage(ChatMessageDTO message){ + String zone = MultilingualUtil.getZone(); + ZoneId zoneId = ZoneId.of(zone); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN).withZone(zoneId); + ChatMessageVO messageVO = new ChatMessageVO() + .setId(message.getId()) + .setFrom(message.getFrom()) + .setSenderId(message.getSenderId()) + .setSenderName(message.getSenderName()) + .setSenderAvatar(message.getSenderAvatar()) + .setContent(message.getContent()) + .setTicketState(message.getTicketState()) + .setCreateTime(formatter.format(message.getCreateTime())) + .setImages(message.getImages()) + .setAttachments(message.getAttachments()) + .setQuote(Objects.isNull(message.getQuote()) ? null : new ChatMessageVO() + .setId(message.getQuote().getId()) + .setFrom(message.getQuote().getFrom()) + .setSenderId(message.getQuote().getSenderId()) + .setSenderName(message.getQuote().getSenderName()) + .setSenderAvatar(message.getQuote().getSenderAvatar()) + .setTicketState(message.getQuote().getTicketState()) + .setContent(message.getQuote().getContent()) + .setAttachments(message.getQuote().getAttachments()) + .setImages(message.getQuote().getImages()) + .setCreateTime(formatter.format(message.getQuote().getCreateTime()))); + return new SSEMessageDTO() + .setType(1) + .setData(messageVO); + } +} diff --git a/nflg-mobilebroken-cfs-app/src/main/java/com/nflg/mobilebroken/cfs/CfsApplication.java b/nflg-mobilebroken-cfs-app/src/main/java/com/nflg/mobilebroken/cfs/CfsApplication.java index 5b70c53a..3a3acf91 100644 --- a/nflg-mobilebroken-cfs-app/src/main/java/com/nflg/mobilebroken/cfs/CfsApplication.java +++ b/nflg-mobilebroken-cfs-app/src/main/java/com/nflg/mobilebroken/cfs/CfsApplication.java @@ -6,12 +6,14 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; import org.springframework.context.annotation.ComponentScan; import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication @MapperScan("com.nflg.mobilebroken.repository.mapper") @ComponentScan(basePackages = {"com.nflg.mobilebroken.repository.service", "com.nflg.mobilebroken.cfs" ,"com.nflg.mobilebroken.starter"}) @EnableDiscoveryClient +@EnableScheduling @EnableAsync public class CfsApplication { diff --git a/nflg-mobilebroken-cfs-app/src/main/java/com/nflg/mobilebroken/cfs/controller/TiketController.java b/nflg-mobilebroken-cfs-app/src/main/java/com/nflg/mobilebroken/cfs/controller/TiketController.java index c3fd8e14..e4290aac 100644 --- a/nflg-mobilebroken-cfs-app/src/main/java/com/nflg/mobilebroken/cfs/controller/TiketController.java +++ b/nflg-mobilebroken-cfs-app/src/main/java/com/nflg/mobilebroken/cfs/controller/TiketController.java @@ -1,17 +1,16 @@ package com.nflg.mobilebroken.cfs.controller; import cn.hutool.core.collection.CollectionUtil; -import cn.hutool.core.date.DatePattern; import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.StrUtil; import com.baomidou.mybatisplus.core.metadata.IPage; import com.nflg.mobilebroken.cfs.publisher.TicketEventPublisher; +import com.nflg.mobilebroken.cfs.service.SsePushService; import com.nflg.mobilebroken.common.constant.Constant; import com.nflg.mobilebroken.common.constant.TicketState; import com.nflg.mobilebroken.common.pojo.ApiResult; import com.nflg.mobilebroken.common.pojo.PageData; import com.nflg.mobilebroken.common.pojo.dto.ChatMessageDTO; -import com.nflg.mobilebroken.common.pojo.dto.SSEMessageDTO; import com.nflg.mobilebroken.common.pojo.dto.TicketChatDTO; import com.nflg.mobilebroken.common.pojo.request.*; import com.nflg.mobilebroken.common.pojo.vo.*; @@ -21,7 +20,6 @@ import com.nflg.mobilebroken.common.util.PageUtil; import com.nflg.mobilebroken.common.util.VUtils; import com.nflg.mobilebroken.repository.entity.*; import com.nflg.mobilebroken.repository.service.*; -import com.nflg.mobilebroken.starter.service.impl.AdminSSEManagerService; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.web.bind.annotation.*; @@ -30,10 +28,7 @@ import javax.annotation.Resource; import javax.validation.Valid; import javax.validation.constraints.NotBlank; import javax.validation.constraints.NotNull; -import java.io.IOException; import java.time.Instant; -import java.time.ZoneId; -import java.time.format.DateTimeFormatter; import java.util.*; import java.util.stream.Collectors; @@ -65,9 +60,6 @@ public class TiketController extends ControllerBase { @Resource private ITicketEvaluateService ticketEvaluateService; - @Resource - private IAdminMessageService adminMessageService; - @Resource private IDictionaryItemTranslateService dictionaryItemTranslateService; @@ -84,7 +76,7 @@ public class TiketController extends ControllerBase { private ITBaseCustomerService customerService; @Resource - private AdminSSEManagerService sseManagerService; + private SsePushService ssePushService; @Resource private TicketEventPublisher ticketEventPublisher; @@ -339,43 +331,10 @@ public class TiketController extends ControllerBase { } ticketChatService.addMessage(request.getTicketId(), message); //推送消息 - String handle = ticket.getHandle(); - if (StrUtil.isNotBlank(handle)) { - String zone = MultilingualUtil.getZone(); - ZoneId zoneId = ZoneId.of(zone); - DateTimeFormatter formatter = DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN).withZone(zoneId); - ChatMessageVO messageVO = new ChatMessageVO() - .setId(message.getId()) - .setFrom(message.getFrom()) - .setSenderId(message.getSenderId()) - .setSenderName(message.getSenderName()) - .setSenderAvatar(message.getSenderAvatar()) - .setContent(message.getContent()) - .setTicketState(message.getTicketState()) - .setCreateTime(formatter.format(message.getCreateTime())) - .setImages(message.getImages()) - .setAttachments(message.getAttachments()) - .setQuote(Objects.isNull(message.getQuote()) ? null : new ChatMessageVO() - .setId(message.getQuote().getId()) - .setFrom(message.getQuote().getFrom()) - .setSenderId(message.getQuote().getSenderId()) - .setSenderName(message.getQuote().getSenderName()) - .setSenderAvatar(message.getQuote().getSenderAvatar()) - .setTicketState(message.getQuote().getTicketState()) - .setContent(message.getQuote().getContent()) - .setAttachments(message.getQuote().getAttachments()) - .setImages(message.getQuote().getImages()) - .setCreateTime(formatter.format(message.getQuote().getCreateTime()))); - SSEMessageDTO messageDTO = new SSEMessageDTO() - .setType(1) - .setData(messageVO); - StrUtil.split(handle, ",").forEach(userId -> { - try { - sseManagerService.send(Integer.valueOf(userId), messageDTO); - } catch (IOException e) { - log.error("发送SSE消息出错", e); - } - }); + List handles=Arrays.stream(ticket.getHandle().split(",")) + .map(Integer::parseInt).collect(Collectors.toList()); + if (CollectionUtil.isNotEmpty(handles)){ + handles.forEach(uid->ssePushService.sendTicketMessageToAdmin(uid,message)); } ticketEventPublisher.publishTicketReplyEvent(ticket, MultilingualUtil.getLanguage(), MultilingualUtil.getZone()); return ApiResult.success(); diff --git a/nflg-mobilebroken-cfs-app/src/main/java/com/nflg/mobilebroken/cfs/service/SsePushService.java b/nflg-mobilebroken-cfs-app/src/main/java/com/nflg/mobilebroken/cfs/service/SsePushService.java new file mode 100644 index 00000000..4a87af94 --- /dev/null +++ b/nflg-mobilebroken-cfs-app/src/main/java/com/nflg/mobilebroken/cfs/service/SsePushService.java @@ -0,0 +1,71 @@ +package com.nflg.mobilebroken.cfs.service; + +import cn.hutool.core.date.DatePattern; +import cn.hutool.json.JSONUtil; +import com.nflg.mobilebroken.common.pojo.ApiResult; +import com.nflg.mobilebroken.common.pojo.dto.ChatMessageDTO; +import com.nflg.mobilebroken.common.pojo.dto.SSEMessageDTO; +import com.nflg.mobilebroken.common.pojo.request.PushRequest; +import com.nflg.mobilebroken.common.pojo.vo.ChatMessageVO; +import com.nflg.mobilebroken.common.util.MultilingualUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.cloud.context.config.annotation.RefreshScope; +import org.springframework.http.ResponseEntity; +import org.springframework.stereotype.Component; +import org.springframework.web.client.RestTemplate; + +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.Objects; + +@Slf4j +@Component +@RefreshScope +public class SsePushService { + + @Value("${sse.url}") + private String sseUrl; + + public void sendTicketMessageToAdmin(Integer userId, ChatMessageDTO message){ + try { + PushRequest request=new PushRequest().setUserId(userId).setMessage(buildMessage(message)); + RestTemplate restTemplate = new RestTemplate(); + ResponseEntity response = restTemplate.postForEntity(sseUrl+"/sse/admin/push",request, ApiResult.class); + log.debug("发送消息结果:{}", JSONUtil.toJsonStr(response.getBody())); + } catch (Exception e) { + log.error("发送消息出错", e); + } + } + + private SSEMessageDTO buildMessage(ChatMessageDTO message){ + String zone = MultilingualUtil.getZone(); + ZoneId zoneId = ZoneId.of(zone); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN).withZone(zoneId); + ChatMessageVO messageVO = new ChatMessageVO() + .setId(message.getId()) + .setFrom(message.getFrom()) + .setSenderId(message.getSenderId()) + .setSenderName(message.getSenderName()) + .setSenderAvatar(message.getSenderAvatar()) + .setContent(message.getContent()) + .setTicketState(message.getTicketState()) + .setCreateTime(formatter.format(message.getCreateTime())) + .setImages(message.getImages()) + .setAttachments(message.getAttachments()) + .setQuote(Objects.isNull(message.getQuote()) ? null : new ChatMessageVO() + .setId(message.getQuote().getId()) + .setFrom(message.getQuote().getFrom()) + .setSenderId(message.getQuote().getSenderId()) + .setSenderName(message.getQuote().getSenderName()) + .setSenderAvatar(message.getQuote().getSenderAvatar()) + .setTicketState(message.getQuote().getTicketState()) + .setContent(message.getQuote().getContent()) + .setAttachments(message.getQuote().getAttachments()) + .setImages(message.getQuote().getImages()) + .setCreateTime(formatter.format(message.getQuote().getCreateTime()))); + return new SSEMessageDTO() + .setType(1) + .setData(messageVO); + } +} \ No newline at end of file diff --git a/nflg-mobilebroken-common/src/main/java/com/nflg/mobilebroken/common/pojo/dto/SSEMessageDTO.java b/nflg-mobilebroken-common/src/main/java/com/nflg/mobilebroken/common/pojo/dto/SSEMessageDTO.java index 12931b3e..b405ffc6 100644 --- a/nflg-mobilebroken-common/src/main/java/com/nflg/mobilebroken/common/pojo/dto/SSEMessageDTO.java +++ b/nflg-mobilebroken-common/src/main/java/com/nflg/mobilebroken/common/pojo/dto/SSEMessageDTO.java @@ -3,13 +3,17 @@ package com.nflg.mobilebroken.common.pojo.dto; import lombok.Data; import lombok.experimental.Accessors; +import javax.validation.constraints.NotNull; + @Data @Accessors(chain = true) public class SSEMessageDTO { //类型,1:工单会话消息,2:消息提醒 + @NotNull private int type; //消息内容 + @NotNull private Object data; } diff --git a/nflg-mobilebroken-common/src/main/java/com/nflg/mobilebroken/common/pojo/request/PushRequest.java b/nflg-mobilebroken-common/src/main/java/com/nflg/mobilebroken/common/pojo/request/PushRequest.java new file mode 100644 index 00000000..e27f3a19 --- /dev/null +++ b/nflg-mobilebroken-common/src/main/java/com/nflg/mobilebroken/common/pojo/request/PushRequest.java @@ -0,0 +1,22 @@ +package com.nflg.mobilebroken.common.pojo.request; + +import com.nflg.mobilebroken.common.pojo.dto.SSEMessageDTO; +import lombok.Data; +import lombok.experimental.Accessors; + +import javax.validation.Valid; +import javax.validation.constraints.NotNull; + +@Data +@Accessors(chain = true) +public class PushRequest { + + //用户id + @NotNull + private Integer userId; + + //消息 + @NotNull + @Valid + private SSEMessageDTO message; +} diff --git a/nflg-mobilebroken-push/pom.xml b/nflg-mobilebroken-push/pom.xml new file mode 100644 index 00000000..785fe915 --- /dev/null +++ b/nflg-mobilebroken-push/pom.xml @@ -0,0 +1,50 @@ + + + 4.0.0 + + com.nflg + nflg-mobilebroken + 1.0.0-SNAPSHOT + + nflg-mobilebroken-push + 1.0.0-SNAPSHOT + 服务-SSE服务 + SSE消息推送功能接口 + jar + + + org.springframework.boot + spring-boot-starter + + + com.nflg + nflg-mobilebroken-common + + + org.springframework + spring-webmvc + + + cn.dev33 + sa-token-spring-boot-starter + + + cn.dev33 + sa-token-jwt + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + org.apache.maven.plugins + maven-surefire-plugin + + + + \ No newline at end of file diff --git a/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/PushApplication.java b/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/PushApplication.java new file mode 100644 index 00000000..80021eea --- /dev/null +++ b/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/PushApplication.java @@ -0,0 +1,20 @@ +package com.nflg.mobilebroken.push; + +import cn.dev33.satoken.SaManager; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.annotation.EnableScheduling; + +@SpringBootApplication +@Slf4j +@EnableScheduling +@EnableAsync +public class PushApplication { + + public static void main(String[] args) { + SpringApplication.run(PushApplication.class, args); + log.info("启动成功,Sa-Token 配置如下:" + SaManager.getConfig()); + } +} diff --git a/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/advice/GlobalRestControllerAdvice.java b/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/advice/GlobalRestControllerAdvice.java new file mode 100644 index 00000000..9542d632 --- /dev/null +++ b/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/advice/GlobalRestControllerAdvice.java @@ -0,0 +1,67 @@ +package com.nflg.mobilebroken.push.advice; + +import cn.dev33.satoken.exception.NotLoginException; +import cn.hutool.core.util.StrUtil; +import com.nflg.mobilebroken.common.constant.STATE; +import com.nflg.mobilebroken.common.exception.NflgException; +import com.nflg.mobilebroken.common.pojo.ApiResult; +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.validation.FieldError; +import org.springframework.web.bind.MethodArgumentNotValidException; +import org.springframework.web.bind.annotation.ExceptionHandler; +import org.springframework.web.bind.annotation.ResponseStatus; +import org.springframework.web.bind.annotation.RestControllerAdvice; + +import javax.validation.ConstraintViolation; +import javax.validation.ConstraintViolationException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +@RestControllerAdvice +@Slf4j +public class GlobalRestControllerAdvice { + + @ExceptionHandler(Exception.class) + public ApiResult handleAllExceptions(Exception ex) { + log.error("服务器内部错误: ", ex); + return ApiResult.error(STATE.BusinessError,"服务器内部错误: " + ex.getMessage()); + } + + @ExceptionHandler(NflgException.class) + public ResponseEntity handleNflgException(NflgException ex) { + if (ex.getState() == STATE.LoginError) { + log.error("登录失效: ", ex); + return ResponseEntity.status(HttpStatus.UNAUTHORIZED).body(ex.getMessage()); + } else { + log.error("业务错误: ", ex); + return ResponseEntity.ok().body(ApiResult.error(ex.getState(), ex.getMessage())); + } + } + + @ExceptionHandler(ConstraintViolationException.class) + public ApiResult handleConstraintViolationException(ConstraintViolationException ex) { + log.error("数据校验失败: ", ex); + return ApiResult.error(STATE.ParamErr, "数据校验失败: " + StrUtil.join(",", ex.getConstraintViolations().stream().map(ConstraintViolation::getMessage).collect(Collectors.toList()))); + } + + @ExceptionHandler(MethodArgumentNotValidException.class) + public ApiResult handleMethodArgumentNotValidException(MethodArgumentNotValidException ex) { + log.error("数据校验失败: ", ex); + List errors = new ArrayList<>(); + ex.getBindingResult().getAllErrors().forEach(error -> { + String fieldName = ((FieldError) error).getField(); + String errorMessage = error.getDefaultMessage(); + errors.add(fieldName + ":" + errorMessage); + }); + return ApiResult.error(STATE.ParamErr, "数据校验失败: " + StrUtil.join(",", errors)); + } + + @ExceptionHandler(NotLoginException.class) + @ResponseStatus(HttpStatus.UNAUTHORIZED) // 返回 401 状态码 + public String handleNotLoginException(NotLoginException e) { + return "请重新登录"; + } +} \ No newline at end of file diff --git a/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/config/CorsConfig.java b/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/config/CorsConfig.java new file mode 100644 index 00000000..d423afb3 --- /dev/null +++ b/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/config/CorsConfig.java @@ -0,0 +1,25 @@ +package com.nflg.mobilebroken.push.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.servlet.config.annotation.CorsRegistry; +import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; + +@Configuration +public class CorsConfig { + + @Bean + public WebMvcConfigurer corsConfigurer() { + return new WebMvcConfigurer() { + @Override + public void addCorsMappings(CorsRegistry registry) { + registry.addMapping("/**") // 允许所有路径 + .allowedOrigins("*") // 允许所有来源 + .allowedMethods("GET", "POST", "PUT", "DELETE") // 允许的HTTP方法 + .allowedHeaders("*") // 允许所有请求头 + .allowCredentials(true) // 允许携带凭证(如cookies) + .maxAge(3600); // 预检请求的缓存时间(秒) + } + }; + } +} diff --git a/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/config/SaTokenAnnotationConfigure.java b/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/config/SaTokenAnnotationConfigure.java new file mode 100644 index 00000000..9e91fb2b --- /dev/null +++ b/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/config/SaTokenAnnotationConfigure.java @@ -0,0 +1,23 @@ +package com.nflg.mobilebroken.push.config; + +import cn.dev33.satoken.jwt.StpLogicJwtForStateless; +import cn.dev33.satoken.strategy.SaAnnotationStrategy; +import com.nflg.mobilebroken.common.util.SaTokenAdminUtil; +import com.nflg.mobilebroken.common.util.SaTokenAppUtil; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.annotation.AnnotatedElementUtils; + +import javax.annotation.PostConstruct; + +@Configuration +public class SaTokenAnnotationConfigure { + + @PostConstruct + public void rewriteSaStrategy() { + // 重写Sa-Token的注解处理器,增加注解合并功能 + SaAnnotationStrategy.instance.getAnnotation = AnnotatedElementUtils::getMergedAnnotation; + //设置jwt模式 + SaTokenAppUtil.setStpLogic(new StpLogicJwtForStateless(SaTokenAppUtil.TYPE)); + SaTokenAdminUtil.setStpLogic(new StpLogicJwtForStateless(SaTokenAdminUtil.TYPE)); + } +} \ No newline at end of file diff --git a/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/controller/SSEController.java b/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/controller/SSEController.java new file mode 100644 index 00000000..6a881b63 --- /dev/null +++ b/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/controller/SSEController.java @@ -0,0 +1,76 @@ +package com.nflg.mobilebroken.push.controller; + +import com.nflg.mobilebroken.common.constant.STATE; +import com.nflg.mobilebroken.common.pojo.ApiResult; +import com.nflg.mobilebroken.common.pojo.request.PushRequest; +import com.nflg.mobilebroken.common.util.AdminUserUtil; +import com.nflg.mobilebroken.common.util.AppUserUtil; +import com.nflg.mobilebroken.push.service.impl.APPSSEManagerService; +import com.nflg.mobilebroken.push.service.impl.AdminSSEManagerService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.*; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import javax.annotation.Resource; +import javax.validation.Valid; +import javax.validation.constraints.NotNull; +import java.io.IOException; + +@RestController +@Slf4j +@RequestMapping("/sse") +public class SSEController { + + @Resource + private APPSSEManagerService appsseManagerService; + + @Resource + private AdminSSEManagerService adminSSEManagerService; + + /** + * 客户端账号建立sse连接 + */ + @GetMapping(value = "app/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + public SseEmitter appConnect() { + return appsseManagerService.connect(AppUserUtil.getUserId()); + } + + /** + * 推送消息到客户端账号 + * @param request 请求参数 + */ + @PostMapping("app/push") + public ApiResult pushtToApp(@Valid @RequestBody @NotNull PushRequest request){ + try { + appsseManagerService.send(request.getUserId(),request.getMessage()); + return ApiResult.success(); + } catch (IOException e) { + log.error("发送SSE消息出错", e); + return ApiResult.error(STATE.BusinessError,e.getMessage()); + } + } + + /** + * 管理端账号建立sse连接 + */ + @GetMapping(value = "admin/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + public SseEmitter adminConnect() { + return adminSSEManagerService.connect(AdminUserUtil.getUserId()); + } + + /** + * 推送消息到管理端账号 + * @param request 请求参数 + */ + @PostMapping("admin/push") + public ApiResult pushtToAdmin(@Valid @RequestBody @NotNull PushRequest request){ + try { + adminSSEManagerService.send(request.getUserId(),request.getMessage()); + return ApiResult.success(); + } catch (IOException e) { + log.error("发送SSE消息出错", e); + return ApiResult.error(STATE.BusinessError,e.getMessage()); + } + } +} diff --git a/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/service/SSEManagerBase.java b/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/service/SSEManagerBase.java new file mode 100644 index 00000000..3ac0af89 --- /dev/null +++ b/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/service/SSEManagerBase.java @@ -0,0 +1,73 @@ +package com.nflg.mobilebroken.push.service; + +import com.nflg.mobilebroken.common.constant.STATE; +import com.nflg.mobilebroken.common.util.VUtils; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +@Slf4j +public class SSEManagerBase { + + private static boolean IS_SHUTDOWN = false; + + protected void check(){ + VUtils.trueThrow(IS_SHUTDOWN).throwMessage(STATE.ServiceConnectRefused,"SSE服务已关闭"); + } + + protected static void shutdown(Map emitters) { + IS_SHUTDOWN=true; + log.warn("准备关闭SSE服务"); + emitters.forEach((k,v)->{ + try { + v.send("因SSE服务关闭,连接即将断开"); + }catch (Exception ex){ + log.error("SSE发送消息失败:"+k,ex); + } + v.complete(); + }); + log.warn("SSE服务已关闭"); + } + + protected static void close(SseEmitter emitter){ + emitter.complete(); + } + + protected SseEmitter connect(Integer userId, Map emitters) { + SseEmitter emitter = new SseEmitter(Long.MAX_VALUE); + SseEmitter old=emitters.put(userId, emitter); + if (Objects.nonNull(old)){ + log.warn("停止旧连接:"+userId); + try { + old.send(SseEmitter.event().name("被踢下线").data("你已在其他地方连接")); + old.complete(); + } catch (Exception e) { + old.completeWithError(e); + } + } + emitter.onError((ex) -> { + emitters.remove(userId); + emitter.complete(); + log.error("SSE异常:"+userId, ex); + }); + emitter.onTimeout(() -> { + emitters.remove(userId); + emitter.complete(); + log.error("SSE超时:"+userId); + }); + emitter.onCompletion(() -> { + emitters.remove(userId); + emitter.complete(); + log.error("SSE完成:"+userId); + }); + try { + emitter.send(SseEmitter.event().data("已连接").reconnectTime(5000)); + } catch (IOException e) { + log.error("sse发送数据出错", e); + } + return emitter; + } +} \ No newline at end of file diff --git a/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/service/SSEManagerService.java b/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/service/SSEManagerService.java new file mode 100644 index 00000000..905dff94 --- /dev/null +++ b/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/service/SSEManagerService.java @@ -0,0 +1,20 @@ +package com.nflg.mobilebroken.push.service; + +import com.nflg.mobilebroken.common.pojo.dto.SSEMessageDTO; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import java.io.IOException; +import java.util.Collection; + +public interface SSEManagerService { + + SseEmitter connect(Integer userId); + + void send(Integer userId, SSEMessageDTO message) throws IOException; + + void close(Integer userId); + + void shutdown(); + + Collection getUserIds(); +} diff --git a/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/service/impl/APPSSEManagerService.java b/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/service/impl/APPSSEManagerService.java new file mode 100644 index 00000000..d9c8ad91 --- /dev/null +++ b/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/service/impl/APPSSEManagerService.java @@ -0,0 +1,68 @@ +package com.nflg.mobilebroken.push.service.impl; + +import cn.hutool.core.util.StrUtil; +import com.nflg.mobilebroken.common.pojo.dto.SSEMessageDTO; +import com.nflg.mobilebroken.common.util.VUtils; +import com.nflg.mobilebroken.push.service.SSEManagerBase; +import com.nflg.mobilebroken.push.service.SSEManagerService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import javax.annotation.PreDestroy; +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +@Service +@Slf4j +public class APPSSEManagerService extends SSEManagerBase implements SSEManagerService { + + public static final Map EMITTERS = new ConcurrentHashMap<>(); + + @Override + public SseEmitter connect(Integer userId) { + check(); + log.info("APP端SSE已连接:"+userId); + return connect(userId,EMITTERS); + } + + @Override + public void send(Integer userId, SSEMessageDTO message) throws IOException { + log.info(StrUtil.format("APP端SSE发送消息,用户id: {},内容: {}", userId, message)); + SseEmitter emitter = EMITTERS.get(userId); + VUtils.trueThrowBusinessError(Objects.isNull(emitter)).throwMessage("用户未连接:"+userId); + emitter.send(message); + } + + @Override + public void close(Integer userId) { + log.info("APP端SSE连接主动关闭:"+userId); + close(EMITTERS.remove(userId)); + } + + @Override + public void shutdown() { + shutdown(EMITTERS); + } + + @Override + public Collection getUserIds() { + return EMITTERS.keySet(); + } + + @PreDestroy + public void cleanup() { + log.info("释放SSE连接"); + for (SseEmitter emitter : EMITTERS.values()) { + try { + emitter.complete(); + } catch (Exception e) { + emitter.completeWithError(e); + } + } + EMITTERS.clear(); + } +} \ No newline at end of file diff --git a/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/service/impl/AdminSSEManagerService.java b/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/service/impl/AdminSSEManagerService.java new file mode 100644 index 00000000..33da938a --- /dev/null +++ b/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/service/impl/AdminSSEManagerService.java @@ -0,0 +1,68 @@ +package com.nflg.mobilebroken.push.service.impl; + +import cn.hutool.core.util.StrUtil; +import com.nflg.mobilebroken.common.pojo.dto.SSEMessageDTO; +import com.nflg.mobilebroken.common.util.VUtils; +import com.nflg.mobilebroken.push.service.SSEManagerBase; +import com.nflg.mobilebroken.push.service.SSEManagerService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import javax.annotation.PreDestroy; +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +@Service +@Slf4j +public class AdminSSEManagerService extends SSEManagerBase implements SSEManagerService { + + public static final Map EMITTERS = new ConcurrentHashMap<>(); + + @Override + public SseEmitter connect(Integer userId) { + check(); + log.info("管理端SSE已连接:"+userId); + return connect(userId,EMITTERS); + } + + @Override + public void send(Integer userId, SSEMessageDTO message) throws IOException { + log.info(StrUtil.format("管理端SSE发送消息,用户id: {},内容: {}", userId, message)); + SseEmitter emitter = EMITTERS.get(userId); + VUtils.trueThrowBusinessError(Objects.isNull(emitter)).throwMessage("用户未连接:"+userId); + emitter.send(message); + } + + @Override + public void close(Integer userId) { + close(EMITTERS.remove(userId)); + log.info("管理端SSE连接主动关闭:"+userId); + } + + @Override + public void shutdown() { + shutdown(EMITTERS); + } + + @Override + public Collection getUserIds() { + return EMITTERS.keySet(); + } + + @PreDestroy + public void cleanup() { + log.info("释放SSE连接"); + for (SseEmitter emitter : EMITTERS.values()) { + try { + emitter.complete(); + } catch (Exception e) { + emitter.completeWithError(e); + } + } + EMITTERS.clear(); + } +} \ No newline at end of file diff --git a/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/task/SSEScheduledTasks.java b/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/task/SSEScheduledTasks.java new file mode 100644 index 00000000..ee6f4992 --- /dev/null +++ b/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/task/SSEScheduledTasks.java @@ -0,0 +1,41 @@ +package com.nflg.mobilebroken.push.task; + +import com.nflg.mobilebroken.push.service.impl.APPSSEManagerService; +import com.nflg.mobilebroken.push.service.impl.AdminSSEManagerService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import java.util.Iterator; +import java.util.Map; + +@Component +@Slf4j +public class SSEScheduledTasks { + + /** + * 发送SSE心跳消息 + * 每分钟执行一次 + */ + @Scheduled(fixedDelay=60000) + public void sendHeart() { + log.info("发送SSE心跳消息开始"); + send(APPSSEManagerService.EMITTERS.entrySet().iterator()); + send(AdminSSEManagerService.EMITTERS.entrySet().iterator()); + log.info("发送SSE心跳消息结束"); + } + + private void send(Iterator iterator) { + while (iterator.hasNext()) { + SseEmitter emitter = ((Map.Entry) iterator.next()).getValue(); + try { + emitter.send(SseEmitter.event().data("心跳")); + } catch (Exception e) { + log.error("发送心跳消息失败", e); + emitter.complete(); + iterator.remove(); + } + } + } +} diff --git a/nflg-mobilebroken-push/src/main/resources/application-dev.properties b/nflg-mobilebroken-push/src/main/resources/application-dev.properties new file mode 100644 index 00000000..3bfe5d16 --- /dev/null +++ b/nflg-mobilebroken-push/src/main/resources/application-dev.properties @@ -0,0 +1 @@ +logging.config=classpath:logback-sit.xml \ No newline at end of file diff --git a/nflg-mobilebroken-push/src/main/resources/application.properties b/nflg-mobilebroken-push/src/main/resources/application.properties new file mode 100644 index 00000000..5cc7c6e0 --- /dev/null +++ b/nflg-mobilebroken-push/src/main/resources/application.properties @@ -0,0 +1,14 @@ +spring.application.name=push +spring.profiles.active=dev +server.port=8084 + +# sa-token?? +sa-token.is-log=true +sa-token.token-name=authorization +# token ????????? ??1??-1 ?????? +sa-token.timeout=86400 +sa-token.is-read-header=true +sa-token.is-read-cookie=false +sa-token.is-read-body=false +sa-token.is-share=true +sa-token.jwt-secret-key=sdadewr23DEWR342D3242c \ No newline at end of file diff --git a/nflg-mobilebroken-push/src/main/resources/logback-sit.xml b/nflg-mobilebroken-push/src/main/resources/logback-sit.xml new file mode 100644 index 00000000..c5eed4d6 --- /dev/null +++ b/nflg-mobilebroken-push/src/main/resources/logback-sit.xml @@ -0,0 +1,71 @@ + + + + + + + + + + + + %d{yyyy-MM-dd HH:mm:ss.SSS} %X{traceId} [%thread] %-5level %logger{10} %msg%n + + + + + + + debug + + ${logDir}/mobilebroken-push.log + + + ${logDir}/%d{yyyy-MM-dd}.%i.log + + 15 + + true + + ${splitSize} + + + + %d{yyyy-MM-dd HH:mm:ss.SSS} %X{traceId} [%thread] %-5level %logger{50} %msg%n + + + + + + + error + + ${logDir}/error/mobilebroken-push.log + + + ${logDir}/error/%d{yyyy-MM-dd}.%i.error.log + + 30 + + true + + ${splitSize} + + + + %d{yyyy-MM-dd HH:mm:ss.SSS} %X{traceId} [%thread] %-5level %logger{50} %msg%n + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/nflg-mobilebroken-repository/src/main/resources/mapper/TBaseDepartmentMapper.xml b/nflg-mobilebroken-repository/src/main/resources/mapper/TBaseDepartmentMapper.xml index c5beb7ac..af8b6f03 100644 --- a/nflg-mobilebroken-repository/src/main/resources/mapper/TBaseDepartmentMapper.xml +++ b/nflg-mobilebroken-repository/src/main/resources/mapper/TBaseDepartmentMapper.xml @@ -13,6 +13,9 @@ select * from t_base_department where data_valid_status=1 + + and dept_parent_id=0 + diff --git a/pom.xml b/pom.xml index 9ba9b7b3..0e806969 100644 --- a/pom.xml +++ b/pom.xml @@ -20,6 +20,7 @@ nflg-mobilebroken-gateway nflg-mobilebroken-repository nflg-mobilebroken-auth + nflg-mobilebroken-push 11