feat: 产品中心

This commit is contained in:
曹鹏飞 2025-05-26 13:36:47 +08:00
parent 1d493b3b2f
commit ecce586407
2 changed files with 32 additions and 23 deletions

View File

@ -1,14 +1,18 @@
package com.nflg.mobilebroken.push; package com.nflg.mobilebroken.push;
import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.concurrent.ScheduledFuture;
@Slf4j
@EqualsAndHashCode(callSuper = true) @EqualsAndHashCode(callSuper = true)
@Data @Data
@AllArgsConstructor
@Accessors(chain = true) @Accessors(chain = true)
public class UserSseEmitter extends SseEmitter { public class UserSseEmitter extends SseEmitter {
@ -18,11 +22,31 @@ public class UserSseEmitter extends SseEmitter {
private Integer ticketId; private Integer ticketId;
private ScheduledFuture<?> heartbeatFuture;
public UserSseEmitter(){ public UserSseEmitter(){
super(0L); super(0L);
} }
public UserSseEmitter(String from, Integer userId, Integer ticketId) {
this.from = from;
this.userId = userId;
this.ticketId = ticketId;
}
public String getUser(){ public String getUser(){
return from + "-" + userId; return from + "-" + userId;
} }
public void startHeartbeat(TaskScheduler taskScheduler) {
heartbeatFuture= taskScheduler.scheduleAtFixedRate(() -> {
try {
send(SseEmitter.event().data("ping"));
} catch (IOException e) {
heartbeatFuture.cancel(true);
log.error("sse发送ping数据出错({}):{}", getUser(),e.getMessage());
completeWithError(e);
}
}, 30_000);
}
} }

View File

@ -19,7 +19,6 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.ScheduledFuture;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@Slf4j @Slf4j
@ -59,18 +58,18 @@ public class SSEManagerBase {
check(); check();
log.info(from + "SSE连接:用户id:" + userId + ",工单id:" + ticketId); log.info(from + "SSE连接:用户id:" + userId + ",工单id:" + ticketId);
UserSseEmitter emitter = new UserSseEmitter(from, userId, ticketId); UserSseEmitter emitter = new UserSseEmitter(from, userId, ticketId);
emitter.startHeartbeat(taskScheduler);
SSE_EMITTERS.add(emitter); SSE_EMITTERS.add(emitter);
ScheduledFuture<?> heartbeatTask = startHeartbeat(emitter);
emitter.onError((ex) -> { emitter.onError((ex) -> {
remove(emitter,heartbeatTask); remove(emitter);
log.error("SSE异常({}):{}", ex.getMessage(),emitter.getUser()); log.error("SSE异常({}):{}", ex.getMessage(),emitter.getUser());
}); });
emitter.onTimeout(() -> { emitter.onTimeout(() -> {
remove(emitter,heartbeatTask); remove(emitter);
log.error("SSE超时:"+userId); log.error("SSE超时:"+userId);
}); });
emitter.onCompletion(() -> { emitter.onCompletion(() -> {
remove(emitter,heartbeatTask); remove(emitter);
log.error("SSE完成:"+userId); log.error("SSE完成:"+userId);
}); });
try { try {
@ -131,23 +130,9 @@ public class SSEManagerBase {
}); });
} }
private void remove(UserSseEmitter emitter,ScheduledFuture<?> heartbeatTask){ private void remove(UserSseEmitter emitter){
heartbeatTask.cancel(true);
SSE_EMITTERS.remove(emitter); SSE_EMITTERS.remove(emitter);
emitter.complete(); emitter.complete();
emitter=null; emitter=null;
} }
private ScheduledFuture<?> startHeartbeat(UserSseEmitter emitter) {
return taskScheduler.scheduleAtFixedRate(() -> {
try {
if (Objects.nonNull(emitter)) {
emitter.send(SseEmitter.event().data("ping"));
}
} catch (IOException e) {
log.error("sse发送ping数据出错({}):{}", emitter.getUser(),e.getMessage());
emitter.completeWithError(e);
}
}, 30_000);
}
} }