123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- using Edge.Core.Core.database;
- using MQTTnet;
- using MQTTnet.Client;
- using Newtonsoft.Json;
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading.Tasks;
- namespace Edge.Core.MqttClient
- {
- public class MqttClientService: IMqttClientService
- {
- private static NLog.Logger Logger = NLog.LogManager.GetCurrentClassLogger();
- private string[] topics;//需要订阅的主题
- private IMqttClient _mqttClient; //mqtt 客户端
- public MqttClientService() { }
- public event EventHandler<MqttClientConnectedEventArgs> OnConnect;
- public event EventHandler<MqttClientDisconnectedEventArgs> OnDisconnect;
- public event EventHandler<MqttApplicationMessageReceivedEventArgs> OnApplicationMessageReceived;
- public void Public(string topic, string paramsJson)
- {
-
- }
- public void Start()
- {
- using (var dbContext = new MysqlDbContext())
- {
- Domain.FccStationInfo.FccStationInfo? fccStationInfo = dbContext.FccStationInfos.FirstOrDefault();
- string? mqttService = fccStationInfo?.MqttService;
- string? buildId = fccStationInfo?.BuildId;
- if (mqttService == null || buildId == null)
- {
- Logger.Info($"can not get mqttService:{mqttService} and buildId:{buildId}");
- return;
- }
- string[] hostAndPort = mqttService.Split(":");
- MqttClientOptions mqttClientOptions = new MqttClientOptionsBuilder()
- .WithTcpServer(hostAndPort[0], hostAndPort[1].ToString().ToInt())
- .WithClientId(buildId)
- .WithCleanSession()
- .WithTlsOptions(new MqttClientTlsOptions()
- {
- UseTls = false
- })
- .Build();
- _mqttClient = new MqttFactory().CreateMqttClient();
- this.topics = new string[] { $"fromClound/{buildId}" };
- _mqttClient.ConnectedAsync += mqttConnected;
- _mqttClient.DisconnectedAsync += mqttDisConnected;
- _mqttClient.ApplicationMessageReceivedAsync += mqttOnReceive;
- _mqttClient.ConnectAsync(mqttClientOptions);
- }
- }
- /// <summary>
- /// MQTT 已连接事件
- /// </summary>
- /// <param name="args"></param>
- /// <returns></returns>
- private Task mqttConnected(MqttClientConnectedEventArgs args)
- {
- Logger.Info($"mqtt connected");
- topics.ForEach(topic =>
- {
- _mqttClient.SubscribeAsync(topic, MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);
- });
- OnConnect?.Invoke(this, args);
- return Task.CompletedTask;
- }
- /// <summary>
- /// MQTT 断开连接
- /// </summary>
- /// <param name="args"></param>
- /// <returns></returns>
- private Task mqttDisConnected(MqttClientDisconnectedEventArgs args)
- {
- Logger.Info($"mqtt disconnect",JsonConvert.SerializeObject(args));
- Thread.Sleep(3000);
- Start();
- OnDisconnect?.Invoke(this, args);
- return Task.CompletedTask;
- }
- /// <summary>
- /// MQTT 获取到数据
- /// </summary>
- /// <param name="args"></param>
- /// <returns></returns>
- private Task mqttOnReceive(MqttApplicationMessageReceivedEventArgs args)
- {
- Logger.Info($"mqtt receive message");
- OnApplicationMessageReceived?.Invoke(this, args);
- return Task.CompletedTask;
- }
- }
- }
|