将MQTT的后台服务改为继承BackgroundService

This commit is contained in:
2025-07-17 20:28:20 +08:00
parent 018fe7c9d0
commit 1c14fbd3d6
2 changed files with 78 additions and 95 deletions

View File

@@ -77,11 +77,11 @@ public partial class App : Application
MainWindow.Show(); MainWindow.Show();
// 根据配置启动服务 // 根据配置启动服务
var connectionSettings = PMSWPF.Config.ConnectionSettings.Load(); // var connectionSettings = PMSWPF.Config.ConnectionSettings.Load();
if (connectionSettings.EnableMqttService) // if (connectionSettings.EnableMqttService)
{ // {
Host.Services.GetRequiredService<MqttBackgroundService>().StartService(); // Host.Services.GetRequiredService<MqttBackgroundService>().StartService();
} // }
// if (connectionSettings.EnableOpcUaService) // if (connectionSettings.EnableOpcUaService)
// { // {
// Host.Services.GetRequiredService<OpcUaBackgroundService>().StartService(); // Host.Services.GetRequiredService<OpcUaBackgroundService>().StartService();
@@ -105,7 +105,7 @@ public partial class App : Application
services.AddSingleton<GrowlNotificationService>(); services.AddSingleton<GrowlNotificationService>();
services.AddHostedService<S7BackgroundService>(); services.AddHostedService<S7BackgroundService>();
services.AddHostedService<OpcUaBackgroundService>(); services.AddHostedService<OpcUaBackgroundService>();
services.AddSingleton<MqttBackgroundService>(); services.AddHostedService<MqttBackgroundService>();
services.AddSingleton<OpcUaBackgroundService>(); services.AddSingleton<OpcUaBackgroundService>();
// 注册 AutoMapper // 注册 AutoMapper

View File

@@ -1,4 +1,6 @@
using System.Collections.Concurrent;
using System.Text; using System.Text;
using Microsoft.Extensions.Hosting;
using MQTTnet; using MQTTnet;
using MQTTnet.Client; using MQTTnet.Client;
using MQTTnet.Client.Connecting; using MQTTnet.Client.Connecting;
@@ -13,28 +15,23 @@ namespace PMSWPF.Services
/// <summary> /// <summary>
/// MQTT后台服务继承自BackgroundService用于在后台管理MQTT连接和数据发布。 /// MQTT后台服务继承自BackgroundService用于在后台管理MQTT连接和数据发布。
/// </summary> /// </summary>
public class MqttBackgroundService public class MqttBackgroundService : BackgroundService
{ {
// 数据服务实例用于访问和操作应用程序数据如MQTT配置和变量数据。 // 数据服务实例用于访问和操作应用程序数据如MQTT配置和变量数据。
private readonly DataServices _dataServices; private readonly DataServices _dataServices;
private readonly MqttRepository _mqttRepository; private readonly MqttRepository _mqttRepository;
// 存储MQTT客户端实例的字典键为MQTT配置ID值为IMqttClient对象。 // 存储MQTT客户端实例的字典键为MQTT配置ID值为IMqttClient对象。
private readonly Dictionary<int, IMqttClient> _mqttClients; private readonly ConcurrentDictionary<int, IMqttClient> _mqttClients;
// 存储MQTT配置的字典键为MQTT配置ID值为Mqtt模型对象。 // 存储MQTT配置的字典键为MQTT配置ID值为Mqtt模型对象。
private readonly Dictionary<int, Mqtt> _mqttConfigDic; private readonly ConcurrentDictionary<int, Mqtt> _mqttConfigDic;
// 存储每个客户端重连尝试次数的字典 // 存储每个客户端重连尝试次数的字典
private readonly Dictionary<int, int> _reconnectAttempts; private readonly ConcurrentDictionary<int, int> _reconnectAttempts;
private readonly SemaphoreSlim _reloadSemaphore = new SemaphoreSlim(0);
// 定时器,用于周期性地执行数据发布任务。
private Timer _timer;
private Thread _serviceMainThread;
private ManualResetEvent _reloadEvent = new ManualResetEvent(false);
private ManualResetEvent _stopEvent = new ManualResetEvent(false);
/// <summary> /// <summary>
/// 构造函数注入DataServices。 /// 构造函数注入DataServices。
@@ -44,81 +41,67 @@ namespace PMSWPF.Services
{ {
_dataServices = dataServices; _dataServices = dataServices;
_mqttRepository = mqttRepository; _mqttRepository = mqttRepository;
_mqttClients = new Dictionary<int, IMqttClient>(); _mqttClients = new ConcurrentDictionary<int, IMqttClient>();
_mqttConfigDic = new Dictionary<int, Mqtt>(); _mqttConfigDic = new ConcurrentDictionary<int, Mqtt>();
_reconnectAttempts = new Dictionary<int, int>(); _reconnectAttempts = new ConcurrentDictionary<int, int>();
}
/// <summary>
/// 启动MQTT后台服务。
/// </summary>
public async void StartService()
{
// 订阅MQTT列表和变量数据变化的事件以便在数据更新时重新加载配置和数据。
_dataServices.OnMqttListChanged += HandleMqttListChanged; _dataServices.OnMqttListChanged += HandleMqttListChanged;
NlogHelper.Info("Mqtt后台服务启动"); // 记录服务启动信息
_reloadEvent.Set();
_stopEvent.Reset();
_serviceMainThread = new Thread(Execute);
_serviceMainThread.IsBackground = true;
_serviceMainThread.Name = "MqttServiceMainThread";
_serviceMainThread.Start();
} }
private async void Execute() protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{ {
while (!_stopEvent.WaitOne(0)) NlogHelper.Info("Mqtt后台服务正在启动。");
{ _reloadSemaphore.Release(); // Initial trigger to load variables and connect
if (_dataServices.Mqtts == null || _dataServices.Mqtts.Count == 0)
{
_reloadEvent.Reset();
continue;
}
_reloadEvent.WaitOne();
try try
{ {
// 初始加载MQTT配置和变量数据。 while (!stoppingToken.IsCancellationRequested)
var isLoaded = LoadMqttConfigurations();
if (isLoaded)
{ {
await ConnectMqttList(); await _reloadSemaphore.WaitAsync(stoppingToken); // Wait for a reload signal
if (stoppingToken.IsCancellationRequested)
{
break;
} }
if (_dataServices.Mqtts == null || _dataServices.Mqtts.Count == 0)
{
NlogHelper.Info("没有可用的Mqtt配置等待Mqtt列表更新...");
continue;
}
var isLoaded = LoadMqttConfigurations();
if (!isLoaded)
{
NlogHelper.Info("加载Mqtt配置过程中发生了错误停止后面的操作。");
continue;
}
await ConnectMqttList(stoppingToken);
NlogHelper.Info("Mqtt后台服务已启动。");
// while (!stoppingToken.IsCancellationRequested && _reloadSemaphore.CurrentCount == 0)
// {
// await Task.Delay(1000, stoppingToken);
// }
}
}
catch (OperationCanceledException)
{
NlogHelper.Info("Mqtt后台服务已停止。");
} }
catch (Exception e) catch (Exception e)
{ {
NotificationHelper.ShowError($"Mqt后台服务主程序在加载变量和连接服务器过程中出现了错误{e.Message} ", e); NlogHelper.Error($"Mqtt后台服务运行中发生了错误:{e.Message}", e);
} }
finally finally
{ {
_reloadEvent.Reset(); await DisconnectAll(stoppingToken);
}
}
}
/// <summary>
/// 停止MQTT后台服务。
/// </summary>
public async Task StopService()
{
NlogHelper.Info("Mqtt后台服务开始停止...."); // 记录服务停止信息
_stopEvent.Set();
// 取消订阅事件。
_dataServices.OnMqttListChanged -= HandleMqttListChanged; _dataServices.OnMqttListChanged -= HandleMqttListChanged;
}
await DisconnecetAll();
// 清空所有字典。
_mqttClients.Clear();
_mqttConfigDic.Clear();
_reconnectAttempts.Clear();
NlogHelper.Info("Mqtt后台服务已停止。"); // 记录服务停止信息
} }
private async Task DisconnecetAll() private async Task DisconnectAll(CancellationToken stoppingToken)
{ {
// 断开所有已连接的MQTT客户端。 // 断开所有已连接的MQTT客户端。
foreach (var mqttId in _mqttClients.Keys.ToList()) foreach (var mqttId in _mqttClients.Keys.ToList())
@@ -130,7 +113,7 @@ namespace PMSWPF.Services
mqtt.IsConnected = false; mqtt.IsConnected = false;
if (client.IsConnected) if (client.IsConnected)
{ {
await client.DisconnectAsync(); await client.DisconnectAsync(new MqttClientDisconnectOptions(), stoppingToken);
} }
} }
catch (Exception e) catch (Exception e)
@@ -177,7 +160,7 @@ namespace PMSWPF.Services
{ {
if (mqtt.IsActive) if (mqtt.IsActive)
{ {
await ConnectMqtt(mqtt); _reloadSemaphore.Release();
} }
else else
{ {
@@ -195,7 +178,7 @@ namespace PMSWPF.Services
mqtt.IsConnected = false; mqtt.IsConnected = false;
mqtt.ConnectMessage = "断开连接."; mqtt.ConnectMessage = "断开连接.";
_mqttClients.Remove(mqtt.Id); _mqttClients.TryRemove(mqtt.Id, out _);
NlogHelper.Info($"{mqtt.Name}的客户端,与服务器断开连接."); NlogHelper.Info($"{mqtt.Name}的客户端,与服务器断开连接.");
} }
@@ -213,7 +196,7 @@ namespace PMSWPF.Services
/// </summary> /// </summary>
/// <param name="mqtt">MQTT配置对象。</param> /// <param name="mqtt">MQTT配置对象。</param>
/// <returns>表示异步操作的任务。</returns> /// <returns>表示异步操作的任务。</returns>
private async Task ConnectMqttList() private async Task ConnectMqttList(CancellationToken stoppingToken)
{ {
foreach (Mqtt mqtt in _mqttConfigDic.Values.ToList()) foreach (Mqtt mqtt in _mqttConfigDic.Values.ToList())
{ {
@@ -227,7 +210,7 @@ namespace PMSWPF.Services
continue; continue;
} }
await ConnectMqtt(mqtt); await ConnectMqtt(mqtt, stoppingToken);
} }
catch (Exception ex) catch (Exception ex)
{ {
@@ -236,7 +219,7 @@ namespace PMSWPF.Services
} }
} }
private async Task ConnectMqtt(Mqtt mqtt) private async Task ConnectMqtt(Mqtt mqtt, CancellationToken stoppingToken)
{ {
NlogHelper.Info($"开始连接:{mqtt.Name}的服务器..."); NlogHelper.Info($"开始连接:{mqtt.Name}的服务器...");
mqtt.ConnectMessage = "开始连接服务器..."; mqtt.ConnectMessage = "开始连接服务器...";
@@ -258,10 +241,10 @@ namespace PMSWPF.Services
client.UseApplicationMessageReceivedHandler(e => { HandleMessageReceived(e, mqtt); }); client.UseApplicationMessageReceivedHandler(e => { HandleMessageReceived(e, mqtt); });
// 设置断开连接事件处理程序。 // 设置断开连接事件处理程序。
client.UseDisconnectedHandler(async (e) => await HandleDisconnected(e, options, client, mqtt)); client.UseDisconnectedHandler(async (e) => await HandleDisconnected(e, options, client, mqtt, stoppingToken));
// 尝试连接到MQTT代理。 // 尝试连接到MQTT代理。
await client.ConnectAsync(options, CancellationToken.None); await client.ConnectAsync(options, stoppingToken);
// 将连接成功的客户端添加到字典。 // 将连接成功的客户端添加到字典。
_mqttClients[mqtt.Id] = client; _mqttClients[mqtt.Id] = client;
@@ -278,13 +261,13 @@ namespace PMSWPF.Services
private async Task HandleDisconnected(MqttClientDisconnectedEventArgs args, IMqttClientOptions options, private async Task HandleDisconnected(MqttClientDisconnectedEventArgs args, IMqttClientOptions options,
IMqttClient client, IMqttClient client,
Mqtt mqtt) Mqtt mqtt, CancellationToken stoppingToken)
{ {
NotificationHelper.ShowWarn($"与MQTT服务器断开连接: {mqtt.Name}"); NotificationHelper.ShowWarn($"与MQTT服务器断开连接: {mqtt.Name}");
mqtt.ConnectMessage = "断开连接."; mqtt.ConnectMessage = "断开连接.";
mqtt.IsConnected = false; mqtt.IsConnected = false;
// 服务停止 // 服务停止
if (_stopEvent.WaitOne(0) || !mqtt.IsActive) if (stoppingToken.IsCancellationRequested || !mqtt.IsActive)
return; return;
// 增加重连尝试次数 // 增加重连尝试次数
@@ -309,11 +292,11 @@ namespace PMSWPF.Services
$"与MQTT服务器{mqtt.Name} 的连接已断开。将在 {delay.TotalSeconds} 秒后尝试第 {_reconnectAttempts[mqtt.Id]} 次重新连接..."); $"与MQTT服务器{mqtt.Name} 的连接已断开。将在 {delay.TotalSeconds} 秒后尝试第 {_reconnectAttempts[mqtt.Id]} 次重新连接...");
mqtt.ConnectMessage = $"连接已断开。将在 {delay.TotalSeconds} 秒后尝试第 {_reconnectAttempts[mqtt.Id]} 次重新连接..."; mqtt.ConnectMessage = $"连接已断开。将在 {delay.TotalSeconds} 秒后尝试第 {_reconnectAttempts[mqtt.Id]} 次重新连接...";
await Task.Delay(delay); await Task.Delay(delay, stoppingToken);
try try
{ {
mqtt.ConnectMessage = $"开始重新连接服务器..."; mqtt.ConnectMessage = $"开始重新连接服务器...";
await client.ConnectAsync(options, CancellationToken.None); await client.ConnectAsync(options, stoppingToken);
} }
catch (Exception ex) catch (Exception ex)
{ {
@@ -346,11 +329,11 @@ namespace PMSWPF.Services
/// </summary> /// </summary>
/// <param name="sender">事件发送者。</param> /// <param name="sender">事件发送者。</param>
/// <param name="mqtts">更新后的MQTT配置列表。</param> /// <param name="mqtts">更新后的MQTT配置列表。</param>
private async void HandleMqttListChanged(List<Mqtt> mqtts) private void HandleMqttListChanged(List<Mqtt> mqtts)
{ {
NlogHelper.Info("Mqtt列表发生了变化正在重新加载数据..."); // 记录MQTT列表变化信息 NlogHelper.Info("Mqtt列表发生了变化正在重新加载数据..."); // 记录MQTT列表变化信息
// 重新加载MQTT配置和变量数据。 // 重新加载MQTT配置和变量数据。
_reloadEvent.Set(); _reloadSemaphore.Release();
} }
} }
} }