123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105 |
- using MQTTnet;
- using MQTTnet.Client;
- using Microsoft.Extensions.Options;
- using Microsoft.Extensions.Logging;
- using System.Text;
- using MQTTnet.Server;
- namespace Fuel.Application.MqttService
- {
- public class MqttClientService : IMqttClientService, IDisposable
- {
- private readonly IMqttClient _mqttClient;
- private readonly MqttOptions _options;
- private readonly ILogger<MqttClientService> _logger;
- public MqttClientService(
- IOptions<MqttOptions> options,
- ILogger<MqttClientService> logger)
- {
- _options = options.Value;
- _logger = logger;
- var factory = new MqttFactory();
- _mqttClient = factory.CreateMqttClient();
- // 配置事件处理
- _mqttClient.ConnectedAsync += HandleConnectedAsync;
- _mqttClient.DisconnectedAsync += HandleDisconnectedAsync;
- _mqttClient.ApplicationMessageReceivedAsync += HandleMessageReceivedAsync;
- }
- private async Task HandleConnectedAsync(MqttClientConnectedEventArgs e)
- {
- _logger.LogInformation("MQTT connected");
- await Task.CompletedTask;
- }
- private async Task HandleDisconnectedAsync(MqttClientDisconnectedEventArgs e)
- {
- _logger.LogWarning("MQTT disconnected. Attempting to reconnect...");
- await Task.Delay(TimeSpan.FromSeconds(5));
- await ConnectAsync(); // 自动重连
- }
- private async Task HandleMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs e)
- {
- var payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
- _logger.LogInformation($"Received message: {payload} [Topic: {e.ApplicationMessage.Topic}]");
- await Task.CompletedTask;
- }
- public async Task ConnectAsync()
- {
- var options = new MqttClientOptionsBuilder()
- .WithTcpServer(_options.Server, _options.Port)
- .WithClientId(_options.ClientId)
- .WithCredentials(_options.Username, _options.Password)
- .WithCleanSession()
- .Build();
- await _mqttClient.ConnectAsync(options);
- }
- public async Task PublishAsync(string topic, string payload)
- {
- if (!_mqttClient.IsConnected)
- {
- await ConnectAsync();
- }
- var message = new MqttApplicationMessageBuilder()
- .WithTopic(topic)
- .WithPayload(payload)
- .WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce)
- .Build();
- await _mqttClient.PublishAsync(message);
- }
- public async Task SubscribeAsync(string topic)
- {
- if (!_mqttClient.IsConnected)
- {
- await ConnectAsync();
- }
- var topicFilter = new MqttTopicFilterBuilder()
- .WithTopic(topic)
- .Build();
- await _mqttClient.SubscribeAsync(topicFilter);
- }
- public async Task DisconnectAsync()
- {
- await _mqttClient.DisconnectAsync();
- }
- public void Dispose()
- {
- _mqttClient?.Dispose();
- }
- }
- }
|