123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203 |
- using Edge.Core.Processor;
- using Edge.Core.IndustryStandardInterface.Pump;
- using Microsoft.Extensions.DependencyInjection;
- using Microsoft.Extensions.Logging;
- using Microsoft.Extensions.Logging.Abstractions;
- using MQTTnet;
- using MQTTnet.Client;
- using SimpleScanner.MessageEntity;
- using System;
- using System.Linq;
- using System.Text;
- using System.Threading.Tasks;
- namespace SimpleScanner
- {
- public class Handler : IDeviceHandler<byte[], ScannerMessage>
- {
- private IMqttClient mqttClient;
- private string mqttServerUrl;
- private int mqttServerPort;
- private string mqttClientId;
- private string mqttClientUsername;
- private string mqttClientPassword;
- private string topic_ScannerRequest;
- private string topic_ScannerRequestReply;
- private ILogger logger = NullLogger.Instance;
- private bool shouldReconnect = true;
- public Handler(string mqttServerUrl, int mqttServerPort, string mqttClientId,
- string mqttClientUsername, string mqttClientPassword,
- string topic_ScannerRequest, string topic_ScannerRequestReply,
- IServiceProvider services)
- {
- var loggerFactory = services.GetRequiredService<ILoggerFactory>();
- this.logger = loggerFactory.CreateLogger("Application");
- this.mqttServerUrl = mqttServerUrl;
- this.mqttServerPort = mqttServerPort;
- this.mqttClientId = mqttClientId;
- this.mqttClientUsername = mqttClientUsername;
- this.mqttClientPassword = mqttClientPassword;
- this.topic_ScannerRequest = topic_ScannerRequest;
- this.topic_ScannerRequestReply = topic_ScannerRequestReply;
- }
- public async void Init(IContext<byte[], ScannerMessage> context)
- {
- //MqttNetGlobalLogger.LogMessagePublished += (s, e) =>
- //{
- // //var trace = $">> [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}";
- // if (this.logger.IsEnabled(LogLevel.Trace))
- // {
- // var trace = $"MQTTnet - {e.TraceMessage.Message}";
- // if (e.TraceMessage.Exception != null)
- // trace += Environment.NewLine + e.TraceMessage.Exception.ToString();
- // this.logger.LogTrace(trace);
- // }
- //};
- // Create a new MQTT client.
- var factory = new MqttFactory();
- this.mqttClient = factory.CreateMqttClient();
- this.mqttClient.ApplicationMessageReceivedAsync += e =>
- {
- if (this.logger.IsEnabled(LogLevel.Trace))
- {
- this.logger.LogTrace("### Received Messages from Topic ###");
- this.logger.LogTrace($" Topic = {e.ApplicationMessage.Topic}");
- this.logger.LogTrace($" Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
- this.logger.LogTrace($" QoS = {e.ApplicationMessage.QualityOfServiceLevel}");
- this.logger.LogTrace($" Retain = {e.ApplicationMessage.Retain}");
- }
- if (e.ApplicationMessage.Topic == this.topic_ScannerRequestReply)
- {
- try
- {
- // the device could not be capable to receive any data
- logger.LogInformation("how mqtt server send data to this device " +
- "which is not capable for receive any command, send it anyway: " +
- Encoding.UTF8.GetString(e.ApplicationMessage.Payload));
- context.Outgoing.Write(new ScannerMessage()
- {
- RawContent = this.ParseFormatDataToDeviceData(e.ApplicationMessage.Payload)
- });
- }
- catch (Exception eeee)
- {
- logger.LogInformation("handle mqtt server incoming msg received from topic: " +
- (this.topic_ScannerRequestReply ?? "")
- + ", exceptioned: " + eeee);
- }
- }
- return Task.CompletedTask;
- };
- this.mqttClient.DisconnectedAsync += arg =>
- {
- this.shouldReconnect = true;
- this.logger.LogInformation(" Mqtt connection disconnected, " +
- "AuthenticateResult: " + (arg.ReasonString ?? "") +
- ", ClientWasConnected: " + arg.ClientWasConnected +
- ", exception: " + arg.Exception);
- return Task.CompletedTask;
- };
- // Create TCP based options using the builder.
- var options = new MqttClientOptionsBuilder()
- .WithClientId(this.mqttClientId)
- .WithTcpServer(this.mqttServerUrl, this.mqttServerPort)
- .WithCredentials(this.mqttClientUsername, this.mqttClientPassword)
- .WithKeepAlivePeriod(new TimeSpan(0, 0, 30))
- .WithCleanSession(true)
- .Build();
- await Task.Run(async () =>
- {
- while (true)
- {
- if (!this.shouldReconnect) { await Task.Delay(30000); continue; }
- this.logger.LogDebug("Connecting to mqtt server...");
- try
- {
- try
- {
- // always try to close previous conn.
- await mqttClient.DisconnectAsync();
- }
- catch { }
- var result = await mqttClient.ConnectAsync(options);
- if (result.ResultCode == MqttClientConnectResultCode.Success)
- {
- this.logger.LogInformation(" Successfully connected to mqtt server.");
- if (string.IsNullOrEmpty(this.topic_ScannerRequestReply))
- {
- this.shouldReconnect = false;
- return;
- }
- var subResult = await this.mqttClient.SubscribeAsync(this.topic_ScannerRequestReply,
- MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce);
- if (subResult.Items.Any())
- {
- if ((int)(subResult.Items.First().ResultCode) <= 2)
- {
- this.logger.LogInformation(" Successfully subscribed on topic.");
- this.shouldReconnect = false;
- }
- else
- {
- this.logger.LogInformation(" Failed on subscribe on topic with Sub resultCode: "
- + subResult.Items.First().ResultCode);
- }
- }
- else
- this.logger.LogInformation(" Failed on subscribe on topic by generic error.");
- }
- else
- {
- this.logger.LogInformation(" Failed connecting to mqtt server with returned ResultCode: "
- + result.ResultCode
- + ", ReasonString:" + (result.ReasonString ?? ""));
- }
- }
- catch (Exception exx)
- {
- this.logger.LogError(" Connecting to mqtt server exceptioned: " + exx);
- }
- // keep retry...
- await Task.Delay(30000);
- }
- });
- }
- public Task Process(IContext<byte[], ScannerMessage> context)
- {
- if (context.Incoming.Message.RawContent == null || !context.Incoming.Message.RawContent.Any())
- { logger.LogDebug("Incoming an null or empty Scanner Msg."); return Task.CompletedTask; }
- logger.LogDebug("Incoming Scanner msg: "
- + context.Incoming.Message.RawContent.Select(b => b.ToString("X2")).Aggregate((acc, n) => acc + " " + n)
- + " (" + Encoding.UTF8.GetString(context.Incoming.Message.RawContent) + ")");
- if (!string.IsNullOrEmpty(this.topic_ScannerRequest))
- this.mqttClient.PublishAsync(new MqttApplicationMessage()
- {
- Topic = this.topic_ScannerRequest,
- Payload = context.Incoming.Message.RawContent
- });
- return Task.CompletedTask;
- }
- /// <summary>
- /// </summary>
- /// <param name="rawContent"></param>
- /// <returns>bytes that the device can read and operate</returns>
- private byte[] ParseFormatDataToDeviceData(byte[] rawContent)
- {
- return rawContent;
- }
- }
- }
|