feat: sse独立出来

This commit is contained in:
曹鹏飞 2025-02-25 20:54:11 +08:00
parent d0e6a54bf0
commit 02667a58be
25 changed files with 865 additions and 142 deletions

View File

@ -119,8 +119,8 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>10</source>
<target>10</target>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
</plugins>

View File

@ -8,12 +8,14 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@MapperScan("com.nflg.mobilebroken.repository.mapper")
@ComponentScan(basePackages = {"com.nflg.mobilebroken.repository.service", "com.nflg.mobilebroken.admin"
, "com.nflg.mobilebroken.starter"})
@EnableDiscoveryClient
@EnableScheduling
@Slf4j
@EnableAsync
public class AdminApplication {

View File

@ -1,28 +1,25 @@
package com.nflg.mobilebroken.admin.controller;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.util.StrUtil;
import com.itextpdf.html2pdf.ConverterProperties;
import com.itextpdf.html2pdf.HtmlConverter;
import com.itextpdf.layout.font.FontProvider;
import com.nflg.mobilebroken.admin.annotation.ApiMark;
import com.nflg.mobilebroken.admin.publisher.TicketEventPublisher;
import com.nflg.mobilebroken.admin.service.SsePushService;
import com.nflg.mobilebroken.common.constant.STATE;
import com.nflg.mobilebroken.common.constant.TicketState;
import com.nflg.mobilebroken.common.exception.NflgException;
import com.nflg.mobilebroken.common.pojo.ApiResult;
import com.nflg.mobilebroken.common.pojo.PageData;
import com.nflg.mobilebroken.common.pojo.dto.ChatMessageDTO;
import com.nflg.mobilebroken.common.pojo.dto.SSEMessageDTO;
import com.nflg.mobilebroken.common.pojo.dto.TicketChatDTO;
import com.nflg.mobilebroken.common.pojo.request.*;
import com.nflg.mobilebroken.common.pojo.vo.*;
import com.nflg.mobilebroken.common.util.*;
import com.nflg.mobilebroken.repository.entity.*;
import com.nflg.mobilebroken.repository.service.*;
import com.nflg.mobilebroken.starter.annotation.MethodInfoMark;
import com.nflg.mobilebroken.starter.service.impl.APPSSEManagerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.http.HttpHeaders;
@ -42,8 +39,6 @@ import java.io.IOException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.stream.Collectors;
@ -77,7 +72,7 @@ public class TicketController extends ControllerBase {
private TicketChatService ticketChatService;
@Resource
private APPSSEManagerService sseManagerService;
private SsePushService ssePushService;
@Resource
private IAppAreaService appAreaService;
@ -204,25 +199,18 @@ public class TicketController extends ControllerBase {
@ApiMark(moduleName = "工单管理", apiName = "分派工单")
public ApiResult<Void> assignmentTicket(@Valid @RequestBody AssignmentTicketRequest request) {
Ticket ticket=ticketService.assignmentTicket(request);
TicketChatDTO message=new TicketChatDTO()
.setTicketId(ticket.getId())
.setMessages(Collections.singletonList(new ChatMessageDTO()
.setFrom("system")
.setTicketState(ticket.getState())
.setSenderId(0)
.setSenderName("服务助手")
.setCreateTime(Instant.now())
.setContent("<b>已为您分派工程师</b><br/>如需补充信息,请继续留言。")));
ticketChatService.add(message);
try {
SSEMessageDTO messageDTO = new SSEMessageDTO()
.setType(1)
.setData(message);
sseManagerService.send(ticket.getUserId(), messageDTO);
} catch (IOException e) {
log.error("发送SSE消息出错", e);
}
ticketEventPublisher.publishTicketAssignedEvent(ticket,request.getUserIds());
ChatMessageDTO message = new ChatMessageDTO()
.setId(cn.hutool.core.util.IdUtil.getSnowflakeNextIdStr())
.setFrom("system")
.setTicketState(ticket.getState())
.setSenderId(0)
.setSenderName("服务助手")
.setContent("<b>已为您分派工程师</b><br/>如需补充信息,请继续留言。")
.setCreateTime(Instant.now());
ticketChatService.addMessage(ticket.getId(), message);
//推送消息
ssePushService.sendTicketMessageToApp(ticket.getUserId(),message);
return ApiResult.success();
}
@ -261,23 +249,22 @@ public class TicketController extends ControllerBase {
for (Integer id : ids){
Ticket ticket=ticketService.completeTicket(id);
ticketEventPublisher.publishTicketCompleteEvent(ticket);
TicketChatDTO message=new TicketChatDTO()
.setTicketId(ticket.getId())
.setMessages(Collections.singletonList(new ChatMessageDTO()
.setFrom("system")
.setTicketState(ticket.getState())
.setSenderId(0)
.setSenderName("服务助手")
.setCreateTime(Instant.now())
.setContent("<b>工单已完成</b><br/>工单已完成,如需补充信息,请重新打开工单,继续留言。")));
ticketChatService.add(message);
try {
SSEMessageDTO messageDTO = new SSEMessageDTO()
.setType(1)
.setData(message);
sseManagerService.send(ticket.getUserId(), messageDTO);
} catch (IOException e) {
log.error("发送SSE消息出错", e);
ChatMessageDTO message = new ChatMessageDTO()
.setId(cn.hutool.core.util.IdUtil.getSnowflakeNextIdStr())
.setFrom("system")
.setTicketState(ticket.getState())
.setSenderId(0)
.setSenderName("服务助手")
.setContent("<b>工单已完成</b><br/>工单已完成,如需补充信息,请重新打开工单,继续留言。")
.setCreateTime(Instant.now());
ticketChatService.addMessage(id, message);
//推送消息
ssePushService.sendTicketMessageToApp(ticket.getUserId(),message);
List<Integer> handles = Arrays.stream(ticket.getHandle().split(","))
.map(Integer::parseInt).collect(Collectors.toList());
handles.remove(AdminUserUtil.getUserId());
if (CollectionUtil.isNotEmpty(handles)){
handles.forEach(uid->ssePushService.sendTicketMessageToAdmin(uid,message));
}
}
return ApiResult.success();
@ -293,23 +280,22 @@ public class TicketController extends ControllerBase {
public ApiResult<Void> closeTicket(@Valid @RequestBody TicketCloseRequest request) {
Ticket ticket=ticketService.closeTicket(request);
ticketEventPublisher.publishTicketCloseEvent(ticket);
TicketChatDTO message=new TicketChatDTO()
.setTicketId(ticket.getId())
.setMessages(Collections.singletonList(new ChatMessageDTO()
.setFrom("system")
.setTicketState(ticket.getState())
.setSenderId(0)
.setSenderName("服务助手")
.setCreateTime(Instant.now())
.setContent("<b>工单已关闭</b><br/>感谢你的使用,如有问题,请重新提交新的工单。")));
ticketChatService.add(message);
try {
SSEMessageDTO messageDTO = new SSEMessageDTO()
.setType(1)
.setData(message);
sseManagerService.send(ticket.getUserId(), messageDTO);
} catch (IOException e) {
log.error("发送SSE消息出错", e);
ChatMessageDTO message = new ChatMessageDTO()
.setId(cn.hutool.core.util.IdUtil.getSnowflakeNextIdStr())
.setFrom("system")
.setTicketState(ticket.getState())
.setSenderId(0)
.setSenderName("服务助手")
.setContent("<b>工单已关闭</b><br/>感谢你的使用,如有问题,请重新提交新的工单。")
.setCreateTime(Instant.now());
ticketChatService.addMessage(request.getTicketId(), message);
//推送消息
ssePushService.sendTicketMessageToApp(ticket.getUserId(),message);
List<Integer> handles = Arrays.stream(ticket.getHandle().split(","))
.map(Integer::parseInt).collect(Collectors.toList());
handles.remove(AdminUserUtil.getUserId());
if (CollectionUtil.isNotEmpty(handles)){
handles.forEach(uid->ssePushService.sendTicketMessageToAdmin(uid,message));
}
return ApiResult.success();
}
@ -545,8 +531,10 @@ public class TicketController extends ControllerBase {
VUtils.trueThrowBusinessError(Objects.isNull(ticket)).throwMessage("工单不存在");
VUtils.trueThrowBusinessError(!Objects.equals(ticket.getState(), TicketState.Processing.getState()))
.throwMessage("当前工单状态不允许发送消息");
VUtils.trueThrowBusinessError(Arrays.stream(ticket.getHandle().split(","))
.noneMatch(uid -> StrUtil.equals(uid, AdminUserUtil.getUserId().toString())))
List<Integer> handles=Arrays.stream(ticket.getHandle().split(","))
.map(Integer::parseInt).collect(Collectors.toList());
VUtils.trueThrowBusinessError(handles.stream()
.noneMatch(uid -> Objects.equals(uid, AdminUserUtil.getUserId())))
.throwMessage("只有工单处理人能发送消息");
ticket.setCurrentHandle(AdminUserUtil.getUserId());
ticketService.updateById(ticket);
@ -568,38 +556,10 @@ public class TicketController extends ControllerBase {
}
ticketChatService.addMessage(request.getTicketId(), message);
//推送消息
try {
String zone = MultilingualUtil.getZone();
ZoneId zoneId = ZoneId.of(zone);
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN).withZone(zoneId);
ChatMessageVO messageVO = new ChatMessageVO()
.setId(message.getId())
.setFrom(message.getFrom())
.setSenderId(message.getSenderId())
.setSenderName(message.getSenderName())
.setSenderAvatar(message.getSenderAvatar())
.setContent(message.getContent())
.setTicketState(message.getTicketState())
.setCreateTime(formatter.format(message.getCreateTime()))
.setImages(message.getImages())
.setAttachments(message.getAttachments())
.setQuote(Objects.isNull(message.getQuote()) ? null : new ChatMessageVO()
.setId(message.getQuote().getId())
.setFrom(message.getQuote().getFrom())
.setSenderId(message.getQuote().getSenderId())
.setSenderName(message.getQuote().getSenderName())
.setSenderAvatar(message.getQuote().getSenderAvatar())
.setTicketState(message.getQuote().getTicketState())
.setContent(message.getQuote().getContent())
.setAttachments(message.getQuote().getAttachments())
.setImages(message.getQuote().getImages())
.setCreateTime(formatter.format(message.getQuote().getCreateTime())));
SSEMessageDTO messageDTO = new SSEMessageDTO()
.setType(1)
.setData(messageVO);
sseManagerService.send(ticket.getUserId(), messageDTO);
} catch (IOException e) {
log.error("发送SSE消息出错", e);
ssePushService.sendTicketMessageToApp(ticket.getUserId(),message);
handles.remove(AdminUserUtil.getUserId());
if (CollectionUtil.isNotEmpty(handles)){
handles.forEach(uid->ssePushService.sendTicketMessageToAdmin(uid,message));
}
ticketEventPublisher.publishTicketReplyEvent(ticket);
return ApiResult.success();

View File

@ -0,0 +1,82 @@
package com.nflg.mobilebroken.admin.service;
import cn.hutool.core.date.DatePattern;
import cn.hutool.json.JSONUtil;
import com.nflg.mobilebroken.common.pojo.ApiResult;
import com.nflg.mobilebroken.common.pojo.dto.ChatMessageDTO;
import com.nflg.mobilebroken.common.pojo.dto.SSEMessageDTO;
import com.nflg.mobilebroken.common.pojo.request.PushRequest;
import com.nflg.mobilebroken.common.pojo.vo.ChatMessageVO;
import com.nflg.mobilebroken.common.util.MultilingualUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Objects;
@Slf4j
@Component
@RefreshScope
public class SsePushService {
@Value("${sse.url}")
private String sseUrl;
public void sendTicketMessageToAdmin(Integer userId, ChatMessageDTO message){
try {
PushRequest request=new PushRequest().setUserId(userId).setMessage(buildMessage(message));
RestTemplate restTemplate = new RestTemplate();
ResponseEntity<ApiResult> response = restTemplate.postForEntity(sseUrl+"/sse/admin/push",request, ApiResult.class);
log.debug("发送消息结果:{}", JSONUtil.toJsonStr(response.getBody()));
} catch (Exception e) {
log.error("发送消息出错", e);
}
}
public void sendTicketMessageToApp(Integer userId, ChatMessageDTO message){
try {
PushRequest request=new PushRequest().setUserId(userId).setMessage(buildMessage(message));
RestTemplate restTemplate = new RestTemplate();
ResponseEntity<ApiResult> response = restTemplate.postForEntity(sseUrl+"/sse/app/push",request, ApiResult.class);
log.debug("发送消息结果:{}", JSONUtil.toJsonStr(response.getBody()));
} catch (Exception e) {
log.error("发送消息出错", e);
}
}
private SSEMessageDTO buildMessage(ChatMessageDTO message){
String zone = MultilingualUtil.getZone();
ZoneId zoneId = ZoneId.of(zone);
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN).withZone(zoneId);
ChatMessageVO messageVO = new ChatMessageVO()
.setId(message.getId())
.setFrom(message.getFrom())
.setSenderId(message.getSenderId())
.setSenderName(message.getSenderName())
.setSenderAvatar(message.getSenderAvatar())
.setContent(message.getContent())
.setTicketState(message.getTicketState())
.setCreateTime(formatter.format(message.getCreateTime()))
.setImages(message.getImages())
.setAttachments(message.getAttachments())
.setQuote(Objects.isNull(message.getQuote()) ? null : new ChatMessageVO()
.setId(message.getQuote().getId())
.setFrom(message.getQuote().getFrom())
.setSenderId(message.getQuote().getSenderId())
.setSenderName(message.getQuote().getSenderName())
.setSenderAvatar(message.getQuote().getSenderAvatar())
.setTicketState(message.getQuote().getTicketState())
.setContent(message.getQuote().getContent())
.setAttachments(message.getQuote().getAttachments())
.setImages(message.getQuote().getImages())
.setCreateTime(formatter.format(message.getQuote().getCreateTime())));
return new SSEMessageDTO()
.setType(1)
.setData(messageVO);
}
}

View File

@ -6,12 +6,14 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@MapperScan("com.nflg.mobilebroken.repository.mapper")
@ComponentScan(basePackages = {"com.nflg.mobilebroken.repository.service", "com.nflg.mobilebroken.cfs"
,"com.nflg.mobilebroken.starter"})
@EnableDiscoveryClient
@EnableScheduling
@EnableAsync
public class CfsApplication {

View File

@ -1,17 +1,16 @@
package com.nflg.mobilebroken.cfs.controller;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.nflg.mobilebroken.cfs.publisher.TicketEventPublisher;
import com.nflg.mobilebroken.cfs.service.SsePushService;
import com.nflg.mobilebroken.common.constant.Constant;
import com.nflg.mobilebroken.common.constant.TicketState;
import com.nflg.mobilebroken.common.pojo.ApiResult;
import com.nflg.mobilebroken.common.pojo.PageData;
import com.nflg.mobilebroken.common.pojo.dto.ChatMessageDTO;
import com.nflg.mobilebroken.common.pojo.dto.SSEMessageDTO;
import com.nflg.mobilebroken.common.pojo.dto.TicketChatDTO;
import com.nflg.mobilebroken.common.pojo.request.*;
import com.nflg.mobilebroken.common.pojo.vo.*;
@ -21,7 +20,6 @@ import com.nflg.mobilebroken.common.util.PageUtil;
import com.nflg.mobilebroken.common.util.VUtils;
import com.nflg.mobilebroken.repository.entity.*;
import com.nflg.mobilebroken.repository.service.*;
import com.nflg.mobilebroken.starter.service.impl.AdminSSEManagerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.*;
@ -30,10 +28,7 @@ import javax.annotation.Resource;
import javax.validation.Valid;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.stream.Collectors;
@ -65,9 +60,6 @@ public class TiketController extends ControllerBase {
@Resource
private ITicketEvaluateService ticketEvaluateService;
@Resource
private IAdminMessageService adminMessageService;
@Resource
private IDictionaryItemTranslateService dictionaryItemTranslateService;
@ -84,7 +76,7 @@ public class TiketController extends ControllerBase {
private ITBaseCustomerService customerService;
@Resource
private AdminSSEManagerService sseManagerService;
private SsePushService ssePushService;
@Resource
private TicketEventPublisher ticketEventPublisher;
@ -339,43 +331,10 @@ public class TiketController extends ControllerBase {
}
ticketChatService.addMessage(request.getTicketId(), message);
//推送消息
String handle = ticket.getHandle();
if (StrUtil.isNotBlank(handle)) {
String zone = MultilingualUtil.getZone();
ZoneId zoneId = ZoneId.of(zone);
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN).withZone(zoneId);
ChatMessageVO messageVO = new ChatMessageVO()
.setId(message.getId())
.setFrom(message.getFrom())
.setSenderId(message.getSenderId())
.setSenderName(message.getSenderName())
.setSenderAvatar(message.getSenderAvatar())
.setContent(message.getContent())
.setTicketState(message.getTicketState())
.setCreateTime(formatter.format(message.getCreateTime()))
.setImages(message.getImages())
.setAttachments(message.getAttachments())
.setQuote(Objects.isNull(message.getQuote()) ? null : new ChatMessageVO()
.setId(message.getQuote().getId())
.setFrom(message.getQuote().getFrom())
.setSenderId(message.getQuote().getSenderId())
.setSenderName(message.getQuote().getSenderName())
.setSenderAvatar(message.getQuote().getSenderAvatar())
.setTicketState(message.getQuote().getTicketState())
.setContent(message.getQuote().getContent())
.setAttachments(message.getQuote().getAttachments())
.setImages(message.getQuote().getImages())
.setCreateTime(formatter.format(message.getQuote().getCreateTime())));
SSEMessageDTO messageDTO = new SSEMessageDTO()
.setType(1)
.setData(messageVO);
StrUtil.split(handle, ",").forEach(userId -> {
try {
sseManagerService.send(Integer.valueOf(userId), messageDTO);
} catch (IOException e) {
log.error("发送SSE消息出错", e);
}
});
List<Integer> handles=Arrays.stream(ticket.getHandle().split(","))
.map(Integer::parseInt).collect(Collectors.toList());
if (CollectionUtil.isNotEmpty(handles)){
handles.forEach(uid->ssePushService.sendTicketMessageToAdmin(uid,message));
}
ticketEventPublisher.publishTicketReplyEvent(ticket, MultilingualUtil.getLanguage(), MultilingualUtil.getZone());
return ApiResult.success();

View File

@ -0,0 +1,71 @@
package com.nflg.mobilebroken.cfs.service;
import cn.hutool.core.date.DatePattern;
import cn.hutool.json.JSONUtil;
import com.nflg.mobilebroken.common.pojo.ApiResult;
import com.nflg.mobilebroken.common.pojo.dto.ChatMessageDTO;
import com.nflg.mobilebroken.common.pojo.dto.SSEMessageDTO;
import com.nflg.mobilebroken.common.pojo.request.PushRequest;
import com.nflg.mobilebroken.common.pojo.vo.ChatMessageVO;
import com.nflg.mobilebroken.common.util.MultilingualUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Objects;
@Slf4j
@Component
@RefreshScope
public class SsePushService {
@Value("${sse.url}")
private String sseUrl;
public void sendTicketMessageToAdmin(Integer userId, ChatMessageDTO message){
try {
PushRequest request=new PushRequest().setUserId(userId).setMessage(buildMessage(message));
RestTemplate restTemplate = new RestTemplate();
ResponseEntity<ApiResult> response = restTemplate.postForEntity(sseUrl+"/sse/admin/push",request, ApiResult.class);
log.debug("发送消息结果:{}", JSONUtil.toJsonStr(response.getBody()));
} catch (Exception e) {
log.error("发送消息出错", e);
}
}
private SSEMessageDTO buildMessage(ChatMessageDTO message){
String zone = MultilingualUtil.getZone();
ZoneId zoneId = ZoneId.of(zone);
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN).withZone(zoneId);
ChatMessageVO messageVO = new ChatMessageVO()
.setId(message.getId())
.setFrom(message.getFrom())
.setSenderId(message.getSenderId())
.setSenderName(message.getSenderName())
.setSenderAvatar(message.getSenderAvatar())
.setContent(message.getContent())
.setTicketState(message.getTicketState())
.setCreateTime(formatter.format(message.getCreateTime()))
.setImages(message.getImages())
.setAttachments(message.getAttachments())
.setQuote(Objects.isNull(message.getQuote()) ? null : new ChatMessageVO()
.setId(message.getQuote().getId())
.setFrom(message.getQuote().getFrom())
.setSenderId(message.getQuote().getSenderId())
.setSenderName(message.getQuote().getSenderName())
.setSenderAvatar(message.getQuote().getSenderAvatar())
.setTicketState(message.getQuote().getTicketState())
.setContent(message.getQuote().getContent())
.setAttachments(message.getQuote().getAttachments())
.setImages(message.getQuote().getImages())
.setCreateTime(formatter.format(message.getQuote().getCreateTime())));
return new SSEMessageDTO()
.setType(1)
.setData(messageVO);
}
}

View File

@ -3,13 +3,17 @@ package com.nflg.mobilebroken.common.pojo.dto;
import lombok.Data;
import lombok.experimental.Accessors;
import javax.validation.constraints.NotNull;
@Data
@Accessors(chain = true)
public class SSEMessageDTO {
//类型1工单会话消息2消息提醒
@NotNull
private int type;
//消息内容
@NotNull
private Object data;
}

View File

@ -0,0 +1,22 @@
package com.nflg.mobilebroken.common.pojo.request;
import com.nflg.mobilebroken.common.pojo.dto.SSEMessageDTO;
import lombok.Data;
import lombok.experimental.Accessors;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
@Data
@Accessors(chain = true)
public class PushRequest {
//用户id
@NotNull
private Integer userId;
//消息
@NotNull
@Valid
private SSEMessageDTO message;
}

View File

@ -0,0 +1,50 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.nflg</groupId>
<artifactId>nflg-mobilebroken</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>nflg-mobilebroken-push</artifactId>
<version>1.0.0-SNAPSHOT</version>
<name>服务-SSE服务</name>
<description>SSE消息推送功能接口</description>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.nflg</groupId>
<artifactId>nflg-mobilebroken-common</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
</dependency>
<dependency>
<groupId>cn.dev33</groupId>
<artifactId>sa-token-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>cn.dev33</groupId>
<artifactId>sa-token-jwt</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,20 @@
package com.nflg.mobilebroken.push;
import cn.dev33.satoken.SaManager;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@Slf4j
@EnableScheduling
@EnableAsync
public class PushApplication {
public static void main(String[] args) {
SpringApplication.run(PushApplication.class, args);
log.info("启动成功Sa-Token 配置如下:" + SaManager.getConfig());
}
}

View File

@ -0,0 +1,67 @@
package com.nflg.mobilebroken.push.advice;
import cn.dev33.satoken.exception.NotLoginException;
import cn.hutool.core.util.StrUtil;
import com.nflg.mobilebroken.common.constant.STATE;
import com.nflg.mobilebroken.common.exception.NflgException;
import com.nflg.mobilebroken.common.pojo.ApiResult;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.validation.FieldError;
import org.springframework.web.bind.MethodArgumentNotValidException;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestControllerAdvice;
import javax.validation.ConstraintViolation;
import javax.validation.ConstraintViolationException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@RestControllerAdvice
@Slf4j
public class GlobalRestControllerAdvice {
@ExceptionHandler(Exception.class)
public ApiResult<Void> handleAllExceptions(Exception ex) {
log.error("服务器内部错误: ", ex);
return ApiResult.error(STATE.BusinessError,"服务器内部错误: " + ex.getMessage());
}
@ExceptionHandler(NflgException.class)
public ResponseEntity<Object> handleNflgException(NflgException ex) {
if (ex.getState() == STATE.LoginError) {
log.error("登录失效: ", ex);
return ResponseEntity.status(HttpStatus.UNAUTHORIZED).body(ex.getMessage());
} else {
log.error("业务错误: ", ex);
return ResponseEntity.ok().body(ApiResult.error(ex.getState(), ex.getMessage()));
}
}
@ExceptionHandler(ConstraintViolationException.class)
public ApiResult<Void> handleConstraintViolationException(ConstraintViolationException ex) {
log.error("数据校验失败: ", ex);
return ApiResult.error(STATE.ParamErr, "数据校验失败: " + StrUtil.join(",", ex.getConstraintViolations().stream().map(ConstraintViolation::getMessage).collect(Collectors.toList())));
}
@ExceptionHandler(MethodArgumentNotValidException.class)
public ApiResult<Void> handleMethodArgumentNotValidException(MethodArgumentNotValidException ex) {
log.error("数据校验失败: ", ex);
List<String> errors = new ArrayList<>();
ex.getBindingResult().getAllErrors().forEach(error -> {
String fieldName = ((FieldError) error).getField();
String errorMessage = error.getDefaultMessage();
errors.add(fieldName + ":" + errorMessage);
});
return ApiResult.error(STATE.ParamErr, "数据校验失败: " + StrUtil.join(",", errors));
}
@ExceptionHandler(NotLoginException.class)
@ResponseStatus(HttpStatus.UNAUTHORIZED) // 返回 401 状态码
public String handleNotLoginException(NotLoginException e) {
return "请重新登录";
}
}

View File

@ -0,0 +1,25 @@
package com.nflg.mobilebroken.push.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
@Configuration
public class CorsConfig {
@Bean
public WebMvcConfigurer corsConfigurer() {
return new WebMvcConfigurer() {
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**") // 允许所有路径
.allowedOrigins("*") // 允许所有来源
.allowedMethods("GET", "POST", "PUT", "DELETE") // 允许的HTTP方法
.allowedHeaders("*") // 允许所有请求头
.allowCredentials(true) // 允许携带凭证如cookies
.maxAge(3600); // 预检请求的缓存时间
}
};
}
}

View File

@ -0,0 +1,23 @@
package com.nflg.mobilebroken.push.config;
import cn.dev33.satoken.jwt.StpLogicJwtForStateless;
import cn.dev33.satoken.strategy.SaAnnotationStrategy;
import com.nflg.mobilebroken.common.util.SaTokenAdminUtil;
import com.nflg.mobilebroken.common.util.SaTokenAppUtil;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.AnnotatedElementUtils;
import javax.annotation.PostConstruct;
@Configuration
public class SaTokenAnnotationConfigure {
@PostConstruct
public void rewriteSaStrategy() {
// 重写Sa-Token的注解处理器增加注解合并功能
SaAnnotationStrategy.instance.getAnnotation = AnnotatedElementUtils::getMergedAnnotation;
//设置jwt模式
SaTokenAppUtil.setStpLogic(new StpLogicJwtForStateless(SaTokenAppUtil.TYPE));
SaTokenAdminUtil.setStpLogic(new StpLogicJwtForStateless(SaTokenAdminUtil.TYPE));
}
}

View File

@ -0,0 +1,76 @@
package com.nflg.mobilebroken.push.controller;
import com.nflg.mobilebroken.common.constant.STATE;
import com.nflg.mobilebroken.common.pojo.ApiResult;
import com.nflg.mobilebroken.common.pojo.request.PushRequest;
import com.nflg.mobilebroken.common.util.AdminUserUtil;
import com.nflg.mobilebroken.common.util.AppUserUtil;
import com.nflg.mobilebroken.push.service.impl.APPSSEManagerService;
import com.nflg.mobilebroken.push.service.impl.AdminSSEManagerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import javax.annotation.Resource;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import java.io.IOException;
@RestController
@Slf4j
@RequestMapping("/sse")
public class SSEController {
@Resource
private APPSSEManagerService appsseManagerService;
@Resource
private AdminSSEManagerService adminSSEManagerService;
/**
* 客户端账号建立sse连接
*/
@GetMapping(value = "app/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter appConnect() {
return appsseManagerService.connect(AppUserUtil.getUserId());
}
/**
* 推送消息到客户端账号
* @param request 请求参数
*/
@PostMapping("app/push")
public ApiResult<Void> pushtToApp(@Valid @RequestBody @NotNull PushRequest request){
try {
appsseManagerService.send(request.getUserId(),request.getMessage());
return ApiResult.success();
} catch (IOException e) {
log.error("发送SSE消息出错", e);
return ApiResult.error(STATE.BusinessError,e.getMessage());
}
}
/**
* 管理端账号建立sse连接
*/
@GetMapping(value = "admin/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter adminConnect() {
return adminSSEManagerService.connect(AdminUserUtil.getUserId());
}
/**
* 推送消息到管理端账号
* @param request 请求参数
*/
@PostMapping("admin/push")
public ApiResult<Void> pushtToAdmin(@Valid @RequestBody @NotNull PushRequest request){
try {
adminSSEManagerService.send(request.getUserId(),request.getMessage());
return ApiResult.success();
} catch (IOException e) {
log.error("发送SSE消息出错", e);
return ApiResult.error(STATE.BusinessError,e.getMessage());
}
}
}

View File

@ -0,0 +1,73 @@
package com.nflg.mobilebroken.push.service;
import com.nflg.mobilebroken.common.constant.STATE;
import com.nflg.mobilebroken.common.util.VUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
@Slf4j
public class SSEManagerBase {
private static boolean IS_SHUTDOWN = false;
protected void check(){
VUtils.trueThrow(IS_SHUTDOWN).throwMessage(STATE.ServiceConnectRefused,"SSE服务已关闭");
}
protected static void shutdown(Map<Integer, SseEmitter> emitters) {
IS_SHUTDOWN=true;
log.warn("准备关闭SSE服务");
emitters.forEach((k,v)->{
try {
v.send("因SSE服务关闭,连接即将断开");
}catch (Exception ex){
log.error("SSE发送消息失败:"+k,ex);
}
v.complete();
});
log.warn("SSE服务已关闭");
}
protected static void close(SseEmitter emitter){
emitter.complete();
}
protected SseEmitter connect(Integer userId, Map<Integer, SseEmitter> emitters) {
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
SseEmitter old=emitters.put(userId, emitter);
if (Objects.nonNull(old)){
log.warn("停止旧连接:"+userId);
try {
old.send(SseEmitter.event().name("被踢下线").data("你已在其他地方连接"));
old.complete();
} catch (Exception e) {
old.completeWithError(e);
}
}
emitter.onError((ex) -> {
emitters.remove(userId);
emitter.complete();
log.error("SSE异常:"+userId, ex);
});
emitter.onTimeout(() -> {
emitters.remove(userId);
emitter.complete();
log.error("SSE超时:"+userId);
});
emitter.onCompletion(() -> {
emitters.remove(userId);
emitter.complete();
log.error("SSE完成:"+userId);
});
try {
emitter.send(SseEmitter.event().data("已连接").reconnectTime(5000));
} catch (IOException e) {
log.error("sse发送数据出错", e);
}
return emitter;
}
}

View File

@ -0,0 +1,20 @@
package com.nflg.mobilebroken.push.service;
import com.nflg.mobilebroken.common.pojo.dto.SSEMessageDTO;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.Collection;
public interface SSEManagerService {
SseEmitter connect(Integer userId);
void send(Integer userId, SSEMessageDTO message) throws IOException;
void close(Integer userId);
void shutdown();
Collection<Integer> getUserIds();
}

View File

@ -0,0 +1,68 @@
package com.nflg.mobilebroken.push.service.impl;
import cn.hutool.core.util.StrUtil;
import com.nflg.mobilebroken.common.pojo.dto.SSEMessageDTO;
import com.nflg.mobilebroken.common.util.VUtils;
import com.nflg.mobilebroken.push.service.SSEManagerBase;
import com.nflg.mobilebroken.push.service.SSEManagerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
@Service
@Slf4j
public class APPSSEManagerService extends SSEManagerBase implements SSEManagerService {
public static final Map<Integer, SseEmitter> EMITTERS = new ConcurrentHashMap<>();
@Override
public SseEmitter connect(Integer userId) {
check();
log.info("APP端SSE已连接:"+userId);
return connect(userId,EMITTERS);
}
@Override
public void send(Integer userId, SSEMessageDTO message) throws IOException {
log.info(StrUtil.format("APP端SSE发送消息,用户id: {},内容: {}", userId, message));
SseEmitter emitter = EMITTERS.get(userId);
VUtils.trueThrowBusinessError(Objects.isNull(emitter)).throwMessage("用户未连接:"+userId);
emitter.send(message);
}
@Override
public void close(Integer userId) {
log.info("APP端SSE连接主动关闭:"+userId);
close(EMITTERS.remove(userId));
}
@Override
public void shutdown() {
shutdown(EMITTERS);
}
@Override
public Collection<Integer> getUserIds() {
return EMITTERS.keySet();
}
@PreDestroy
public void cleanup() {
log.info("释放SSE连接");
for (SseEmitter emitter : EMITTERS.values()) {
try {
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
}
EMITTERS.clear();
}
}

View File

@ -0,0 +1,68 @@
package com.nflg.mobilebroken.push.service.impl;
import cn.hutool.core.util.StrUtil;
import com.nflg.mobilebroken.common.pojo.dto.SSEMessageDTO;
import com.nflg.mobilebroken.common.util.VUtils;
import com.nflg.mobilebroken.push.service.SSEManagerBase;
import com.nflg.mobilebroken.push.service.SSEManagerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
@Service
@Slf4j
public class AdminSSEManagerService extends SSEManagerBase implements SSEManagerService {
public static final Map<Integer, SseEmitter> EMITTERS = new ConcurrentHashMap<>();
@Override
public SseEmitter connect(Integer userId) {
check();
log.info("管理端SSE已连接:"+userId);
return connect(userId,EMITTERS);
}
@Override
public void send(Integer userId, SSEMessageDTO message) throws IOException {
log.info(StrUtil.format("管理端SSE发送消息,用户id: {},内容: {}", userId, message));
SseEmitter emitter = EMITTERS.get(userId);
VUtils.trueThrowBusinessError(Objects.isNull(emitter)).throwMessage("用户未连接:"+userId);
emitter.send(message);
}
@Override
public void close(Integer userId) {
close(EMITTERS.remove(userId));
log.info("管理端SSE连接主动关闭:"+userId);
}
@Override
public void shutdown() {
shutdown(EMITTERS);
}
@Override
public Collection<Integer> getUserIds() {
return EMITTERS.keySet();
}
@PreDestroy
public void cleanup() {
log.info("释放SSE连接");
for (SseEmitter emitter : EMITTERS.values()) {
try {
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
}
EMITTERS.clear();
}
}

View File

@ -0,0 +1,41 @@
package com.nflg.mobilebroken.push.task;
import com.nflg.mobilebroken.push.service.impl.APPSSEManagerService;
import com.nflg.mobilebroken.push.service.impl.AdminSSEManagerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.Iterator;
import java.util.Map;
@Component
@Slf4j
public class SSEScheduledTasks {
/**
* 发送SSE心跳消息
* 每分钟执行一次
*/
@Scheduled(fixedDelay=60000)
public void sendHeart() {
log.info("发送SSE心跳消息开始");
send(APPSSEManagerService.EMITTERS.entrySet().iterator());
send(AdminSSEManagerService.EMITTERS.entrySet().iterator());
log.info("发送SSE心跳消息结束");
}
private void send(Iterator<?> iterator) {
while (iterator.hasNext()) {
SseEmitter emitter = ((Map.Entry<String, SseEmitter>) iterator.next()).getValue();
try {
emitter.send(SseEmitter.event().data("心跳"));
} catch (Exception e) {
log.error("发送心跳消息失败", e);
emitter.complete();
iterator.remove();
}
}
}
}

View File

@ -0,0 +1 @@
logging.config=classpath:logback-sit.xml

View File

@ -0,0 +1,14 @@
spring.application.name=push
spring.profiles.active=dev
server.port=8084
# sa-token??
sa-token.is-log=true
sa-token.token-name=authorization
# token ????????? ??1??-1 ??????
sa-token.timeout=86400
sa-token.is-read-header=true
sa-token.is-read-cookie=false
sa-token.is-read-body=false
sa-token.is-share=true
sa-token.jwt-secret-key=sdadewr23DEWR342D3242c

View File

@ -0,0 +1,71 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="true" scan="false" scanPeriod="30 seconds">
<!--定义日志文件的存储地址 -->
<Property name="logDir" value="./logs" />
<Property name="splitSize" value="50MB" />
<!-- 控制台日志, 控制台输出 -->
<appender name="Console" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder" charset="UTF-8">
<!--格式化输出:%d表示日期%thread表示线程名%-5level级别从左显示5个字符宽度,%logger:显示类名 %msg日志消息%n是换行符-->
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %X{traceId} [%thread] %-5level %logger{10} %msg%n</pattern>
</encoder>
</appender>
<!-- 固定窗口滚动策略的日志输出 -->
<appender name="RollingFileAll" class="ch.qos.logback.core.rolling.RollingFileAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>debug</level>
</filter>
<file>${logDir}/mobilebroken-push.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<!-- 按日期滚动,每天生成一个文件 -->
<fileNamePattern>${logDir}/%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<!-- 保留15天的日志 -->
<maxHistory>15</maxHistory>
<!-- 启动时,是否删除旧的日志文件 -->
<cleanHistoryOnStart>true</cleanHistoryOnStart>
<!-- 单天单个日志最大size -->
<maxFileSize>${splitSize}</maxFileSize>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder" charset="UTF-8">
<!--格式化输出:%d表示日期%thread表示线程名%-5level级别从左显示5个字符宽度%logger:显示类名 %msg日志消息%n是换行符-->
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %X{traceId} [%thread] %-5level %logger{50} %msg%n</pattern>
</encoder>
</appender>
<!-- error日志 -->
<appender name="ErrorFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>error</level>
</filter>
<file>${logDir}/error/mobilebroken-push.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<!-- 按日期滚动,每天生成一个文件 -->
<fileNamePattern>${logDir}/error/%d{yyyy-MM-dd}.%i.error.log</fileNamePattern>
<!-- 保留15天的日志 -->
<maxHistory>30</maxHistory>
<!-- 启动时,是否删除旧的日志文件 -->
<cleanHistoryOnStart>true</cleanHistoryOnStart>
<!-- 单天单个日志最大size -->
<maxFileSize>${splitSize}</maxFileSize>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder" charset="UTF-8">
<!--格式化输出:%d表示日期%thread表示线程名%-5level级别从左显示5个字符宽度%logger:显示类名 %msg日志消息%n是换行符-->
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %X{traceId} [%thread] %-5level %logger{50} %msg%n</pattern>
</encoder>
</appender>
<logger name="com.nflg" level="DEBUG"/>
<!-- 日志输出级别 -->
<root level="ERROR">
<appender-ref ref="ErrorFile" />
</root>
<root level="DEBUG">
<appender-ref ref="Console" />
<appender-ref ref="RollingFileAll" />
</root>
</configuration>

View File

@ -13,6 +13,9 @@
select *
from t_base_department
where data_valid_status=1
<if test="query.deptStatus==null and (query.deptCodeOrName==null || query.deptCodeOrName=='')">
and dept_parent_id=0
</if>
<include refid="whr"/>
</select>

View File

@ -20,6 +20,7 @@
<module>nflg-mobilebroken-gateway</module>
<module>nflg-mobilebroken-repository</module>
<module>nflg-mobilebroken-auth</module>
<module>nflg-mobilebroken-push</module>
</modules>
<properties>
<java.version>11</java.version>