feat: sse服务优化

This commit is contained in:
曹鹏飞 2025-02-28 14:27:10 +08:00
parent 4dfe32bff8
commit bf75099c8b
13 changed files with 106 additions and 64 deletions

View File

@ -1,13 +1,8 @@
package com.nflg.mobilebroken.admin.controller;
import com.nflg.mobilebroken.starter.service.impl.APPSSEManagerService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.Collection;
/**
* 统计分析相关
*/
@ -15,15 +10,15 @@ import java.util.Collection;
@RequestMapping("/analysis")
public class AnalysisController extends ControllerBase{
@Resource
private APPSSEManagerService appSSEManagerService;
/**
* 获取当前已连接SSE的客户端用户列表
* @return 当前已连接SSE的客户端用户列表
*/
@GetMapping("getSSEConnects")
public Collection<Integer> getSSEConnects(){
return appSSEManagerService.getUserIds();
}
// @Resource
// private APPSSEManagerService appSSEManagerService;
//
// /**
// * 获取当前已连接SSE的客户端用户列表
// * @return 当前已连接SSE的客户端用户列表
// */
// @GetMapping("getSSEConnects")
// public Collection<Integer> getSSEConnects(){
// return appSSEManagerService.getUserIds();
// }
}

View File

@ -550,6 +550,7 @@ public class TicketController extends ControllerBase {
ticketChatService.addMessage(request.getTicketId(), message);
//推送消息
ssePushService.sendTicketMessageToAdmin(request.getTicketId(),message);
ssePushService.sendTicketMessageToApp(request.getTicketId(),message);
ticketEventPublisher.publishTicketReplyEvent(ticket);
return ApiResult.success();
}

View File

@ -0,0 +1,22 @@
package com.nflg.mobilebroken.push;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
@EqualsAndHashCode(callSuper = true)
@Data
@AllArgsConstructor
@Accessors(chain = true)
public class UserSseEmitter extends SseEmitter {
private Integer userId;
private Integer ticketId;
public UserSseEmitter(){
super(0L);
}
}

View File

@ -1,16 +1,16 @@
package com.nflg.mobilebroken.push.controller;
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import com.nflg.mobilebroken.push.UserSseEmitter;
import com.nflg.mobilebroken.push.service.impl.APPSSEManagerService;
import com.nflg.mobilebroken.push.service.impl.AdminSSEManagerService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 统计分析相关
@ -30,13 +30,16 @@ public class AnalysisController{
* @return 当前已连接SSE的客户端列表
*/
@GetMapping("getAppSSEConnects")
public Map<Integer,Integer> getAppSSEConnects(){
Map<Integer, List<SseEmitter>> map= appSSEManagerService.getMap();
Map<Integer,Integer> countMap=new HashMap<>();
map.forEach((k,v)->{
countMap.put(k,v.size());
public JSONArray getAppSSEConnects(){
List<UserSseEmitter> map= appSSEManagerService.getSSE_EMITTERS();
JSONArray array=new JSONArray();
map.forEach(sseEmitter->{
JSONObject jo=new JSONObject();
jo.putOpt("userId",sseEmitter.getUserId());
jo.putOpt("ticketId",sseEmitter.getTicketId());
array.add(jo);
});
return countMap;
return array;
}
/**
@ -44,12 +47,15 @@ public class AnalysisController{
* @return 当前已连接SSE的管理端列表
*/
@GetMapping("getAdminSSEConnects")
public Map<Integer,Integer> getAdminSSEConnects(){
Map<Integer, List<SseEmitter>> map= adminSSEManagerService.getMap();
Map<Integer,Integer> countMap=new HashMap<>();
map.forEach((k,v)->{
countMap.put(k,v.size());
public JSONArray getAdminSSEConnects(){
List<UserSseEmitter> map= adminSSEManagerService.getSSE_EMITTERS();
JSONArray array=new JSONArray();
map.forEach(sseEmitter->{
JSONObject jo=new JSONObject();
jo.putOpt("userId",sseEmitter.getUserId());
jo.putOpt("ticketId",sseEmitter.getTicketId());
array.add(jo);
});
return countMap;
return array;
}
}

View File

@ -32,7 +32,7 @@ public class SSEController {
* 客户端账号建立sse连接
*/
@GetMapping(value = "app/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter appConnect(@Valid @RequestParam @NotNull Integer ticketId) {
public SseEmitter appConnect(@RequestParam Integer ticketId) {
return appsseManagerService.connect(ticketId,AppUserUtil.getUserId());
}
@ -43,7 +43,7 @@ public class SSEController {
@PostMapping("app/push")
public ApiResult<Void> pushtToApp(@Valid @RequestBody @NotNull PushRequest request){
try {
appsseManagerService.send(request.getTicketId(),request.getMessage());
appsseManagerService.sendTicketMessage(request.getTicketId(),request.getMessage());
return ApiResult.success();
} catch (IOException e) {
log.error("发送SSE消息出错", e);
@ -55,7 +55,7 @@ public class SSEController {
* 管理端账号建立sse连接
*/
@GetMapping(value = "admin/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter adminConnect(@Valid @RequestParam @NotNull Integer ticketId) {
public SseEmitter adminConnect(@RequestParam Integer ticketId) {
return adminSSEManagerService.connect(ticketId,AdminUserUtil.getUserId());
}
@ -66,7 +66,7 @@ public class SSEController {
@PostMapping("admin/push")
public ApiResult<Void> pushtToAdmin(@Valid @RequestBody @NotNull PushRequest request){
try {
adminSSEManagerService.send(request.getTicketId(),request.getMessage());
adminSSEManagerService.sendTicketMessage(request.getTicketId(),request.getMessage());
return ApiResult.success();
} catch (IOException e) {
log.error("发送SSE消息出错", e);

View File

@ -5,6 +5,7 @@ import cn.hutool.core.util.StrUtil;
import com.nflg.mobilebroken.common.constant.STATE;
import com.nflg.mobilebroken.common.pojo.vo.ChatMessageVO;
import com.nflg.mobilebroken.common.util.VUtils;
import com.nflg.mobilebroken.push.UserSseEmitter;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.TaskScheduler;
@ -13,8 +14,10 @@ import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ScheduledFuture;
import java.util.stream.Collectors;
@ -27,7 +30,7 @@ public class SSEManagerBase {
private static boolean IS_SHUTDOWN = false;
@Getter
protected final Map<Integer, List<SseEmitter>> map = new ConcurrentHashMap<>();
protected final List<UserSseEmitter> SSE_EMITTERS = Collections.synchronizedList(new ArrayList<>());
protected String from;
@ -39,7 +42,7 @@ public class SSEManagerBase {
protected void shutdown() {
IS_SHUTDOWN=true;
log.warn("准备关闭SSE服务");
map.values().stream().flatMap(List::stream).collect(Collectors.toList()).forEach(emitter->{
SSE_EMITTERS.forEach(emitter->{
try {
emitter.send("因SSE服务关闭,连接即将断开");
emitter.complete();
@ -53,25 +56,24 @@ public class SSEManagerBase {
protected SseEmitter connect(Integer ticketId,Integer userId) {
check();
log.info(from+"SSE连接:工单id:"+ticketId+",用户id:"+userId);
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
List<SseEmitter> emitters=map.getOrDefault(userId, Collections.synchronizedList(new ArrayList<>()));
emitters.add(emitter);
log.info(from+"SSE连接:用户id:"+userId+",工单id:"+ticketId);
UserSseEmitter emitter = new UserSseEmitter(userId,ticketId);
SSE_EMITTERS.add(emitter);
ScheduledFuture<?> heartbeatTask = startHeartbeat(emitter);
emitter.onError((ex) -> {
remove(userId, emitters, emitter,heartbeatTask);
remove(userId, emitter,heartbeatTask);
log.error("SSE异常:"+userId, ex);
});
emitter.onTimeout(() -> {
remove(userId, emitters, emitter,heartbeatTask);
remove(userId, emitter,heartbeatTask);
log.error("SSE超时:"+userId);
});
emitter.onCompletion(() -> {
remove(userId, emitters, emitter,heartbeatTask);
remove(userId, emitter,heartbeatTask);
log.error("SSE完成:"+userId);
});
try {
emitter.send(SseEmitter.event().comment("已连接").reconnectTime(5000));
emitter.send(SseEmitter.event().data("已连接").reconnectTime(5000));
} catch (IOException e) {
log.error("sse发送数据出错", e);
emitter.completeWithError(e);
@ -81,8 +83,13 @@ public class SSEManagerBase {
protected void send(Integer ticketId, ChatMessageVO message) throws IOException {
log.info(StrUtil.format(from + "SSE发送消息,工单id: {},内容: {}", ticketId, message));
List<SseEmitter> emitters = map.get(ticketId);
VUtils.trueThrowBusinessError(Objects.isNull(emitters)).throwMessage("没有用户连接工单:"+ticketId);
List<SseEmitter> emitters = SSE_EMITTERS.stream()
.filter(s -> Objects.equals(s.getTicketId(), ticketId))
.collect(Collectors.toList());
if (CollectionUtil.isEmpty(emitters)){
log.info(StrUtil.format(from + "没有用户连接工单:{}", ticketId));
return;
}
emitters.forEach(emitter -> {
try {
emitter.send(SseEmitter.event().name("ticketMessage").data(message));
@ -93,22 +100,19 @@ public class SSEManagerBase {
});
}
private void remove(Integer userId,List<SseEmitter> emitters,SseEmitter emitter,ScheduledFuture<?> heartbeatTask){
private void remove(Integer userId,UserSseEmitter emitter,ScheduledFuture<?> heartbeatTask){
heartbeatTask.cancel(false);
emitters.remove(emitter);
if (CollectionUtil.isEmpty(emitters)){
map.remove(userId);
}
SSE_EMITTERS.remove(emitter);
emitter.complete();
}
private ScheduledFuture<?> startHeartbeat(SseEmitter emitter) {
private ScheduledFuture<?> startHeartbeat(UserSseEmitter emitter) {
return taskScheduler.scheduleAtFixedRate(() -> {
try {
emitter.send(SseEmitter.event().comment("ping"));
emitter.send(SseEmitter.event().data("ping"));
} catch (IOException e) {
emitter.completeWithError(e);
}
}, 60_000);
}, 30_000);
}
}

View File

@ -9,5 +9,5 @@ public interface SSEManagerService {
SseEmitter connect(Integer ticketId,Integer userId);
void send(Integer ticketId, ChatMessageVO message) throws IOException;
void sendTicketMessage(Integer ticketId, ChatMessageVO message) throws IOException;
}

View File

@ -25,7 +25,7 @@ public class APPSSEManagerService extends SSEManagerBase implements SSEManagerSe
}
@Override
public void send(Integer ticketId, ChatMessageVO message) throws IOException {
public void sendTicketMessage(Integer ticketId, ChatMessageVO message) throws IOException {
super.send(ticketId, message);
}
}

View File

@ -25,7 +25,7 @@ public class AdminSSEManagerService extends SSEManagerBase implements SSEManager
}
@Override
public void send(Integer ticketId, ChatMessageVO message) throws IOException {
public void sendTicketMessage(Integer ticketId, ChatMessageVO message) throws IOException {
super.send(ticketId, message);
}
}

View File

@ -1,9 +1,22 @@
spring.application.name=push
spring.profiles.active=dev
server.port=8084
logging.level.root=info
server.tomcat.threads.max=1000
server.tomcat.max-connections=1000
# 设置最小空闲线程数
server.tomcat.threads.min-spare=50
# 设置接受队列大小
server.tomcat.accept-count=1000
# 调整异步请求超时时间(单位:毫秒)
spring.mvc.async.request-timeout=-1
# 设置连接超时时间单位毫秒设置为0表示无限制
server.tomcat.connection-timeout=0
# 设置最大空闲时间毫秒0表示无限制
server.tomcat.keep-alive-timeout=0
# 启用 keep-alive默认已启用但确保未被禁用
server.tomcat.max-keep-alive-requests=1000
# sa-token??
sa-token.is-log=true

View File

@ -24,7 +24,7 @@ public interface TicketMapper extends BaseMapper<Ticket> {
IPage<TicketVO> searchFollow(IPage<?> page, TicketSearchRequest request, Integer userId);
IPage<TicketVO> searchArea(IPage<?> page, TicketSearchRequest request, List<Integer> companyIds);
IPage<TicketVO> searchArea(IPage<?> page, TicketSearchRequest request, List<Integer> companyIds, Integer userId);
IPage<AdminTicketVO> searchFromAdmin(AdminTicketSearchRequest request, Integer userId, IPage<?> page);

View File

@ -157,6 +157,7 @@ public class AppUserServiceImpl extends ServiceImpl<AppUserMapper, AppUser> impl
AppUser user = new AppUser()
.setLoginName(request.getLoginName())
.setName(request.getUserName())
.setAvatar(request.getAvatar())
.setEmail(request.getEmail())
.setPhone(request.getPhone())
.setAreaId(request.getAreaId())

View File

@ -90,7 +90,7 @@ public class TicketServiceImpl extends ServiceImpl<TicketMapper, Ticket> impleme
}else if (request.getType()==2){
return baseMapper.searchFollow(new Page<>(request.getPage(), request.getPageSize()), request, user.getId());
}else if (request.getType()==3) {
return baseMapper.searchArea(new Page<>(request.getPage(), request.getPageSize()), request, user.getCompanyIds());
return baseMapper.searchArea(new Page<>(request.getPage(), request.getPageSize()), request, user.getCompanyIds(), user.getId());
}
return null;
}