Merge branch 'feature/bug-528' into develop
This commit is contained in:
commit
9ea8a7a167
|
|
@ -49,10 +49,10 @@ public class TestController extends ControllerBase{
|
|||
}
|
||||
}
|
||||
|
||||
@GetMapping("test")
|
||||
public ApiResult<Boolean> test(@RequestParam Integer userId){
|
||||
return ApiResult.success(ticketCallService.isInCall(userId));
|
||||
}
|
||||
// @GetMapping("test")
|
||||
// public ApiResult<Boolean> test(@RequestParam Integer userId){
|
||||
// return ApiResult.success(ticketCallService.isInCall(userId));
|
||||
// }
|
||||
|
||||
/**
|
||||
* 翻译为日语
|
||||
|
|
|
|||
|
|
@ -745,6 +745,9 @@ public class TicketController extends ControllerBase {
|
|||
.throwMessage("当前工单状态不允许发送消息");
|
||||
List<Integer> adminUsers=adminUserService.getTickerMangagers();
|
||||
adminUsers.addAll(StrUtil.split(ticket.getHandle(),",").stream().map(Integer::parseInt).collect(Collectors.toList()));
|
||||
if (StrUtil.equals(Constant.FROM_ADMIN,ticket.getUserPlatform())){
|
||||
adminUsers.add(ticket.getUserId());
|
||||
}
|
||||
VUtils.trueThrowBusinessError(adminUsers.stream()
|
||||
.noneMatch(uid -> Objects.equals(uid, AdminUserUtil.getUserId())))
|
||||
.throwMessage("你无权发送消息");
|
||||
|
|
@ -1246,13 +1249,33 @@ public class TicketController extends ControllerBase {
|
|||
VUtils.trueThrowBusinessError(!Objects.equals(ticket.getState(), TicketState.Processing.getState()))
|
||||
.throwMessage("当前工单状态不允许请求通话");
|
||||
Integer handlerUserId = Arrays.stream(ticket.getHandle().split(",")).map(Integer::parseInt).findFirst().get();
|
||||
if (StrUtil.equals(Constant.FROM_APP, ticket.getUserPlatform())) {
|
||||
VUtils.trueThrowBusinessError(!Objects.equals(AdminUserUtil.getUserId(), handlerUserId))
|
||||
.throwMessage("不是工单主负责人无权限呼叫");
|
||||
VUtils.trueThrowBusinessError(ticketCallService.isInCall(ticket.getUserId())).throwMessage("对方正在通话中");
|
||||
AdminUser adminUser = adminUserService.getById(handlerUserId);
|
||||
} else {
|
||||
VUtils.trueThrowBusinessError(!Objects.equals(ticket.getUserId(), AdminUserUtil.getUserId()) && !Objects.equals(AdminUserUtil.getUserId(), handlerUserId))
|
||||
.throwMessage("无权限呼叫");
|
||||
}
|
||||
boolean userIdCreate = StrUtil.equals(ticket.getUserPlatform(), Constant.FROM_ADMIN) && Objects.equals(ticket.getUserId(), AdminUserUtil.getUserId());
|
||||
Integer sendUserId = 0, receiveUserId = 0;
|
||||
String sendUserFrom, receiveUserFrom;
|
||||
if (userIdCreate) {
|
||||
sendUserId = ticket.getUserId();
|
||||
receiveUserId = handlerUserId;
|
||||
sendUserFrom = Constant.FROM_ADMIN;
|
||||
receiveUserFrom = Constant.FROM_ADMIN;
|
||||
} else {
|
||||
sendUserId = handlerUserId;
|
||||
receiveUserId = ticket.getUserId();
|
||||
sendUserFrom = Constant.FROM_ADMIN;
|
||||
receiveUserFrom = Constant.FROM_APP;
|
||||
}
|
||||
VUtils.trueThrowBusinessError(ticketCallJoinService.isInCall(ticket.getId(), receiveUserFrom, receiveUserId)).throwMessage("对方正在通话中");
|
||||
ticketCallService.add(ticketId, sendUserId, sendUserFrom, receiveUserId, receiveUserFrom);
|
||||
AdminUser adminUser = adminUserService.getById(sendUserId);
|
||||
uniPushService.send(new UniPushMessage()
|
||||
.setSenderId("admin-uid-" + handlerUserId)
|
||||
.setReceiverId("app-uid-" + ticket.getUserId())
|
||||
.setSenderId("admin-uid-" + sendUserId)
|
||||
.setReceiverId(receiveUserFrom + "-uid-" + receiveUserId)
|
||||
.setSendData(new UniPushMessageBody()
|
||||
.setTitle("视频通话")
|
||||
.setContent(adminUser.getUserName() + "请求与您视频通话")
|
||||
|
|
@ -1266,8 +1289,11 @@ public class TicketController extends ControllerBase {
|
|||
)
|
||||
)
|
||||
);
|
||||
if (StrUtil.equals(Constant.FROM_APP, ticket.getUserPlatform())) {
|
||||
ssePushService.sendTicketCallToApp(adminUser, ticket.getUserId(), ticketId);
|
||||
ticketCallService.add(ticketId, handlerUserId, ticket.getUserId(), Constant.FROM_ADMIN);
|
||||
} else {
|
||||
ssePushService.sendTicketCallToAdmin(adminUser, receiveUserId, ticketId);
|
||||
}
|
||||
ticketEventPublisher.publishTicketCallBeginEvent(ticketId, adminUser.getUserName());
|
||||
return ApiResult.success();
|
||||
}
|
||||
|
|
@ -1321,12 +1347,12 @@ public class TicketController extends ControllerBase {
|
|||
VUtils.trueThrowBusinessError(Objects.isNull(ticket)).throwMessage("工单不存在");
|
||||
VUtils.trueThrowBusinessError(!Objects.equals(ticket.getState(), TicketState.Processing.getState()))
|
||||
.throwMessage("当前工单状态不允许请求通话");
|
||||
VUtils.trueThrowBusinessError(ticketCallService.isInCall(AdminUserUtil.getUserId()))
|
||||
VUtils.trueThrowBusinessError(ticketCallJoinService.isInCall(ticket.getId(), Constant.FROM_ADMIN, AdminUserUtil.getUserId()))
|
||||
.throwMessage("您已加入别的通话中");
|
||||
ticketCallJoinService.join(ticketId, AdminUserUtil.getUserId(),Constant.FROM_ADMIN);
|
||||
ssePushService.sendTicketCallJoinedToAdmin(AdminUserUtil.getUserId(), ticketId);
|
||||
uniPushService.send(new UniPushMessage()
|
||||
.setSenderId("admin-uid-" + AdminUserUtil.getUserId())
|
||||
.setSenderId("admin-uid-" + AdminUserUtil.getUserId())//不重要
|
||||
.setReceiverId("admin-uid-" + AdminUserUtil.getUserId())
|
||||
.setSendData(new UniPushMessageBody()
|
||||
.setTitle("视频通话")
|
||||
|
|
@ -1348,11 +1374,8 @@ public class TicketController extends ControllerBase {
|
|||
*/
|
||||
@PostMapping("call/hangUp")
|
||||
public ApiResult<Void> hangUp(@Valid @RequestBody TicketCallHangUpRequest request) {
|
||||
TicketCall ticketCall = ticketCallService.lambdaQuery()
|
||||
.eq(TicketCall::getTicketId, request.getTicketId())
|
||||
.ne(TicketCall::getState, 2)
|
||||
.one();
|
||||
boolean flag=false;
|
||||
TicketCall ticketCall = ticketCallService.getLast(request.getTicketId());
|
||||
if (Objects.isNull(ticketCall)) return ApiResult.success();
|
||||
if (request.getReject()) {
|
||||
AdminUser adminUser = adminUserService.getById(AdminUserUtil.getUserId());
|
||||
if (StrUtil.equals(request.getFrom(), "app")) {
|
||||
|
|
@ -1373,7 +1396,6 @@ public class TicketController extends ControllerBase {
|
|||
)
|
||||
);
|
||||
ssePushService.sendTicketCallHangUpToApp(request.getTicketId(), request.getFromUserId(), adminUser);
|
||||
flag=ticketCallJoinService.hangUp(request.getTicketId(), AdminUserUtil.getUserId(), Constant.FROM_ADMIN, true);
|
||||
}else if (StrUtil.equals(request.getFrom(), "admin")) {
|
||||
if (Objects.equals(request.getFromUserId(), AdminUserUtil.getUserId())){
|
||||
Ticket ticket = ticketService.getById(request.getTicketId());
|
||||
|
|
@ -1395,7 +1417,6 @@ public class TicketController extends ControllerBase {
|
|||
)
|
||||
);
|
||||
ssePushService.sendTicketCallCancelToAdmin(request.getTicketId(), handlerId, adminUser);
|
||||
flag=ticketCallService.hangUp(request.getTicketId(), AdminUserUtil.getUserId(), Constant.FROM_ADMIN, true);
|
||||
}else {
|
||||
uniPushService.send(new UniPushMessage()
|
||||
.setSenderId("admin-uid-" + adminUser.getId())
|
||||
|
|
@ -1414,17 +1435,10 @@ public class TicketController extends ControllerBase {
|
|||
)
|
||||
);
|
||||
ssePushService.sendTicketCallHangUpToAdmin(request.getTicketId(), request.getFromUserId(), adminUser);
|
||||
flag=ticketCallJoinService.hangUp(ticketCall.getId(), AdminUserUtil.getUserId(), Constant.FROM_ADMIN, true);
|
||||
}
|
||||
}
|
||||
}else {
|
||||
if (StrUtil.equals(request.getFrom(), "app") || Objects.equals(request.getFromUserId(), AdminUserUtil.getUserId())) {
|
||||
flag=ticketCallService.hangUp(request.getTicketId(), AdminUserUtil.getUserId(), Constant.FROM_ADMIN, false);
|
||||
}else {
|
||||
flag=ticketCallJoinService.hangUp(ticketCall.getId(), AdminUserUtil.getUserId(), Constant.FROM_ADMIN, false);
|
||||
}
|
||||
}
|
||||
if (flag){
|
||||
if (ticketCallJoinService.hangUp(ticketCall.getId(), AdminUserUtil.getUserId(), Constant.FROM_ADMIN, request.getReject())) {
|
||||
ticketEventPublisher.publishTicketCallEndEvent(request.getTicketId());
|
||||
}
|
||||
return ApiResult.success();
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ import com.nflg.mobilebroken.repository.service.ITicketCallService;
|
|||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.slf4j.MDC;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
|
@ -23,6 +24,7 @@ import java.util.List;
|
|||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@ConditionalOnProperty(name = "shengwang.sync.enable", havingValue = "true", matchIfMissing = true)
|
||||
@Component
|
||||
@Slf4j
|
||||
public class ShengWangScheduledTasks {
|
||||
|
|
@ -62,7 +64,7 @@ public class ShengWangScheduledTasks {
|
|||
ticketCallService.updateById(ticketCall);
|
||||
ticketCallJoinService.lambdaUpdate()
|
||||
.set(TicketCallJoin::getState, 2)
|
||||
.set(TicketCallJoin::getHangupTime, LocalDateTime.now())
|
||||
// .set(TicketCallJoin::getHangupTime, LocalDateTime.now())
|
||||
.eq(TicketCallJoin::getCallId, ticketCall.getId())
|
||||
.update();
|
||||
});
|
||||
|
|
@ -80,7 +82,7 @@ public class ShengWangScheduledTasks {
|
|||
if (CollectionUtil.isEmpty(channelUsers)) {
|
||||
ticketCallJoinService.lambdaUpdate()
|
||||
.set(TicketCallJoin::getState, 2)
|
||||
.set(TicketCallJoin::getHangupTime, LocalDateTime.now())
|
||||
// .set(TicketCallJoin::getHangupTime, LocalDateTime.now())
|
||||
.eq(TicketCallJoin::getCallId, ticketCall.getId())
|
||||
.update();
|
||||
} else {
|
||||
|
|
@ -94,7 +96,7 @@ public class ShengWangScheduledTasks {
|
|||
if (CollectionUtil.isNotEmpty(userIds)) {
|
||||
ticketCallJoinService.lambdaUpdate()
|
||||
.set(TicketCallJoin::getState, 2)
|
||||
.set(TicketCallJoin::getHangupTime, LocalDateTime.now())
|
||||
// .set(TicketCallJoin::getHangupTime, LocalDateTime.now())
|
||||
.eq(TicketCallJoin::getCallId, ticketCall.getId())
|
||||
.eq(TicketCallJoin::getFrom, Constant.FROM_APP)
|
||||
.notIn(TicketCallJoin::getUserId, userIds)
|
||||
|
|
@ -110,7 +112,7 @@ public class ShengWangScheduledTasks {
|
|||
if (CollectionUtil.isNotEmpty(userIds)) {
|
||||
ticketCallJoinService.lambdaUpdate()
|
||||
.set(TicketCallJoin::getState, 2)
|
||||
.set(TicketCallJoin::getHangupTime, LocalDateTime.now())
|
||||
// .set(TicketCallJoin::getHangupTime, LocalDateTime.now())
|
||||
.eq(TicketCallJoin::getCallId, ticketCall.getId())
|
||||
.eq(TicketCallJoin::getFrom, Constant.FROM_ADMIN)
|
||||
.notIn(TicketCallJoin::getUserId, userIds)
|
||||
|
|
|
|||
|
|
@ -504,10 +504,10 @@ public class TicketController extends ControllerBase {
|
|||
VUtils.trueThrowBusinessError(!Objects.equals(AppUserUtil.getUserId(), ticket.getUserId()))
|
||||
.throwMessage("不是工单创建人无权限呼叫");
|
||||
Integer handlerUserId = Arrays.stream(ticket.getHandle().split(",")).map(Integer::parseInt).findFirst().get();
|
||||
VUtils.trueThrowBusinessError(ticketCallService.isInCall(handlerUserId)).throwMessage("对方正在通话中");
|
||||
VUtils.trueThrowBusinessError(ticketCallJoinService.isInCall(ticket.getId(), Constant.FROM_ADMIN, handlerUserId)).throwMessage("对方正在通话中");
|
||||
AppUser appUser = appUserService.getById(ticket.getUserId());
|
||||
uniPushService.send(new UniPushMessage()
|
||||
.setSenderId("app-uid-" + ticket.getUserId())
|
||||
.setSenderId(ticket.getUserPlatform()+"-uid-" + ticket.getUserId())
|
||||
.setReceiverId("admin-uid-" + handlerUserId)
|
||||
.setSendData(new UniPushMessageBody()
|
||||
.setTitle("视频通话")
|
||||
|
|
@ -523,7 +523,7 @@ public class TicketController extends ControllerBase {
|
|||
)
|
||||
);
|
||||
ssePushService.sendTicketCallToAdmin(appUser, handlerUserId, ticketId);
|
||||
ticketCallService.add(ticketId, AppUserUtil.getUserId(), handlerUserId, Constant.FROM_APP);
|
||||
ticketCallService.add(ticketId, AppUserUtil.getUserId(),Constant.FROM_APP, handlerUserId, Constant.FROM_ADMIN);
|
||||
ticketEventPublisher.publishTicketCallBeginEvent(ticketId,appUser.getName());
|
||||
return ApiResult.success();
|
||||
}
|
||||
|
|
@ -538,13 +538,17 @@ public class TicketController extends ControllerBase {
|
|||
VUtils.trueThrowBusinessError(Objects.isNull(ticket)).throwMessage("工单不存在");
|
||||
VUtils.trueThrowBusinessError(!Objects.equals(ticket.getState(), TicketState.Processing.getState()))
|
||||
.throwMessage("当前工单状态不允许通话");
|
||||
VUtils.trueThrowBusinessError(ticketCallService.isInCall(AppUserUtil.getUserId()))
|
||||
VUtils.trueThrowBusinessError(!StrUtil.equals(ticket.getUserPlatform(), AppUserUtil.getFrom())
|
||||
|| !Objects.equals(AppUserUtil.getUserId(), ticket.getUserId()))
|
||||
.throwMessage("不是创建人无法加入通话");
|
||||
VUtils.trueThrowBusinessError(ticketCallJoinService.isInCall(ticket.getId(), Constant.FROM_APP, AppUserUtil.getUserId()))
|
||||
.throwMessage("您已加入别的通话中");
|
||||
ticketCallJoinService.join(ticketId, AppUserUtil.getUserId(), Constant.FROM_APP);
|
||||
ssePushService.sendTicketCallJoinedToApp(AppUserUtil.getUserId(), ticketId);
|
||||
Integer handlerUserId = Arrays.stream(ticket.getHandle().split(",")).map(Integer::parseInt).findFirst().get();
|
||||
uniPushService.send(new UniPushMessage()
|
||||
.setSenderId("app-uid-" + AppUserUtil.getUserId())
|
||||
.setReceiverId("app-uid-" + AppUserUtil.getUserId())
|
||||
.setSenderId(ticket.getUserPlatform()+"-uid-" + ticket.getUserId())
|
||||
.setReceiverId("admin-uid-" + handlerUserId)
|
||||
.setSendData(new UniPushMessageBody()
|
||||
.setTitle("视频通话")
|
||||
.setContent("您已加入别的通话中")
|
||||
|
|
@ -565,14 +569,15 @@ public class TicketController extends ControllerBase {
|
|||
*/
|
||||
@PostMapping("call/hangUp")
|
||||
public ApiResult<Void> hangUp(@Valid @RequestBody TicketCallHangUpRequest request) {
|
||||
boolean flag=false;
|
||||
TicketCall ticketCall = ticketCallService.getLast(request.getTicketId());
|
||||
if (request.getReject()) {
|
||||
if (Objects.isNull(ticketCall)) return ApiResult.success();
|
||||
if (StrUtil.equals(request.getFrom(), Constant.FROM_APP) && Objects.equals(AppUserUtil.getUserId(), request.getFromUserId())) {
|
||||
AppUser appUser = appUserService.getById(AppUserUtil.getUserId());
|
||||
Ticket ticket = ticketService.getById(request.getTicketId());
|
||||
int handlerId= Integer.parseInt(StrUtil.split(ticket.getHandle(), ",").stream().findFirst().get());
|
||||
int handlerId = Integer.parseInt(StrUtil.split(ticket.getHandle(), ",").stream().findFirst().get());
|
||||
uniPushService.send(new UniPushMessage()
|
||||
.setSenderId("app-uid-" + appUser.getId())
|
||||
.setSenderId(ticket.getUserPlatform() + "-uid-" + appUser.getId())
|
||||
.setReceiverId("admin-uid-" + handlerId)
|
||||
.setSendData(new UniPushMessageBody()
|
||||
.setTitle("挂断视频通话")
|
||||
|
|
@ -588,11 +593,11 @@ public class TicketController extends ControllerBase {
|
|||
)
|
||||
);
|
||||
ssePushService.sendTicketCallCancelToAdmin(request.getTicketId(), handlerId, appUser);
|
||||
flag=ticketCallService.hangUp(request.getTicketId(), AppUserUtil.getUserId(), Constant.FROM_APP, true);
|
||||
}else {
|
||||
} else {
|
||||
AppUser appUser = appUserService.getById(AppUserUtil.getUserId());
|
||||
Ticket ticket = ticketService.getById(request.getTicketId());
|
||||
uniPushService.send(new UniPushMessage()
|
||||
.setSenderId("app-uid-" + appUser.getId())
|
||||
.setSenderId(ticket.getUserPlatform() + "-uid-" + appUser.getId())
|
||||
.setReceiverId("admin-uid-" + request.getFromUserId())
|
||||
.setSendData(new UniPushMessageBody()
|
||||
.setTitle("拒绝视频通话")
|
||||
|
|
@ -608,10 +613,9 @@ public class TicketController extends ControllerBase {
|
|||
)
|
||||
);
|
||||
ssePushService.sendTicketCallHangUpToAdmin(request.getTicketId(), request.getFromUserId(), appUser);
|
||||
flag=ticketCallJoinService.hangUp(request.getTicketId(), AppUserUtil.getUserId(), Constant.FROM_APP, true);
|
||||
}
|
||||
}
|
||||
if (flag){
|
||||
if (ticketCallJoinService.hangUp(ticketCall.getId(), AppUserUtil.getUserId(), Constant.FROM_APP, request.getReject())) {
|
||||
ticketEventPublisher.publishTicketCallEndEvent(request.getTicketId());
|
||||
}
|
||||
return ApiResult.success();
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
package com.nflg.mobilebroken.push;
|
||||
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.experimental.Accessors;
|
||||
|
|
@ -22,6 +23,8 @@ public class UserSseEmitter extends SseEmitter {
|
|||
|
||||
private Integer ticketId;
|
||||
|
||||
private String id = IdUtil.getSnowflakeNextIdStr();
|
||||
|
||||
private ScheduledFuture<?> heartbeatFuture;
|
||||
|
||||
public UserSseEmitter(){
|
||||
|
|
@ -34,8 +37,8 @@ public class UserSseEmitter extends SseEmitter {
|
|||
this.ticketId = ticketId;
|
||||
}
|
||||
|
||||
public String getUser(){
|
||||
return from + "-" + userId;
|
||||
public String getLable() {
|
||||
return from + "-" + userId + ",连接id:" + id;
|
||||
}
|
||||
|
||||
public void startHeartbeat(TaskScheduler taskScheduler) {
|
||||
|
|
@ -44,7 +47,7 @@ public class UserSseEmitter extends SseEmitter {
|
|||
send(SseEmitter.event().data("ping"));
|
||||
} catch (IOException e) {
|
||||
heartbeatFuture.cancel(true);
|
||||
log.error("sse发送ping数据出错({}):{}", getUser(),e.getMessage());
|
||||
log.error("sse发送ping数据出错({}):{}", getLable(),e.getMessage());
|
||||
completeWithError(e);
|
||||
}
|
||||
}, 30_000);
|
||||
|
|
|
|||
|
|
@ -34,53 +34,53 @@ public class SSEManagerBase {
|
|||
|
||||
protected String from;
|
||||
|
||||
protected void check(){
|
||||
VUtils.trueThrow(IS_SHUTDOWN).throwMessage(STATE.ServiceConnectRefused,"SSE服务已关闭");
|
||||
protected void check() {
|
||||
VUtils.trueThrow(IS_SHUTDOWN).throwMessage(STATE.ServiceConnectRefused, "SSE服务已关闭");
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
protected void shutdown() {
|
||||
IS_SHUTDOWN=true;
|
||||
IS_SHUTDOWN = true;
|
||||
log.warn("准备关闭SSE服务");
|
||||
SSE_EMITTERS.forEach(emitter->{
|
||||
SSE_EMITTERS.forEach(emitter -> {
|
||||
try {
|
||||
emitter.send("因SSE服务关闭,连接即将断开");
|
||||
emitter.complete();
|
||||
}catch (Exception ex){
|
||||
log.error("SSE发送消息失败",ex);
|
||||
} catch (Exception ex) {
|
||||
log.error("SSE发送消息失败", ex);
|
||||
emitter.completeWithError(ex);
|
||||
}
|
||||
});
|
||||
log.warn("SSE服务已关闭");
|
||||
}
|
||||
|
||||
protected SseEmitter connect(Integer ticketId,Integer userId) {
|
||||
protected SseEmitter connect(Integer ticketId, Integer userId) {
|
||||
check();
|
||||
log.info(from + "SSE连接:用户id:" + userId + ",工单id:" + ticketId);
|
||||
UserSseEmitter emitter = new UserSseEmitter(from, userId, ticketId);
|
||||
log.info(from + "SSE连接:用户id:" + userId + ",工单id:" + ticketId + ",连接id:" + emitter.getId());
|
||||
emitter.startHeartbeat(taskScheduler);
|
||||
SSE_EMITTERS.add(emitter);
|
||||
emitter.onError((ex) -> {
|
||||
remove(emitter);
|
||||
log.error("SSE异常({}):{}", ex.getMessage(),emitter.getUser());
|
||||
log.error("SSE异常({}):{}", ex.getMessage(), emitter.getLable());
|
||||
});
|
||||
emitter.onTimeout(() -> {
|
||||
remove(emitter);
|
||||
log.error("SSE超时:"+userId);
|
||||
log.error("SSE超时:" + userId);
|
||||
});
|
||||
emitter.onCompletion(() -> {
|
||||
remove(emitter);
|
||||
log.error("SSE完成:"+userId);
|
||||
log.error("SSE完成:" + userId);
|
||||
});
|
||||
try {
|
||||
log.info("发送连接成功信息:" + emitter.getLable());
|
||||
emitter.send(SseEmitter.event().data("已连接").reconnectTime(5000));
|
||||
}catch (ClientAbortException e){
|
||||
log.error("客户端断开连接:{}", userId);
|
||||
emitter.completeWithError(e);
|
||||
}
|
||||
catch (IOException e) {
|
||||
} catch (ClientAbortException e) {
|
||||
log.error("断开已连接:{}", e.getMessage());
|
||||
emitter.complete();
|
||||
} catch (IOException e) {
|
||||
log.error("sse发送数据出错:{}", e.getMessage());
|
||||
emitter.completeWithError(e);
|
||||
emitter.complete();
|
||||
}
|
||||
return emitter;
|
||||
}
|
||||
|
|
@ -90,19 +90,21 @@ public class SSEManagerBase {
|
|||
List<UserSseEmitter> emitters = SSE_EMITTERS.stream()
|
||||
.filter(s -> Objects.equals(s.getTicketId(), ticketId))
|
||||
.collect(Collectors.toList());
|
||||
if (CollectionUtil.isEmpty(emitters)){
|
||||
if (CollectionUtil.isEmpty(emitters)) {
|
||||
log.info(StrUtil.format(from + "没有用户连接工单:{}", ticketId));
|
||||
return;
|
||||
}
|
||||
emitters.forEach(emitter -> {
|
||||
try {
|
||||
log.info("发送给用户:" + emitter.getLable());
|
||||
emitter.send(SseEmitter.event().name(dto.getType()).data(dto.getData()));
|
||||
log.info("发送成功");
|
||||
} catch (ClientAbortException e) {
|
||||
log.error("客户端断开连接:{}", emitter.getUser());
|
||||
emitter.completeWithError(e);
|
||||
log.error("发送失败,{}", e.getMessage());
|
||||
emitter.complete();
|
||||
} catch (IOException e) {
|
||||
log.error("sse发送数据出错:{}", e.getMessage());
|
||||
emitter.completeWithError(e);
|
||||
log.error("发送失败,sse发送数据出错:{}", e.getMessage());
|
||||
emitter.complete();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
@ -118,21 +120,22 @@ public class SSEManagerBase {
|
|||
}
|
||||
emitters.forEach(emitter -> {
|
||||
try {
|
||||
log.info("发送给用户:" + emitter.getLable());
|
||||
emitter.send(SseEmitter.event().name(dto.getType()).data(dto.getData()));
|
||||
log.info("发送成功");
|
||||
} catch (ClientAbortException e) {
|
||||
log.error("客户端断开连接:{}", emitter.getUser());
|
||||
emitter.completeWithError(e);
|
||||
}catch (IOException e) {
|
||||
log.error("sse发送数据出错:{}", e.getMessage());
|
||||
emitter.completeWithError(e);
|
||||
log.error("发送失败,{}", e.getMessage());
|
||||
emitter.complete();
|
||||
} catch (IOException e) {
|
||||
log.error("发送失败,sse发送数据出错:{}", e.getMessage());
|
||||
emitter.complete();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void remove(UserSseEmitter emitter){
|
||||
private void remove(UserSseEmitter emitter) {
|
||||
SSE_EMITTERS.remove(emitter);
|
||||
emitter.complete();
|
||||
emitter=null;
|
||||
emitter = null;
|
||||
}
|
||||
}
|
||||
|
|
@ -1,7 +1,6 @@
|
|||
package com.nflg.mobilebroken.repository.entity;
|
||||
|
||||
import com.baomidou.mybatisplus.annotation.IdType;
|
||||
import com.baomidou.mybatisplus.annotation.TableField;
|
||||
import com.baomidou.mybatisplus.annotation.TableId;
|
||||
import com.baomidou.mybatisplus.annotation.TableName;
|
||||
import lombok.Getter;
|
||||
|
|
@ -35,16 +34,16 @@ public class TicketCall implements Serializable {
|
|||
*/
|
||||
private Integer ticketId;
|
||||
|
||||
/**
|
||||
* 呼叫来源,app或者admin
|
||||
*/
|
||||
@TableField("`from`")
|
||||
private String from;
|
||||
|
||||
/**
|
||||
* 呼叫人id
|
||||
*/
|
||||
private Integer callerUserId;
|
||||
// /**
|
||||
// * 呼叫来源,app或者admin
|
||||
// */
|
||||
// @TableField("`from`")
|
||||
// private String from;
|
||||
//
|
||||
// /**
|
||||
// * 呼叫人id
|
||||
// */
|
||||
// private Integer callerUserId;
|
||||
|
||||
/**
|
||||
* 状态,0-呼叫中;1-通话中;2-已结束
|
||||
|
|
|
|||
|
|
@ -13,5 +13,5 @@ import com.nflg.mobilebroken.repository.entity.TicketCallJoin;
|
|||
*/
|
||||
public interface TicketCallJoinMapper extends BaseMapper<TicketCallJoin> {
|
||||
|
||||
boolean isInCall(Integer userId);
|
||||
boolean isInCall(Integer ticketId, String from, Integer userId);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ import javax.validation.constraints.NotNull;
|
|||
*/
|
||||
public interface ITicketCallJoinService extends IService<TicketCallJoin> {
|
||||
|
||||
boolean isInCall(Integer userId);
|
||||
boolean isInCall(Integer ticketId, String from, Integer userId);
|
||||
|
||||
void join(@Valid @NotNull Integer ticketId, Integer userId, String from);
|
||||
|
||||
|
|
@ -25,6 +25,4 @@ public interface ITicketCallJoinService extends IService<TicketCallJoin> {
|
|||
boolean hangUp(Integer callId, Integer userId, String from, boolean reject);
|
||||
|
||||
void add(Integer callId, Integer userId, String from);
|
||||
|
||||
void hangUpAll(Integer id);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,9 +15,11 @@ import javax.validation.constraints.NotNull;
|
|||
*/
|
||||
public interface ITicketCallService extends IService<TicketCall> {
|
||||
|
||||
boolean isInCall(Integer userId);
|
||||
// boolean isInCall(Integer userId);
|
||||
|
||||
void add(Integer ticketId, Integer callerUserId, Integer calledUserId, String from);
|
||||
void add(Integer ticketId, Integer callerUserId,String callerUserFrom, Integer calledUserId, String calledUserFrom);
|
||||
|
||||
boolean hangUp(@NotNull Integer ticketId, Integer userId, String from, boolean reject);
|
||||
// boolean hangUp(@NotNull Integer ticketId, Integer userId, String from, boolean reject);
|
||||
|
||||
TicketCall getLast(@NotNull Integer ticketId);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -31,17 +31,14 @@ public class TicketCallJoinServiceImpl extends ServiceImpl<TicketCallJoinMapper,
|
|||
private ITicketCallService ticketCallService;
|
||||
|
||||
@Override
|
||||
public boolean isInCall(Integer userId) {
|
||||
return baseMapper.isInCall(userId);
|
||||
public boolean isInCall(Integer ticketId, String from, Integer userId) {
|
||||
return baseMapper.isInCall(ticketId, from, userId);
|
||||
}
|
||||
|
||||
@Transactional
|
||||
@Override
|
||||
public void join(Integer ticketId, Integer userId, String from) {
|
||||
TicketCall ticketCall = ticketCallService.lambdaQuery()
|
||||
.eq(TicketCall::getTicketId, ticketId)
|
||||
.ne(TicketCall::getState, 2)
|
||||
.one();
|
||||
TicketCall ticketCall = ticketCallService.getLast(ticketId);
|
||||
VUtils.trueThrowBusinessError(Objects.isNull(ticketCall)).throwMessage("加入失败,通话已结束");
|
||||
TicketCallJoin ticketCallJoin = lambdaQuery()
|
||||
.eq(TicketCallJoin::getCallId, ticketCall.getId())
|
||||
|
|
@ -72,20 +69,29 @@ public class TicketCallJoinServiceImpl extends ServiceImpl<TicketCallJoinMapper,
|
|||
.ne(TicketCallJoin::getState, 2)
|
||||
.eq(TicketCallJoin::getUserId, userId)
|
||||
.eq(TicketCallJoin::getFrom, from)
|
||||
.orderByDesc(TicketCallJoin::getId)
|
||||
.last("limit 1")
|
||||
.one();
|
||||
boolean flag = false;
|
||||
if (Objects.nonNull(ticketCallJoin)) {
|
||||
ticketCallJoin.setState(2);
|
||||
if (!reject) {
|
||||
if (reject) {
|
||||
ticketCallJoin.setHangupTime(LocalDateTime.now());
|
||||
}
|
||||
updateById(ticketCallJoin);
|
||||
if (StrUtil.equals(from, Constant.FROM_APP)) {
|
||||
lambdaUpdate()
|
||||
.set(TicketCallJoin::getState, 2)
|
||||
.set(!reject, TicketCallJoin::getHangupTime, LocalDateTime.now())
|
||||
.set(reject, TicketCallJoin::getHangupTime, LocalDateTime.now())
|
||||
.eq(TicketCallJoin::getCallId, callId)
|
||||
.ne(TicketCallJoin::getFrom, from)
|
||||
.update();
|
||||
} else {
|
||||
lambdaUpdate()
|
||||
.set(TicketCallJoin::getState, 2)
|
||||
.set(reject, TicketCallJoin::getHangupTime, LocalDateTime.now())
|
||||
.eq(TicketCallJoin::getCallId, callId)
|
||||
.eq(TicketCallJoin::getUserId, userId)
|
||||
.eq(TicketCallJoin::getFrom, from)
|
||||
.update();
|
||||
}
|
||||
if (!lambdaQuery()
|
||||
|
|
@ -94,7 +100,7 @@ public class TicketCallJoinServiceImpl extends ServiceImpl<TicketCallJoinMapper,
|
|||
.exists()){
|
||||
ticketCallService.lambdaUpdate()
|
||||
.set(TicketCall::getState, 2)
|
||||
.set(!reject,TicketCall::getHangupTime, LocalDateTime.now())
|
||||
.set(reject,TicketCall::getHangupTime, LocalDateTime.now())
|
||||
.eq(TicketCall::getId, callId)
|
||||
.update();
|
||||
flag = true;
|
||||
|
|
@ -120,12 +126,4 @@ public class TicketCallJoinServiceImpl extends ServiceImpl<TicketCallJoinMapper,
|
|||
);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void hangUpAll(Integer id) {
|
||||
lambdaUpdate()
|
||||
.set(TicketCallJoin::getState, 2)
|
||||
.eq(TicketCallJoin::getCallId, id)
|
||||
.update();
|
||||
}
|
||||
}
|
||||
|
|
@ -1,8 +1,6 @@
|
|||
package com.nflg.mobilebroken.repository.service.impl;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import com.nflg.mobilebroken.common.constant.Constant;
|
||||
import com.nflg.mobilebroken.repository.entity.TicketCall;
|
||||
import com.nflg.mobilebroken.repository.entity.TicketCallJoin;
|
||||
import com.nflg.mobilebroken.repository.mapper.TicketCallMapper;
|
||||
|
|
@ -13,7 +11,6 @@ import org.springframework.transaction.annotation.Transactional;
|
|||
|
||||
import javax.annotation.Resource;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
|
|
@ -29,54 +26,68 @@ public class TicketCallServiceImpl extends ServiceImpl<TicketCallMapper, TicketC
|
|||
@Resource
|
||||
private ITicketCallJoinService ticketCallJoinService;
|
||||
|
||||
@Override
|
||||
public boolean isInCall(Integer userId) {
|
||||
return lambdaQuery()
|
||||
.eq(TicketCall::getState, 1)
|
||||
.eq(TicketCall::getCallerUserId, userId)
|
||||
.exists()
|
||||
||
|
||||
ticketCallJoinService.isInCall(userId);
|
||||
}
|
||||
// @Override
|
||||
// public boolean isInCall(Integer userId) {
|
||||
// return lambdaQuery()
|
||||
// .eq(TicketCall::getState, 1)
|
||||
// .eq(TicketCall::getCallerUserId, userId)
|
||||
// .exists()
|
||||
// ||
|
||||
// ticketCallJoinService.isInCall(userId);
|
||||
// }
|
||||
|
||||
@Transactional
|
||||
@Override
|
||||
public void add(Integer ticketId, Integer callerUserId, Integer calledUserId, String from) {
|
||||
public void add(Integer ticketId, Integer callerUserId,String callerUserFrom, Integer calledUserId, String calledUserFrom) {
|
||||
TicketCall ticketCall = new TicketCall()
|
||||
.setTicketId(ticketId)
|
||||
.setCallerUserId(callerUserId)
|
||||
// .setCallerUserId(callerUserId)
|
||||
.setState(0)
|
||||
.setFrom(from)
|
||||
// .setFrom(callerUserFrom)
|
||||
.setCreateTime(LocalDateTime.now());
|
||||
save(ticketCall);
|
||||
ticketCallJoinService.save(new TicketCallJoin()
|
||||
.setCallId(ticketCall.getId())
|
||||
.setFrom(StrUtil.equals(from, Constant.FROM_APP) ? Constant.FROM_ADMIN : Constant.FROM_APP)
|
||||
.setFrom(callerUserFrom)
|
||||
.setUserId(callerUserId)
|
||||
.setState(0)
|
||||
.setCreateTime(LocalDateTime.now())
|
||||
);
|
||||
ticketCallJoinService.save(new TicketCallJoin()
|
||||
.setCallId(ticketCall.getId())
|
||||
.setFrom(calledUserFrom)
|
||||
.setUserId(calledUserId)
|
||||
.setState(0)
|
||||
.setCreateTime(LocalDateTime.now())
|
||||
);
|
||||
}
|
||||
|
||||
// @Override
|
||||
// public boolean hangUp(Integer ticketId, Integer userId, String from, boolean reject) {
|
||||
// TicketCall ticketCall = getLast(ticketId);
|
||||
// boolean flag = false;
|
||||
// if (Objects.nonNull(ticketCall)){
|
||||
// if ((Objects.equals(ticketCall.getCallerUserId(), userId) && StrUtil.equals(ticketCall.getFrom(), Constant.FROM_APP))
|
||||
// || ticketCallJoinService.allIsHangUp(ticketCall.getId())){
|
||||
// ticketCall.setState(2);
|
||||
// ticketCallJoinService.hangUpAll(ticketCall.getId());
|
||||
// if (reject) {
|
||||
// ticketCall.setHangupTime(LocalDateTime.now());
|
||||
// }
|
||||
// updateById(ticketCall);
|
||||
// }
|
||||
// flag=ticketCallJoinService.hangUp(ticketCall.getId(), userId, from, reject);
|
||||
// }
|
||||
// return flag;
|
||||
// }
|
||||
|
||||
@Override
|
||||
public boolean hangUp(Integer ticketId, Integer userId, String from, boolean reject) {
|
||||
TicketCall ticketCall = lambdaQuery()
|
||||
public TicketCall getLast(Integer ticketId) {
|
||||
return lambdaQuery()
|
||||
.eq(TicketCall::getTicketId, ticketId)
|
||||
.ne(TicketCall::getState, 2)
|
||||
.orderByDesc(TicketCall::getId)
|
||||
.last("limit 1")
|
||||
.one();
|
||||
boolean flag = false;
|
||||
if (Objects.nonNull(ticketCall)){
|
||||
if ((Objects.equals(ticketCall.getCallerUserId(), userId) && StrUtil.equals(ticketCall.getFrom(), Constant.FROM_APP))
|
||||
|| ticketCallJoinService.allIsHangUp(ticketCall.getId())){
|
||||
ticketCall.setState(2);
|
||||
ticketCallJoinService.hangUpAll(ticketCall.getId());
|
||||
if (!reject) {
|
||||
ticketCall.setHangupTime(LocalDateTime.now());
|
||||
}
|
||||
updateById(ticketCall);
|
||||
}
|
||||
flag=ticketCallJoinService.hangUp(ticketCall.getId(), userId, from, reject);
|
||||
}
|
||||
return flag;
|
||||
}
|
||||
}
|
||||
|
|
@ -7,6 +7,6 @@
|
|||
INNER JOIN ticket_call_join tcj ON tc.id = tcj.call_id
|
||||
WHERE tc.state = 1
|
||||
AND tcj.state = 1
|
||||
AND tcj.user_id = #{userId}
|
||||
AND tcj.user_id = #{userId} and tcj.from = #{from} and tc.ticket_id = #{ticketId}
|
||||
</select>
|
||||
</mapper>
|
||||
|
|
|
|||
Loading…
Reference in New Issue