using Edge.Core.Processor; using Edge.Core.IndustryStandardInterface.Pump; using Edge.Core.UniversalApi; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using MQTTnet.Extensions.ManagedClient; using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Reflection; using System.Text; using System.Text.Json; using System.Text.Json.Serialization; using System.Threading.Tasks; using Microsoft.CodeAnalysis.CSharp.Syntax; using Edge.Core.Configuration; using System.Net.NetworkInformation; using System.Threading; using MQTTnet.Client; namespace Edge.Core.UniversalApi { public class RemoteMqttCommunicatorProvider : LocalMqttCommunicatorProvider { private const string topic_prefix = "edge/"; private string PurposeTopic_online_status; private bool isEnabled = false; protected IServiceProvider services; private string mqttClientId; internal RemoteMqttCommunicatorProvider(IServiceProvider services) : base(services) { this.services = services; } public override IEnumerable GetApiDocuments() { return base.GetApiDocuments(); } protected override string TopicMaker(string topicFormatStr, IProcessor processor, string universalApiServiceOrPropertyOrEventName) { if (!string.IsNullOrEmpty(this.mqttClientId)) return topic_prefix + this.mqttClientId + base.TopicMaker(topicFormatStr, processor, universalApiServiceOrPropertyOrEventName); throw new ArgumentException("RemoteMqttCommunicatorProvider requires 'mqttClientId' defined in configuration file meta section: 'remoteMqttServerConnection'"); } protected override ManagedMqttClientOptions GetManagedMqttClientOptions() { var remoteMqttServerConnectionStr = this.services.GetRequiredService().MetaConfiguration .Parameter?.FirstOrDefault(p => p.Name.Equals("remoteMqttServerConnection", StringComparison.OrdinalIgnoreCase))?.Value; if (string.IsNullOrEmpty(remoteMqttServerConnectionStr)) return null; try { var parameters = remoteMqttServerConnectionStr.Replace(" ", "").Split(':'); var ip = parameters[0]; var port = parameters[1]; var clientId = parameters[2]; var userName = parameters[3]; var password = parameters[4]; this.mqttClientId = clientId; this.PurposeTopic_online_status = topic_prefix + this.mqttClientId + "/purpose/online_status"; return new ManagedMqttClientOptionsBuilder() .WithAutoReconnectDelay(TimeSpan.FromSeconds(5)) .WithClientOptions( new MqttClientOptionsBuilder() .WithClientId(clientId) .WithCredentials(userName, password) .WithTcpServer(ip, int.Parse(port))//.WithTls() .WithWillTopic(PurposeTopic_online_status) .WithWillRetain(true) .WithWillPayload( Encoding.UTF8.GetBytes($"" + $"{{ " + $"\"status\":\"offline\"," + $"\"description\":\"{this.mqttClientId} was disconnected from mqtt broker, the connection was early established at: {DateTime.Now}\" " + $"}}")) .Build()) .Build(); } catch { return null; } } public override async Task SetupAsync(IEnumerable processors) { var remoteMqttServerConnectionStr = this.services.GetRequiredService().MetaConfiguration .Parameter?.FirstOrDefault(p => p.Name.Equals("remoteMqttServerConnection", StringComparison.OrdinalIgnoreCase))?.Value; if (string.IsNullOrEmpty(remoteMqttServerConnectionStr)) { this.isEnabled = false; return; } this.isEnabled = true; await base.SetupAsync(processors); base.mqttClient.ConnectedAsync += async (arg) => { this.logger.LogInformation($"{this.GetType().Name}, Connected to mqtt server, ResultCode: {arg.ConnectResult?.ResultCode}"); await base.mqttClient.EnqueueAsync(new ManagedMqttApplicationMessage() { ApplicationMessage = new MQTTnet.MqttApplicationMessage() { Topic = PurposeTopic_online_status, Payload = Encoding.UTF8.GetBytes($"" + $"{{ " + $"\"status\":\"online\"," + $"\"description\":\"{this.mqttClientId} is connected to mqtt broker at time: {DateTime.Now}\" " + $"}}"), Retain = true, } }); base.logger.LogInformation($"{this.GetType().Name}, Published status online message to topic: {PurposeTopic_online_status}"); }; return; } public override void Dispose() { if (this.isEnabled) { var task = base.mqttClient.EnqueueAsync(new ManagedMqttApplicationMessage() { ApplicationMessage = new MQTTnet.MqttApplicationMessage() { Topic = PurposeTopic_online_status, Payload = Encoding.UTF8.GetBytes($"" + $"{{ " + $"\"status\":\"offline\"," + $"\"description\":\"{this.mqttClientId} was disconnected(gracefully) from mqtt broker, the connection was early established at: {DateTime.Now}\" " + $"}}"), Retain = true, } }); Thread.Sleep(1000); } base.Dispose(); } public override Task RouteEventAsync(IProcessor eventSource, EventDescriptor eventDescriptor) { if (!this.isEnabled) return Task.FromResult(false); return base.RouteEventAsync(eventSource, eventDescriptor); } private string GetNetworkInterfacePhysicalAdress() { IPGlobalProperties computerProperties = IPGlobalProperties.GetIPGlobalProperties(); NetworkInterface[] nics = NetworkInterface.GetAllNetworkInterfaces(); if (nics == null || nics.Length < 1) return ""; foreach (NetworkInterface adapter in nics) { IPInterfaceProperties properties = adapter.GetIPProperties(); // .GetIPInterfaceProperties(); PhysicalAddress address = adapter.GetPhysicalAddress(); byte[] bytes = address.GetAddressBytes(); return bytes.Select(b => b.ToString("X2")).Aggregate((acc, n) => acc + n); } return ""; } } }