using Edge.Core.Processor; using Edge.Core.IndustryStandardInterface.Pump; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using MQTTnet; using MQTTnet.Client; using SimpleScanner.MessageEntity; using System; using System.Linq; using System.Text; using System.Threading.Tasks; namespace SimpleScanner { public class Handler : IDeviceHandler { private IMqttClient mqttClient; private string mqttServerUrl; private int mqttServerPort; private string mqttClientId; private string mqttClientUsername; private string mqttClientPassword; private string topic_ScannerRequest; private string topic_ScannerRequestReply; private ILogger logger = NullLogger.Instance; private bool shouldReconnect = true; public Handler(string mqttServerUrl, int mqttServerPort, string mqttClientId, string mqttClientUsername, string mqttClientPassword, string topic_ScannerRequest, string topic_ScannerRequestReply, IServiceProvider services) { var loggerFactory = services.GetRequiredService(); this.logger = loggerFactory.CreateLogger("Application"); this.mqttServerUrl = mqttServerUrl; this.mqttServerPort = mqttServerPort; this.mqttClientId = mqttClientId; this.mqttClientUsername = mqttClientUsername; this.mqttClientPassword = mqttClientPassword; this.topic_ScannerRequest = topic_ScannerRequest; this.topic_ScannerRequestReply = topic_ScannerRequestReply; } public async void Init(IContext context) { //MqttNetGlobalLogger.LogMessagePublished += (s, e) => //{ // //var trace = $">> [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}"; // if (this.logger.IsEnabled(LogLevel.Trace)) // { // var trace = $"MQTTnet - {e.TraceMessage.Message}"; // if (e.TraceMessage.Exception != null) // trace += Environment.NewLine + e.TraceMessage.Exception.ToString(); // this.logger.LogTrace(trace); // } //}; // Create a new MQTT client. var factory = new MqttFactory(); this.mqttClient = factory.CreateMqttClient(); this.mqttClient.ApplicationMessageReceivedAsync += e => { if (this.logger.IsEnabled(LogLevel.Trace)) { this.logger.LogTrace("### Received Messages from Topic ###"); this.logger.LogTrace($" Topic = {e.ApplicationMessage.Topic}"); this.logger.LogTrace($" Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}"); this.logger.LogTrace($" QoS = {e.ApplicationMessage.QualityOfServiceLevel}"); this.logger.LogTrace($" Retain = {e.ApplicationMessage.Retain}"); } if (e.ApplicationMessage.Topic == this.topic_ScannerRequestReply) { try { // the device could not be capable to receive any data logger.LogInformation("how mqtt server send data to this device " + "which is not capable for receive any command, send it anyway: " + Encoding.UTF8.GetString(e.ApplicationMessage.Payload)); context.Outgoing.Write(new ScannerMessage() { RawContent = this.ParseFormatDataToDeviceData(e.ApplicationMessage.Payload) }); } catch (Exception eeee) { logger.LogInformation("handle mqtt server incoming msg received from topic: " + (this.topic_ScannerRequestReply ?? "") + ", exceptioned: " + eeee); } } return Task.CompletedTask; }; this.mqttClient.DisconnectedAsync += arg => { this.shouldReconnect = true; this.logger.LogInformation(" Mqtt connection disconnected, " + "AuthenticateResult: " + (arg.ReasonString ?? "") + ", ClientWasConnected: " + arg.ClientWasConnected + ", exception: " + arg.Exception); return Task.CompletedTask; }; // Create TCP based options using the builder. var options = new MqttClientOptionsBuilder() .WithClientId(this.mqttClientId) .WithTcpServer(this.mqttServerUrl, this.mqttServerPort) .WithCredentials(this.mqttClientUsername, this.mqttClientPassword) .WithKeepAlivePeriod(new TimeSpan(0, 0, 30)) .WithCleanSession(true) .Build(); await Task.Run(async () => { while (true) { if (!this.shouldReconnect) { await Task.Delay(30000); continue; } this.logger.LogDebug("Connecting to mqtt server..."); try { try { // always try to close previous conn. await mqttClient.DisconnectAsync(); } catch { } var result = await mqttClient.ConnectAsync(options); if (result.ResultCode == MqttClientConnectResultCode.Success) { this.logger.LogInformation(" Successfully connected to mqtt server."); if (string.IsNullOrEmpty(this.topic_ScannerRequestReply)) { this.shouldReconnect = false; return; } var subResult = await this.mqttClient.SubscribeAsync(this.topic_ScannerRequestReply, MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce); if (subResult.Items.Any()) { if ((int)(subResult.Items.First().ResultCode) <= 2) { this.logger.LogInformation(" Successfully subscribed on topic."); this.shouldReconnect = false; } else { this.logger.LogInformation(" Failed on subscribe on topic with Sub resultCode: " + subResult.Items.First().ResultCode); } } else this.logger.LogInformation(" Failed on subscribe on topic by generic error."); } else { this.logger.LogInformation(" Failed connecting to mqtt server with returned ResultCode: " + result.ResultCode + ", ReasonString:" + (result.ReasonString ?? "")); } } catch (Exception exx) { this.logger.LogError(" Connecting to mqtt server exceptioned: " + exx); } // keep retry... await Task.Delay(30000); } }); } public Task Process(IContext context) { if (context.Incoming.Message.RawContent == null || !context.Incoming.Message.RawContent.Any()) { logger.LogDebug("Incoming an null or empty Scanner Msg."); return Task.CompletedTask; } logger.LogDebug("Incoming Scanner msg: " + context.Incoming.Message.RawContent.Select(b => b.ToString("X2")).Aggregate((acc, n) => acc + " " + n) + " (" + Encoding.UTF8.GetString(context.Incoming.Message.RawContent) + ")"); if (!string.IsNullOrEmpty(this.topic_ScannerRequest)) this.mqttClient.PublishAsync(new MqttApplicationMessage() { Topic = this.topic_ScannerRequest, Payload = context.Incoming.Message.RawContent }); return Task.CompletedTask; } /// /// /// /// bytes that the device can read and operate private byte[] ParseFormatDataToDeviceData(byte[] rawContent) { return rawContent; } } }