123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161 |
- using Edge.Core.IndustryStandardInterface.NetworkController;
- using Microsoft.Extensions.Logging;
- using MQTTnet;
- using MQTTnet.Client;
- using MQTTnet.Diagnostics;
- using MQTTnet.Protocol;
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading.Tasks;
- namespace DeviceInfoToAliIotHubViaGateway
- {
- class FastMqttClient : IMqttClientNetworkController
- {
- private IMqttClient mqttClient;
- public string Name => this.GetType().FullName;
- public event EventHandler<OnMqttMessageReceivedEventArg> OnMessageReceived;
- public event EventHandler<NetworkControllerOnStateChangeEventArg> OnNetworkControllerStateChange;
- private ILogger logger = null;
- public FastMqttClient(ILogger logger)
- {
- this.logger = logger;
-
-
-
-
-
-
-
-
-
-
- var factory = new MqttFactory();
- this.mqttClient = factory.CreateMqttClient();
- this.mqttClient.ApplicationMessageReceivedAsync += e =>
- {
-
-
-
-
-
-
- this.logger.LogDebug($"======>>>FastMqttClient received at topic: {e.ApplicationMessage.Topic} with payload: {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
- var safe = this.OnMessageReceived;
- safe?.Invoke(this, new OnMqttMessageReceivedEventArg(new MqttMessage()
- {
- Message = e.ApplicationMessage.Payload,
- Topic = e.ApplicationMessage.Topic,
- }));
- return Task.CompletedTask;
- };
- }
- public async Task<bool> CloseAsync()
- {
- this.logger.LogInformation($"Closing FastMqttClient");
- return true;
- }
- public async Task<bool> ConfigNetworkAsync(params string[] parameters)
- {
- throw new NotImplementedException();
- }
- public async Task<bool> ConnectAsync(string clientId, string userName, string password)
- {
-
- var options = new MqttClientOptionsBuilder()
- .WithClientId(clientId)
- .WithTcpServer(this.mqttServerUrl, this.mqttServerPort)
- .WithCredentials(userName, password).WithTls()
-
- .WithKeepAlivePeriod(new TimeSpan(0, 2, 0))
- .WithCleanSession(true)
- .Build();
- this.logger.LogInformation($"Connecting to mqtt server with url: {this.mqttServerUrl + "-" + this.mqttServerPort}, clientId: {clientId}, userName: {userName}, pwd: {password}");
- var result = await this.mqttClient.ConnectAsync(options);
- this.logger.LogInformation($" Connect result is: {result.ResultCode}");
- if (result.ResultCode == MqttClientConnectResultCode.Success)
- return true;
- else
- return false;
- }
- public async Task<bool> DisconnectAsync()
- {
- this.logger.LogInformation($"Disconnecting FastMqttClient");
- await this.mqttClient.DisconnectAsync();
- return true;
- }
- private string mqttServerUrl;
- private int mqttServerPort;
- public async Task<bool> OpenAsync(string mqttServerUrl, int port)
- {
- this.mqttServerUrl = mqttServerUrl;
- this.mqttServerPort = port;
- return true;
- }
- public async Task<bool> PublishAsync(int msgId, string topic, byte[] byteMsg, Mqtt_QosLevel qosLevel, bool retain)
- {
- MqttClientPublishResult result = null;
- MqttApplicationMessage mqttMsg = new MqttApplicationMessage();
- mqttMsg.Payload = byteMsg;
- mqttMsg.Retain = retain;
- mqttMsg.Topic = topic;
- if (qosLevel == Mqtt_QosLevel.AtMostOnce)
- mqttMsg.QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce;
- else if (qosLevel == Mqtt_QosLevel.AtLeastOnce)
- mqttMsg.QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce;
- else if (qosLevel == Mqtt_QosLevel.ExactlyOnce)
- mqttMsg.QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce;
- result = await this.mqttClient.PublishAsync(mqttMsg);
- if (result.ReasonCode == MqttClientPublishReasonCode.Success)
- return true;
- return false;
- }
- public async Task<NetworkState> QueryStatusAsync()
- {
- return NetworkState.NetworkConnected;
- }
- public async Task<bool> ResetAsync()
- {
- await this.mqttClient.DisconnectAsync();
- return true;
- }
- public async Task<bool> SubscribeAsync(int msgId, string topic, Mqtt_QosLevel qosLevel)
- {
- MqttClientSubscribeResult results = null;
- if (qosLevel == Mqtt_QosLevel.AtMostOnce)
- results = await this.mqttClient.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtMostOnce);
- else if (qosLevel == Mqtt_QosLevel.AtLeastOnce)
- results = await this.mqttClient.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtLeastOnce);
- else if (qosLevel == Mqtt_QosLevel.ExactlyOnce)
- results = await this.mqttClient.SubscribeAsync(topic, MqttQualityOfServiceLevel.ExactlyOnce);
- if (results.Items.First().ResultCode == MqttClientSubscribeResultCode.GrantedQoS0
- || results.Items.First().ResultCode == MqttClientSubscribeResultCode.GrantedQoS1
- || results.Items.First().ResultCode == MqttClientSubscribeResultCode.GrantedQoS2)
- return true;
- else
- return false;
- }
- public async Task<bool> UnsubscribeAsync(int msgId, string topic)
- {
- var result = await this.mqttClient.UnsubscribeAsync(topic);
- if (result.Items.First().ResultCode == MqttClientUnsubscribeResultCode.Success)
- return true;
- else
- return false;
- }
- }
- }
|