using Edge.Core.IndustryStandardInterface.NetworkController; using Microsoft.Extensions.Logging; using MQTTnet; using MQTTnet.Client; using MQTTnet.Diagnostics; using MQTTnet.Protocol; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace DeviceInfoToAliIotHubViaGateway { class FastMqttClient : IMqttClientNetworkController { private IMqttClient mqttClient; public string Name => this.GetType().FullName; public event EventHandler OnMessageReceived; public event EventHandler OnNetworkControllerStateChange; private ILogger logger = null; public FastMqttClient(ILogger logger) { this.logger = logger; // Write all trace messages to the console window. //MqttNetGlobalLogger.LogMessagePublished += (s, e) => //{ // //var trace = $">> [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}"; // var trace = $"{e.TraceMessage.Message}"; // if (e.TraceMessage.Exception != null) // trace += Environment.NewLine + e.TraceMessage.Exception.ToString(); // this.logger.LogTrace(trace); //}; // Create a new MQTT client. var factory = new MqttFactory(); this.mqttClient = factory.CreateMqttClient(); this.mqttClient.ApplicationMessageReceivedAsync += e => { //Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###"); //Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}"); //Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}"); //Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}"); //Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}"); //Console.WriteLine(); this.logger.LogDebug($"======>>>FastMqttClient received at topic: {e.ApplicationMessage.Topic} with payload: {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}"); var safe = this.OnMessageReceived; safe?.Invoke(this, new OnMqttMessageReceivedEventArg(new MqttMessage() { Message = e.ApplicationMessage.Payload, Topic = e.ApplicationMessage.Topic, })); return Task.CompletedTask; }; } public async Task CloseAsync() { this.logger.LogInformation($"Closing FastMqttClient"); return true; } public async Task ConfigNetworkAsync(params string[] parameters) { throw new NotImplementedException(); } public async Task ConnectAsync(string clientId, string userName, string password) { // Create TCP based options using the builder. var options = new MqttClientOptionsBuilder() .WithClientId(clientId) .WithTcpServer(this.mqttServerUrl, this.mqttServerPort) .WithCredentials(userName, password).WithTls() // aliyun require a high value on this period, like 2m, otherwise, rejected .WithKeepAlivePeriod(new TimeSpan(0, 2, 0)) .WithCleanSession(true) .Build(); this.logger.LogInformation($"Connecting to mqtt server with url: {this.mqttServerUrl + "-" + this.mqttServerPort}, clientId: {clientId}, userName: {userName}, pwd: {password}"); var result = await this.mqttClient.ConnectAsync(options); this.logger.LogInformation($" Connect result is: {result.ResultCode}"); if (result.ResultCode == MqttClientConnectResultCode.Success) return true; else return false; } public async Task DisconnectAsync() { this.logger.LogInformation($"Disconnecting FastMqttClient"); await this.mqttClient.DisconnectAsync(); return true; } private string mqttServerUrl; private int mqttServerPort; public async Task OpenAsync(string mqttServerUrl, int port) { this.mqttServerUrl = mqttServerUrl; this.mqttServerPort = port; return true; } public async Task PublishAsync(int msgId, string topic, byte[] byteMsg, Mqtt_QosLevel qosLevel, bool retain) { MqttClientPublishResult result = null; MqttApplicationMessage mqttMsg = new MqttApplicationMessage(); mqttMsg.Payload = byteMsg; mqttMsg.Retain = retain; mqttMsg.Topic = topic; if (qosLevel == Mqtt_QosLevel.AtMostOnce) mqttMsg.QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce; else if (qosLevel == Mqtt_QosLevel.AtLeastOnce) mqttMsg.QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce; else if (qosLevel == Mqtt_QosLevel.ExactlyOnce) mqttMsg.QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce; result = await this.mqttClient.PublishAsync(mqttMsg); if (result.ReasonCode == MqttClientPublishReasonCode.Success) return true; return false; } public async Task QueryStatusAsync() { return NetworkState.NetworkConnected; } public async Task ResetAsync() { await this.mqttClient.DisconnectAsync(); return true; } public async Task SubscribeAsync(int msgId, string topic, Mqtt_QosLevel qosLevel) { MqttClientSubscribeResult results = null; if (qosLevel == Mqtt_QosLevel.AtMostOnce) results = await this.mqttClient.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtMostOnce); else if (qosLevel == Mqtt_QosLevel.AtLeastOnce) results = await this.mqttClient.SubscribeAsync(topic, MqttQualityOfServiceLevel.AtLeastOnce); else if (qosLevel == Mqtt_QosLevel.ExactlyOnce) results = await this.mqttClient.SubscribeAsync(topic, MqttQualityOfServiceLevel.ExactlyOnce); if (results.Items.First().ResultCode == MqttClientSubscribeResultCode.GrantedQoS0 || results.Items.First().ResultCode == MqttClientSubscribeResultCode.GrantedQoS1 || results.Items.First().ResultCode == MqttClientSubscribeResultCode.GrantedQoS2) return true; else return false; } public async Task UnsubscribeAsync(int msgId, string topic) { var result = await this.mqttClient.UnsubscribeAsync(topic); if (result.Items.First().ResultCode == MqttClientUnsubscribeResultCode.Success) return true; else return false; } } }