using Edge.Core.Core.database; using MQTTnet; using MQTTnet.Client; using Newtonsoft.Json; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace Edge.Core.MqttClient { public class MqttClientService: IMqttClientService { private static NLog.Logger Logger = NLog.LogManager.GetCurrentClassLogger(); private string[] topics;//需要订阅的主题 private IMqttClient _mqttClient; //mqtt 客户端 public MqttClientService() { } public event EventHandler OnConnect; public event EventHandler OnDisconnect; public event EventHandler OnApplicationMessageReceived; public void Public(string topic, string paramsJson) { } public void Start() { using (var dbContext = new MysqlDbContext()) { Domain.FccStationInfo.FccStationInfo? fccStationInfo = dbContext.FccStationInfos.FirstOrDefault(); string? mqttService = fccStationInfo?.MqttService; string? buildId = fccStationInfo?.BuildId; if (mqttService == null || buildId == null) { Logger.Info($"can not get mqttService:{mqttService} and buildId:{buildId}"); return; } string[] hostAndPort = mqttService.Split(":"); MqttClientOptions mqttClientOptions = new MqttClientOptionsBuilder() .WithTcpServer(hostAndPort[0], hostAndPort[1].ToString().ToInt()) .WithClientId(buildId) .WithCleanSession() .WithTlsOptions(new MqttClientTlsOptions() { UseTls = false }) .Build(); _mqttClient = new MqttFactory().CreateMqttClient(); this.topics = new string[] { $"fromClound/{buildId}" }; _mqttClient.ConnectedAsync += mqttConnected; _mqttClient.DisconnectedAsync += mqttDisConnected; _mqttClient.ApplicationMessageReceivedAsync += mqttOnReceive; _mqttClient.ConnectAsync(mqttClientOptions); } } /// /// MQTT 已连接事件 /// /// /// private Task mqttConnected(MqttClientConnectedEventArgs args) { Logger.Info($"mqtt connected"); topics.ForEach(topic => { _mqttClient.SubscribeAsync(topic, MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce); }); OnConnect?.Invoke(this, args); return Task.CompletedTask; } /// /// MQTT 断开连接 /// /// /// private Task mqttDisConnected(MqttClientDisconnectedEventArgs args) { Logger.Info($"mqtt disconnect",JsonConvert.SerializeObject(args)); Thread.Sleep(3000); Start(); OnDisconnect?.Invoke(this, args); return Task.CompletedTask; } /// /// MQTT 获取到数据 /// /// /// private Task mqttOnReceive(MqttApplicationMessageReceivedEventArgs args) { Logger.Info($"mqtt receive message"); OnApplicationMessageReceived?.Invoke(this, args); return Task.CompletedTask; } } }