RemoteMqttCommunicatorProvider.cs 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. using Edge.Core.Processor;
  2. using Edge.Core.IndustryStandardInterface.Pump;
  3. using Edge.Core.UniversalApi;
  4. using Microsoft.Extensions.DependencyInjection;
  5. using Microsoft.Extensions.Logging;
  6. using Microsoft.Extensions.Logging.Abstractions;
  7. using MQTTnet.Extensions.ManagedClient;
  8. using System;
  9. using System.Collections.Generic;
  10. using System.IO;
  11. using System.Linq;
  12. using System.Reflection;
  13. using System.Text;
  14. using System.Text.Json;
  15. using System.Text.Json.Serialization;
  16. using System.Threading.Tasks;
  17. using Microsoft.CodeAnalysis.CSharp.Syntax;
  18. using Edge.Core.Configuration;
  19. using System.Net.NetworkInformation;
  20. using System.Threading;
  21. using MQTTnet.Client;
  22. namespace Edge.Core.UniversalApi
  23. {
  24. public class RemoteMqttCommunicatorProvider : LocalMqttCommunicatorProvider
  25. {
  26. private const string topic_prefix = "edge/";
  27. private string PurposeTopic_online_status;
  28. private bool isEnabled = false;
  29. protected IServiceProvider services;
  30. private string mqttClientId;
  31. internal RemoteMqttCommunicatorProvider(IServiceProvider services) : base(services)
  32. {
  33. this.services = services;
  34. }
  35. public override IEnumerable<UniversalApiInfoDoc> GetApiDocuments()
  36. {
  37. return base.GetApiDocuments();
  38. }
  39. protected override string TopicMaker(string topicFormatStr,
  40. IProcessor processor, string universalApiServiceOrPropertyOrEventName)
  41. {
  42. if (!string.IsNullOrEmpty(this.mqttClientId))
  43. return topic_prefix + this.mqttClientId + base.TopicMaker(topicFormatStr, processor, universalApiServiceOrPropertyOrEventName);
  44. throw new ArgumentException("RemoteMqttCommunicatorProvider requires 'mqttClientId' defined in configuration file meta section: 'remoteMqttServerConnection'");
  45. }
  46. protected override ManagedMqttClientOptions GetManagedMqttClientOptions()
  47. {
  48. var remoteMqttServerConnectionStr =
  49. this.services.GetRequiredService<Configurator>().MetaConfiguration
  50. .Parameter?.FirstOrDefault(p => p.Name.Equals("remoteMqttServerConnection", StringComparison.OrdinalIgnoreCase))?.Value;
  51. if (string.IsNullOrEmpty(remoteMqttServerConnectionStr))
  52. return null;
  53. try
  54. {
  55. var parameters = remoteMqttServerConnectionStr.Replace(" ", "").Split(':');
  56. var ip = parameters[0];
  57. var port = parameters[1];
  58. var clientId = parameters[2];
  59. var userName = parameters[3];
  60. var password = parameters[4];
  61. this.mqttClientId = clientId;
  62. this.PurposeTopic_online_status = topic_prefix + this.mqttClientId + "/purpose/online_status";
  63. return new ManagedMqttClientOptionsBuilder()
  64. .WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
  65. .WithClientOptions(
  66. new MqttClientOptionsBuilder()
  67. .WithClientId(clientId)
  68. .WithCredentials(userName, password)
  69. .WithTcpServer(ip, int.Parse(port))//.WithTls()
  70. .WithWillTopic(PurposeTopic_online_status)
  71. .WithWillRetain(true)
  72. .WithWillPayload(
  73. Encoding.UTF8.GetBytes($"" +
  74. $"{{ " +
  75. $"\"status\":\"offline\"," +
  76. $"\"description\":\"{this.mqttClientId} was disconnected from mqtt broker, the connection was early established at: {DateTime.Now}\" " +
  77. $"}}"))
  78. .Build())
  79. .Build();
  80. }
  81. catch
  82. {
  83. return null;
  84. }
  85. }
  86. public override async Task SetupAsync(IEnumerable<IProcessor> processors)
  87. {
  88. var remoteMqttServerConnectionStr =
  89. this.services.GetRequiredService<Configurator>().MetaConfiguration
  90. .Parameter?.FirstOrDefault(p => p.Name.Equals("remoteMqttServerConnection", StringComparison.OrdinalIgnoreCase))?.Value;
  91. if (string.IsNullOrEmpty(remoteMqttServerConnectionStr))
  92. {
  93. this.isEnabled = false;
  94. return;
  95. }
  96. this.isEnabled = true;
  97. await base.SetupAsync(processors);
  98. base.mqttClient.ConnectedAsync += async (arg) =>
  99. {
  100. this.logger.LogInformation($"{this.GetType().Name}, Connected to mqtt server, ResultCode: {arg.ConnectResult?.ResultCode}");
  101. await base.mqttClient.EnqueueAsync(new ManagedMqttApplicationMessage()
  102. {
  103. ApplicationMessage = new MQTTnet.MqttApplicationMessage()
  104. {
  105. Topic = PurposeTopic_online_status,
  106. Payload = Encoding.UTF8.GetBytes($"" +
  107. $"{{ " +
  108. $"\"status\":\"online\"," +
  109. $"\"description\":\"{this.mqttClientId} is connected to mqtt broker at time: {DateTime.Now}\" " +
  110. $"}}"),
  111. Retain = true,
  112. }
  113. });
  114. base.logger.LogInformation($"{this.GetType().Name}, Published status online message to topic: {PurposeTopic_online_status}");
  115. };
  116. return;
  117. }
  118. public override void Dispose()
  119. {
  120. if (this.isEnabled)
  121. {
  122. var task = base.mqttClient.EnqueueAsync(new ManagedMqttApplicationMessage()
  123. {
  124. ApplicationMessage = new MQTTnet.MqttApplicationMessage()
  125. {
  126. Topic = PurposeTopic_online_status,
  127. Payload = Encoding.UTF8.GetBytes($"" +
  128. $"{{ " +
  129. $"\"status\":\"offline\"," +
  130. $"\"description\":\"{this.mqttClientId} was disconnected(gracefully) from mqtt broker, the connection was early established at: {DateTime.Now}\" " +
  131. $"}}"),
  132. Retain = true,
  133. }
  134. });
  135. Thread.Sleep(1000);
  136. }
  137. base.Dispose();
  138. }
  139. public override Task<bool> RouteEventAsync(IProcessor eventSource, EventDescriptor eventDescriptor)
  140. {
  141. if (!this.isEnabled)
  142. return Task.FromResult(false);
  143. return base.RouteEventAsync(eventSource, eventDescriptor);
  144. }
  145. private string GetNetworkInterfacePhysicalAdress()
  146. {
  147. IPGlobalProperties computerProperties = IPGlobalProperties.GetIPGlobalProperties();
  148. NetworkInterface[] nics = NetworkInterface.GetAllNetworkInterfaces();
  149. if (nics == null || nics.Length < 1)
  150. return "";
  151. foreach (NetworkInterface adapter in nics)
  152. {
  153. IPInterfaceProperties properties = adapter.GetIPProperties(); // .GetIPInterfaceProperties();
  154. PhysicalAddress address = adapter.GetPhysicalAddress();
  155. byte[] bytes = address.GetAddressBytes();
  156. return bytes.Select(b => b.ToString("X2")).Aggregate((acc, n) => acc + n);
  157. }
  158. return "";
  159. }
  160. }
  161. }