feat: 产品中心

This commit is contained in:
曹鹏飞 2025-05-26 13:36:47 +08:00
parent 14535f8906
commit 022b3b88a3
2 changed files with 32 additions and 23 deletions

View File

@ -1,14 +1,18 @@
package com.nflg.mobilebroken.push;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.concurrent.ScheduledFuture;
@Slf4j
@EqualsAndHashCode(callSuper = true)
@Data
@AllArgsConstructor
@Accessors(chain = true)
public class UserSseEmitter extends SseEmitter {
@ -18,11 +22,31 @@ public class UserSseEmitter extends SseEmitter {
private Integer ticketId;
private ScheduledFuture<?> heartbeatFuture;
public UserSseEmitter(){
super(0L);
}
public UserSseEmitter(String from, Integer userId, Integer ticketId) {
this.from = from;
this.userId = userId;
this.ticketId = ticketId;
}
public String getUser(){
return from + "-" + userId;
}
public void startHeartbeat(TaskScheduler taskScheduler) {
heartbeatFuture= taskScheduler.scheduleAtFixedRate(() -> {
try {
send(SseEmitter.event().data("ping"));
} catch (IOException e) {
heartbeatFuture.cancel(true);
log.error("sse发送ping数据出错({}):{}", getUser(),e.getMessage());
completeWithError(e);
}
}, 30_000);
}
}

View File

@ -19,7 +19,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ScheduledFuture;
import java.util.stream.Collectors;
@Slf4j
@ -59,18 +58,18 @@ public class SSEManagerBase {
check();
log.info(from + "SSE连接:用户id:" + userId + ",工单id:" + ticketId);
UserSseEmitter emitter = new UserSseEmitter(from, userId, ticketId);
emitter.startHeartbeat(taskScheduler);
SSE_EMITTERS.add(emitter);
ScheduledFuture<?> heartbeatTask = startHeartbeat(emitter);
emitter.onError((ex) -> {
remove(emitter,heartbeatTask);
remove(emitter);
log.error("SSE异常({}):{}", ex.getMessage(),emitter.getUser());
});
emitter.onTimeout(() -> {
remove(emitter,heartbeatTask);
remove(emitter);
log.error("SSE超时:"+userId);
});
emitter.onCompletion(() -> {
remove(emitter,heartbeatTask);
remove(emitter);
log.error("SSE完成:"+userId);
});
try {
@ -131,23 +130,9 @@ public class SSEManagerBase {
});
}
private void remove(UserSseEmitter emitter,ScheduledFuture<?> heartbeatTask){
heartbeatTask.cancel(true);
private void remove(UserSseEmitter emitter){
SSE_EMITTERS.remove(emitter);
emitter.complete();
emitter=null;
}
private ScheduledFuture<?> startHeartbeat(UserSseEmitter emitter) {
return taskScheduler.scheduleAtFixedRate(() -> {
try {
if (Objects.nonNull(emitter)) {
emitter.send(SseEmitter.event().data("ping"));
}
} catch (IOException e) {
log.error("sse发送ping数据出错({}):{}", emitter.getUser(),e.getMessage());
emitter.completeWithError(e);
}
}, 30_000);
}
}