diff --git a/nflg-mobilebroken-admin/src/main/java/com/nflg/mobilebroken/admin/controller/AnalysisController.java b/nflg-mobilebroken-admin/src/main/java/com/nflg/mobilebroken/admin/controller/AnalysisController.java index 7a6e44f7..7252c548 100644 --- a/nflg-mobilebroken-admin/src/main/java/com/nflg/mobilebroken/admin/controller/AnalysisController.java +++ b/nflg-mobilebroken-admin/src/main/java/com/nflg/mobilebroken/admin/controller/AnalysisController.java @@ -1,13 +1,8 @@ package com.nflg.mobilebroken.admin.controller; -import com.nflg.mobilebroken.starter.service.impl.APPSSEManagerService; -import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; -import javax.annotation.Resource; -import java.util.Collection; - /** * 统计分析相关 */ @@ -15,15 +10,15 @@ import java.util.Collection; @RequestMapping("/analysis") public class AnalysisController extends ControllerBase{ - @Resource - private APPSSEManagerService appSSEManagerService; - - /** - * 获取当前已连接SSE的客户端用户列表 - * @return 当前已连接SSE的客户端用户列表 - */ - @GetMapping("getSSEConnects") - public Collection getSSEConnects(){ - return appSSEManagerService.getUserIds(); - } +// @Resource +// private APPSSEManagerService appSSEManagerService; +// +// /** +// * 获取当前已连接SSE的客户端用户列表 +// * @return 当前已连接SSE的客户端用户列表 +// */ +// @GetMapping("getSSEConnects") +// public Collection getSSEConnects(){ +// return appSSEManagerService.getUserIds(); +// } } diff --git a/nflg-mobilebroken-admin/src/main/java/com/nflg/mobilebroken/admin/controller/TicketController.java b/nflg-mobilebroken-admin/src/main/java/com/nflg/mobilebroken/admin/controller/TicketController.java index e19a2e73..c778a334 100644 --- a/nflg-mobilebroken-admin/src/main/java/com/nflg/mobilebroken/admin/controller/TicketController.java +++ b/nflg-mobilebroken-admin/src/main/java/com/nflg/mobilebroken/admin/controller/TicketController.java @@ -550,6 +550,7 @@ public class TicketController extends ControllerBase { ticketChatService.addMessage(request.getTicketId(), message); //推送消息 ssePushService.sendTicketMessageToAdmin(request.getTicketId(),message); + ssePushService.sendTicketMessageToApp(request.getTicketId(),message); ticketEventPublisher.publishTicketReplyEvent(ticket); return ApiResult.success(); } diff --git a/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/UserSseEmitter.java b/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/UserSseEmitter.java new file mode 100644 index 00000000..205001e5 --- /dev/null +++ b/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/UserSseEmitter.java @@ -0,0 +1,22 @@ +package com.nflg.mobilebroken.push; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.experimental.Accessors; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +@EqualsAndHashCode(callSuper = true) +@Data +@AllArgsConstructor +@Accessors(chain = true) +public class UserSseEmitter extends SseEmitter { + + private Integer userId; + + private Integer ticketId; + + public UserSseEmitter(){ + super(0L); + } +} diff --git a/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/controller/AnalysisController.java b/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/controller/AnalysisController.java index a53c79f7..4353ecd5 100644 --- a/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/controller/AnalysisController.java +++ b/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/controller/AnalysisController.java @@ -1,16 +1,16 @@ package com.nflg.mobilebroken.push.controller; +import cn.hutool.json.JSONArray; +import cn.hutool.json.JSONObject; +import com.nflg.mobilebroken.push.UserSseEmitter; import com.nflg.mobilebroken.push.service.impl.APPSSEManagerService; import com.nflg.mobilebroken.push.service.impl.AdminSSEManagerService; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; -import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import javax.annotation.Resource; -import java.util.HashMap; import java.util.List; -import java.util.Map; /** * 统计分析相关 @@ -30,13 +30,16 @@ public class AnalysisController{ * @return 当前已连接SSE的客户端列表 */ @GetMapping("getAppSSEConnects") - public Map getAppSSEConnects(){ - Map> map= appSSEManagerService.getMap(); - Map countMap=new HashMap<>(); - map.forEach((k,v)->{ - countMap.put(k,v.size()); + public JSONArray getAppSSEConnects(){ + List map= appSSEManagerService.getSSE_EMITTERS(); + JSONArray array=new JSONArray(); + map.forEach(sseEmitter->{ + JSONObject jo=new JSONObject(); + jo.putOpt("userId",sseEmitter.getUserId()); + jo.putOpt("ticketId",sseEmitter.getTicketId()); + array.add(jo); }); - return countMap; + return array; } /** @@ -44,12 +47,15 @@ public class AnalysisController{ * @return 当前已连接SSE的管理端列表 */ @GetMapping("getAdminSSEConnects") - public Map getAdminSSEConnects(){ - Map> map= adminSSEManagerService.getMap(); - Map countMap=new HashMap<>(); - map.forEach((k,v)->{ - countMap.put(k,v.size()); + public JSONArray getAdminSSEConnects(){ + List map= adminSSEManagerService.getSSE_EMITTERS(); + JSONArray array=new JSONArray(); + map.forEach(sseEmitter->{ + JSONObject jo=new JSONObject(); + jo.putOpt("userId",sseEmitter.getUserId()); + jo.putOpt("ticketId",sseEmitter.getTicketId()); + array.add(jo); }); - return countMap; + return array; } } \ No newline at end of file diff --git a/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/controller/SSEController.java b/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/controller/SSEController.java index 6d9c5cc6..631e7a24 100644 --- a/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/controller/SSEController.java +++ b/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/controller/SSEController.java @@ -32,7 +32,7 @@ public class SSEController { * 客户端账号建立sse连接 */ @GetMapping(value = "app/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE) - public SseEmitter appConnect(@Valid @RequestParam @NotNull Integer ticketId) { + public SseEmitter appConnect(@RequestParam Integer ticketId) { return appsseManagerService.connect(ticketId,AppUserUtil.getUserId()); } @@ -43,7 +43,7 @@ public class SSEController { @PostMapping("app/push") public ApiResult pushtToApp(@Valid @RequestBody @NotNull PushRequest request){ try { - appsseManagerService.send(request.getTicketId(),request.getMessage()); + appsseManagerService.sendTicketMessage(request.getTicketId(),request.getMessage()); return ApiResult.success(); } catch (IOException e) { log.error("发送SSE消息出错", e); @@ -55,7 +55,7 @@ public class SSEController { * 管理端账号建立sse连接 */ @GetMapping(value = "admin/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE) - public SseEmitter adminConnect(@Valid @RequestParam @NotNull Integer ticketId) { + public SseEmitter adminConnect(@RequestParam Integer ticketId) { return adminSSEManagerService.connect(ticketId,AdminUserUtil.getUserId()); } @@ -66,7 +66,7 @@ public class SSEController { @PostMapping("admin/push") public ApiResult pushtToAdmin(@Valid @RequestBody @NotNull PushRequest request){ try { - adminSSEManagerService.send(request.getTicketId(),request.getMessage()); + adminSSEManagerService.sendTicketMessage(request.getTicketId(),request.getMessage()); return ApiResult.success(); } catch (IOException e) { log.error("发送SSE消息出错", e); diff --git a/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/service/SSEManagerBase.java b/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/service/SSEManagerBase.java index 38b559bc..b5044634 100644 --- a/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/service/SSEManagerBase.java +++ b/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/service/SSEManagerBase.java @@ -5,6 +5,7 @@ import cn.hutool.core.util.StrUtil; import com.nflg.mobilebroken.common.constant.STATE; import com.nflg.mobilebroken.common.pojo.vo.ChatMessageVO; import com.nflg.mobilebroken.common.util.VUtils; +import com.nflg.mobilebroken.push.UserSseEmitter; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.TaskScheduler; @@ -13,8 +14,10 @@ import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import javax.annotation.PreDestroy; import javax.annotation.Resource; import java.io.IOException; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; import java.util.concurrent.ScheduledFuture; import java.util.stream.Collectors; @@ -27,7 +30,7 @@ public class SSEManagerBase { private static boolean IS_SHUTDOWN = false; @Getter - protected final Map> map = new ConcurrentHashMap<>(); + protected final List SSE_EMITTERS = Collections.synchronizedList(new ArrayList<>()); protected String from; @@ -39,7 +42,7 @@ public class SSEManagerBase { protected void shutdown() { IS_SHUTDOWN=true; log.warn("准备关闭SSE服务"); - map.values().stream().flatMap(List::stream).collect(Collectors.toList()).forEach(emitter->{ + SSE_EMITTERS.forEach(emitter->{ try { emitter.send("因SSE服务关闭,连接即将断开"); emitter.complete(); @@ -53,25 +56,24 @@ public class SSEManagerBase { protected SseEmitter connect(Integer ticketId,Integer userId) { check(); - log.info(from+"SSE连接:工单id:"+ticketId+",用户id:"+userId); - SseEmitter emitter = new SseEmitter(Long.MAX_VALUE); - List emitters=map.getOrDefault(userId, Collections.synchronizedList(new ArrayList<>())); - emitters.add(emitter); + log.info(from+"SSE连接:用户id:"+userId+",工单id:"+ticketId); + UserSseEmitter emitter = new UserSseEmitter(userId,ticketId); + SSE_EMITTERS.add(emitter); ScheduledFuture heartbeatTask = startHeartbeat(emitter); emitter.onError((ex) -> { - remove(userId, emitters, emitter,heartbeatTask); + remove(userId, emitter,heartbeatTask); log.error("SSE异常:"+userId, ex); }); emitter.onTimeout(() -> { - remove(userId, emitters, emitter,heartbeatTask); + remove(userId, emitter,heartbeatTask); log.error("SSE超时:"+userId); }); emitter.onCompletion(() -> { - remove(userId, emitters, emitter,heartbeatTask); + remove(userId, emitter,heartbeatTask); log.error("SSE完成:"+userId); }); try { - emitter.send(SseEmitter.event().comment("已连接").reconnectTime(5000)); + emitter.send(SseEmitter.event().data("已连接").reconnectTime(5000)); } catch (IOException e) { log.error("sse发送数据出错", e); emitter.completeWithError(e); @@ -80,10 +82,15 @@ public class SSEManagerBase { } protected void send(Integer ticketId, ChatMessageVO message) throws IOException { - log.info(StrUtil.format(from+"SSE发送消息,工单id: {},内容: {}", ticketId, message)); - List emitters = map.get(ticketId); - VUtils.trueThrowBusinessError(Objects.isNull(emitters)).throwMessage("没有用户连接工单:"+ticketId); - emitters.forEach(emitter-> { + log.info(StrUtil.format(from + "SSE发送消息,工单id: {},内容: {}", ticketId, message)); + List emitters = SSE_EMITTERS.stream() + .filter(s -> Objects.equals(s.getTicketId(), ticketId)) + .collect(Collectors.toList()); + if (CollectionUtil.isEmpty(emitters)){ + log.info(StrUtil.format(from + "没有用户连接工单:{}", ticketId)); + return; + } + emitters.forEach(emitter -> { try { emitter.send(SseEmitter.event().name("ticketMessage").data(message)); } catch (IOException e) { @@ -93,22 +100,19 @@ public class SSEManagerBase { }); } - private void remove(Integer userId,List emitters,SseEmitter emitter,ScheduledFuture heartbeatTask){ + private void remove(Integer userId,UserSseEmitter emitter,ScheduledFuture heartbeatTask){ heartbeatTask.cancel(false); - emitters.remove(emitter); - if (CollectionUtil.isEmpty(emitters)){ - map.remove(userId); - } + SSE_EMITTERS.remove(emitter); emitter.complete(); } - private ScheduledFuture startHeartbeat(SseEmitter emitter) { + private ScheduledFuture startHeartbeat(UserSseEmitter emitter) { return taskScheduler.scheduleAtFixedRate(() -> { try { - emitter.send(SseEmitter.event().comment("ping")); + emitter.send(SseEmitter.event().data("ping")); } catch (IOException e) { emitter.completeWithError(e); } - }, 60_000); + }, 30_000); } } \ No newline at end of file diff --git a/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/service/SSEManagerService.java b/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/service/SSEManagerService.java index 99928fea..6c330e87 100644 --- a/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/service/SSEManagerService.java +++ b/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/service/SSEManagerService.java @@ -9,5 +9,5 @@ public interface SSEManagerService { SseEmitter connect(Integer ticketId,Integer userId); - void send(Integer ticketId, ChatMessageVO message) throws IOException; + void sendTicketMessage(Integer ticketId, ChatMessageVO message) throws IOException; } diff --git a/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/service/impl/APPSSEManagerService.java b/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/service/impl/APPSSEManagerService.java index 43aeaf16..f706b4cc 100644 --- a/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/service/impl/APPSSEManagerService.java +++ b/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/service/impl/APPSSEManagerService.java @@ -25,7 +25,7 @@ public class APPSSEManagerService extends SSEManagerBase implements SSEManagerSe } @Override - public void send(Integer ticketId, ChatMessageVO message) throws IOException { + public void sendTicketMessage(Integer ticketId, ChatMessageVO message) throws IOException { super.send(ticketId, message); } } \ No newline at end of file diff --git a/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/service/impl/AdminSSEManagerService.java b/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/service/impl/AdminSSEManagerService.java index 5b2eddd1..b0f2eed1 100644 --- a/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/service/impl/AdminSSEManagerService.java +++ b/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/service/impl/AdminSSEManagerService.java @@ -25,7 +25,7 @@ public class AdminSSEManagerService extends SSEManagerBase implements SSEManager } @Override - public void send(Integer ticketId, ChatMessageVO message) throws IOException { + public void sendTicketMessage(Integer ticketId, ChatMessageVO message) throws IOException { super.send(ticketId, message); } } \ No newline at end of file diff --git a/nflg-mobilebroken-push/src/main/resources/application.properties b/nflg-mobilebroken-push/src/main/resources/application.properties index d84da407..ead8a866 100644 --- a/nflg-mobilebroken-push/src/main/resources/application.properties +++ b/nflg-mobilebroken-push/src/main/resources/application.properties @@ -1,9 +1,22 @@ spring.application.name=push spring.profiles.active=dev server.port=8084 +logging.level.root=info server.tomcat.threads.max=1000 server.tomcat.max-connections=1000 +# 设置最小空闲线程数 +server.tomcat.threads.min-spare=50 +# 设置接受队列大小 +server.tomcat.accept-count=1000 +# 调整异步请求超时时间(单位:毫秒) +spring.mvc.async.request-timeout=-1 +# 设置连接超时时间(单位:毫秒,设置为0表示无限制) +server.tomcat.connection-timeout=0 +# 设置最大空闲时间(毫秒,0表示无限制) +server.tomcat.keep-alive-timeout=0 +# 启用 keep-alive(默认已启用,但确保未被禁用) +server.tomcat.max-keep-alive-requests=1000 # sa-token?? sa-token.is-log=true diff --git a/nflg-mobilebroken-repository/src/main/java/com/nflg/mobilebroken/repository/mapper/TicketMapper.java b/nflg-mobilebroken-repository/src/main/java/com/nflg/mobilebroken/repository/mapper/TicketMapper.java index e752a12c..6feb5575 100644 --- a/nflg-mobilebroken-repository/src/main/java/com/nflg/mobilebroken/repository/mapper/TicketMapper.java +++ b/nflg-mobilebroken-repository/src/main/java/com/nflg/mobilebroken/repository/mapper/TicketMapper.java @@ -24,7 +24,7 @@ public interface TicketMapper extends BaseMapper { IPage searchFollow(IPage page, TicketSearchRequest request, Integer userId); - IPage searchArea(IPage page, TicketSearchRequest request, List companyIds); + IPage searchArea(IPage page, TicketSearchRequest request, List companyIds, Integer userId); IPage searchFromAdmin(AdminTicketSearchRequest request, Integer userId, IPage page); diff --git a/nflg-mobilebroken-repository/src/main/java/com/nflg/mobilebroken/repository/service/impl/AppUserServiceImpl.java b/nflg-mobilebroken-repository/src/main/java/com/nflg/mobilebroken/repository/service/impl/AppUserServiceImpl.java index c7fa691d..24e6bf0f 100644 --- a/nflg-mobilebroken-repository/src/main/java/com/nflg/mobilebroken/repository/service/impl/AppUserServiceImpl.java +++ b/nflg-mobilebroken-repository/src/main/java/com/nflg/mobilebroken/repository/service/impl/AppUserServiceImpl.java @@ -157,6 +157,7 @@ public class AppUserServiceImpl extends ServiceImpl impl AppUser user = new AppUser() .setLoginName(request.getLoginName()) .setName(request.getUserName()) + .setAvatar(request.getAvatar()) .setEmail(request.getEmail()) .setPhone(request.getPhone()) .setAreaId(request.getAreaId()) diff --git a/nflg-mobilebroken-repository/src/main/java/com/nflg/mobilebroken/repository/service/impl/TicketServiceImpl.java b/nflg-mobilebroken-repository/src/main/java/com/nflg/mobilebroken/repository/service/impl/TicketServiceImpl.java index b2c84c11..a4b1b3f9 100644 --- a/nflg-mobilebroken-repository/src/main/java/com/nflg/mobilebroken/repository/service/impl/TicketServiceImpl.java +++ b/nflg-mobilebroken-repository/src/main/java/com/nflg/mobilebroken/repository/service/impl/TicketServiceImpl.java @@ -90,7 +90,7 @@ public class TicketServiceImpl extends ServiceImpl impleme }else if (request.getType()==2){ return baseMapper.searchFollow(new Page<>(request.getPage(), request.getPageSize()), request, user.getId()); }else if (request.getType()==3) { - return baseMapper.searchArea(new Page<>(request.getPage(), request.getPageSize()), request, user.getCompanyIds()); + return baseMapper.searchArea(new Page<>(request.getPage(), request.getPageSize()), request, user.getCompanyIds(), user.getId()); } return null; }