feat: sse服务优化

This commit is contained in:
曹鹏飞 2025-02-26 20:22:19 +08:00
parent 02667a58be
commit 4dfe32bff8
25 changed files with 262 additions and 275 deletions

View File

@ -210,7 +210,7 @@ public class TicketController extends ControllerBase {
.setCreateTime(Instant.now());
ticketChatService.addMessage(ticket.getId(), message);
//推送消息
ssePushService.sendTicketMessageToApp(ticket.getUserId(),message);
ssePushService.sendTicketMessageToApp(ticket.getId(),message);
return ApiResult.success();
}
@ -226,15 +226,20 @@ public class TicketController extends ControllerBase {
}
/**
* 添加工单处理人
* 添加/删除工单处理人
* @param request 请求参数
*/
@PostMapping("addTicketHandle")
@MethodInfoMark(value = "添加工单处理人", menuName = "工单管理")
@ApiMark(moduleName = "工单管理", apiName = "添加工单处理人")
public ApiResult<Void> addTicketHandle(@Valid @RequestBody TicketHandleAddRequest request){
Ticket ticket=ticketService.addTicketHandle(request);
ticketEventPublisher.publishTicketAssignedEvent(ticket,request.getUserIds());
Ticket ticket=ticketService.getById(request.getTicketId());
List<Integer> handleIds= Arrays.stream(ticket.getHandle().split(",")).map(Integer::parseInt).collect(Collectors.toList());
Ticket ticket1=ticketService.addTicketHandle(request);
request.getUserIds().removeAll(handleIds);
if(CollectionUtil.isNotEmpty(request.getUserIds())) {
ticketEventPublisher.publishTicketAssignedEvent(ticket1, request.getUserIds());
}
return ApiResult.success();
}
@ -259,13 +264,7 @@ public class TicketController extends ControllerBase {
.setCreateTime(Instant.now());
ticketChatService.addMessage(id, message);
//推送消息
ssePushService.sendTicketMessageToApp(ticket.getUserId(),message);
List<Integer> handles = Arrays.stream(ticket.getHandle().split(","))
.map(Integer::parseInt).collect(Collectors.toList());
handles.remove(AdminUserUtil.getUserId());
if (CollectionUtil.isNotEmpty(handles)){
handles.forEach(uid->ssePushService.sendTicketMessageToAdmin(uid,message));
}
ssePushService.sendTicketMessageToAdmin(id,message);
}
return ApiResult.success();
}
@ -290,13 +289,7 @@ public class TicketController extends ControllerBase {
.setCreateTime(Instant.now());
ticketChatService.addMessage(request.getTicketId(), message);
//推送消息
ssePushService.sendTicketMessageToApp(ticket.getUserId(),message);
List<Integer> handles = Arrays.stream(ticket.getHandle().split(","))
.map(Integer::parseInt).collect(Collectors.toList());
handles.remove(AdminUserUtil.getUserId());
if (CollectionUtil.isNotEmpty(handles)){
handles.forEach(uid->ssePushService.sendTicketMessageToAdmin(uid,message));
}
ssePushService.sendTicketMessageToAdmin(ticket.getId(),message);
return ApiResult.success();
}
@ -556,11 +549,7 @@ public class TicketController extends ControllerBase {
}
ticketChatService.addMessage(request.getTicketId(), message);
//推送消息
ssePushService.sendTicketMessageToApp(ticket.getUserId(),message);
handles.remove(AdminUserUtil.getUserId());
if (CollectionUtil.isNotEmpty(handles)){
handles.forEach(uid->ssePushService.sendTicketMessageToAdmin(uid,message));
}
ssePushService.sendTicketMessageToAdmin(request.getTicketId(),message);
ticketEventPublisher.publishTicketReplyEvent(ticket);
return ApiResult.success();
}

View File

@ -4,7 +4,6 @@ import cn.hutool.core.date.DatePattern;
import cn.hutool.json.JSONUtil;
import com.nflg.mobilebroken.common.pojo.ApiResult;
import com.nflg.mobilebroken.common.pojo.dto.ChatMessageDTO;
import com.nflg.mobilebroken.common.pojo.dto.SSEMessageDTO;
import com.nflg.mobilebroken.common.pojo.request.PushRequest;
import com.nflg.mobilebroken.common.pojo.vo.ChatMessageVO;
import com.nflg.mobilebroken.common.util.MultilingualUtil;
@ -27,9 +26,9 @@ public class SsePushService {
@Value("${sse.url}")
private String sseUrl;
public void sendTicketMessageToAdmin(Integer userId, ChatMessageDTO message){
public void sendTicketMessageToAdmin(Integer ticketId, ChatMessageDTO message){
try {
PushRequest request=new PushRequest().setUserId(userId).setMessage(buildMessage(message));
PushRequest request=new PushRequest().setTicketId(ticketId).setMessage(buildMessage(message));
RestTemplate restTemplate = new RestTemplate();
ResponseEntity<ApiResult> response = restTemplate.postForEntity(sseUrl+"/sse/admin/push",request, ApiResult.class);
log.debug("发送消息结果:{}", JSONUtil.toJsonStr(response.getBody()));
@ -38,9 +37,9 @@ public class SsePushService {
}
}
public void sendTicketMessageToApp(Integer userId, ChatMessageDTO message){
public void sendTicketMessageToApp(Integer ticketId, ChatMessageDTO message){
try {
PushRequest request=new PushRequest().setUserId(userId).setMessage(buildMessage(message));
PushRequest request=new PushRequest().setTicketId(ticketId).setMessage(buildMessage(message));
RestTemplate restTemplate = new RestTemplate();
ResponseEntity<ApiResult> response = restTemplate.postForEntity(sseUrl+"/sse/app/push",request, ApiResult.class);
log.debug("发送消息结果:{}", JSONUtil.toJsonStr(response.getBody()));
@ -49,11 +48,11 @@ public class SsePushService {
}
}
private SSEMessageDTO buildMessage(ChatMessageDTO message){
private ChatMessageVO buildMessage(ChatMessageDTO message){
String zone = MultilingualUtil.getZone();
ZoneId zoneId = ZoneId.of(zone);
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN).withZone(zoneId);
ChatMessageVO messageVO = new ChatMessageVO()
return new ChatMessageVO()
.setId(message.getId())
.setFrom(message.getFrom())
.setSenderId(message.getSenderId())
@ -75,8 +74,5 @@ public class SsePushService {
.setAttachments(message.getQuote().getAttachments())
.setImages(message.getQuote().getImages())
.setCreateTime(formatter.format(message.getQuote().getCreateTime())));
return new SSEMessageDTO()
.setType(1)
.setData(messageVO);
}
}

View File

@ -1,8 +1,6 @@
package com.nflg.mobilebroken.admin.service.impl;
import com.nflg.mobilebroken.common.constant.Constant;
import com.nflg.mobilebroken.common.pojo.dto.NotifyDTO;
import com.nflg.mobilebroken.common.pojo.dto.SSEMessageDTO;
import com.nflg.mobilebroken.common.pojo.dto.UserDTO;
import com.nflg.mobilebroken.starter.service.INotifyPushService;
import com.nflg.mobilebroken.starter.service.SSEManagerService;
@ -12,7 +10,6 @@ import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.Objects;
@Service
@ -28,14 +25,14 @@ public class SSEINotifyPushService implements INotifyPushService {
@Override
public void push(UserDTO user, String subject, String content) {
try {
SSEMessageDTO message = new SSEMessageDTO()
.setType(2)
.setData(new NotifyDTO().setSubject(subject).setContent(content));
sseManagerService.send(user.getId(), message);
} catch (IOException e) {
log.error("发送SSE失败", e);
}
// try {
// SSEMessageDTO message = new SSEMessageDTO()
// .setType(2)
// .setData(new NotifyDTO().setSubject(subject).setContent(content));
// sseManagerService.send(user.getId(), message);
// } catch (IOException e) {
// log.error("发送SSE失败", e);
// }
}
@Override

View File

@ -2,8 +2,6 @@ package com.nflg.mobilebroken.cfs.controller;
import com.nflg.mobilebroken.cfs.publisher.TicketEventPublisher;
import com.nflg.mobilebroken.common.pojo.ApiResult;
import com.nflg.mobilebroken.common.pojo.dto.NotifyDTO;
import com.nflg.mobilebroken.common.pojo.dto.SSEMessageDTO;
import com.nflg.mobilebroken.common.util.MultilingualUtil;
import com.nflg.mobilebroken.repository.entity.Ticket;
import com.nflg.mobilebroken.repository.service.ITicketService;
@ -11,11 +9,9 @@ import com.nflg.mobilebroken.starter.service.impl.APPSSEManagerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.io.IOException;
@RestController
@Slf4j
@ -31,14 +27,14 @@ public class TestController extends ControllerBase {
@Resource
private ITicketService ticketService;
@GetMapping("sse/send")
public ApiResult<Void> sendSse(@RequestParam String userId, @RequestParam String message) throws IOException {
SSEMessageDTO messageDTO = new SSEMessageDTO()
.setType(2)
.setData(new NotifyDTO().setSubject("消息测试").setContent("消息内容"));
sseManagerService.send(Integer.valueOf(userId), messageDTO);
return ApiResult.success();
}
// @GetMapping("sse/send")
// public ApiResult<Void> sendSse(@RequestParam String userId, @RequestParam String message) throws IOException {
// SSEMessageDTO messageDTO = new SSEMessageDTO()
// .setType(2)
// .setData(new NotifyDTO().setSubject("消息测试").setContent("消息内容"));
// sseManagerService.send(Integer.valueOf(userId), messageDTO);
// return ApiResult.success();
// }
@GetMapping("sss")
public ApiResult sss(){

View File

@ -331,11 +331,7 @@ public class TiketController extends ControllerBase {
}
ticketChatService.addMessage(request.getTicketId(), message);
//推送消息
List<Integer> handles=Arrays.stream(ticket.getHandle().split(","))
.map(Integer::parseInt).collect(Collectors.toList());
if (CollectionUtil.isNotEmpty(handles)){
handles.forEach(uid->ssePushService.sendTicketMessageToAdmin(uid,message));
}
ssePushService.sendTicketMessageToAdmin(ticket.getId(),message);
ticketEventPublisher.publishTicketReplyEvent(ticket, MultilingualUtil.getLanguage(), MultilingualUtil.getZone());
return ApiResult.success();
}

View File

@ -4,7 +4,6 @@ import cn.hutool.core.date.DatePattern;
import cn.hutool.json.JSONUtil;
import com.nflg.mobilebroken.common.pojo.ApiResult;
import com.nflg.mobilebroken.common.pojo.dto.ChatMessageDTO;
import com.nflg.mobilebroken.common.pojo.dto.SSEMessageDTO;
import com.nflg.mobilebroken.common.pojo.request.PushRequest;
import com.nflg.mobilebroken.common.pojo.vo.ChatMessageVO;
import com.nflg.mobilebroken.common.util.MultilingualUtil;
@ -27,9 +26,9 @@ public class SsePushService {
@Value("${sse.url}")
private String sseUrl;
public void sendTicketMessageToAdmin(Integer userId, ChatMessageDTO message){
public void sendTicketMessageToAdmin(Integer ticketId, ChatMessageDTO message){
try {
PushRequest request=new PushRequest().setUserId(userId).setMessage(buildMessage(message));
PushRequest request=new PushRequest().setTicketId(ticketId).setMessage(buildMessage(message));
RestTemplate restTemplate = new RestTemplate();
ResponseEntity<ApiResult> response = restTemplate.postForEntity(sseUrl+"/sse/admin/push",request, ApiResult.class);
log.debug("发送消息结果:{}", JSONUtil.toJsonStr(response.getBody()));
@ -38,11 +37,11 @@ public class SsePushService {
}
}
private SSEMessageDTO buildMessage(ChatMessageDTO message){
private ChatMessageVO buildMessage(ChatMessageDTO message){
String zone = MultilingualUtil.getZone();
ZoneId zoneId = ZoneId.of(zone);
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(DatePattern.NORM_DATETIME_PATTERN).withZone(zoneId);
ChatMessageVO messageVO = new ChatMessageVO()
return new ChatMessageVO()
.setId(message.getId())
.setFrom(message.getFrom())
.setSenderId(message.getSenderId())
@ -64,8 +63,5 @@ public class SsePushService {
.setAttachments(message.getQuote().getAttachments())
.setImages(message.getQuote().getImages())
.setCreateTime(formatter.format(message.getQuote().getCreateTime())));
return new SSEMessageDTO()
.setType(1)
.setData(messageVO);
}
}

View File

@ -1,5 +1,7 @@
package com.nflg.mobilebroken.common.constant;
import java.util.List;
public class Constant {
public static final String DEFAULT_LANGUAGE_CODE="cn";
@ -101,4 +103,6 @@ public class Constant {
public static final String DICTIONARY_ITEM_ACCOUNT_HAS_EXPIRED_PRIMARY="AccountHasExpiredPrimary";
public static final String DICTIONARY_ITEM_ACCOUNT_HAS_EXPIRED="AccountHasExpired";
public static final List<String> ROLE_CODE_TICKET_MANAGERS = List.of(TITLE_DIRECTOROF_BUSINESS_UNIT,TITLE_TECHNICAL_MANAGER,TITLE_SALES_MANAGER,TITLE_TEST_MANAGER,TITLE_QUALITY_MANAGER,DICTIONARY_TYPE_TITLE_CQM);
}

View File

@ -9,10 +9,6 @@ import javax.validation.constraints.NotNull;
@Accessors(chain = true)
public class SSEMessageDTO {
//类型1工单会话消息2消息提醒
@NotNull
private int type;
//消息内容
@NotNull
private Object data;

View File

@ -1,6 +1,6 @@
package com.nflg.mobilebroken.common.pojo.request;
import com.nflg.mobilebroken.common.pojo.dto.SSEMessageDTO;
import com.nflg.mobilebroken.common.pojo.vo.ChatMessageVO;
import lombok.Data;
import lombok.experimental.Accessors;
@ -11,12 +11,12 @@ import javax.validation.constraints.NotNull;
@Accessors(chain = true)
public class PushRequest {
//用户id
//工单id
@NotNull
private Integer userId;
private Integer ticketId;
//消息
@NotNull
@Valid
private SSEMessageDTO message;
private ChatMessageVO message;
}

View File

@ -27,9 +27,9 @@
<artifactId>spring-webmvc</artifactId>
</dependency>
<dependency>
<groupId>cn.dev33</groupId>
<artifactId>sa-token-spring-boot-starter</artifactId>
</dependency>
<groupId>cn.dev33</groupId>
<artifactId>sa-token-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>cn.dev33</groupId>
<artifactId>sa-token-jwt</artifactId>

View File

@ -0,0 +1,20 @@
package com.nflg.mobilebroken.push.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class ThreadPoolConfig {
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(25);
executor.setThreadNamePrefix("SseHeartbeat-");
executor.initialize();
return executor;
}
}

View File

@ -0,0 +1,55 @@
package com.nflg.mobilebroken.push.controller;
import com.nflg.mobilebroken.push.service.impl.APPSSEManagerService;
import com.nflg.mobilebroken.push.service.impl.AdminSSEManagerService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 统计分析相关
*/
@RestController
@RequestMapping("/analysis")
public class AnalysisController{
@Resource
private APPSSEManagerService appSSEManagerService;
@Resource
private AdminSSEManagerService adminSSEManagerService;
/**
* 获取当前已连接SSE的客户端列表
* @return 当前已连接SSE的客户端列表
*/
@GetMapping("getAppSSEConnects")
public Map<Integer,Integer> getAppSSEConnects(){
Map<Integer, List<SseEmitter>> map= appSSEManagerService.getMap();
Map<Integer,Integer> countMap=new HashMap<>();
map.forEach((k,v)->{
countMap.put(k,v.size());
});
return countMap;
}
/**
* 获取当前已连接SSE的管理端列表
* @return 当前已连接SSE的管理端列表
*/
@GetMapping("getAdminSSEConnects")
public Map<Integer,Integer> getAdminSSEConnects(){
Map<Integer, List<SseEmitter>> map= adminSSEManagerService.getMap();
Map<Integer,Integer> countMap=new HashMap<>();
map.forEach((k,v)->{
countMap.put(k,v.size());
});
return countMap;
}
}

View File

@ -32,8 +32,8 @@ public class SSEController {
* 客户端账号建立sse连接
*/
@GetMapping(value = "app/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter appConnect() {
return appsseManagerService.connect(AppUserUtil.getUserId());
public SseEmitter appConnect(@Valid @RequestParam @NotNull Integer ticketId) {
return appsseManagerService.connect(ticketId,AppUserUtil.getUserId());
}
/**
@ -43,7 +43,7 @@ public class SSEController {
@PostMapping("app/push")
public ApiResult<Void> pushtToApp(@Valid @RequestBody @NotNull PushRequest request){
try {
appsseManagerService.send(request.getUserId(),request.getMessage());
appsseManagerService.send(request.getTicketId(),request.getMessage());
return ApiResult.success();
} catch (IOException e) {
log.error("发送SSE消息出错", e);
@ -55,8 +55,8 @@ public class SSEController {
* 管理端账号建立sse连接
*/
@GetMapping(value = "admin/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter adminConnect() {
return adminSSEManagerService.connect(AdminUserUtil.getUserId());
public SseEmitter adminConnect(@Valid @RequestParam @NotNull Integer ticketId) {
return adminSSEManagerService.connect(ticketId,AdminUserUtil.getUserId());
}
/**
@ -66,7 +66,7 @@ public class SSEController {
@PostMapping("admin/push")
public ApiResult<Void> pushtToAdmin(@Valid @RequestBody @NotNull PushRequest request){
try {
adminSSEManagerService.send(request.getUserId(),request.getMessage());
adminSSEManagerService.send(request.getTicketId(),request.getMessage());
return ApiResult.success();
} catch (IOException e) {
log.error("发送SSE消息出错", e);

View File

@ -1,73 +1,114 @@
package com.nflg.mobilebroken.push.service;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.StrUtil;
import com.nflg.mobilebroken.common.constant.STATE;
import com.nflg.mobilebroken.common.pojo.vo.ChatMessageVO;
import com.nflg.mobilebroken.common.util.VUtils;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.stream.Collectors;
@Slf4j
public class SSEManagerBase {
@Resource
private TaskScheduler taskScheduler;
private static boolean IS_SHUTDOWN = false;
@Getter
protected final Map<Integer, List<SseEmitter>> map = new ConcurrentHashMap<>();
protected String from;
protected void check(){
VUtils.trueThrow(IS_SHUTDOWN).throwMessage(STATE.ServiceConnectRefused,"SSE服务已关闭");
}
protected static void shutdown(Map<Integer, SseEmitter> emitters) {
@PreDestroy
protected void shutdown() {
IS_SHUTDOWN=true;
log.warn("准备关闭SSE服务");
emitters.forEach((k,v)->{
map.values().stream().flatMap(List::stream).collect(Collectors.toList()).forEach(emitter->{
try {
v.send("因SSE服务关闭,连接即将断开");
emitter.send("因SSE服务关闭,连接即将断开");
emitter.complete();
}catch (Exception ex){
log.error("SSE发送消息失败:"+k,ex);
log.error("SSE发送消息失败",ex);
emitter.completeWithError(ex);
}
v.complete();
});
log.warn("SSE服务已关闭");
}
protected static void close(SseEmitter emitter){
emitter.complete();
}
protected SseEmitter connect(Integer userId, Map<Integer, SseEmitter> emitters) {
protected SseEmitter connect(Integer ticketId,Integer userId) {
check();
log.info(from+"SSE连接:工单id:"+ticketId+",用户id:"+userId);
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
SseEmitter old=emitters.put(userId, emitter);
if (Objects.nonNull(old)){
log.warn("停止旧连接:"+userId);
try {
old.send(SseEmitter.event().name("被踢下线").data("你已在其他地方连接"));
old.complete();
} catch (Exception e) {
old.completeWithError(e);
}
}
List<SseEmitter> emitters=map.getOrDefault(userId, Collections.synchronizedList(new ArrayList<>()));
emitters.add(emitter);
ScheduledFuture<?> heartbeatTask = startHeartbeat(emitter);
emitter.onError((ex) -> {
emitters.remove(userId);
emitter.complete();
remove(userId, emitters, emitter,heartbeatTask);
log.error("SSE异常:"+userId, ex);
});
emitter.onTimeout(() -> {
emitters.remove(userId);
emitter.complete();
remove(userId, emitters, emitter,heartbeatTask);
log.error("SSE超时:"+userId);
});
emitter.onCompletion(() -> {
emitters.remove(userId);
emitter.complete();
remove(userId, emitters, emitter,heartbeatTask);
log.error("SSE完成:"+userId);
});
try {
emitter.send(SseEmitter.event().data("已连接").reconnectTime(5000));
emitter.send(SseEmitter.event().comment("已连接").reconnectTime(5000));
} catch (IOException e) {
log.error("sse发送数据出错", e);
emitter.completeWithError(e);
}
return emitter;
}
protected void send(Integer ticketId, ChatMessageVO message) throws IOException {
log.info(StrUtil.format(from+"SSE发送消息,工单id: {},内容: {}", ticketId, message));
List<SseEmitter> emitters = map.get(ticketId);
VUtils.trueThrowBusinessError(Objects.isNull(emitters)).throwMessage("没有用户连接工单:"+ticketId);
emitters.forEach(emitter-> {
try {
emitter.send(SseEmitter.event().name("ticketMessage").data(message));
} catch (IOException e) {
log.error("sse发送数据出错", e);
emitter.completeWithError(e);
}
});
}
private void remove(Integer userId,List<SseEmitter> emitters,SseEmitter emitter,ScheduledFuture<?> heartbeatTask){
heartbeatTask.cancel(false);
emitters.remove(emitter);
if (CollectionUtil.isEmpty(emitters)){
map.remove(userId);
}
emitter.complete();
}
private ScheduledFuture<?> startHeartbeat(SseEmitter emitter) {
return taskScheduler.scheduleAtFixedRate(() -> {
try {
emitter.send(SseEmitter.event().comment("ping"));
} catch (IOException e) {
emitter.completeWithError(e);
}
}, 60_000);
}
}

View File

@ -1,20 +1,13 @@
package com.nflg.mobilebroken.push.service;
import com.nflg.mobilebroken.common.pojo.dto.SSEMessageDTO;
import com.nflg.mobilebroken.common.pojo.vo.ChatMessageVO;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.Collection;
public interface SSEManagerService {
SseEmitter connect(Integer userId);
SseEmitter connect(Integer ticketId,Integer userId);
void send(Integer userId, SSEMessageDTO message) throws IOException;
void close(Integer userId);
void shutdown();
Collection<Integer> getUserIds();
void send(Integer ticketId, ChatMessageVO message) throws IOException;
}

View File

@ -1,68 +1,31 @@
package com.nflg.mobilebroken.push.service.impl;
import cn.hutool.core.util.StrUtil;
import com.nflg.mobilebroken.common.pojo.dto.SSEMessageDTO;
import com.nflg.mobilebroken.common.util.VUtils;
import com.nflg.mobilebroken.common.pojo.vo.ChatMessageVO;
import com.nflg.mobilebroken.push.service.SSEManagerBase;
import com.nflg.mobilebroken.push.service.SSEManagerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import javax.annotation.PreDestroy;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
@Service
@Slf4j
public class APPSSEManagerService extends SSEManagerBase implements SSEManagerService {
public static final Map<Integer, SseEmitter> EMITTERS = new ConcurrentHashMap<>();
@Override
public SseEmitter connect(Integer userId) {
check();
log.info("APP端SSE已连接:"+userId);
return connect(userId,EMITTERS);
@PostConstruct
public void init() {
from="APP端";
}
@Override
public void send(Integer userId, SSEMessageDTO message) throws IOException {
log.info(StrUtil.format("APP端SSE发送消息,用户id: {},内容: {}", userId, message));
SseEmitter emitter = EMITTERS.get(userId);
VUtils.trueThrowBusinessError(Objects.isNull(emitter)).throwMessage("用户未连接:"+userId);
emitter.send(message);
public SseEmitter connect(Integer ticketId,Integer userId) {
return super.connect(ticketId,userId);
}
@Override
public void close(Integer userId) {
log.info("APP端SSE连接主动关闭:"+userId);
close(EMITTERS.remove(userId));
}
@Override
public void shutdown() {
shutdown(EMITTERS);
}
@Override
public Collection<Integer> getUserIds() {
return EMITTERS.keySet();
}
@PreDestroy
public void cleanup() {
log.info("释放SSE连接");
for (SseEmitter emitter : EMITTERS.values()) {
try {
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
}
EMITTERS.clear();
public void send(Integer ticketId, ChatMessageVO message) throws IOException {
super.send(ticketId, message);
}
}

View File

@ -1,68 +1,31 @@
package com.nflg.mobilebroken.push.service.impl;
import cn.hutool.core.util.StrUtil;
import com.nflg.mobilebroken.common.pojo.dto.SSEMessageDTO;
import com.nflg.mobilebroken.common.util.VUtils;
import com.nflg.mobilebroken.common.pojo.vo.ChatMessageVO;
import com.nflg.mobilebroken.push.service.SSEManagerBase;
import com.nflg.mobilebroken.push.service.SSEManagerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import javax.annotation.PreDestroy;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
@Service
@Slf4j
public class AdminSSEManagerService extends SSEManagerBase implements SSEManagerService {
public static final Map<Integer, SseEmitter> EMITTERS = new ConcurrentHashMap<>();
@Override
public SseEmitter connect(Integer userId) {
check();
log.info("管理端SSE已连接:"+userId);
return connect(userId,EMITTERS);
@PostConstruct
public void init() {
from="管理端";
}
@Override
public void send(Integer userId, SSEMessageDTO message) throws IOException {
log.info(StrUtil.format("管理端SSE发送消息,用户id: {},内容: {}", userId, message));
SseEmitter emitter = EMITTERS.get(userId);
VUtils.trueThrowBusinessError(Objects.isNull(emitter)).throwMessage("用户未连接:"+userId);
emitter.send(message);
public SseEmitter connect(Integer ticketId,Integer userId) {
return super.connect(ticketId,userId);
}
@Override
public void close(Integer userId) {
close(EMITTERS.remove(userId));
log.info("管理端SSE连接主动关闭:"+userId);
}
@Override
public void shutdown() {
shutdown(EMITTERS);
}
@Override
public Collection<Integer> getUserIds() {
return EMITTERS.keySet();
}
@PreDestroy
public void cleanup() {
log.info("释放SSE连接");
for (SseEmitter emitter : EMITTERS.values()) {
try {
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
}
EMITTERS.clear();
public void send(Integer ticketId, ChatMessageVO message) throws IOException {
super.send(ticketId, message);
}
}

View File

@ -1,41 +0,0 @@
package com.nflg.mobilebroken.push.task;
import com.nflg.mobilebroken.push.service.impl.APPSSEManagerService;
import com.nflg.mobilebroken.push.service.impl.AdminSSEManagerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.Iterator;
import java.util.Map;
@Component
@Slf4j
public class SSEScheduledTasks {
/**
* 发送SSE心跳消息
* 每分钟执行一次
*/
@Scheduled(fixedDelay=60000)
public void sendHeart() {
log.info("发送SSE心跳消息开始");
send(APPSSEManagerService.EMITTERS.entrySet().iterator());
send(AdminSSEManagerService.EMITTERS.entrySet().iterator());
log.info("发送SSE心跳消息结束");
}
private void send(Iterator<?> iterator) {
while (iterator.hasNext()) {
SseEmitter emitter = ((Map.Entry<String, SseEmitter>) iterator.next()).getValue();
try {
emitter.send(SseEmitter.event().data("心跳"));
} catch (Exception e) {
log.error("发送心跳消息失败", e);
emitter.complete();
iterator.remove();
}
}
}
}

View File

@ -2,6 +2,9 @@ spring.application.name=push
spring.profiles.active=dev
server.port=8084
server.tomcat.threads.max=1000
server.tomcat.max-connections=1000
# sa-token??
sa-token.is-log=true
sa-token.token-name=authorization

View File

@ -19,4 +19,6 @@ public interface AdminUserMapper extends BaseMapper<AdminUser> {
List<AdminUser> getByRoleCode(String roleCode);
List<AdminUserSimpleVO> getSimples(List<Integer> userIds);
List<Integer> getTickerMangagers(List<String> titleCodes);
}

View File

@ -50,4 +50,6 @@ public interface IAdminUserService extends IService<AdminUser> {
List<AdminUserSimpleVO> getSimples(List<Integer> userIds);
void deleteAccount(Integer id);
List<Integer> getTickerMangagers();
}

View File

@ -301,6 +301,11 @@ public class AdminUserServiceImpl extends ServiceImpl<AdminUserMapper, AdminUser
.update();
}
@Override
public List<Integer> getTickerMangagers() {
return baseMapper.getTickerMangagers(Constant.ROLE_CODE_TICKET_MANAGERS);
}
private String getDepartmentName(Long departmentId) {
TBaseDepartment department = departmentService.lambdaQuery()
.eq(TBaseDepartment::getId, departmentId)

View File

@ -113,9 +113,9 @@ public class TicketServiceImpl extends ServiceImpl<TicketMapper, Ticket> impleme
Ticket ticket = getById(request.getTicketId());
VUtils.trueThrowBusinessError(Objects.isNull(ticket)).throwMessage("未找到工单");
VUtils.trueThrowBusinessError(!Objects.equals(ticket.getState(), TicketState.PendingProcessing.getState())).throwMessage("工单状态异常");
VUtils.trueThrowBusinessError(adminUserService.getCQM().stream()
.noneMatch(u -> Objects.equals(u.getId(), AdminUserUtil.getUserId())))
.throwMessage("不是CQM无权分派工单");
List<Integer> tickerMangagers = adminUserService.getTickerMangagers();
VUtils.trueThrowBusinessError(tickerMangagers.stream().noneMatch(uid -> Objects.equals(uid, AdminUserUtil.getUserId())))
.throwMessage("无权分派工单");
ticket.setUrgency(TicketUrgency.findByValue(request.getUrgency()).getState());
ticket.setQuestion(request.getQuestion());
ticket.setState(TicketState.Processing.getState());
@ -166,9 +166,10 @@ public class TicketServiceImpl extends ServiceImpl<TicketMapper, Ticket> impleme
Ticket ticket=getById(id);
VUtils.trueThrowBusinessError(!Objects.equals(ticket.getState(), TicketState.Processing.getState()))
.throwMessage("工单状态不允许完成");
VUtils.trueThrowBusinessError(Arrays.stream(ticket.getHandle().split(","))
.noneMatch(uid->StrUtil.equals(uid, AdminUserUtil.getUserId().toString())))
.throwMessage("你无权操作该工单");
List<Integer> tickerMangagers = adminUserService.getTickerMangagers();
tickerMangagers.addAll(Arrays.stream(ticket.getHandle().split(",")).map(Integer::parseInt).collect(Collectors.toList()));
VUtils.trueThrowBusinessError(tickerMangagers.stream().noneMatch(uid -> Objects.equals(uid, AdminUserUtil.getUserId())))
.throwMessage("你无权添加处理人");
ticket.setState(TicketState.ProcessingCompleted.getState());
ticket.setCurrentHandle(AdminUserUtil.getUserId());
ticket.setUpdateTime(LocalDateTime.now());
@ -186,9 +187,10 @@ public class TicketServiceImpl extends ServiceImpl<TicketMapper, Ticket> impleme
.eq(TicketEvaluate::getTicketId, request.getTicketId())
.exists())
.throwMessage("工单尚未评价,不能关闭");
VUtils.trueThrowBusinessError(adminUserService.getCQM().stream()
.noneMatch(u -> Objects.equals(u.getId(), AdminUserUtil.getUserId())))
.throwMessage("你不是CQM无权关闭工单");
List<Integer> tickerMangagers = adminUserService.getTickerMangagers();
tickerMangagers.addAll(Arrays.stream(ticket.getHandle().split(",")).map(Integer::parseInt).collect(Collectors.toList()));
VUtils.trueThrowBusinessError(tickerMangagers.stream().noneMatch(uid -> Objects.equals(uid, AdminUserUtil.getUserId())))
.throwMessage("你无权关闭工单");
ticket.setState(TicketState.Closed.getState());
ticket.setSolution(request.getSolution());
ticket.setSolutionAttachments(StrUtil.join(",", request.getAttachments()));
@ -268,13 +270,12 @@ public class TicketServiceImpl extends ServiceImpl<TicketMapper, Ticket> impleme
Ticket ticket = getById(request.getTicketId());
VUtils.trueThrowBusinessError(Objects.isNull(ticket)).throwMessage("未找到工单");
VUtils.trueThrowBusinessError(!Objects.equals(ticket.getState(), TicketState.Processing.getState())).throwMessage("工单状态异常");
VUtils.trueThrowBusinessError(!Objects.equals(ticket.getCqm(), AdminUserUtil.getUserId())
&& Arrays.stream(ticket.getHandle().split(","))
.map(Integer::parseInt)
.noneMatch(uid -> Objects.equals(uid, AdminUserUtil.getUserId())))
List<Integer> tickerMangagers = adminUserService.getTickerMangagers();
tickerMangagers.addAll(Arrays.stream(ticket.getHandle().split(",")).map(Integer::parseInt).collect(Collectors.toList()));
VUtils.trueThrowBusinessError(tickerMangagers.stream().noneMatch(uid -> Objects.equals(uid, AdminUserUtil.getUserId())))
.throwMessage("你无权添加处理人");
ticket.setHandle(ticket.getHandle().concat(",").concat(StrUtil.join(",", request.getUserIds())));
ticket.setHandleName(ticket.getHandleName().concat(",").concat(StrUtil.join(",", adminUserService.listByIds(request.getUserIds()).stream().map(AdminUser::getUserName).collect(Collectors.toList()))));
ticket.setHandle(StrUtil.join(",", request.getUserIds()));
ticket.setHandleName(StrUtil.join(",", adminUserService.listByIds(request.getUserIds()).stream().map(AdminUser::getUserName).collect(Collectors.toList())));
ticket.setUpdateTime(LocalDateTime.now());
updateById(ticket);
return ticket;

View File

@ -19,4 +19,14 @@
#{userId}
</foreach>
</select>
<select id="getTickerMangagers" resultType="java.lang.Integer">
SELECT au.id
FROM t_base_position p
INNER JOIN admin_user au ON au.title_id=p.id
WHERE p.position_code IN
<foreach collection="titleCodes" item="titleCode" separator="," open="(" close=")">
#{titleCode}
</foreach>
</select>
</mapper>

View File

@ -93,7 +93,7 @@
LEFT JOIN app_user u ON t.user_id=u.id
LEFT JOIN t_base_area a1 ON u.area_id=a1.id
LEFT JOIN app_area a2 ON u.area_id=a2.id
LEFT JOIN ticket_follow tf ON t.id=tf.ticket_id AND tf.from=0
INNER JOIN ticket_follow tf ON t.id=tf.ticket_id AND tf.from=0
WHERE tf.user_id=#{userId} AND t.state!=4
<include refid="searchWhereCondition"/>
ORDER BY t.id DESC
@ -106,7 +106,7 @@
LEFT JOIN app_user u ON t.user_id=u.id
LEFT JOIN t_base_area a1 ON u.area_id=a1.id
LEFT JOIN app_area a2 ON u.area_id=a2.id
LEFT JOIN ticket_follow tf ON t.id=tf.ticket_id AND tf.from=0
LEFT JOIN ticket_follow tf ON t.id=tf.ticket_id AND tf.user_id=#{userId} AND tf.from=0
WHERE t.state!=4 AND u.company_id IN
<foreach collection="companyIds" item="companyId" open="(" separator="," close=")">
#{companyId}