diff --git a/App.xaml.cs b/App.xaml.cs index ecce532..fb73336 100644 --- a/App.xaml.cs +++ b/App.xaml.cs @@ -77,11 +77,11 @@ public partial class App : Application MainWindow.Show(); // 根据配置启动服务 - var connectionSettings = PMSWPF.Config.ConnectionSettings.Load(); - if (connectionSettings.EnableMqttService) - { - Host.Services.GetRequiredService().StartService(); - } + // var connectionSettings = PMSWPF.Config.ConnectionSettings.Load(); + // if (connectionSettings.EnableMqttService) + // { + // Host.Services.GetRequiredService().StartService(); + // } // if (connectionSettings.EnableOpcUaService) // { // Host.Services.GetRequiredService().StartService(); @@ -105,7 +105,7 @@ public partial class App : Application services.AddSingleton(); services.AddHostedService(); services.AddHostedService(); - services.AddSingleton(); + services.AddHostedService(); services.AddSingleton(); // 注册 AutoMapper diff --git a/Services/MqttBackgroundService.cs b/Services/MqttBackgroundService.cs index e919ec1..8103948 100644 --- a/Services/MqttBackgroundService.cs +++ b/Services/MqttBackgroundService.cs @@ -1,4 +1,6 @@ +using System.Collections.Concurrent; using System.Text; +using Microsoft.Extensions.Hosting; using MQTTnet; using MQTTnet.Client; using MQTTnet.Client.Connecting; @@ -13,29 +15,24 @@ namespace PMSWPF.Services /// /// MQTT后台服务,继承自BackgroundService,用于在后台管理MQTT连接和数据发布。 /// - public class MqttBackgroundService + public class MqttBackgroundService : BackgroundService { // 数据服务实例,用于访问和操作应用程序数据,如MQTT配置和变量数据。 private readonly DataServices _dataServices; private readonly MqttRepository _mqttRepository; // 存储MQTT客户端实例的字典,键为MQTT配置ID,值为IMqttClient对象。 - private readonly Dictionary _mqttClients; + private readonly ConcurrentDictionary _mqttClients; // 存储MQTT配置的字典,键为MQTT配置ID,值为Mqtt模型对象。 - private readonly Dictionary _mqttConfigDic; + private readonly ConcurrentDictionary _mqttConfigDic; // 存储每个客户端重连尝试次数的字典 - private readonly Dictionary _reconnectAttempts; + private readonly ConcurrentDictionary _reconnectAttempts; + + private readonly SemaphoreSlim _reloadSemaphore = new SemaphoreSlim(0); - // 定时器,用于周期性地执行数据发布任务。 - private Timer _timer; - private Thread _serviceMainThread; - - private ManualResetEvent _reloadEvent = new ManualResetEvent(false); - private ManualResetEvent _stopEvent = new ManualResetEvent(false); - /// /// 构造函数,注入DataServices。 /// @@ -44,81 +41,67 @@ namespace PMSWPF.Services { _dataServices = dataServices; _mqttRepository = mqttRepository; - _mqttClients = new Dictionary(); - _mqttConfigDic = new Dictionary(); - _reconnectAttempts = new Dictionary(); - } - - /// - /// 启动MQTT后台服务。 - /// - public async void StartService() - { - // 订阅MQTT列表和变量数据变化的事件,以便在数据更新时重新加载配置和数据。 + _mqttClients = new ConcurrentDictionary(); + _mqttConfigDic = new ConcurrentDictionary(); + _reconnectAttempts = new ConcurrentDictionary(); + _dataServices.OnMqttListChanged += HandleMqttListChanged; - NlogHelper.Info("Mqtt后台服务启动"); // 记录服务启动信息 - _reloadEvent.Set(); - _stopEvent.Reset(); - _serviceMainThread = new Thread(Execute); - _serviceMainThread.IsBackground = true; - _serviceMainThread.Name = "MqttServiceMainThread"; - _serviceMainThread.Start(); } - private async void Execute() + protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - while (!_stopEvent.WaitOne(0)) + NlogHelper.Info("Mqtt后台服务正在启动。"); + _reloadSemaphore.Release(); // Initial trigger to load variables and connect + + try { - if (_dataServices.Mqtts == null || _dataServices.Mqtts.Count == 0) + while (!stoppingToken.IsCancellationRequested) { - _reloadEvent.Reset(); - continue; - } + await _reloadSemaphore.WaitAsync(stoppingToken); // Wait for a reload signal - _reloadEvent.WaitOne(); - - try - { - // 初始加载MQTT配置和变量数据。 - var isLoaded = LoadMqttConfigurations(); - if (isLoaded) + if (stoppingToken.IsCancellationRequested) { - await ConnectMqttList(); + break; } - } - catch (Exception e) - { - NotificationHelper.ShowError($"Mqt后台服务主程序在加载变量和连接服务器过程中出现了错误:{e.Message} ", e); - } - finally - { - _reloadEvent.Reset(); + + if (_dataServices.Mqtts == null || _dataServices.Mqtts.Count == 0) + { + NlogHelper.Info("没有可用的Mqtt配置,等待Mqtt列表更新..."); + continue; + } + + var isLoaded = LoadMqttConfigurations(); + if (!isLoaded) + { + NlogHelper.Info("加载Mqtt配置过程中发生了错误,停止后面的操作。"); + continue; + } + + await ConnectMqttList(stoppingToken); + NlogHelper.Info("Mqtt后台服务已启动。"); + + // while (!stoppingToken.IsCancellationRequested && _reloadSemaphore.CurrentCount == 0) + // { + // await Task.Delay(1000, stoppingToken); + // } } } + catch (OperationCanceledException) + { + NlogHelper.Info("Mqtt后台服务已停止。"); + } + catch (Exception e) + { + NlogHelper.Error($"Mqtt后台服务运行中发生了错误:{e.Message}", e); + } + finally + { + await DisconnectAll(stoppingToken); + _dataServices.OnMqttListChanged -= HandleMqttListChanged; + } } - - - /// - /// 停止MQTT后台服务。 - /// - public async Task StopService() - { - NlogHelper.Info("Mqtt后台服务开始停止...."); // 记录服务停止信息 - - _stopEvent.Set(); - // 取消订阅事件。 - _dataServices.OnMqttListChanged -= HandleMqttListChanged; - - await DisconnecetAll(); - - // 清空所有字典。 - _mqttClients.Clear(); - _mqttConfigDic.Clear(); - _reconnectAttempts.Clear(); - NlogHelper.Info("Mqtt后台服务已停止。"); // 记录服务停止信息 - } - - private async Task DisconnecetAll() + + private async Task DisconnectAll(CancellationToken stoppingToken) { // 断开所有已连接的MQTT客户端。 foreach (var mqttId in _mqttClients.Keys.ToList()) @@ -130,7 +113,7 @@ namespace PMSWPF.Services mqtt.IsConnected = false; if (client.IsConnected) { - await client.DisconnectAsync(); + await client.DisconnectAsync(new MqttClientDisconnectOptions(), stoppingToken); } } catch (Exception e) @@ -177,7 +160,7 @@ namespace PMSWPF.Services { if (mqtt.IsActive) { - await ConnectMqtt(mqtt); + _reloadSemaphore.Release(); } else { @@ -195,7 +178,7 @@ namespace PMSWPF.Services mqtt.IsConnected = false; mqtt.ConnectMessage = "断开连接."; - _mqttClients.Remove(mqtt.Id); + _mqttClients.TryRemove(mqtt.Id, out _); NlogHelper.Info($"{mqtt.Name}的客户端,与服务器断开连接."); } @@ -213,7 +196,7 @@ namespace PMSWPF.Services /// /// MQTT配置对象。 /// 表示异步操作的任务。 - private async Task ConnectMqttList() + private async Task ConnectMqttList(CancellationToken stoppingToken) { foreach (Mqtt mqtt in _mqttConfigDic.Values.ToList()) { @@ -227,7 +210,7 @@ namespace PMSWPF.Services continue; } - await ConnectMqtt(mqtt); + await ConnectMqtt(mqtt, stoppingToken); } catch (Exception ex) { @@ -236,7 +219,7 @@ namespace PMSWPF.Services } } - private async Task ConnectMqtt(Mqtt mqtt) + private async Task ConnectMqtt(Mqtt mqtt, CancellationToken stoppingToken) { NlogHelper.Info($"开始连接:{mqtt.Name}的服务器..."); mqtt.ConnectMessage = "开始连接服务器..."; @@ -258,10 +241,10 @@ namespace PMSWPF.Services client.UseApplicationMessageReceivedHandler(e => { HandleMessageReceived(e, mqtt); }); // 设置断开连接事件处理程序。 - client.UseDisconnectedHandler(async (e) => await HandleDisconnected(e, options, client, mqtt)); + client.UseDisconnectedHandler(async (e) => await HandleDisconnected(e, options, client, mqtt, stoppingToken)); // 尝试连接到MQTT代理。 - await client.ConnectAsync(options, CancellationToken.None); + await client.ConnectAsync(options, stoppingToken); // 将连接成功的客户端添加到字典。 _mqttClients[mqtt.Id] = client; @@ -278,13 +261,13 @@ namespace PMSWPF.Services private async Task HandleDisconnected(MqttClientDisconnectedEventArgs args, IMqttClientOptions options, IMqttClient client, - Mqtt mqtt) + Mqtt mqtt, CancellationToken stoppingToken) { NotificationHelper.ShowWarn($"与MQTT服务器断开连接: {mqtt.Name}"); mqtt.ConnectMessage = "断开连接."; mqtt.IsConnected = false; // 服务停止 - if (_stopEvent.WaitOne(0) || !mqtt.IsActive) + if (stoppingToken.IsCancellationRequested || !mqtt.IsActive) return; // 增加重连尝试次数 @@ -309,11 +292,11 @@ namespace PMSWPF.Services $"与MQTT服务器:{mqtt.Name} 的连接已断开。将在 {delay.TotalSeconds} 秒后尝试第 {_reconnectAttempts[mqtt.Id]} 次重新连接..."); mqtt.ConnectMessage = $"连接已断开。将在 {delay.TotalSeconds} 秒后尝试第 {_reconnectAttempts[mqtt.Id]} 次重新连接..."; - await Task.Delay(delay); + await Task.Delay(delay, stoppingToken); try { mqtt.ConnectMessage = $"开始重新连接服务器..."; - await client.ConnectAsync(options, CancellationToken.None); + await client.ConnectAsync(options, stoppingToken); } catch (Exception ex) { @@ -346,11 +329,11 @@ namespace PMSWPF.Services /// /// 事件发送者。 /// 更新后的MQTT配置列表。 - private async void HandleMqttListChanged(List mqtts) + private void HandleMqttListChanged(List mqtts) { NlogHelper.Info("Mqtt列表发生了变化,正在重新加载数据..."); // 记录MQTT列表变化信息 // 重新加载MQTT配置和变量数据。 - _reloadEvent.Set(); + _reloadSemaphore.Release(); } } -} \ No newline at end of file +}