123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- using MQTTnet;
- using MQTTnet.Client;
- using Microsoft.Extensions.Options;
- using Microsoft.Extensions.Logging;
- using System.Text;
- using MQTTnet.Server;
- namespace Fuel.Application.MqttService
- {
- public class MqttClientService : IMqttClientService, IDisposable
- {
- private IMqttClient _mqttClient;
- private MqttOptions _options;
- private ILogger<MqttClientService> _logger;
- private bool _disposed = false;
- public MqttClientService(
- IOptions<MqttOptions> options,
- ILogger<MqttClientService> logger)
- {
- _options = options.Value;
- _logger = logger;
- var factory = new MqttFactory();
- _mqttClient = factory.CreateMqttClient();
- // 配置事件处理
- _mqttClient.ConnectedAsync += HandleConnectedAsync;
- _mqttClient.DisconnectedAsync += HandleDisconnectedAsync;
- _mqttClient.ApplicationMessageReceivedAsync += HandleMessageReceivedAsync;
- }
- private async Task HandleConnectedAsync(MqttClientConnectedEventArgs e)
- {
- _logger.LogInformation("MQTT connected");
- await Task.CompletedTask;
- }
- private async Task HandleDisconnectedAsync(MqttClientDisconnectedEventArgs e)
- {
- _logger.LogWarning("MQTT disconnected. Attempting to reconnect...");
- await Task.Delay(TimeSpan.FromSeconds(5));
- if (!_disposed)
- {
- await ConnectAsync(); // 自动重连
- }
- }
- private async Task HandleMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs e)
- {
- var payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
- _logger.LogInformation($"Received message: {payload} [Topic: {e.ApplicationMessage.Topic}]");
- await Task.CompletedTask;
- }
- public async Task ConnectAsync()
- {
- if (_mqttClient.IsConnected)
- {
- return;
- }
- var options = new MqttClientOptionsBuilder()
- .WithTcpServer(_options.Server, _options.Port)
- .WithClientId(_options.ClientId)
- .WithCredentials("HSClient", "HS202503")
- .WithCleanSession()
- .Build();
- try
- {
- await _mqttClient.ConnectAsync(options);
- }
- catch (ObjectDisposedException ex)
- {
- _logger.LogError($"Error connecting to MQTT broker: {ex.Message}");
- // 处理异常或重新创建客户端实例
- }
- }
- public async Task PublishAsync(string topic, string payload)
- {
- if (!_mqttClient.IsConnected)
- {
- await ConnectAsync();
- }
- var message = new MqttApplicationMessageBuilder()
- .WithTopic(topic)
- .WithPayload(payload)
- .WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce)
- .Build();
- try
- {
- await _mqttClient.PublishAsync(message);
- _logger.LogError($"PublishAsync,topic:{topic},payload:{payload},推送成功");
- }
- catch (ObjectDisposedException ex)
- {
- _logger.LogError($"Error publishing message: {ex.Message}");
- // 处理异常或重新创建客户端实例
- }
- }
- public async Task SubscribeAsync(string topic)
- {
- if (!_mqttClient.IsConnected)
- {
- await ConnectAsync();
- }
- var topicFilter = new MqttTopicFilterBuilder()
- .WithTopic(topic)
- .Build();
- try
- {
- await _mqttClient.SubscribeAsync(topicFilter);
- }
- catch (Exception ex)
- {
- _logger.LogError($"Error subscribing to topic {topic}: {ex.Message}");
- // 处理异常
- }
- }
- public async Task DisconnectAsync()
- {
- if (_mqttClient.IsConnected)
- {
- await _mqttClient.DisconnectAsync();
- }
- }
- protected virtual void Dispose(bool disposing)
- {
- if (!_disposed)
- {
- if (disposing)
- {
- _mqttClient?.Dispose();
- }
- _disposed = true;
- }
- }
- public void Dispose()
- {
- Dispose(true);
- GC.SuppressFinalize(this);
- }
- }
- }
|