网站首页 > 编程文章 正文
《Netty 架构实战系列(一):如何构建支持 5 万设备在线的高性能 TCP 数据转发平台》
目标读者:后端开发工程师、中高级架构师 关键词:Netty、TCP转发、双向通信、高并发、设备连接管理、动态连接、Redis、Kafka
- 引言
在物联网与大规模设备管理系统中,设备数据实时上报与双向通信是整个系统的命脉。以电池设备为例,这类设备定时通过 TCP 协议上传数据到平台,而平台需要将数据分发到各个租户服务端,同时也要将租户服务端的下行指令返回给设备。这就形成了一个闭环的数据流转链路。
当设备数达到 5 万甚至更高时,如何保证数据传输的及时性、系统的高并发处理能力以及动态连接管理的高效性,就成为架构师必须解决的核心问题。本文将以实际案例出发,详细介绍如何利用 Netty 实现一个高性能、支持海量设备连接的 TCP 数据转发平台。整个方案既包含了上行数据转发,也涵盖了下行数据响应的完整闭环,且代码实现完整、功能覆盖,便于大家直接在项目中参考应用。
- 系统整体架构概览
整个系统主要分为以下几个关键模块:
- 设备层:电池设备通过 TCP 协议上传数据,建立长连接进行数据传输。
- 协议平台服务:基于 Netty 构建 TCP Server,负责接入设备数据,解析设备 ID(devId),并管理设备连接。业务层负责将设备数据转发给租户客户端,再通过租户客户端将数据发送至租户服务端;同时,租户服务端下行返回数据时,经过租户客户端再回传给设备。
- 租户客户端:动态建立与租户服务端的 TCP 连接,保证数据在上行和下行中得到传递。
- 异常与告警:在设备连接或数据转发过程中出现异常时,平台会立即触发告警(详细方案将在下一篇文章讨论)。
- 后台系统:(后续文章中详细讨论)主要负责从 Kafka 中消费告警消息,并将其写入 TDengine,同时触发多渠道通知。
下图展示了整体数据流转架构(图中包括上行、下行闭环流程):(支持 Mermaid 渲染):
https://mermaid-live.nodejs.cn/
graph TD
subgraph 设备层
A[电池设备上传数据]
end
subgraph 协议平台服务
A --> B[Netty TCP Server]
B --> C[业务处理 & 数据转发]
C --> D[Tenant客户端连接器]
D --> E[租户服务端]
E --> D
D --> C
C --> A
B -- 异常/告警 --> F[Kafka告警消息]
end
各模块职责说明如下:
- Netty TCP Server:建立设备连接、进行心跳保活、解析设备上传数据。
- 业务处理层(BatteryService):解析消息、注册设备、调用租户客户端进行数据转发。
- 设备连接管理(BatterySessionManager):将设备的 Channel 存入内存,以便后续上行与下行数据匹配。
- 租户客户端(TenantClientManager/Handler):根据设备对应租户信息动态创建连接,并通过该连接实现数据转发和下行回传。
- 关键模块详细代码与调用闭环
下面介绍每个模块的完整代码。
3.1 Netty TCP 服务与设备数据接入
3.1.1 BatteryNettyServer.java
负责启动 TCP 服务并初始化 Pipeline。
package com.example.battery.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Component
public class BatteryNettyServer {
public void start(int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 加入空闲检测,180秒内无数据则关闭连接
pipeline.addLast(new IdleStateHandler(180, 0, 0, TimeUnit.SECONDS));
pipeline.addLast(new HeartbeatHandler());
// 处理设备上报数据
pipeline.addLast(new BatteryDataHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = bootstrap.bind(port).sync();
System.out.println("Battery Netty Server started on port: " + port);
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
3.1.2 HeartbeatHandler.java
用于检测空闲连接,以防长时间无数据占用资源。
package com.example.battery.netty;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
System.out.println("Channel idle too long, closing: " + ctx.channel().remoteAddress());
ctx.close();
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}
3.1.3 BatteryDataHandler.java
实现设备数据接收、设备注册与数据转发的入口;在这里调用了业务层逻辑进行数据处理。
package com.example.battery.netty;
import com.example.battery.session.BatterySessionManager;
import com.example.battery.service.BatteryService;
import com.example.battery.alarm.util.AlarmUtils;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.AttributeKey;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class BatteryDataHandler extends SimpleChannelInboundHandler<String> {
@Autowired
private BatteryService batteryService;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Device connected: " + ctx.channel().remoteAddress());
super.channelActive(ctx);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
// 假设上报数据为 JSON 格式,通过自定义工具解析出设备ID
String devId = DeviceUtils.extractDevId(msg);
ctx.channel().attr(AttributeKey.valueOf("devId")).set(devId);
BatterySessionManager.addChannel(devId, ctx.channel());
// 调用业务逻辑处理数据,形成上行闭环
batteryService.processBatteryData(devId, msg);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
String devId = (String) ctx.channel().attr(AttributeKey.valueOf("devId")).get();
if (devId != null) {
System.out.println("Device disconnected: " + devId);
BatterySessionManager.removeChannel(devId);
}
super.channelInactive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
String devId = (String) ctx.channel().attr(AttributeKey.valueOf("devId")).get();
if (devId == null) devId = "UNKNOWN";
// 调用告警工具直接上报网络层异常
AlarmUtils.sendNettyAlarm(devId, cause);
ctx.close();
}
}
3.1.4 BatterySessionManager.java
管理设备连接,保持一个全局的设备 ID → Channel 映射,确保数据转发闭环能正确匹配到设备。
package com.example.battery.session;
import io.netty.channel.Channel;
import java.util.concurrent.ConcurrentHashMap;
public class BatterySessionManager {
private static final ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>();
public static void addChannel(String devId, Channel channel) {
channelMap.put(devId, channel);
}
public static Channel getChannel(String devId) {
Channel ch = channelMap.get(devId);
if (ch != null && ch.isActive()) {
return ch;
} else {
channelMap.remove(devId);
return null;
}
}
public static void removeChannel(String devId) {
channelMap.remove(devId);
}
}
3.2 业务层数据处理 —— BatteryService.java
通过业务层调用租户客户端进行数据转发,实现上行数据闭环。
此处模拟从设备ID获取租户信息,并调用租户客户端连接器获取连接,进而发送数据。
package com.example.battery.service;
import com.example.battery.client.TenantClientManager;
import com.example.battery.model.TenantInfo;
import org.springframework.stereotype.Service;
@Service
public class BatteryService {
public void processBatteryData(String devId, String data) {
// 模拟根据设备ID获取租户信息,此处应从Redis或数据库中查询实际配置
TenantInfo tenantInfo = getTenantInfoByDevId(devId);
if (tenantInfo == null) {
System.err.println("No tenant info for devId: " + devId);
return;
}
// 通过租户客户端管理器创建或获取到与租户服务端的连接
io.netty.channel.Channel tenantChannel = TenantClientManager.getOrCreateChannel(
tenantInfo.getTenantId(), tenantInfo.getHost(), tenantInfo.getPort()
);
if (tenantChannel != null && tenantChannel.isActive()) {
tenantChannel.writeAndFlush(data);
System.out.println("Data forwarded to tenant server for devId: " + devId);
} else {
System.err.println("Failed to forward data for devId: " + devId);
}
}
// 模拟根据设备ID查询租户信息
private TenantInfo getTenantInfoByDevId(String devId) {
// 实际情况下可从Redis或数据库中获取
TenantInfo info = new TenantInfo();
info.setTenantId("tenant1");
info.setHost("127.0.0.1");
info.setPort(9001);
return info;
}
}
TenantInfo.java
package com.example.battery.model;
public class TenantInfo {
private String tenantId;
private String host;
private Integer port;
// Getters and Setters
public String getTenantId() {
return tenantId;
}
public void setTenantId(String tenantId) {
this.tenantId = tenantId;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public Integer getPort() {
return port;
}
public void setPort(Integer port) {
this.port = port;
}
}
3.3 租户客户端连接及数据回传
实现与租户服务端的TCP连接,并在收到租户返回的数据后,将下行数据回传给设备。
3.3.1 TenantClientManager.java
动态建立与租户服务端的TCP连接,并缓存连接,以便数据转发。
package com.example.battery.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class TenantClientManager {
private static final Map<String, Channel> tenantChannels = new ConcurrentHashMap<>();
public static Channel getOrCreateChannel(String tenantId, String host, int port) {
return tenantChannels.computeIfAbsent(tenantId, key -> {
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new TenantClientInitializer());
ChannelFuture future = bootstrap.connect(host, port).sync();
if (future.isSuccess()) {
System.out.println("Connected to tenant server: " + tenantId);
return future.channel();
}
} catch (Exception e) {
e.printStackTrace();
}
return null;
});
}
}
3.3.2 TenantClientInitializer.java
设置租户客户端Channel的Pipeline。
package com.example.battery.client;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
public class TenantClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
ch.pipeline().addLast(new LengthFieldPrepender(4));
ch.pipeline().addLast(new TenantClientHandler());
}
}
3.3.3 TenantClientHandler.java
接收租户服务端下行数据,将数据回传给对应设备。
package com.example.battery.client;
import com.example.battery.session.BatterySessionManager;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class TenantClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
String devId = DeviceUtils.extractDevIdFromResponse(msg);
Channel deviceChannel = BatterySessionManager.getChannel(devId);
if (deviceChannel != null && deviceChannel.isActive()) {
deviceChannel.writeAndFlush(msg);
System.out.println("Forwarded tenant response to device: " + devId);
} else {
System.err.println("Device channel not found for devId: " + devId);
}
}
}
3.3.4 DeviceUtils.java
提供工具方法解析上报数据及下行响应中的设备ID。
package com.example.battery.netty;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
public class DeviceUtils {
public static String extractDevId(String msg) {
try {
JSONObject json = JSON.parseObject(msg);
return json.getString("devId");
} catch(Exception e) {
return "unknown";
}
}
public static String extractDevIdFromResponse(String msg) {
try {
JSONObject json = JSON.parseObject(msg);
return json.getString("devId");
} catch(Exception e) {
return "unknown";
}
}
}
- 总结
在本篇文章中,我们全面解析了如何利用 Netty 构建一个高性能的 TCP 数据转发平台,支撑 5 万设备在线连接。文章从设备数据接入、设备连接管理(BatterySessionManager)、上行数据转发以及租户客户端动态连接创建等关键模块入手,详细讲解了各层模块的设计思想和完整实现代码,形成了数据双向传输的闭环。通过IdleStateHandler 保活、心跳检测和异常捕获机制,平台既能够实时响应设备上传,也能在出现故障时快速上报异常(为后续告警体系奠定基础)。
总结要点:
- 高并发接入:利用 Netty 高效处理 TCP 长连接,实现大规模设备在线;
- 数据闭环转发:各模块调用形成完整闭环,确保数据上行转发和下行响应无遗漏;
- 模块化设计:业务逻辑、连接管理和异常处理各自独立,便于后续维护与扩展。
本方案为实际物联网平台建设提供了一整套成熟的解决方案,是对大规模数据传输架构的一次深入实战,希望对你在高并发网络架构设计中有所启发。
如果你觉得本文对你有帮助,欢迎点赞、留言讨论,分享你的优化思路。
猜你喜欢
- 2025-06-12 一张图带你看懂防火墙报文转发流程!
- 2025-06-12 搞了半天,终于弄懂了TCP Socket数据的接收和发送,太难
- 2025-06-12 终于把TCP/IP 协议讲的明明白白了,再也不怕被问三次握手了
- 2025-06-12 隧道 ssh -L 命令总结 和 windows端口转发配置
- 2024-08-13 「渗透测试」内网渗透中的端口转发
- 2024-08-13 SSH端口转发(ssh端口转发似乎无法在远程主机上)
- 2024-08-13 TCP/IP和Socket的关系(tcp/ip协议和socket的区别)
- 2024-08-13 Centos利用Rinetd实现端口转发-连接阿里云RDS
- 2024-08-13 「喵咪开源软件推荐(6)」TCP链路加速技术KcpTun
- 2024-08-13 SSH端口转发总结与实战(ssh端口号)
你 发表评论:
欢迎- 06-13边缘计算网关如何实现系统高效运维?
- 06-13虚拟专用网络VPN连接配置
- 06-13WIN10/11下配置VPN(解决L2TP/IPsec无法连接的问题)
- 06-13公共网络上的私有安全通道——VPN
- 06-13远程办公如何访问公司内网办公系统和内部资源?
- 06-13VBRAS场景测试方法—如何高效验证网络设备的性能与稳定性
- 06-13Win10系统如何使用VPN远程办公
- 06-13iPhone轻松实现远程访问公司局域网电脑上的共享文件
- 最近发表
- 标签列表
-
- spire.doc (70)
- instanceclient (62)
- system.data.oracleclient (61)
- 按键小精灵源码提取 (66)
- pyqt5designer教程 (65)
- 联想刷bios工具 (66)
- c#源码 (64)
- graphics.h头文件 (62)
- mysqldump下载 (66)
- sqljdbc4.jar下载 (56)
- libmp3lame (60)
- maven3.3.9 (63)
- 二调符号库 (57)
- git.exe下载 (68)
- diskgenius_winpe (72)
- pythoncrc16 (57)
- solidworks宏文件下载 (59)
- qt帮助文档中文版 (73)
- satacontroller (66)
- hgcad (64)
- bootimg.exe (69)
- android-gif-drawable (62)
- axure9元件库免费下载 (57)
- libmysqlclient.so.18 (58)
- springbootdemo (64)
本文暂时没有评论,来添加一个吧(●'◡'●)