程序员开发实例大全宝库

网站首页 > 编程文章 正文

C# 之门课程系列-31(c#入门教程 视频)

zazugpt 2024-08-09 12:23:57 编程文章 117 ℃ 0 评论

KEPServerEX6 下的 IOT & RabbitMQ


常用MQTT,Rest Client两种方式上传数据

asp.net Api 接口

[Route("postdata")]
        [HttpPost]
        public IActionResult PostData(object v)
        {
            kepPack pack = Newtonsoft.Json.JsonConvert.DeserializeObject<kepPack>(v.ToString());
            return Ok(v);
        }
    public class kepItem
    {
        public string id { get; set; }
        public object v { get; set; }
        public bool q { get; set; }
        public object t { get; set; }
    }

    public class kepPack
    {
        public string timestamp { get; set; }
        public List<kepItem> values { get; set; }
    }

RabbitMQ

安装Erlang https://www.erlang.org/downloads

安装rabbitmq http://www.rabbitmq.com/download.html

进入目录C:\Program Files\RabbitMQ Server\versionx\sbin

rabbitmq-plugins list

rabbitmq-plugins enable rabbitmq_management

rabbitmq-plugins enable rabbitmq_mqtt

http://localhost:15672/ 默认用户guest

set ERLANG_HOME=C:\Program Files\erl-24.1

RabbitMQ 生产

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using(var connection=factory.CreateConnection())
            using(var channel=connection.CreateModel())
            {
                channel.QueueDeclare(queue: "First", false, false, false, null);
                while (true)
                {
                    var input = Console.ReadLine();
                    string message = input;
                    var body = Encoding.UTF8.GetBytes(message);
                    channel.BasicPublish(exchange: "", routingKey: "First", null, body);
                    Console.WriteLine("[x] send {0}", message);
                }
            }
            Console.ReadLine();
        }

RabbitMQ 消费

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using(var connection=factory.CreateConnection())
            using(var channel=connection.CreateModel())
            {
                channel.QueueDeclare(queue: "First", durable: false, exclusive: false, autoDelete: false, arguments: null);
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body.ToArray());
                    Console.WriteLine(" [x] Received {0}", message);
                };
                channel.BasicConsume(queue: "First", true, consumer: consumer);
                Console.ReadKey();
            }
        }

RabbitMQ 配制

Exchange: amq.topic





RabbitMQ Helper

 public class RqHelper
    {
        private IModel channel { get; set; }
        private IConnection connection { get; set; }

        private string exchange = "";
        private string queue = "";

        private string hostName = "";
        private int port = 5672;
        private string userName = "";
        private string password = "";
        /// <summary>
        /// 主机
        /// </summary>
        public string HostName
        {
            get
            {
                return hostName;
            }

            set
            {
                hostName = value;
            }
        }

        public string Exchange
        {
            get
            {
                return exchange;
            }

            set
            {
                exchange = value;
            }
        }

        public string Queue
        {
            get
            {
                return queue;
            }

            set
            {
                queue = value;
            }
        }

        public int Port
        {
            get
            {
                return port;
            }

            set
            {
                port = value;
            }
        }

        public string UserName
        {
            get
            {
                return userName;
            }

            set
            {
                userName = value;
            }
        }

        public string Password
        {
            get
            {
                return password;
            }

            set
            {
                password = value;
            }
        }

        /// <summary>
        /// 生产者
        /// </summary>
        /// <param name="msg"></param>
        public void Producer(string msg)
        {
            if (channel == null || !channel.IsOpen)
            {
                var factory = new ConnectionFactory() {
                    HostName = this.HostName,
                    Port = this.Port,
                    UserName = this.UserName,
                    Password = this.Password
                };
                connection = factory.CreateConnection();
                channel = connection.CreateModel();
                //queue:消息队列名称
                //durable:是否持久化,true持久化,队列会保存磁盘,服务器重启时可以保证不丢失相关信息。
                //exclusive:是否排他,true排他的,如果一个队列声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除.
                //autoDelete:是否自动删除。true是自动删除。自动删除的前提是:致少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除.
                //arguments:设置队列的一些其它参数
                channel.QueueDeclare(queue: this.Queue, false, false, false, null);
            }
            var body = Encoding.UTF8.GetBytes(msg);
            channel.BasicPublish(exchange: this.Exchange, routingKey: this.Queue, null, body);
        }

        /// <summary>
        /// 消费者
        /// </summary>
        /// <param name="callback"></param>
        public void Consumer(Action<string> callback)
        {
            var factory = new ConnectionFactory() { HostName = this.HostName };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: this.Queue, durable: false, exclusive: false, autoDelete: false, arguments: null);
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body.ToArray());
                    callback(message);
                };
                channel.BasicConsume(queue: this.Queue, true, consumer: consumer);
                while (true) { }
            }
        }

    }

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表