Redis 发布订阅模式
前言
项目中新增在线咨询功能,使用netty框架实现。代码开发完毕在本地调试没有问题,放到sit环境测试中出现部分消息无法接收到的情况,排查后定位到问题是由于后端项目集群部署导致,记录一下处理过程。
原因分析
netty框架中客户端与服务端建立的websocket链接后会在当前服务器产生一个channel通道,后续的发送消息以及接收消息都是通过这个通道来实现,出现消息发送失败问题的原因是后端项目集群部署会有多个应用服务器,建立websocket链接的应用服务器可能和发送消息的应用服务器不是同一个。比如用户A与用户B与应用服务器A建立了websocket链接,应用服务器A中存在两个用户的链接通道,但是当用户A给用户B发送消息的时候,这次发送请求被负载均衡转发到了应用服务器B上面,应用服务器B中并没有存在用户B的通道,就没有办法给用户B推送消息。
解决方案
通过redis的发布订阅模式解决(MQ同理),当用户A给用户B发送消息时,接收到发送请求的应用服务器向redis的指定消息队列中添加参数,无需处理发送。同时所有应用服务器监听此队列,只要队列发生了变化,监听器就能获取到队列中的参数,获取到消息后在监听器中判断当前服务器中是否存在用户B的通道,如果存在了则使用该通道给用户B推送消息,否则不处理即可。只要用户B在线,则其只可能与一台应用服务器建立了通道,也就是redis中的队列消息必定会被与用户B建立了链接的应用服务器监听到,消息自然就能推送出去了。如果都不存在则说明用户B不在线,再进行其他业务逻辑处理即可。
代码实现
-
建立两个集合,用来存在channel通道以及用户与通道的对应关系。
/** * 存储每个客户端接入进来时的channel对象 * 主要用于使用writeAndFlush方法广播信息 */ public static ChannelGroup imsChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); /** * 用于客户端和服务端握手时存储用户id和netty ChannelId对应关系 */ public static Map<String, ChannelId> channelMap = new ConcurrentHashMap<>();
-
建立链接时,将相关信息存入集合中。
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 省略部分代码... Channel channel = ctx.channel(); imsChannelGroup.add(channel); // 这里的userid为自定义,标记消息接收方用户唯一即可 channelMap.put(userid, channel.id()); }
-
发送消息时,向redis的指定通道队列中添加参数(使用redisTemplate)。
@Override protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) { // 省略部分代码... // 组装放入redis队列中的参数,根据业务逻辑自定义 JSONObject jsonObject = new JSONObject(); // 将唯一标识放入参数中,确保其他应用服务器能够通过用户唯一标识获取到消息接收方的channel jsonObject.putOpt("userid", userid); // 其他业务参数 jsonObject.putOpt("xxx", xxx); // 第一个参数为redis的消息队列名称 redisTemplate.convertAndSend("imstopic", jsonObject); }
-
添加监听器,监听队列消息,进行逻辑处理(可以根据传入的其他业务参数进行业务逻辑处理)。
@Component public class ImsListenerAdapter implements MessageListener { @Override public void onMessage(Message message, byte[] bytes) { // 获取到队列中的消息 String jsonStr = JSONUtil.formatJsonStr(new String(message.getBody())); JSONObject jsonObject = JSONUtil.parseObj(jsonStr); // 根据userid判断当前应用服务器是否存在消息接收方的channel String userid = jsonObject.getStr("userid"); ChannelId channelId = channelMap.get(userid); if (channelId == null || imsChannelGroup.find(channelId) == null) { // 不存在不做处理 return; } // 存在,根据消息接收方的channel发送消息 Channel toUserChannel = imsChannelGroup.find(channelId); // TextWebSocketFrame的构造参数为字符串,这里因为业务需要所以发送了json格式消息,由前端将json中的文本拿出来显示,如果不需要也可以直接在构造中传入消息文本字符串 toUserChannel.writeAndFlush(new TextWebSocketFrame(JSONUtil.toJsonStr(xxx))); } }
-
添加redis配置,配置监听器监听redis消息队列。
@Configuration public class RedisCacheConfig { /** * 订阅消息队列配置 */ @Bean RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter imsMessageListenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(redisConnectionFactory); // 可以添加多个messageListener,配置不同的交换机 container.addMessageListener(imsMessageListenerAdapter, new PatternTopic("imstopic")); return container; } /** * 实例化监听器 */ @Bean MessageListenerAdapter imsMessageListenerAdapter() { return new MessageListenerAdapter(new ImsListenerAdapter()); } }
扩展
消息发送中其实经常存在对方不在线的情况,为了防止消息丢失,在逻辑上做了以下处理:
- 在第3步往redis消息队列插入数据之前,先将这条消息存入数据库中,状态置为未发送,将表中消息id一起放入队列参数中。
- 在第5步判断当前应用服务器是否存在消息接收方channel时,从队列参数中获取消息id,根据id从数据库中查出消息内容,然后调用writeAndFlush方法发送消息。消息发送成功后,将表中该条消息置为已发送。
- 如果对方不在线,设置一个定时任务查询未发送的消息,通过其他方式提醒给用户,如微信公众号消息提醒等。这个查询时记得设置时间间隔,比如当前时间10s前的数据,防止将刚插入库的、监听器还未处理完的消息查询出来。
在做上述逻辑处理过程中还遇到了另外一个问题,在监听器中查询消息以及更新消息时,需要使用dao层方法,使用@Autowired注入dao层对象时发现dao层对象为null。百度查了一下发现在监听器中无法注入bean,于是找了其他办法来解决这个问题,后面会单独写一篇文章来记录。