程序员开发实例大全宝库

网站首页 > 编程文章 正文

如何构建支持 5 万设备在线的高性能 TCP 数据转发平台

zazugpt 2025-06-12 19:03:55 编程文章 11 ℃ 0 评论

《Netty 架构实战系列(一):如何构建支持 5 万设备在线的高性能 TCP 数据转发平台》

目标读者:后端开发工程师、中高级架构师 关键词:Netty、TCP转发、双向通信、高并发、设备连接管理、动态连接、Redis、Kafka


  1. 引言

在物联网与大规模设备管理系统中,设备数据实时上报与双向通信是整个系统的命脉。以电池设备为例,这类设备定时通过 TCP 协议上传数据到平台,而平台需要将数据分发到各个租户服务端,同时也要将租户服务端的下行指令返回给设备。这就形成了一个闭环的数据流转链路。

当设备数达到 5 万甚至更高时,如何保证数据传输的及时性、系统的高并发处理能力以及动态连接管理的高效性,就成为架构师必须解决的核心问题。本文将以实际案例出发,详细介绍如何利用 Netty 实现一个高性能、支持海量设备连接的 TCP 数据转发平台。整个方案既包含了上行数据转发,也涵盖了下行数据响应的完整闭环,且代码实现完整、功能覆盖,便于大家直接在项目中参考应用。


  1. 系统整体架构概览

整个系统主要分为以下几个关键模块:

  • 设备层:电池设备通过 TCP 协议上传数据,建立长连接进行数据传输。
  • 协议平台服务:基于 Netty 构建 TCP Server,负责接入设备数据,解析设备 ID(devId),并管理设备连接。业务层负责将设备数据转发给租户客户端,再通过租户客户端将数据发送至租户服务端;同时,租户服务端下行返回数据时,经过租户客户端再回传给设备。
  • 租户客户端:动态建立与租户服务端的 TCP 连接,保证数据在上行和下行中得到传递。
  • 异常与告警:在设备连接或数据转发过程中出现异常时,平台会立即触发告警(详细方案将在下一篇文章讨论)。
  • 后台系统:(后续文章中详细讨论)主要负责从 Kafka 中消费告警消息,并将其写入 TDengine,同时触发多渠道通知。

下图展示了整体数据流转架构(图中包括上行、下行闭环流程):(支持 Mermaid 渲染):
https://mermaid-live.nodejs.cn/

graph TD
    subgraph 设备层
        A[电池设备上传数据]
    end
    subgraph 协议平台服务
        A --> B[Netty TCP Server]
        B --> C[业务处理 & 数据转发]
        C --> D[Tenant客户端连接器]
        D --> E[租户服务端]
        E --> D
        D --> C
        C --> A
        B -- 异常/告警 --> F[Kafka告警消息]
    end

各模块职责说明如下:

  • Netty TCP Server:建立设备连接、进行心跳保活、解析设备上传数据。
  • 业务处理层(BatteryService):解析消息、注册设备、调用租户客户端进行数据转发。
  • 设备连接管理(BatterySessionManager):将设备的 Channel 存入内存,以便后续上行与下行数据匹配。
  • 租户客户端(TenantClientManager/Handler):根据设备对应租户信息动态创建连接,并通过该连接实现数据转发和下行回传。

  1. 关键模块详细代码与调用闭环

下面介绍每个模块的完整代码。

3.1 Netty TCP 服务与设备数据接入

3.1.1 BatteryNettyServer.java

负责启动 TCP 服务并初始化 Pipeline。

package com.example.battery.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Component
public class BatteryNettyServer {
    public void start(int port) {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                     .channel(NioServerSocketChannel.class)
                     .childHandler(new ChannelInitializer<SocketChannel>() {
                         @Override
                         protected void initChannel(SocketChannel ch) {
                             ChannelPipeline pipeline = ch.pipeline();
                             // 加入空闲检测,180秒内无数据则关闭连接
                             pipeline.addLast(new IdleStateHandler(180, 0, 0, TimeUnit.SECONDS));
                             pipeline.addLast(new HeartbeatHandler());
                             // 处理设备上报数据
                             pipeline.addLast(new BatteryDataHandler());
                         }
                     })
                     .option(ChannelOption.SO_BACKLOG, 128)
                     .childOption(ChannelOption.SO_KEEPALIVE, true);
            ChannelFuture future = bootstrap.bind(port).sync();
            System.out.println("Battery Netty Server started on port: " + port);
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

3.1.2 HeartbeatHandler.java

用于检测空闲连接,以防长时间无数据占用资源。

package com.example.battery.netty;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE) {
                System.out.println("Channel idle too long, closing: " + ctx.channel().remoteAddress());
                ctx.close();
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}

3.1.3 BatteryDataHandler.java

实现设备数据接收、设备注册与数据转发的入口;在这里调用了业务层逻辑进行数据处理。

package com.example.battery.netty;
import com.example.battery.session.BatterySessionManager;
import com.example.battery.service.BatteryService;
import com.example.battery.alarm.util.AlarmUtils;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.AttributeKey;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class BatteryDataHandler extends SimpleChannelInboundHandler<String> {
    @Autowired
    private BatteryService batteryService;
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Device connected: " + ctx.channel().remoteAddress());
        super.channelActive(ctx);
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        // 假设上报数据为 JSON 格式,通过自定义工具解析出设备ID
        String devId = DeviceUtils.extractDevId(msg);
        ctx.channel().attr(AttributeKey.valueOf("devId")).set(devId);
        BatterySessionManager.addChannel(devId, ctx.channel());

        // 调用业务逻辑处理数据,形成上行闭环
        batteryService.processBatteryData(devId, msg);
    }
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        String devId = (String) ctx.channel().attr(AttributeKey.valueOf("devId")).get();
        if (devId != null) {
            System.out.println("Device disconnected: " + devId);
            BatterySessionManager.removeChannel(devId);
        }
        super.channelInactive(ctx);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        String devId = (String) ctx.channel().attr(AttributeKey.valueOf("devId")).get();
        if (devId == null) devId = "UNKNOWN";
        // 调用告警工具直接上报网络层异常
        AlarmUtils.sendNettyAlarm(devId, cause);
        ctx.close();
    }
}

3.1.4 BatterySessionManager.java

管理设备连接,保持一个全局的设备 ID → Channel 映射,确保数据转发闭环能正确匹配到设备。

package com.example.battery.session;
import io.netty.channel.Channel;
import java.util.concurrent.ConcurrentHashMap;
public class BatterySessionManager {
    private static final ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>();
    public static void addChannel(String devId, Channel channel) {
        channelMap.put(devId, channel);
    }
    public static Channel getChannel(String devId) {
        Channel ch = channelMap.get(devId);
        if (ch != null && ch.isActive()) {
            return ch;
        } else {
            channelMap.remove(devId);
            return null;
        }
    }
    public static void removeChannel(String devId) {
        channelMap.remove(devId);
    }
}

3.2 业务层数据处理 —— BatteryService.java

通过业务层调用租户客户端进行数据转发,实现上行数据闭环。

此处模拟从设备ID获取租户信息,并调用租户客户端连接器获取连接,进而发送数据。

package com.example.battery.service;
import com.example.battery.client.TenantClientManager;
import com.example.battery.model.TenantInfo;
import org.springframework.stereotype.Service;
@Service
public class BatteryService {
    public void processBatteryData(String devId, String data) {
        // 模拟根据设备ID获取租户信息,此处应从Redis或数据库中查询实际配置
        TenantInfo tenantInfo = getTenantInfoByDevId(devId);
        if (tenantInfo == null) {
            System.err.println("No tenant info for devId: " + devId);
            return;
        }
        // 通过租户客户端管理器创建或获取到与租户服务端的连接
        io.netty.channel.Channel tenantChannel = TenantClientManager.getOrCreateChannel(
            tenantInfo.getTenantId(), tenantInfo.getHost(), tenantInfo.getPort()
        );
        if (tenantChannel != null && tenantChannel.isActive()) {
            tenantChannel.writeAndFlush(data);
            System.out.println("Data forwarded to tenant server for devId: " + devId);
        } else {
            System.err.println("Failed to forward data for devId: " + devId);
        }
    }
    // 模拟根据设备ID查询租户信息
    private TenantInfo getTenantInfoByDevId(String devId) {
        // 实际情况下可从Redis或数据库中获取
        TenantInfo info = new TenantInfo();
        info.setTenantId("tenant1");
        info.setHost("127.0.0.1");
        info.setPort(9001);
        return info;
    }
}

TenantInfo.java

package com.example.battery.model;
public class TenantInfo {
    private String tenantId;
    private String host;
    private Integer port;
    // Getters and Setters
    public String getTenantId() {
        return tenantId;
    }
    public void setTenantId(String tenantId) {
        this.tenantId = tenantId;
    }
    public String getHost() {
        return host;
    }
    public void setHost(String host) {
        this.host = host;
    }
    public Integer getPort() {
        return port;
    }
    public void setPort(Integer port) {
        this.port = port;
    }
}

3.3 租户客户端连接及数据回传

实现与租户服务端的TCP连接,并在收到租户返回的数据后,将下行数据回传给设备。

3.3.1 TenantClientManager.java

动态建立与租户服务端的TCP连接,并缓存连接,以便数据转发。

package com.example.battery.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class TenantClientManager {
    private static final Map<String, Channel> tenantChannels = new ConcurrentHashMap<>();
    public static Channel getOrCreateChannel(String tenantId, String host, int port) {
        return tenantChannels.computeIfAbsent(tenantId, key -> {
            try {
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(new NioEventLoopGroup())
                         .channel(NioSocketChannel.class)
                         .option(ChannelOption.SO_KEEPALIVE, true)
                         .handler(new TenantClientInitializer());
                ChannelFuture future = bootstrap.connect(host, port).sync();
                if (future.isSuccess()) {
                    System.out.println("Connected to tenant server: " + tenantId);
                    return future.channel();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            return null;
        });
    }
}

3.3.2 TenantClientInitializer.java

设置租户客户端Channel的Pipeline。

package com.example.battery.client;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
public class TenantClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) {
        ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
        ch.pipeline().addLast(new LengthFieldPrepender(4));
        ch.pipeline().addLast(new TenantClientHandler());
    }
}

3.3.3 TenantClientHandler.java

接收租户服务端下行数据,将数据回传给对应设备。

package com.example.battery.client;
import com.example.battery.session.BatterySessionManager;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class TenantClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        String devId = DeviceUtils.extractDevIdFromResponse(msg);
        Channel deviceChannel = BatterySessionManager.getChannel(devId);
        if (deviceChannel != null && deviceChannel.isActive()) {
            deviceChannel.writeAndFlush(msg);
            System.out.println("Forwarded tenant response to device: " + devId);
        } else {
            System.err.println("Device channel not found for devId: " + devId);
        }
    }
}

3.3.4 DeviceUtils.java

提供工具方法解析上报数据及下行响应中的设备ID。

package com.example.battery.netty;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
public class DeviceUtils {
    public static String extractDevId(String msg) {
        try {
            JSONObject json = JSON.parseObject(msg);
            return json.getString("devId");
        } catch(Exception e) {
            return "unknown";
        }
    }

    public static String extractDevIdFromResponse(String msg) {
        try {
            JSONObject json = JSON.parseObject(msg);
            return json.getString("devId");
        } catch(Exception e) {
            return "unknown";
        }
    }
}

  1. 总结

在本篇文章中,我们全面解析了如何利用 Netty 构建一个高性能的 TCP 数据转发平台,支撑 5 万设备在线连接。文章从设备数据接入、设备连接管理(BatterySessionManager)、上行数据转发以及租户客户端动态连接创建等关键模块入手,详细讲解了各层模块的设计思想和完整实现代码,形成了数据双向传输的闭环。通过IdleStateHandler 保活、心跳检测和异常捕获机制,平台既能够实时响应设备上传,也能在出现故障时快速上报异常(为后续告警体系奠定基础)。

总结要点

  • 高并发接入:利用 Netty 高效处理 TCP 长连接,实现大规模设备在线;
  • 数据闭环转发:各模块调用形成完整闭环,确保数据上行转发和下行响应无遗漏;
  • 模块化设计:业务逻辑、连接管理和异常处理各自独立,便于后续维护与扩展。

本方案为实际物联网平台建设提供了一整套成熟的解决方案,是对大规模数据传输架构的一次深入实战,希望对你在高并发网络架构设计中有所启发。

如果你觉得本文对你有帮助,欢迎点赞、留言讨论,分享你的优化思路。

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表