diff --git a/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/UserSseEmitter.java b/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/UserSseEmitter.java index 9780e9e5..55290553 100644 --- a/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/UserSseEmitter.java +++ b/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/UserSseEmitter.java @@ -1,14 +1,18 @@ package com.nflg.mobilebroken.push; -import lombok.AllArgsConstructor; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.experimental.Accessors; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.TaskScheduler; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; +import java.io.IOException; +import java.util.concurrent.ScheduledFuture; + +@Slf4j @EqualsAndHashCode(callSuper = true) @Data -@AllArgsConstructor @Accessors(chain = true) public class UserSseEmitter extends SseEmitter { @@ -18,11 +22,31 @@ public class UserSseEmitter extends SseEmitter { private Integer ticketId; + private ScheduledFuture heartbeatFuture; + public UserSseEmitter(){ super(0L); } + public UserSseEmitter(String from, Integer userId, Integer ticketId) { + this.from = from; + this.userId = userId; + this.ticketId = ticketId; + } + public String getUser(){ return from + "-" + userId; } -} + + public void startHeartbeat(TaskScheduler taskScheduler) { + heartbeatFuture= taskScheduler.scheduleAtFixedRate(() -> { + try { + send(SseEmitter.event().data("ping")); + } catch (IOException e) { + heartbeatFuture.cancel(true); + log.error("sse发送ping数据出错({}):{}", getUser(),e.getMessage()); + completeWithError(e); + } + }, 30_000); + } +} \ No newline at end of file diff --git a/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/service/SSEManagerBase.java b/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/service/SSEManagerBase.java index 7118abaf..8b3c682e 100644 --- a/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/service/SSEManagerBase.java +++ b/nflg-mobilebroken-push/src/main/java/com/nflg/mobilebroken/push/service/SSEManagerBase.java @@ -19,7 +19,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; -import java.util.concurrent.ScheduledFuture; import java.util.stream.Collectors; @Slf4j @@ -59,18 +58,18 @@ public class SSEManagerBase { check(); log.info(from + "SSE连接:用户id:" + userId + ",工单id:" + ticketId); UserSseEmitter emitter = new UserSseEmitter(from, userId, ticketId); + emitter.startHeartbeat(taskScheduler); SSE_EMITTERS.add(emitter); - ScheduledFuture heartbeatTask = startHeartbeat(emitter); emitter.onError((ex) -> { - remove(emitter,heartbeatTask); + remove(emitter); log.error("SSE异常({}):{}", ex.getMessage(),emitter.getUser()); }); emitter.onTimeout(() -> { - remove(emitter,heartbeatTask); + remove(emitter); log.error("SSE超时:"+userId); }); emitter.onCompletion(() -> { - remove(emitter,heartbeatTask); + remove(emitter); log.error("SSE完成:"+userId); }); try { @@ -131,23 +130,9 @@ public class SSEManagerBase { }); } - private void remove(UserSseEmitter emitter,ScheduledFuture heartbeatTask){ - heartbeatTask.cancel(true); + private void remove(UserSseEmitter emitter){ SSE_EMITTERS.remove(emitter); emitter.complete(); emitter=null; } - - private ScheduledFuture startHeartbeat(UserSseEmitter emitter) { - return taskScheduler.scheduleAtFixedRate(() -> { - try { - if (Objects.nonNull(emitter)) { - emitter.send(SseEmitter.event().data("ping")); - } - } catch (IOException e) { - log.error("sse发送ping数据出错({}):{}", emitter.getUser(),e.getMessage()); - emitter.completeWithError(e); - } - }, 30_000); - } } \ No newline at end of file