KEPServerEX6 下的 IOT & RabbitMQ
常用MQTT,Rest Client两种方式上传数据
asp.net Api 接口
[Route("postdata")]
[HttpPost]
public IActionResult PostData(object v)
{
kepPack pack = Newtonsoft.Json.JsonConvert.DeserializeObject<kepPack>(v.ToString());
return Ok(v);
}
public class kepItem
{
public string id { get; set; }
public object v { get; set; }
public bool q { get; set; }
public object t { get; set; }
}
public class kepPack
{
public string timestamp { get; set; }
public List<kepItem> values { get; set; }
}
RabbitMQ
安装Erlang https://www.erlang.org/downloads
安装rabbitmq http://www.rabbitmq.com/download.html
进入目录C:\Program Files\RabbitMQ Server\versionx\sbin
rabbitmq-plugins list
rabbitmq-plugins enable rabbitmq_management
rabbitmq-plugins enable rabbitmq_mqtt
http://localhost:15672/ 默认用户guest
set ERLANG_HOME=C:\Program Files\erl-24.1
RabbitMQ 生产
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection=factory.CreateConnection())
using(var channel=connection.CreateModel())
{
channel.QueueDeclare(queue: "First", false, false, false, null);
while (true)
{
var input = Console.ReadLine();
string message = input;
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "", routingKey: "First", null, body);
Console.WriteLine("[x] send {0}", message);
}
}
Console.ReadLine();
}
RabbitMQ 消费
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection=factory.CreateConnection())
using(var channel=connection.CreateModel())
{
channel.QueueDeclare(queue: "First", durable: false, exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine(" [x] Received {0}", message);
};
channel.BasicConsume(queue: "First", true, consumer: consumer);
Console.ReadKey();
}
}
RabbitMQ 配制
Exchange: amq.topic
RabbitMQ Helper
public class RqHelper
{
private IModel channel { get; set; }
private IConnection connection { get; set; }
private string exchange = "";
private string queue = "";
private string hostName = "";
private int port = 5672;
private string userName = "";
private string password = "";
/// <summary>
/// 主机
/// </summary>
public string HostName
{
get
{
return hostName;
}
set
{
hostName = value;
}
}
public string Exchange
{
get
{
return exchange;
}
set
{
exchange = value;
}
}
public string Queue
{
get
{
return queue;
}
set
{
queue = value;
}
}
public int Port
{
get
{
return port;
}
set
{
port = value;
}
}
public string UserName
{
get
{
return userName;
}
set
{
userName = value;
}
}
public string Password
{
get
{
return password;
}
set
{
password = value;
}
}
/// <summary>
/// 生产者
/// </summary>
/// <param name="msg"></param>
public void Producer(string msg)
{
if (channel == null || !channel.IsOpen)
{
var factory = new ConnectionFactory() {
HostName = this.HostName,
Port = this.Port,
UserName = this.UserName,
Password = this.Password
};
connection = factory.CreateConnection();
channel = connection.CreateModel();
//queue:消息队列名称
//durable:是否持久化,true持久化,队列会保存磁盘,服务器重启时可以保证不丢失相关信息。
//exclusive:是否排他,true排他的,如果一个队列声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除.
//autoDelete:是否自动删除。true是自动删除。自动删除的前提是:致少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除.
//arguments:设置队列的一些其它参数
channel.QueueDeclare(queue: this.Queue, false, false, false, null);
}
var body = Encoding.UTF8.GetBytes(msg);
channel.BasicPublish(exchange: this.Exchange, routingKey: this.Queue, null, body);
}
/// <summary>
/// 消费者
/// </summary>
/// <param name="callback"></param>
public void Consumer(Action<string> callback)
{
var factory = new ConnectionFactory() { HostName = this.HostName };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: this.Queue, durable: false, exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
callback(message);
};
channel.BasicConsume(queue: this.Queue, true, consumer: consumer);
while (true) { }
}
}
}
本文暂时没有评论,来添加一个吧(●'◡'●)