ASP.NET原创框架十五-接入MQTT自定义消息的使用说明
框架接入的MQTT服务为基于Go语言的开源MQTT服务HMQ,框架通过与HMQ的管理http接口交互实现对消息的订阅发送认证等
HMQ的基本配置HMQ.config
{
"workerNum": 4096,
"port": "1883",//服务端口
"host": "0.0.0.0",
"debug": true,
"httpPort": "8080",
"tlsPort": "8883",
"tlsHost": "0.0.0.0",
"wsPort": "8083", //ws的服务端口
"wsPath": "/mqtt",//ws的连接节点名
"wsTLS": false,
"tlsInfo": {
"verify": false,
"caFile": "ssl/ca/ca.pem",
"certFile": "ssl/server/cert.pem",
"keyFile": "ssl/server/key.pem"
},
"plugins": {
"auth": "authhttp" //认证方式 authhttp
}
}
需要修改HMQ的plugins\auth\authhttp\http.json文件内容
{
"auth": "http://xxx.com/CoreSYS.SYS/AuthMQTT_RawJson.ajax",
"acl": "http://xxx.com/CoreSYS.SYS/AuthALCMQTT_RawJson.ajax",
"super": "http://xxx.com/CoreSYS.SYS/AuthSuperMQTT_RawJson.ajax"
}
其中auth:登录认证地址,acl:订阅与发布认证地址,super:超级管理员的认证地址
HMQ的启动可以编写一个批处理文件HMQ.bat启动
HMQ,bat的内容为:
hmq -c hmq.config
框架使用和兴模块的AutonMQTT_RawJson,AuthALCMQTT_RawJson,AuthSuperMQTT_RawJson函数进行
//连接认证
static private ReturnJson AuthMQTT_RawJson(HttpContext ctx, Object m_Parame)
{
ReturnJson m_ReturnJson = new ReturnJson();
Hashtable m_HH = new Hashtable();
JsonHelper.OBJToHashTable(m_Parame, m_HH);
int StatusCode = 401;//返回状态
try
{
m_ReturnJson.bOK = true;
string username = "", clientid = "", password = "";
//获得参数
if (KeyIsOK(m_HH, "username"))
{
username = m_HH["username"].ToString();
}
if (KeyIsOK(m_HH, "clientid"))
{
clientid = m_HH["clientid"].ToString();
}
if (KeyIsOK(m_HH, "password"))
{
password = m_HH["password"].ToString();
}
//认证代码省略
.....
}
catch(Exception e)
{
StatusCode = 401;
}
ctx.Response.StatusCode = StatusCode;//验证通过调用
ctx.Response.End();
return m_ReturnJson;
}
当返回状态StatusCode为200时表示认证通过
关于框架模块添加自定义MQTT消息,在自定义模块绑定模块都准备好的onread消息
static public void SetJsonMode(string sPath, string sName)
{
...
BindP2PMessage("modeReady", onModeReady);
...
}
//初始化化自己的MQTT消息
static private void onModeReady(string sFromServer, string message, Object m_MsgData)
{
try
{
bool bHaveMQTT = false;
using (BaseEngineDB m_BaseDB = NodeDBEngine.GetBaseEngineDB(null))
{
string SQLString = "select * from sys_mqttservices where isuse=1 order by createtime";
bool bOK = false;
DataSet pSet = m_BaseDB.XNGetRecord(SQLString, out bOK);
if (bOK)
{
if(pSet.Tables[0].Rows.Count>0)
{
bHaveMQTT = true;
}
}
}
if (bHaveMQTT)
{
InitMQTTSend();
//添加自定义的MQTT消息
Hashtable m_inHH = new Hashtable();
m_inHH["topic"] = "/+/upAeaa"; //消息名称
m_inHH["modeClass"] = "Aeaa"; //消息处理模块
m_inHH["qos"] = "最少一次";
m_inHH["authFunc"] = "upAeaa_authFunc_Action";//登录认证函数
m_inHH["pubAuthFunc"] = "upAeaa_pubAuthFunc_Action";//发布消息认证函数
m_inHH["subAuthFunc"] = "upAeaa_subAuthFunc_Action";//订阅消息认证函数
m_inHH["actionFunc"] = "upAeaa_actionFunc_Action";//消息来后的处理函数
m_inHH["linkFunc"] = "upAeaa_linkFunc_Action";//连接状态改变通知函数
Object m_outOBJ;
RunShareFunction(null, "CoreSYS.SYS", "InitOneFreeMQTT_共享函数", out m_outOBJ, m_inHH); //调用核心模块函数添加自定义MQTT消息
m_inHH.Clear();
m_inHH["topic"] = "/+/downAeaa";//消息名称
m_inHH["modeClass"] = "Aeaa";//
m_inHH["qos"] = "最少一次";
RunShareFunction(null, "CoreSYS.SYS", "InitOneFreeMQTT_共享函数", out m_outOBJ, m_inHH);
WriteLog("Aeaa已调用自定义MQTT消息成功");
}
else
{
WriteLog("MQTT无可用节点");
}
}
catch(Exception e)
{
WriteLog("Aeaa调用自定义MQTT消息异常:"+e.ToString());
}
}
//发送自定义MQTT消息
Hashtable m_inHH = new Hashtable();
m_inHH["topic"] = m_oneSendMQTTMessage.topic;//消息名
m_inHH["text"] = m_oneSendMQTTMessage.msgstring;//消息体文本
if (RunShareFunction(null, "CoreSYS.SYS", "PostOneFreeMQTT_共享函数", out m_outOBJ, m_inHH))
{
...
}
//认证处理函数
static public ReturnJson upAeaa_authFunc_Action(HttpContext ctx, Object m_Parame)
{
ReturnJson m_ReturnJson = new ReturnJson();
m_ReturnJson.bOK = true;
try
{
Hashtable m_HH = new Hashtable();
JsonHelper.OBJToHashTable(m_Parame, m_HH);
WriteLog("upAeaa_authFunc_Action:" + JsonHelper.OBJToJsonStr(m_Parame));
//m_ReturnJson.bOK = false;
if (!KeyIsOK(m_HH, "clientid"))
{
m_ReturnJson.bOK = false;
m_ReturnJson.sMsg = "upAeaa_authFunc_Action 缺少参数:clientid";
return m_ReturnJson;
}
if (!KeyIsOK(m_HH, "password"))
{
m_ReturnJson.bOK = false;
m_ReturnJson.sMsg = "upAeaa_authFunc_Action 缺少参数:password";
return m_ReturnJson;
}
string clientid = m_HH["clientid"].ToString();
string password = m_HH["password"].ToString();
//验证代码省略
...
}
catch(Exeprion e)
{
}
m_ReturnJson.bOK = true;//true认证通过,false为验证失败
m_ReturnJson.sMsg = "验证通过";
}
//发布消息认证
static public ReturnJson upAeaa_pubAuthFunc_Action(HttpContext ctx, Object m_Parame)
{
ReturnJson m_ReturnJson = new ReturnJson();
m_ReturnJson.bOK = true;
try
{
WriteLog("upAeaa_pubAuthFunc_Action:" + JsonHelper.OBJToJsonStr(m_Parame));
}
catch
{
}
return m_ReturnJson;
}
//订阅消息认证
static public ReturnJson upAeaa_subAuthFunc_Action(HttpContext ctx, Object m_Parame)
{
ReturnJson m_ReturnJson = new ReturnJson();
m_ReturnJson.bOK = true;
try
{
WriteLog("upAeaa_subAuthFunc_Action:" + JsonHelper.OBJToJsonStr(m_Parame));
}
catch
{
}
return m_ReturnJson;
}
//消息来后处理
static public ReturnJson upAeaa_actionFunc_Action(HttpContext ctx, Object m_Parame)
{
ReturnJson m_ReturnJson = new ReturnJson();
m_ReturnJson.bOK = true;
try
{
WriteLog("upAeaa_actionFunc_Action:" + JsonHelper.OBJToJsonStr(m_Parame));
Hashtable m_HH = new Hashtable();
JsonHelper.OBJToHashTable(m_Parame, m_HH);
string topic, clientid, text;
if(!KeyIsOK(m_HH, "topic"))
{
m_ReturnJson.bOK = false;
m_ReturnJson.sMsg = "缺少参数topic";
return m_ReturnJson;
}
/*
if (!KeyIsOK(m_HH, "username"))
{
m_ReturnJson.bOK = false;
m_ReturnJson.sMsg = "缺少参数username";
return m_ReturnJson;
}
if (!KeyIsOK(m_HH, "clientid"))
{
m_ReturnJson.bOK = false;
m_ReturnJson.sMsg = "缺少参数clientid";
return m_ReturnJson;
}
*/
if (!KeyIsOK(m_HH, "text"))
{
m_ReturnJson.bOK = false;
m_ReturnJson.sMsg = "缺少参数text";
return m_ReturnJson;
}
topic = m_HH["topic"].ToString();
text = m_HH["text"].ToString();
clientid = topic.Substring(1);
int nIndex = clientid.IndexOf("/");
if(nIndex!=-1)
{
clientid = clientid.Substring(0, nIndex);
}
if(clientid==md5(BaseJsonMode.P2PServerName, 16))
{
//超级管理员
}
else
{
int nPos = clientid.IndexOf("_");
if(nPos==-1)
{
m_ReturnJson.bOK = false;
m_ReturnJson.sMsg = "clientid有误";
return m_ReturnJson;
}
string nodeuuid = clientid.Substring(0, nPos);
nodeuuid = DecimaConvert.MinStr2DecimalString(nodeuuid);
Hashtable m_returnHH =JsonHelper.JsonStrToOBJ<Hashtable>(text);
if (m_returnHH != null)
{
string msgno;
Object recvOBJ = null;
if(!KeyIsOK(m_returnHH,"msgno"))
{
m_ReturnJson.bOK = false;
m_ReturnJson.sMsg = "数据格式有误缺少msgno";
return m_ReturnJson;
}
msgno = m_returnHH["msgno"].ToString();
if (KeyIsOK(m_returnHH, "data"))
{
try
{
recvOBJ = JsonHelper.OBJToType<Hashtable>(m_returnHH["data"]);
if(recvOBJ==null)
{
recvOBJ = m_returnHH["data"];
}
}
catch
{
recvOBJ = m_returnHH["data"];
}
}
else
{
recvOBJ = new Hashtable();
}
。。。
}
catch
{
}
return m_ReturnJson;
}
//客户端连接状态处理函数
static public ReturnJson upAeaa_linkFunc_Action(HttpContext ctx, Object m_Parame)
{
ReturnJson m_ReturnJson = new ReturnJson();
m_ReturnJson.bOK = true;
try
{
WriteLog("upAeaa_linkFunc_Action:" + JsonHelper.OBJToJsonStr(m_Parame));
Hashtable m_HH = new Hashtable();
JsonHelper.OBJToHashTable(m_Parame, m_HH);
if (!KeyIsOK(m_HH, "link"))
{
m_ReturnJson.bOK = false;
m_ReturnJson.sMsg = "缺少link";
return m_ReturnJson;
}
if (!KeyIsOK(m_HH, "clientid"))
{
m_ReturnJson.bOK = false;
m_ReturnJson.sMsg = "缺少clientid";
return m_ReturnJson;
}
string link = m_HH["link"].ToString();
string clientid = m_HH["clientid"].ToString();
。。。
}
catch (Exception e)
{
m_ReturnJson.bOK = false;
m_ReturnJson.sMsg = "更改状态异常:" + e.ToString();
//WriteLog(m_ReturnJson.sMsg);
}
return m_ReturnJson;
}
框架页面端对MQTT消息的处理
框架处理MQTT消息的JS框架
/public/share/script/FrameMQTT.js
页面使用
$(function () {
try {
StartMQTT();//初始化MQTT
}
catch (r) {
}
});
function StartMQTT() {
<%
if (GetSessionLink() != null)
{
%>
var m_FrameMQTT = new FrameMQTT("<%=HeadUrl%>", function (message) {
//收到消息,处理消息,FrameMQTT自动获得可订阅的消息
var msgName = message.destinationName;
var msgArray = msgName.split("/");
var fromName = "", msgMqttType = "";
if (msgArray.length >= 5) {
msgName = "/" + msgArray[1] + "/" + msgArray[2] + "/" + msgArray[4];
fromName = msgArray[3];
msgMqttType = msgArray[1];
}
//消息分支处理
switch (msgName) {
case "/s2p/neworder/all":
{
var m_Data = JSON.parse(message.payloadString)
var top = "240px";// randomNum(4, 16) + "rem";
var left = ($('body').width() - 360) / 2 + "px"; //randomNum(0, 12) + "rem";
$("#m_PushTips").css({ "top": top, "left": left });
$('#msgPic')[0].src = m_Data.picurl;
$('#msgText').html(m_Data.memo);
$("#m_PushTips").show(200); //如果元素为隐藏,则将它显现
setTimeout(function () {
$("#m_PushTips").hide(100);
$('#msgPic')[0].src = "";
$('#msgText').html("");
}, m_Data.delay * 1000);
ShowCnt();
}
break;
case "/s2p/notify/all":
{
var m_Data = JSON.parse(message.payloadString);
var top = "240px";// randomNum(4, 16) + "rem";
var left = ($('body').width() - 360) / 2 + "px"; //randomNum(0, 12) + "rem";
$("#m_notifyRong").css({ "top": top, "left": left });
if (m_Data.html != undefined && m_Data.html != "") {
$("#m_notifyRong").html(m_Data.html);
$("#m_notifyRong").show(200); //如果元素为隐藏,则将它显现
setTimeout(function () {
$("#m_notifyRong").hide(100);
$('#m_notifyRong').html("");
}, m_Data.delay * 1000);
ShowCnt();
}
else {
var HTMLString = '<p style="font-size:14px;color:#000;"><img id="notifyPic" src="" style="width:2em;height:2em; float:left;border-radius:10px;margin-top:1px;"><span id="notifyText"></span></p>';
$("#m_notifyRong").html(HTMLString);
$('#notifyPic')[0].src = m_Data.picurl;
$('#notifyText').html(m_Data.memo);
$("#m_notifyRong").show(200); //如果元素为隐藏,则将它显现
setTimeout(function () {
$("#m_notifyRong").hide(100);
$('#notifyPic')[0].src = "";
$('#notifyText').html("");
}, m_Data.delay * 1000);
ShowCnt();
}
}
break;
default:
{
if (msgName.indexOf("/s2p/neworder/site") != -1) {
var m_Data = JSON.parse(message.payloadString)
var top = "240px";// randomNum(4, 16) + "rem";
var left = ($('body').width() - 360) / 2 + "px"; //randomNum(0, 12) + "rem";
$("#m_PushTips").css({ "top": top, "left": left });
$('#msgPic')[0].src = m_Data.picurl;
$('#msgText').html(m_Data.memo);
$("#m_PushTips").show(200); //如果元素为隐藏,则将它显现
setTimeout(function () {
$("#m_PushTips").hide(100);
$('#msgPic')[0].src = "";
$('#msgText').html("");
}, m_Data.delay * 1000);
ShowCnt();
}
var top = "240px";// randomNum(4, 16) + "rem";
var left = ($('body').width() - 360) / 2 + "px"; //randomNum(0, 12) + "rem";
$("#m_notifyRong").css({ "top": top, "left": left });
if (msgName.indexOf("/s2p/notify/site") != -1) {
var m_Data = JSON.parse(message.payloadString);
if (m_Data.html != undefined && m_Data.html != "") {
$("#m_notifyRong").html(m_Data.html);
$("#m_notifyRong").show(200); //如果元素为隐藏,则将它显现
setTimeout(function () {
$("#m_notifyRong").hide(100);
$('#m_notifyRong').html("");
}, m_Data.delay * 1000);
ShowCnt();
}
else {
var HTMLString = '<p style="font-size:14px;color:#000;"><img id="notifyPic" src="" style="width:2em;height:2em; float:left;border-radius:10px;margin-top:1px;"><span id="notifyText"></span></p>';
$("#m_notifyRong").html(HTMLString);
$('#notifyPic')[0].src = m_Data.picurl;
$('#notifyText').html(m_Data.memo);
$("#m_notifyRong").show(200); //如果元素为隐藏,则将它显现
setTimeout(function () {
$("#m_notifyRong").hide(100);
$('#notifyPic')[0].src = "";
$('#notifyText').html("");
}, m_Data.delay * 1000);
ShowCnt();
}
}
if (msgName.indexOf("/s2p/notify/person") != -1) {
var m_Data = JSON.parse(message.payloadString);
if (m_Data.html != undefined && m_Data.html != "") {
$("#m_notifyRong").html(m_Data.html);
$("#m_notifyRong").show(200); //如果元素为隐藏,则将它显现
setTimeout(function () {
$("#m_notifyRong").hide(100);
$('#m_notifyRong').html("");
}, m_Data.delay * 1000);
ShowCnt();
}
else {
var HTMLString = '<p style="font-size:14px;color:#000;"><img id="notifyPic" src="" style="width:2em;height:2em; float:left;border-radius:10px;margin-top:1px;"><span id="notifyText"></span></p>';
$("#m_notifyRong").html(HTMLString);
$('#notifyPic')[0].src = m_Data.picurl;
$('#notifyText').html(m_Data.memo);
$("#m_notifyRong").show(200); //如果元素为隐藏,则将它显现
setTimeout(function () {
$("#m_notifyRong").hide(100);
$('#notifyPic')[0].src = "";
$('#notifyText').html("");
}, m_Data.delay * 1000);
ShowCnt();
}
}
else {
switch (msgMqttType) {
case "s2p":
{
var m_Data = JSON.parse(message.payloadString);
var top = "240px";// randomNum(4, 16) + "rem";
var left = ($('body').width() - 360) / 2 + "px"; //randomNum(0, 12) + "rem";
$("#m_notifyRong").css({ "top": top, "left": left });
if (m_Data.html != undefined && m_Data.html != "") {
$("#m_notifyRong").html(m_Data.html);
$("#m_notifyRong").show(200); //如果元素为隐藏,则将它显现
setTimeout(function () {
$("#m_notifyRong").hide(100);
$('#m_notifyRong').html("");
}, m_Data.delay * 1000);
ShowCnt();
}
else {
var HTMLString = '<p style="font-size:14px;color:#000;"><img id="notifyPic" src="" style="width:2em;height:2em; float:left;border-radius:10px;margin-top:1px;"><span id="notifyText"></span></p>';
$("#m_notifyRong").html(HTMLString);
$('#notifyPic')[0].src = m_Data.picurl;
$('#notifyText').html(m_Data.memo);
$("#m_notifyRong").show(200); //如果元素为隐藏,则将它显现
setTimeout(function () {
$("#m_notifyRong").hide(100);
$('#notifyPic')[0].src = "";
$('#notifyText').html("");
}, m_Data.delay * 1000);
ShowCnt();
}
}
break;
}
}
}
break;
}
console.log("onMessageArrived Data:" + message.payloadString);
console.log("onMessageArrived destinationName:" + message.destinationName);
});
//初始化MQTT
m_FrameMQTT.initMQTT(function (bok, sMsg) {
Console.log(sMsg);
});
<%
}
%>
}
本文暂时没有评论,来添加一个吧(●'◡'●)