feat: 优化sse客户端断开连接提示

This commit is contained in:
曹鹏飞 2025-05-22 15:03:45 +08:00
parent 0444cf564b
commit 1c3554a35e
1 changed files with 15 additions and 4 deletions

View File

@ -8,6 +8,7 @@ import com.nflg.mobilebroken.common.util.VUtils;
import com.nflg.mobilebroken.push.UserSseEmitter; import com.nflg.mobilebroken.push.UserSseEmitter;
import lombok.Getter; import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.catalina.connector.ClientAbortException;
import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.TaskScheduler;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
@ -74,7 +75,11 @@ public class SSEManagerBase {
}); });
try { try {
emitter.send(SseEmitter.event().data("已连接").reconnectTime(5000)); emitter.send(SseEmitter.event().data("已连接").reconnectTime(5000));
} catch (IOException e) { }catch (ClientAbortException e){
log.error("客户端断开连接:{}", userId);
emitter.complete();
}
catch (IOException e) {
log.error("sse发送数据出错", e); log.error("sse发送数据出错", e);
emitter.completeWithError(e); emitter.completeWithError(e);
} }
@ -83,7 +88,7 @@ public class SSEManagerBase {
protected void sendByTicket(Integer ticketId, PushMessageDTO dto) { protected void sendByTicket(Integer ticketId, PushMessageDTO dto) {
log.info(StrUtil.format(from + "SSE发送消息,工单id: {},内容: {}", ticketId, dto)); log.info(StrUtil.format(from + "SSE发送消息,工单id: {},内容: {}", ticketId, dto));
List<SseEmitter> emitters = SSE_EMITTERS.stream() List<UserSseEmitter> emitters = SSE_EMITTERS.stream()
.filter(s -> Objects.equals(s.getTicketId(), ticketId)) .filter(s -> Objects.equals(s.getTicketId(), ticketId))
.collect(Collectors.toList()); .collect(Collectors.toList());
if (CollectionUtil.isEmpty(emitters)){ if (CollectionUtil.isEmpty(emitters)){
@ -93,6 +98,9 @@ public class SSEManagerBase {
emitters.forEach(emitter -> { emitters.forEach(emitter -> {
try { try {
emitter.send(SseEmitter.event().name(dto.getType()).data(dto.getData())); emitter.send(SseEmitter.event().name(dto.getType()).data(dto.getData()));
} catch (ClientAbortException e) {
log.error("客户端断开连接:{}", emitter.getUserId());
emitter.complete();
} catch (IOException e) { } catch (IOException e) {
log.error("sse发送数据出错", e); log.error("sse发送数据出错", e);
emitter.completeWithError(e); emitter.completeWithError(e);
@ -102,7 +110,7 @@ public class SSEManagerBase {
protected void sendByUser(Integer userId, PushMessageDTO dto) { protected void sendByUser(Integer userId, PushMessageDTO dto) {
log.info(StrUtil.format(from + "SSE发送消息,用户id: {},内容: {}", userId, dto)); log.info(StrUtil.format(from + "SSE发送消息,用户id: {},内容: {}", userId, dto));
List<SseEmitter> emitters = SSE_EMITTERS.stream() List<UserSseEmitter> emitters = SSE_EMITTERS.stream()
.filter(s -> Objects.equals(s.getUserId(), userId)) .filter(s -> Objects.equals(s.getUserId(), userId))
.collect(Collectors.toList()); .collect(Collectors.toList());
if (CollectionUtil.isEmpty(emitters)) { if (CollectionUtil.isEmpty(emitters)) {
@ -113,7 +121,10 @@ public class SSEManagerBase {
try { try {
emitter.send(SseEmitter.event().name(dto.getType()).data(dto.getData())); emitter.send(SseEmitter.event().name(dto.getType()).data(dto.getData()));
log.info("发送成功"); log.info("发送成功");
} catch (IOException e) { } catch (ClientAbortException e) {
log.error("客户端断开连接:{}", emitter.getUserId());
emitter.complete();
}catch (IOException e) {
log.error("sse发送数据出错", e); log.error("sse发送数据出错", e);
emitter.completeWithError(e); emitter.completeWithError(e);
} }