MqttClientService.cs 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. using Edge.Core.Configuration;
  2. using Edge.Core.Core.database;
  3. using Microsoft.Extensions.Configuration;
  4. using MQTTnet;
  5. using MQTTnet.Client;
  6. using Newtonsoft.Json;
  7. using System;
  8. using System.Collections.Generic;
  9. using System.Linq;
  10. using System.Text;
  11. using System.Threading.Tasks;
  12. namespace Edge.Core.MqttClient
  13. {
  14. public class MqttClientService: IMqttClientService
  15. {
  16. private static NLog.Logger Logger = NLog.LogManager.GetCurrentClassLogger();
  17. private string[] topics;//需要订阅的主题
  18. private string? buildId;
  19. private IMqttClient _mqttClient; //mqtt 客户端
  20. private IConfiguration _configuration;
  21. public MqttClientService(IConfiguration configuration)
  22. {
  23. _configuration = configuration;
  24. }
  25. public event EventHandler<MqttClientConnectedEventArgs> OnConnect;
  26. public event EventHandler<MqttClientDisconnectedEventArgs> OnDisconnect;
  27. public event EventHandler<MqttApplicationMessageReceivedEventArgs> OnApplicationMessageReceived;
  28. public void Public(string topic, string paramsJson)
  29. {
  30. }
  31. public void Start()
  32. {
  33. MqttConfiguration? mqttConfiguration = _configuration.GetSection("Mqtt").Get<MqttConfiguration>();
  34. using (var dbContext = new MysqlDbContext())
  35. {
  36. Domain.FccStationInfo.FccStationInfo? fccStationInfo = dbContext.FccStationInfos.FirstOrDefault();
  37. string? mqttService = fccStationInfo?.MqttService;
  38. buildId = fccStationInfo?.BuildId;
  39. if (mqttService == null || buildId == null)
  40. {
  41. Logger.Info($"can not get mqttService:{mqttService} and buildId:{buildId}");
  42. return;
  43. }
  44. string[] hostAndPort = mqttService.Split(":");
  45. MqttClientOptions mqttClientOptions = new MqttClientOptionsBuilder()
  46. .WithTcpServer(hostAndPort[0], hostAndPort[1].ToString().ToInt())
  47. .WithCredentials(mqttConfiguration.user, mqttConfiguration.password)
  48. .WithClientId(buildId)
  49. .WithCleanSession()
  50. .WithTlsOptions(new MqttClientTlsOptions()
  51. {
  52. UseTls = false
  53. })
  54. .Build();
  55. _mqttClient = new MqttFactory().CreateMqttClient();
  56. this.topics = mqttConfiguration.subTopic;
  57. _mqttClient.ConnectedAsync += mqttConnected;
  58. _mqttClient.DisconnectedAsync += mqttDisConnected;
  59. _mqttClient.ApplicationMessageReceivedAsync += mqttOnReceive;
  60. _mqttClient.ConnectAsync(mqttClientOptions);
  61. }
  62. }
  63. /// <summary>
  64. /// MQTT 已连接事件
  65. /// </summary>
  66. /// <param name="args"></param>
  67. /// <returns></returns>
  68. private Task mqttConnected(MqttClientConnectedEventArgs args)
  69. {
  70. Logger.Info($"mqtt connected");
  71. topics.ForEach(topic =>
  72. {
  73. _mqttClient.SubscribeAsync($"{topic}/{buildId}", MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);
  74. });
  75. OnConnect?.Invoke(this, args);
  76. return Task.CompletedTask;
  77. }
  78. /// <summary>
  79. /// MQTT 断开连接
  80. /// </summary>
  81. /// <param name="args"></param>
  82. /// <returns></returns>
  83. private Task mqttDisConnected(MqttClientDisconnectedEventArgs args)
  84. {
  85. Logger.Info($"mqtt disconnect",JsonConvert.SerializeObject(args));
  86. Thread.Sleep(3000);
  87. Start();
  88. OnDisconnect?.Invoke(this, args);
  89. return Task.CompletedTask;
  90. }
  91. /// <summary>
  92. /// MQTT 获取到数据
  93. /// </summary>
  94. /// <param name="args"></param>
  95. /// <returns></returns>
  96. private Task mqttOnReceive(MqttApplicationMessageReceivedEventArgs args)
  97. {
  98. Logger.Info($"mqtt receive message");
  99. OnApplicationMessageReceived?.Invoke(this, args);
  100. return Task.CompletedTask;
  101. }
  102. }
  103. }