程序员开发实例大全宝库

网站首页 > 编程文章 正文

springboot集成websocket点对点推送、广播推送

zazugpt 2024-09-06 01:52:44 编程文章 28 ℃ 0 评论

一、什么都不用说,导入个依赖先


<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

二、推送到前端的消息实体类


import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
 
 
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class NotifyBean<T> implements Serializable {
 
 private static final long serialVersionUID = 1L;
 
 private int type;
 private String message;
 private T data;
 
}

三、因为要实现点对点的推送,所以需要创建一个监听器来获取到websocket的session,如下:

如果想学习Java工程化、高性能及分布式、深入浅出。微服务、Spring,MyBatis,Netty源码分析的朋友可以加我的Java高级交流:854630135,群里有阿里大牛直播讲解技术,以及Java大型互联网技术的视频免费分享给大家。


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.web.socket.messaging.SessionConnectEvent;
 
 
 
public class STOMPConnectEventListener implements ApplicationListener<SessionConnectEvent> {
 
 @Autowired
 private RedisHelper redisHelper;
 
 @Override
 public void onApplicationEvent(SessionConnectEvent event) {
 StompHeaderAccessor sha = StompHeaderAccessor.wrap(event.getMessage());
 //login get from browser
 if(sha.getNativeHeader("userid")==null){
 return;
 }
 String userid = sha.getNativeHeader("userid").get(0);
 String sessionId = sha.getSessionId();
 redisHelper.redisTemplate.opsForValue().set("websocket:"+userid,sessionId);
 }
}

四、最重要的配置类


import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptorAdapter;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.web.socket.config.annotation.*;
 
 
@Configuration
@EnableWebSocketMessageBroker
@Slf4j
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
 
 //STOMP监听类
 @Bean
 public STOMPConnectEventListener applicationStartListener(){
 return new STOMPConnectEventListener();
 }
 
 
 @Override
 public void registerStompEndpoints(StompEndpointRegistry stompEndpointRegistry) {
 //建立连接端点,注册一个STOMP的协议节点,并指定使用SockJS协议
 stompEndpointRegistry.addEndpoint("/nmpSocketWeb")
 .setAllowedOrigins("*")
 .withSockJS();
 }
 
 
 @Override
 public void configureMessageBroker(MessageBrokerRegistry messageBrokerRegistry) {
 //配置消息代理(MessageBroker)。
 messageBrokerRegistry.enableSimpleBroker("/topic");// 推送消息前缀
 messageBrokerRegistry.setApplicationDestinationPrefixes("/app");// 应用请求前缀,前端发过来的消息将会带有“/app”前缀。
 }
 
 @Override
 public void configureClientInboundChannel(ChannelRegistration registration) {
 //token认证
 registration.setInterceptors(new ChannelInterceptorAdapter() {
 @Override
 public Message<?> preSend(Message<?> message, MessageChannel channel) {
 StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
 if (StompCommand.CONNECT.equals(accessor.getCommand()) || StompCommand.SEND.equals(accessor.getCommand())) {
 String token = accessor.getFirstNativeHeader("token");
 try {
 tokenValidate(token);
 } catch (Exception e) {
 log.error(e.toString());
 return null;
 }
 }
 return message;
 }
 });
 }
 
 
 public boolean tokenValidate(String token) throws Exception {
 if (token == null || token.isEmpty()) {
 throw new Exception("webSocket:token为空!");
 }
 if (JwtUtil.validateToken(token)==null) {
 throw new Exception("webSoc:token无效!");
 }
 return true;
 }
 
}

代码中有详细的解释,认真看可以看明白的。

五、controller


import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
 
 
 
@Api(tags="WebSocket控制器",description="WebSocket控制器")
@Controller
@RequestMapping(value = "/webSocket")
public class WebSocketController extends BaseController {
 
 @Autowired
 private SimpMessagingTemplate simpMessagingTemplate;
 
 @Autowired
 private RedisHelper redisHelper;
 
 
 @ApiOperation(value = "测试主动发送消息", notes = "测试主动发送消息", httpMethod = "GET")
 @RequestMapping(value = "/sendMsg")
 @ResponseBody
 public void sendMsg(){
 System.out.println("测试主动发送消息");
 NotifyBean notifyBean = NotifyBean.builder().message("服务器给你发消息啦!").build();
 simpMessagingTemplate.convertAndSend(WebConstant.WEB_SC_TOPIC_NOTIFY,notifyBean);
 }
 
 
 @MessageMapping("/test") //当浏览器向服务端发送请求时,通过@MessageMapping映射/welcome这个地址,类似于@ResponseMapping
 @SendTo(WebConstant.WEB_SC_TOPIC_NOTIFY)//当服务器有消息时,会对订阅了@SendTo中的路径的浏览器发送消息
 public NotifyBean test(UserVo userVo) {
 try {
 //睡眠1秒
 Thread.sleep(1000);
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 NotifyBean notifyBean = NotifyBean.builder().message("welcome!"+ userVo.getName()).build();
 return notifyBean;
 }
 
 /**
 * 点对点发送消息demo
 * 根据用户key发送消息
 * @param userVo
 * @return
 * @throws Exception
 */
 @MessageMapping("/test/toOne")
 public void toOne(UserVo userVo) throws Exception {
 String sessionId=(String)redisHelper.redisTemplate.opsForValue().get("websocket:"+userVo.getId());
 NotifyBean notifyBean = NotifyBean.builder().message("welcome!"+ userVo.getName()).build();
 //convertAndSendToUser该方法会在订阅路径前拼接"/user",所以前端订阅的路径全路径是"/user/topic/notify"
 simpMessagingTemplate.convertAndSendToUser(sessionId, WebConstant.WEB_SC_TOPIC_NOTIFY,notifyBean,createHeaders(sessionId));
 }
 
 private MessageHeaders createHeaders(String sessionId) {
 SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
 headerAccessor.setSessionId(sessionId);
 headerAccessor.setLeaveMutable(true);
 return headerAccessor.getMessageHeaders();
 }
 
}

六、前端页面

如果想学习Java工程化、高性能及分布式、深入浅出。微服务、Spring,MyBatis,Netty源码分析的朋友可以加我的Java高级交流:854630135,群里有阿里大牛直播讲解技术,以及Java大型互联网技术的视频免费分享给大家。

<!DOCTYPE html>
<html xmlns:th="http://www.thymeleaf.org">
<link lang="en" xmlns:th="http://www.w3.org/1999/xhtml"></link>
<link href="/webjars/bootstrap/3.3.7/css/bootstrap.min.css" rel="stylesheet"></link>
<head>
 <script th:src="@{sockjs.min.js}"></script>
 <script th:src="@{stomp.min.js}"></script>
 <script th:src="@{jquery-1.11.3.min.js}"></script>
</head>
<body>
<blockquote class="layui-elem-quote">/user/topic-message</blockquote>
 
<div id="main-content" class="container">
 <div class="row">
 <div class="col-md-6">
 <form class="form-inline">
 <div class="form-group">
 <label for="connect">WebSocket connection:</label>
 <button id="connect" class="btn btn-default" type="submit">Connect</button>
 <button id="disconnect" class="btn btn-default" type="submit" disabled="disabled">Disconnect
 </button>
 
 </div>
 </form>
 </div>
 <div class="col-md-6">
 <form class="form-inline">
 <div class="form-group">
 <label for="name">What is your name?</label>
 <input type="text" id="name" class="form-control" placeholder="Your name here..."></input>
 </div>
 <button id="send" class="btn btn-default" type="submit">Send</button>
 </form>
 </div>
 </div>
 <div class="row">
 <div class="col-md-12">
 <table id="conversation" class="table table-striped">
 <thead>
 <tr>
 <th>Greetings</th>
 </tr>
 </thead>
 <tbody id="greetings">
 </tbody>
 </table>
 </div>
 <div id="message"></div>
 </div>
</div>
 
<script>
 // /msg/sendcommuser
 var stompClient = null;
 //传递用户key值
 var login = "ricky";
 function setConnected(connected) {
 $("#connect").prop("disabled", connected);
 $("#disconnect").prop("disabled", !connected);
 if (connected) {
 $("#conversation").show();
 }
 else {
 $("#conversation").hide();
 }
 $("#greetings").html("");
 }
 
 function connect() {
 var socket = new SockJS('/nmpSocketWeb');
 stompClient = Stomp.over(socket);
 stompClient.connect({login:login}, function (frame) {
 setConnected(true);
 console.log('Connected: ' + frame);
 stompClient.subscribe('/user/topic/greetings', function (greeting) {
 setMessageInnerHTML(JSON.parse(greeting.body).message);
 console.log(JSON.parse(greeting.body).message)
 });
 });
 }
 
 function disconnect() {
 if (stompClient != null) {
 stompClient.disconnect();
 }
 setConnected(false);
 console.log("Disconnected");
 }
 
 function sendName() {
 stompClient.send("/app/test/toOne", {}, JSON.stringify({'name': $("#name").val(),'id':'ricky'}));
 }
 
 
 function showGreeting(message) {
 $("#greetings").append("<tr><td>" + message + "</td></tr>");
 }
 
 $(function () {
 $("form").on('submit', function (e) {
 e.preventDefault();
 });
 $( "#connect" ).click(function() { connect(); });
 $( "#disconnect" ).click(function() { disconnect(); });
 $( "#send" ).click(function() { sendName(); });
 });
 
 //将消息显示在网页上
 function setMessageInnerHTML(innerHTML){
 console.log(innerHTML);
 document.getElementById('message').innerHTML += innerHTML + '<br/>';
 }
</script>
</body>
</html>

最好,来试试点对点推送。

第一个页面:

第二个页面:

可以看到,后台推送的消息只有一个页面接收到,完事!

如果想学习Java工程化、高性能及分布式、深入浅出。微服务、Spring,MyBatis,Netty源码分析的朋友可以加我的Java高级交流:854630135,群里有阿里大牛直播讲解技术,以及Java大型互联网技术的视频免费分享给大家。

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表