From cd4b249bf963fb8c9f5d2b21907b12cfdbcf0a3d Mon Sep 17 00:00:00 2001 From: "David P.G" Date: Tue, 15 Jul 2025 17:13:27 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=BA=86=E5=BC=80=E5=90=AFMq?= =?UTF-8?q?tt=E5=92=8C=E5=85=B3=E9=97=ADMqtt=E6=97=B6=E7=9A=84=E4=BA=8B?= =?UTF-8?q?=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Models/Mqtt.cs | 13 ++ Services/MqttBackgroundService.cs | 260 +++++++++++++++++++++--------- ViewModels/MqttsViewModel.cs | 50 +++--- Views/MqttsView.xaml | 8 + 4 files changed, 228 insertions(+), 103 deletions(-) diff --git a/Models/Mqtt.cs b/Models/Mqtt.cs index 61ecc0e..fca9d27 100644 --- a/Models/Mqtt.cs +++ b/Models/Mqtt.cs @@ -8,6 +8,8 @@ namespace PMSWPF.Models; /// public partial class Mqtt : ObservableObject { + public event Action OnMqttIsActiveChanged; + /// /// MQTT客户端ID。 /// @@ -34,6 +36,17 @@ public partial class Mqtt : ObservableObject [ObservableProperty] private bool _isActive; + partial void OnIsActiveChanged(bool value) + { + OnMqttIsActiveChanged?.Invoke(this); + } + + /// + /// 显示连接的消息: + /// + [ObservableProperty] + private string connectMessage; + /// /// 是否设置为默认MQTT客户端。 /// diff --git a/Services/MqttBackgroundService.cs b/Services/MqttBackgroundService.cs index c0efaaa..330d1df 100644 --- a/Services/MqttBackgroundService.cs +++ b/Services/MqttBackgroundService.cs @@ -6,6 +6,8 @@ using System.Threading.Tasks; using Microsoft.Extensions.Hosting; using MQTTnet; using MQTTnet.Client; +using MQTTnet.Client.Connecting; +using MQTTnet.Client.Disconnecting; using MQTTnet.Client.Options; using PMSWPF.Models; using PMSWPF.Services; @@ -28,6 +30,9 @@ namespace PMSWPF.Services // 存储MQTT配置的字典,键为MQTT配置ID,值为Mqtt模型对象。 private readonly Dictionary _mqttConfigDic; + // 存储每个客户端重连尝试次数的字典 + private readonly Dictionary _reconnectAttempts; + // 定时器,用于周期性地执行数据发布任务。 private Timer _timer; @@ -45,6 +50,7 @@ namespace PMSWPF.Services _dataServices = dataServices; _mqttClients = new Dictionary(); _mqttConfigDic = new Dictionary(); + _reconnectAttempts = new Dictionary(); } /// @@ -61,24 +67,24 @@ namespace PMSWPF.Services _serviceMainThread.IsBackground = true; _serviceMainThread.Name = "MqttServiceMainThread"; _serviceMainThread.Start(); - } private void Execute() { while (!_stopEvent.WaitOne(0)) { - if (_dataServices.Mqtts==null || _dataServices.Mqtts.Count==0) + if (_dataServices.Mqtts == null || _dataServices.Mqtts.Count == 0) { _reloadEvent.Reset(); continue; } - + _reloadEvent.WaitOne(); + // 初始加载MQTT配置和变量数据。 LoadMqttConfigurations(); - ConnectMqttClient(); - + ConnectMqttList(); + _reloadEvent.Reset(); } } @@ -87,7 +93,7 @@ namespace PMSWPF.Services /// /// 停止MQTT后台服务。 /// - public void StopService() + public void StopService() { NlogHelper.Info("Mqtt后台服务开始停止...."); // 记录服务停止信息 @@ -95,24 +101,32 @@ namespace PMSWPF.Services // 取消订阅事件。 _dataServices.OnMqttListChanged -= HandleMqttListChanged; - // 断开所有已连接的MQTT客户端。 - foreach (var mqttId in _mqttClients.Keys.ToList()) - { - var client=_mqttClients[mqttId]; - var mqtt=_mqttConfigDic[mqttId]; - mqtt.IsConnected = false; - if (client.IsConnected) - { - client.DisconnectAsync().GetAwaiter().GetResult(); - } - } + DisconnecetAll(); // 清空所有字典。 _mqttClients.Clear(); _mqttConfigDic.Clear(); + _reconnectAttempts.Clear(); NlogHelper.Info("Mqtt后台服务已停止。"); // 记录服务停止信息 } + private void DisconnecetAll() + { + // 断开所有已连接的MQTT客户端。 + foreach (var mqttId in _mqttClients.Keys.ToList()) + { + var client = _mqttClients[mqttId]; + var mqtt = _mqttConfigDic[mqttId]; + mqtt.IsConnected = false; + if (client.IsConnected) + { + client.DisconnectAsync() + .GetAwaiter() + .GetResult(); + } + } + } + /// /// 加载并连接MQTT配置。 @@ -121,90 +135,76 @@ namespace PMSWPF.Services private void LoadMqttConfigurations() { NlogHelper.Info("开始加载Mqtt配置文件..."); + _mqttConfigDic.Clear(); // 从数据服务获取所有MQTT配置。 var _mqttConfigList = _dataServices.Mqtts.Where(m => m.IsActive) .ToList(); foreach (var mqtt in _mqttConfigList) { + mqtt.OnMqttIsActiveChanged += OnMqttIsActiveChangedHandler; _mqttConfigDic[mqtt.Id] = mqtt; + mqtt.ConnectMessage = "配置加载成功."; } + NlogHelper.Info($"Mqtt配置文件加载成功,开启的Mqtt客户端:{_mqttConfigList.Count}个。"); } + private async void OnMqttIsActiveChangedHandler(Mqtt mqtt) + { + try + { + if (mqtt.IsActive) + { + await ConnectMqtt(mqtt); + } + else + { + if (!_mqttClients.TryGetValue(mqtt.Id,out var client)) + { + NlogHelper.Warn($"没有在Mqtt连接字典中找到名字为:{mqtt.Name}的连接。"); + return; + } + + if (client.IsConnected) + { + await client.DisconnectAsync(); + } + + mqtt.IsConnected = false; + mqtt.ConnectMessage = "断开连接."; + + _mqttClients.Remove(mqtt.Id); + NlogHelper.Info($"{mqtt.Name}的客户端,与服务器断开连接."); + + } + } + catch (Exception e) + { + NotificationHelper.ShowError($"{mqtt.Name}客户端,开启或关闭的过程中发生了错误:{e.Message}",e); + } + + } + /// /// 连接到指定的MQTT代理。 /// /// MQTT配置对象。 /// 表示异步操作的任务。 - private void ConnectMqttClient() + private async Task ConnectMqttList() { foreach (Mqtt mqtt in _mqttConfigDic.Values.ToList()) { try { - NlogHelper.Info($"开始连接:{mqtt.Name}的服务器..."); - // 创建MQTT客户端工厂和客户端实例。 - var factory = new MqttFactory(); - var client = factory.CreateMqttClient(); - // 构建MQTT客户端连接选项。 - var options = new MqttClientOptionsBuilder() - .WithClientId(mqtt.ClientID) - .WithTcpServer(mqtt.Host, mqtt.Port) - .WithCredentials(mqtt.UserName, mqtt.PassWord) - .WithCleanSession() // 清理会话,每次连接都是新会话 - .Build(); - - // 设置连接成功事件处理程序。 - client.UseConnectedHandler(async e => + if (_mqttClients.TryGetValue(mqtt.Id, out var mclient) && mclient.IsConnected) { - NotificationHelper.ShowSuccess($"已连接到MQTT服务器: {mqtt.Name}"); + NlogHelper.Info($"{mqtt.Name}的Mqtt服务器连接已存在。"); + mqtt.ConnectMessage = "连接成功."; mqtt.IsConnected = true; - - // 订阅主题 - await client.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(mqtt.SubTopic).Build()); - NlogHelper.Info($"MQTT客户端 {mqtt.Name} 已订阅主题: {mqtt.SubTopic}"); - }); + continue; + } - // 设置接收消息处理程序 - client.UseApplicationMessageReceivedHandler(e => - { - var topic = e.ApplicationMessage.Topic; - var payload = System.Text.Encoding.UTF8.GetString(e.ApplicationMessage.Payload); - NlogHelper.Info($"MQTT客户端 {mqtt.Name} 收到消息: 主题={topic}, 消息={payload}"); - - // 在这里添加处理消息的逻辑 - }); - - // 设置断开连接事件处理程序。 - client.UseDisconnectedHandler(async e => - { - NotificationHelper.ShowWarn($"与MQTT服务器断开连接: {mqtt.Name}"); - mqtt.IsConnected = false; - // 服务停止 - if (_stopEvent.WaitOne(0)) - return; - if (!mqtt.IsActive) - return; - - NlogHelper.Info($"5秒后重新连接Mqtt服务器:{mqtt.Name}"); - // 尝试重新连接。 - await Task.Delay(TimeSpan.FromSeconds(5)); // 等待5秒后重连 - try - { - await client.ConnectAsync(options, CancellationToken.None); - } - catch (Exception ex) - { - NlogHelper.Error($"重新与Mqtt服务器连接失败: {mqtt.Name}", ex); - } - }); - - // 尝试连接到MQTT代理。 - client.ConnectAsync(options, CancellationToken.None) - .GetAwaiter() - .GetResult(); - // 将连接成功的客户端添加到字典。 - _mqttClients[mqtt.Id] = client; + await ConnectMqtt(mqtt); } catch (Exception ex) { @@ -213,6 +213,110 @@ namespace PMSWPF.Services } } + private async Task ConnectMqtt(Mqtt mqtt) + { + NlogHelper.Info($"开始连接:{mqtt.Name}的服务器..."); + mqtt.ConnectMessage = "开始连接服务器..."; + // 创建MQTT客户端工厂和客户端实例。 + var factory = new MqttFactory(); + var client = factory.CreateMqttClient(); + // 构建MQTT客户端连接选项。 + var options = new MqttClientOptionsBuilder() + .WithClientId(mqtt.ClientID) + .WithTcpServer(mqtt.Host, mqtt.Port) + .WithCredentials(mqtt.UserName, mqtt.PassWord) + .WithCleanSession() // 清理会话,每次连接都是新会话 + .Build(); + + // 设置连接成功事件处理程序。 + client.UseConnectedHandler(async (e) => { await HandleConnected(e, client, mqtt); }); + + // 设置接收消息处理程序 + client.UseApplicationMessageReceivedHandler(e => { HandleMessageReceived(e, mqtt); }); + + // 设置断开连接事件处理程序。 + client.UseDisconnectedHandler(async (e) => await HandleDisconnected(e,options, client, mqtt)); + + // 尝试连接到MQTT代理。 + await client.ConnectAsync(options, CancellationToken.None); + + // 将连接成功的客户端添加到字典。 + _mqttClients[mqtt.Id] = client; + } + + private static void HandleMessageReceived(MqttApplicationMessageReceivedEventArgs e, Mqtt mqtt) + { + var topic = e.ApplicationMessage.Topic; + var payload = System.Text.Encoding.UTF8.GetString(e.ApplicationMessage.Payload); + NlogHelper.Info($"MQTT客户端 {mqtt.Name} 收到消息: 主题={topic}, 消息={payload}"); + + // 在这里添加处理消息的逻辑 + } + + private async Task HandleDisconnected(MqttClientDisconnectedEventArgs args, IMqttClientOptions options, + IMqttClient client, + Mqtt mqtt) + { + NotificationHelper.ShowWarn($"与MQTT服务器断开连接: {mqtt.Name}"); + mqtt.ConnectMessage = "断开连接."; + mqtt.IsConnected = false; + // 服务停止 + if (_stopEvent.WaitOne(0) || !mqtt.IsActive) + return; + + // 增加重连尝试次数 + if (!_reconnectAttempts.ContainsKey(mqtt.Id)) + { + _reconnectAttempts[mqtt.Id] = 0; + } + + _reconnectAttempts[mqtt.Id]++; + + // 指数退避策略 + var maxDelay = TimeSpan.FromMinutes(5); // 最大延迟5分钟 + var baseDelay = TimeSpan.FromSeconds(5); // 基础延迟5秒 + var delay = TimeSpan.FromSeconds(baseDelay.TotalSeconds * + Math.Pow(2, _reconnectAttempts[mqtt.Id] - 1)); + if (delay > maxDelay) + { + delay = maxDelay; + } + + NlogHelper.Info( + $"与MQTT服务器:{mqtt.Name} 的连接已断开。将在 {delay.TotalSeconds} 秒后尝试第 {_reconnectAttempts[mqtt.Id]} 次重新连接..."); + + mqtt.ConnectMessage = $"连接已断开。将在 {delay.TotalSeconds} 秒后尝试第 {_reconnectAttempts[mqtt.Id]} 次重新连接..."; + await Task.Delay(delay); + try + { + mqtt.ConnectMessage = $"开始重新连接服务器..."; + await client.ConnectAsync(options, CancellationToken.None); + } + catch (Exception ex) + { + mqtt.ConnectMessage = $"重新与Mqtt服务器连接失败."; + NlogHelper.Error($"重新与Mqtt服务器连接失败: {mqtt.Name}", ex); + } + } + + private async Task HandleConnected(MqttClientConnectedEventArgs args, IMqttClient client, Mqtt mqtt) + { + // 重置重连尝试次数 + if (_reconnectAttempts.ContainsKey(mqtt.Id)) + { + _reconnectAttempts[mqtt.Id] = 0; + } + + NotificationHelper.ShowSuccess($"已连接到MQTT服务器: {mqtt.Name}"); + mqtt.IsConnected = true; + mqtt.ConnectMessage = "连接成功."; + + // 订阅主题 + await client.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(mqtt.SubTopic) + .Build()); + NlogHelper.Info($"MQTT客户端 {mqtt.Name} 已订阅主题: {mqtt.SubTopic}"); + } + /// /// 处理MQTT列表变化事件的回调方法。 diff --git a/ViewModels/MqttsViewModel.cs b/ViewModels/MqttsViewModel.cs index ac14924..137edb3 100644 --- a/ViewModels/MqttsViewModel.cs +++ b/ViewModels/MqttsViewModel.cs @@ -19,33 +19,33 @@ public partial class MqttsViewModel : ViewModelBase private readonly MqttRepository _mqttRepository; private readonly ILogger _logger; private readonly NavgatorServices _navgatorServices; - + [ObservableProperty] private ObservableCollection _mqtts; - public ObservableCollection Mqtts - { - get => _mqtts; - set - { - if (_mqtts != null) - { - foreach (var mqtt in _mqtts) - { - mqtt.PropertyChanged -= Mqtt_PropertyChanged; - } - } - - SetProperty(ref _mqtts, value); - - if (_mqtts != null) - { - foreach (var mqtt in _mqtts) - { - mqtt.PropertyChanged += Mqtt_PropertyChanged; - } - } - } - } + // public ObservableCollection Mqtts + // { + // get => _mqtts; + // set + // { + // if (_mqtts != null) + // { + // foreach (var mqtt in _mqtts) + // { + // mqtt.PropertyChanged -= Mqtt_PropertyChanged; + // } + // } + // + // SetProperty(ref _mqtts, value); + // + // if (_mqtts != null) + // { + // foreach (var mqtt in _mqtts) + // { + // mqtt.PropertyChanged += Mqtt_PropertyChanged; + // } + // } + // } + // } [ObservableProperty] private Mqtt _selectedMqtt; diff --git a/Views/MqttsView.xaml b/Views/MqttsView.xaml index 46b35fa..7dd606c 100644 --- a/Views/MqttsView.xaml +++ b/Views/MqttsView.xaml @@ -136,6 +136,14 @@ FontSize="14" /> + + + +