using MQTTnet; using MQTTnet.Client; using Microsoft.Extensions.Options; using Microsoft.Extensions.Logging; using System.Text; using MQTTnet.Server; namespace Fuel.Application.MqttService { public class MqttClientService : IMqttClientService, IDisposable { private IMqttClient _mqttClient; private MqttOptions _options; private ILogger _logger; private bool _disposed = false; public MqttClientService( IOptions options, ILogger logger) { _options = options.Value; _logger = logger; var factory = new MqttFactory(); _mqttClient = factory.CreateMqttClient(); // 配置事件处理 _mqttClient.ConnectedAsync += HandleConnectedAsync; _mqttClient.DisconnectedAsync += HandleDisconnectedAsync; _mqttClient.ApplicationMessageReceivedAsync += HandleMessageReceivedAsync; } private async Task HandleConnectedAsync(MqttClientConnectedEventArgs e) { _logger.LogInformation("MQTT connected"); await Task.CompletedTask; } private async Task HandleDisconnectedAsync(MqttClientDisconnectedEventArgs e) { _logger.LogWarning("MQTT disconnected. Attempting to reconnect..."); await Task.Delay(TimeSpan.FromSeconds(5)); if (!_disposed) { await ConnectAsync(); // 自动重连 } } private async Task HandleMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs e) { var payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload); _logger.LogInformation($"Received message: {payload} [Topic: {e.ApplicationMessage.Topic}]"); await Task.CompletedTask; } public async Task ConnectAsync() { if (_mqttClient.IsConnected) { return; } var options = new MqttClientOptionsBuilder() .WithTcpServer(_options.Server, _options.Port) .WithClientId(_options.ClientId) .WithCredentials("HSClient", "HS202503") .WithCleanSession() .Build(); try { await _mqttClient.ConnectAsync(options); } catch (ObjectDisposedException ex) { _logger.LogError($"Error connecting to MQTT broker: {ex.Message}"); // 处理异常或重新创建客户端实例 } } public async Task PublishAsync(string topic, string payload) { if (!_mqttClient.IsConnected) { await ConnectAsync(); } var message = new MqttApplicationMessageBuilder() .WithTopic(topic) .WithPayload(payload) .WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce) .Build(); try { await _mqttClient.PublishAsync(message); } catch (ObjectDisposedException ex) { _logger.LogError($"Error publishing message: {ex.Message}"); // 处理异常或重新创建客户端实例 } } public async Task SubscribeAsync(string topic) { if (!_mqttClient.IsConnected) { await ConnectAsync(); } var topicFilter = new MqttTopicFilterBuilder() .WithTopic(topic) .Build(); try { await _mqttClient.SubscribeAsync(topicFilter); } catch (Exception ex) { _logger.LogError($"Error subscribing to topic {topic}: {ex.Message}"); // 处理异常 } } public async Task DisconnectAsync() { if (_mqttClient.IsConnected) { await _mqttClient.DisconnectAsync(); } } protected virtual void Dispose(bool disposing) { if (!_disposed) { if (disposing) { _mqttClient?.Dispose(); } _disposed = true; } } public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } } }