perf: 优化sse推送

This commit is contained in:
曹鹏飞 2025-02-03 16:13:44 +08:00
parent 233cd58630
commit 00fc420f96
4 changed files with 29 additions and 6 deletions

View File

@ -2,21 +2,31 @@ package com.nflg.mobilebroken.admin.service.impl;
import com.nflg.mobilebroken.common.pojo.dto.UserDTO; import com.nflg.mobilebroken.common.pojo.dto.UserDTO;
import com.nflg.mobilebroken.starter.service.INotifyPushService; import com.nflg.mobilebroken.starter.service.INotifyPushService;
import com.nflg.mobilebroken.starter.service.SSEManagerService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.io.IOException;
@Service @Service
@Slf4j @Slf4j
public class SSEINotifyPushService implements INotifyPushService { public class SSEINotifyPushService implements INotifyPushService {
@Resource
private SSEManagerService sseManagerService;
@Override @Override
public void push(UserDTO user, String subject, String content) { public void push(UserDTO user, String subject, String content) {
//TODO try {
log.error("尚未实现"); sseManagerService.send(user.getId().toString(), subject, content);
} catch (IOException e) {
log.error("发送SSE失败", e);
}
} }
@Override @Override
public boolean check(UserDTO user) { public boolean check(UserDTO user) {
return false; return true;
} }
} }

View File

@ -61,6 +61,7 @@ public class SSEManagerBase {
public void send(String subject, String message,SseEmitter emitter) throws IOException { public void send(String subject, String message,SseEmitter emitter) throws IOException {
VUtils.trueThrowBusinessError(Objects.isNull(emitter)).throwMessage("没有找到sse"); VUtils.trueThrowBusinessError(Objects.isNull(emitter)).throwMessage("没有找到sse");
log.error("没有找到sse: ");
emitter.send(subject+":"+message); emitter.send(subject+":"+message);
} }
} }

View File

@ -9,13 +9,14 @@ import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@Service @Service
@Slf4j @Slf4j
public class APPSSEManagerService extends SSEManagerBase implements SSEManagerService { public class APPSSEManagerService extends SSEManagerBase implements SSEManagerService {
protected static final Map<String, SseEmitter> EMITTERS = new ConcurrentHashMap<>(); public static final Map<String, SseEmitter> EMITTERS = new ConcurrentHashMap<>();
@Override @Override
public SseEmitter connect(String userId) { public SseEmitter connect(String userId) {
@ -27,7 +28,12 @@ public class APPSSEManagerService extends SSEManagerBase implements SSEManagerSe
@Override @Override
public void send(String userId, String subject, String message) throws IOException { public void send(String userId, String subject, String message) throws IOException {
log.info(StrUtil.format("APP端SSE发送消息,用户id: {},标题: {},内容: {}", userId, subject, message)); log.info(StrUtil.format("APP端SSE发送消息,用户id: {},标题: {},内容: {}", userId, subject, message));
send(subject,message,EMITTERS.get(userId)); SseEmitter emitter = EMITTERS.get(userId);
if (Objects.isNull(emitter)) {
log.error("用户未连接SSE: " + userId);
} else {
send(subject, message, emitter);
}
} }
@Override @Override

View File

@ -9,6 +9,7 @@ import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@Service @Service
@ -27,7 +28,12 @@ public class AdminSSEManagerService extends SSEManagerBase implements SSEManager
@Override @Override
public void send(String userId, String subject, String message) throws IOException { public void send(String userId, String subject, String message) throws IOException {
log.info(StrUtil.format("管理端端SSE发送消息,用户id: {},标题: {},内容: {}", userId, subject, message)); log.info(StrUtil.format("管理端端SSE发送消息,用户id: {},标题: {},内容: {}", userId, subject, message));
send(subject,message,EMITTERS.get(userId)); SseEmitter emitter = EMITTERS.get(userId);
if (Objects.isNull(emitter)) {
log.error("用户未连接SSE: " + userId);
} else {
send(subject, message, emitter);
}
} }
@Override @Override