环境:windows server 2008 r2+vs2017+.net4.6.1/4.7
组件:MQTTnet.dll (开源)
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Diagnostics;
using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Protocol;
MQTTnet 最新版本 2.8.2.0
private IMqttClient _mqttClient;
private Action<string> _updateListBoxAction;
private List<IManagedMqttClient> managedMqttClients = new List<IManagedMqttClient>();
MqttNetGlobalLogger.LogMessagePublished += (o, args) =>
{
var s = new StringBuilder();
s.Append(#34;{args.TraceMessage.Timestamp} ");
s.Append(#34;{args.TraceMessage.Level} ");
s.Append(#34;{args.TraceMessage.Source} ");
s.Append(#34;{args.TraceMessage.ThreadId} ");
s.Append(#34;{args.TraceMessage.Message} ");
s.Append(#34;{args.TraceMessage.Exception}");
s.Append(#34;{args.TraceMessage.LogId} ");
};
var ips = Dns.GetHostAddressesAsync(Dns.GetHostName());
TxbServer.Text = ips.Result[1].ToString();
foreach (var ip in ips.Result)
{
switch (ip.AddressFamily)
{
case AddressFamily.InterNetwork:
TxbServer.Text = ip.ToString();//本地IP地址
break;
case AddressFamily.InterNetworkV6:
break;
}
}
//订阅质量参数:0,1,2
foreach (var value in Enum.GetValues(typeof(MqttQualityOfServiceLevel)))
{
CmbPubMqttQuality.Items.Add((int) value);
CmbSubMqttQuality.Items.Add((int) value);
}
CmbPubMqttQuality.SelectedItem = 0;
CmbSubMqttQuality.SelectedIndex = 0;
//消息列表自动清首条 (满100条后,先进先出)
_updateListBoxAction = new Action<string>((s) =>
{
listBox1.Items.Add(s);
if (listBox1.Items.Count > 100)
{
listBox1.Items.RemoveAt(0);
}
});
//断开连接方法
if (null != _mqttClient && _mqttClient.IsConnected)
{
await _mqttClient.DisconnectAsync();
_mqttClient.Dispose();
_mqttClient = null;
}
//订阅
try
{
Task.Factory.StartNew(async () =>
{
await _mqttClient.SubscribeAsync(
new List<TopicFilter>
{
new TopicFilter(
txbSubscribe.Text,
(MqttQualityOfServiceLevel)
Enum.Parse(typeof (MqttQualityOfServiceLevel), CmbSubMqttQuality.Text))
});
});
}
catch (Exception)
{
throw;
}
//发布主题
try
{
Task.Factory.StartNew(async () =>
{
var msg = new MqttApplicationMessage()
{//主题,内容,质量,不保留
Topic = TxbTopic.Text,
Payload = Encoding.UTF8.GetBytes(TxbPayload.Text),
QualityOfServiceLevel =
(MqttQualityOfServiceLevel)
Enum.Parse(typeof (MqttQualityOfServiceLevel), CmbPubMqttQuality.Text),
Retain = false
};
if (null != _mqttClient)
{
await _mqttClient.PublishAsync(msg);
}
});
}
catch (Exception)
{
throw;
}
private async void MqttClient()
{//连接函数
try
{
var options = new MqttClientOptions() {ClientId = Guid.NewGuid().ToString("D")};
options.ChannelOptions = new MqttClientTcpOptions()
{//服务器地址和端口默认1883
Server = TxbServer.Text,
Port = Convert.ToInt32(TxbPort.Text)
};
options.Credentials = new MqttClientCredentials()
{//账号密码(修改为您的账号密码)
Username = "admin",
Password = "public"
};
//3个选项(清除对话,激活间隔,激活频率)
options.CleanSession = true;
options.KeepAlivePeriod = TimeSpan.FromSeconds(100.5);
options.KeepAliveSendInterval = TimeSpan.FromSeconds(20000);
if (null != _mqttClient)
{//断开
await _mqttClient.DisconnectAsync();
_mqttClient = null;
}
_mqttClient = new MqttFactory().CreateMqttClient();
//接收到发布
_mqttClient.ApplicationMessageReceived += (sender, args) =>
{//异步操作
listBox1.BeginInvoke(
_updateListBoxAction,
#34;ClientID:{args.ClientId} | TOPIC:{args.ApplicationMessage.Topic} | Payload:{Encoding.UTF8.GetString(args.ApplicationMessage.Payload)} | QoS:{args.ApplicationMessage.QualityOfServiceLevel} | Retain:{args.ApplicationMessage.Retain}"
);
};
//连接成功
_mqttClient.Connected += (sender, args) =>
{
listBox1.BeginInvoke(_updateListBoxAction,
#34;Client is Connected: IsSessionPresent:{args.IsSessionPresent}");
};
//断开连接
_mqttClient.Disconnected += (sender, args) =>
{
listBox1.BeginInvoke(_updateListBoxAction,
#34;Client is DisConnected ClientWasConnected:{args.ClientWasConnected}");
};
await _mqttClient.ConnectAsync(options);
}
catch (Exception)
{
throw;
}
}
实例定制(基于#MQTT#的定制),MQTT服务端实例见下一篇文章
支持多线程客户端连接代码 关注私信获取。
mqtt+websocket实例应用(关注v,体验远程控制)
该实例对应的控制模块(基于ESP8266-12F开发)固件自行烧制
本文暂时没有评论,来添加一个吧(●'◡'●)