程序员子龙(Java面试 + Java学习) 程序员子龙(Java面试 + Java学习)
首页
学习指南
工具
开源项目
技术书籍

程序员子龙

Java 开发从业者
首页
学习指南
工具
开源项目
技术书籍
  • 基础

  • JVM

  • Spring

  • 并发编程

  • Mybatis

  • 网络编程

    • Netty 入门
    • Netty中的Option和ChildOption参数解析
    • Netty ByteBuf介绍
    • Netty 心跳机制
    • Kryo 的序列化和序列化
    • Netty粘包拆包
    • Netty 编解码器
    • 网络编程IO模式
    • Netty TCP长连接集群方案
      • 序列化和反序列化
      • 使用 UDP 的 Socket API 实现服务端
      • Netty向客户端发送及接收16进制数据
      • Spring Boot与Netty的完美结合:打造高性能网络通信
    • 数据库

    • 缓存

    • 设计模式

    • 分布式

    • 高并发

    • SpringBoot

    • SpringCloudAlibaba

    • Nginx

    • 面试

    • 生产问题

    • 系统设计

    • 消息中间件

    • Java
    • 网络编程
    程序员子龙
    2024-01-29
    目录

    Netty TCP长连接集群方案

    使用 Netty 自定义协议连接物联网设备,业务增大之后,势必需要使用集群方案。

    # nginx负载均衡

    Nginx 1.9 已经支持 TCP 代理和负载均衡,并可以通过一致性哈希算法将连接均匀的分配到所有的服务器上。

    对于已经安装nginx的,检查是否编译时带with-stream参数,#有with-stream参数,可以代理tcp协议

    nginx -V |grep with-stream

    修改配置文件

    http{
      ...
    }
    
    stream{
    	  upstream cloudsocket {
    
            hash $remote_addr consistent;
    
            server 127.0.0.1:3000 weight=5 max_fails=3 fail_timeout=30s;
    
            server 27.196.3.228:4000 weight=5 max_fails=3 fail_timeout=30s; 
    
         }
    	 
    	  server {
            listen       8080;
    		proxy_pass cloudsocket;
    	 }
    
    
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22

    注意:stream和http是平级的。

    #重启
    ./nginx -s reload
    #检查配置文件语法是否正确
    ./nginx -t
    #停止
    ./nginx -s stop
    
    1
    2
    3
    4
    5
    6

    经过测试可以发现,设备上报的数据分配到不同服务器上。

    window 10,nginx配置后,本地可以访问,局域网机器其他访问不了

    1、防火墙问题

    打开防火墙,允许nginx (opens new window),并且的专用和公用的网络都允许访问。

    # 长连接处理

    在物联网中,设备和服务器之间是可以互相通信的,也就是说设备可以向服务器上报数据,服务器也可以向设备下发指令。由于设备和服务网之间是长连接,下发指令和接收设备上传数据的服务器只能是同一台服务器,因为只有它们之间建立了连接通道。

    我们可以使用map保存设备和ChannelHandlerContext映射关系。

    
    	/**
    	 * 用来保存对应的设备-channel
    	 */
    	private  static Map<String, ChannelHandlerContext> channelMap = new ConcurrentHashMap<>();
    
    	/**
    	 * 用来标记channel当连接断开时要清除channelMap中的记录
    	 */
    	private static Map<ChannelHandlerContext, String> mark = new ConcurrentHashMap<>();
    
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11

    在设备连接、断开时候更新channelMap。

    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
    
    	boolean containsKey = ServerHandler.channelMap.containsKey(deviceId);
    
    			// 设备id和通道建立关系
    			if (!containsKey) {
    				ServerHandler.channelMap.put(deviceId, ctx);
    				ServerHandler.mark.put(ctx, deviceId);
    			}
    
    }
    
    
    	/**
    	 * 客户端与服务端断开连接时调用
    	 */
    	@Override
    	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    		boolean containsKey = ServerHandler.mark.containsKey(ctx);
    		if (containsKey) {
    			String code = ServerHandler.mark.get(ctx);
    			ServerHandler.channelMap.remove(code, ctx);
    			ServerHandler.mark.remove(ctx);
    		}
    	}
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25

    # 指令下发消息处理

    可以通过redis发布/订阅模式实现。将消息 pub 到 redis 集群中,而所有集群中的服务器都 sub 这个 redis 集群,一旦有消息,所有的服务器都会消费消息,保持连接的服务器会处理消息。

    
    	/**
    	 * 向设备发送消息
    	 *
    	 * @param deviceId 设备id
    	 * @param msg  信息
    	 */
    	public static void send(String deviceId, Object msg) {
    
    		if (ServerHandler.channelMap.containsKey(deviceId)) {
    			ChannelHandlerContext handlerContext = ServerHandler.channelMap.get(deviceId);
    			if(handlerContext.channel().isActive()){
    				ChannelFuture channelFuture = handlerContext.writeAndFlush(msg);
    				//操作完成后通知注册一个 ChannelFutureListener
    				channelFuture.addListener((future) -> {
    					if (channelFuture.isSuccess()) {
    						//发送消息操作成功
    						log.info("指令下发成功");
    					} else {
    						//发送消息操作异常
    						Throwable cause = channelFuture.cause();
    						log.error("sendMSG "+msg+" err:",cause);
    						throw new BaseException(cause.getMessage());
    					}
    				});
    			}else {
    				ServerHandler.channelMap.remove(deviceId);
    			}
    
    		} else {
    			log.error("-------设备 {} 已经断开连接-------",deviceId);
    			throw new BaseException(deviceId + "设备已经断开连接");
    		}
    	}
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    上次更新: 2024/03/30, 14:20:04
    网络编程IO模式
    序列化和反序列化

    ← 网络编程IO模式 序列化和反序列化→

    最近更新
    01
    一个注解,优雅的实现接口幂等性
    11-17
    02
    MySQL事务(超详细!!!)
    10-14
    03
    阿里二面:Kafka中如何保证消息的顺序性?这周被问到两次了
    10-09
    更多文章>
    Theme by Vdoing | Copyright © 2024-2024

        辽ICP备2023001503号-2

    • 跟随系统
    • 浅色模式
    • 深色模式
    • 阅读模式