Files
DMS/DMS.Infrastructure/Services/Mqtt/MqttBackgroundService.cs
David P.G b85ffdc21b 1 feat: 重构数据加载完成事件的处理机制
2
    3 - 从IDataLoaderService接口中移除OnLoadDataCompleted事件
    4 - 在IEventService接口中新增OnLoadDataCompleted事件和RaiseLoadDataCompleted方法
    5 - 在EventService实现类中实现数据加载完成事件的触发功能
    6 - 修改DataLoaderService不再直接触发事件,而是通过IEventService来触发
    7 - 更新MQTT、OPC UA和S7后台服务以订阅事件服务中的数据加载完成事件
    8 - 修改数据加载完成事件的监听方式,统一使用事件服务进行管理
    9
   10 此重构改进了事件处理的架构设计,使事件管理更加集中和一致。
2025-10-05 11:21:05 +08:00

264 lines
10 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using System.Collections.Concurrent;
using DMS.Application.Events;
using DMS.Application.Interfaces;
using DMS.Core.Interfaces.Services;
using DMS.Core.Models;
using DMS.Infrastructure.Interfaces.Services;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace DMS.Infrastructure.Services.Mqtt
{
/// <summary>
/// MQTT后台服务负责管理MQTT连接和数据传输
/// </summary>
public class MqttBackgroundService : BackgroundService, IMqttBackgroundService
{
private readonly ILogger<MqttBackgroundService> _logger;
private readonly IMqttServiceManager _mqttServiceManager;
private readonly IEventService _eventService;
private readonly IAppDataStorageService _appDataStorageService;
private readonly IAppDataCenterService _appDataCenterService;
private readonly ConcurrentDictionary<int, MqttServer> _mqttServers;
private readonly SemaphoreSlim _reloadSemaphore = new(0);
public MqttBackgroundService(
ILogger<MqttBackgroundService> logger,
IMqttServiceManager mqttServiceManager,
IEventService eventService,
IAppDataStorageService appDataStorageService,
IAppDataCenterService appDataCenterService)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_mqttServiceManager = mqttServiceManager ?? throw new ArgumentNullException(nameof(mqttServiceManager));
_eventService = eventService;
_appDataStorageService = appDataStorageService;
_appDataCenterService = appDataCenterService ?? throw new ArgumentNullException(nameof(appDataCenterService));
_mqttServers = new ConcurrentDictionary<int, MqttServer>();
_eventService.OnLoadDataCompleted += OnLoadDataCompleted;
}
private void OnLoadDataCompleted(object? sender, DataLoadCompletedEventArgs e)
{
if (e.IsSuccess)
{
Start();
}
}
/// <summary>
/// 启动MQTT后台服务
/// </summary>
private void Start(CancellationToken cancellationToken = default)
{
_reloadSemaphore.Release();
_logger.LogInformation("MQTT后台服务启动请求已发送");
}
/// <summary>
/// 停止MQTT后台服务
/// </summary>
public async Task StopAsync(CancellationToken cancellationToken = default)
{
_logger.LogInformation("MQTT后台服务停止请求已发送");
}
/// <summary>
/// 添加MQTT服务器配置
/// </summary>
public void AddMqttServer(MqttServer mqttServer)
{
if (mqttServer == null)
throw new ArgumentNullException(nameof(mqttServer));
_mqttServers.AddOrUpdate(mqttServer.Id, mqttServer, (key, oldValue) => mqttServer);
_mqttServiceManager.AddMqttServer(mqttServer);
_reloadSemaphore.Release();
_logger.LogInformation("已添加MQTT服务器 {ServerName} 到监控列表", mqttServer.ServerName);
}
/// <summary>
/// 移除MQTT服务器配置
/// </summary>
public async Task RemoveMqttServerAsync(int mqttServerId, CancellationToken cancellationToken = default)
{
if (_mqttServers.TryRemove(mqttServerId, out var mqttServer))
{
await _mqttServiceManager.RemoveMqttServerAsync(mqttServerId, cancellationToken);
_logger.LogInformation("已移除MQTT服务器 {ServerName} 的监控", mqttServer?.ServerName ?? mqttServerId.ToString());
}
}
/// <summary>
/// 更新MQTT服务器配置
/// </summary>
public void UpdateMqttServer(MqttServer mqttServer)
{
if (mqttServer == null)
throw new ArgumentNullException(nameof(mqttServer));
_mqttServers.AddOrUpdate(mqttServer.Id, mqttServer, (key, oldValue) => mqttServer);
_reloadSemaphore.Release();
_logger.LogInformation("已更新MQTT服务器 {ServerName} 的配置", mqttServer.ServerName);
}
/// <summary>
/// 获取所有MQTT服务器配置
/// </summary>
public IEnumerable<MqttServer> GetAllMqttServers()
{
return _mqttServers.Values.ToList();
}
/// <summary>
/// 发布变量数据到MQTT服务器
/// </summary>
public async Task PublishVariableDataAsync(VariableMqttAlias variableMqtt, CancellationToken cancellationToken = default)
{
await _mqttServiceManager.PublishVariableDataAsync(variableMqtt, cancellationToken);
}
/// <summary>
/// 发布批量变量数据到MQTT服务器
/// </summary>
public async Task PublishVariablesDataAsync(List<VariableMqtt> variableMqtts, CancellationToken cancellationToken = default)
{
await _mqttServiceManager.PublishVariablesDataAsync(variableMqtts, cancellationToken);
}
/// <summary>
/// 后台服务的核心执行逻辑
/// </summary>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("MQTT后台服务正在启动");
try
{
while (!stoppingToken.IsCancellationRequested )
{
await _reloadSemaphore.WaitAsync(stoppingToken);
if (stoppingToken.IsCancellationRequested ) break;
// 加载MQTT配置
if (!LoadMqttConfigurations())
{
_logger.LogInformation("加载MQTT配置过程中发生了错误停止后面的操作");
continue;
}
// 连接MQTT服务器
await ConnectMqttServersAsync(stoppingToken);
_logger.LogInformation("MQTT后台服务已启动");
// 保持运行状态
while (!stoppingToken.IsCancellationRequested && _reloadSemaphore.CurrentCount == 0)
{
await Task.Delay(1000, stoppingToken);
}
}
}
catch (OperationCanceledException)
{
_logger.LogInformation("MQTT后台服务正在停止");
}
catch (Exception ex)
{
_logger.LogError(ex, $"MQTT后台服务运行中发生了错误: {ex.Message}");
}
finally
{
_logger.LogInformation("MQTT后台服务已停止");
}
}
/// <summary>
/// 加载MQTT配置
/// </summary>
private bool LoadMqttConfigurations()
{
try
{
_logger.LogInformation("开始加载MQTT配置...");
_mqttServers.Clear();
// 从数据服务中心获取所有激活的MQTT服务器
var mqttServerDtos = _appDataStorageService.MqttServers.Values
.Where(m => m.IsActive)
.ToList();
foreach (var mqttServerDto in mqttServerDtos)
{
// 将 MqttServerDto 转换为 MqttServerConfig
var mqttServer = new MqttServer
{
Id = mqttServerDto.Id,
ServerName = mqttServerDto.ServerName,
ServerUrl = mqttServerDto.ServerUrl,
Port = mqttServerDto.Port,
Username = mqttServerDto.Username,
Password = mqttServerDto.Password,
IsActive = mqttServerDto.IsActive,
SubscribeTopic = mqttServerDto.SubscribeTopic,
PublishTopic = mqttServerDto.PublishTopic,
ClientId = mqttServerDto.ClientId,
CreatedAt = mqttServerDto.CreatedAt,
ConnectedAt = mqttServerDto.ConnectedAt,
ConnectionDuration = mqttServerDto.ConnectionDuration,
MessageFormat = mqttServerDto.MessageFormat
};
_mqttServers.TryAdd(mqttServer.Id, mqttServer);
_mqttServiceManager.AddMqttServer(mqttServer);
}
_logger.LogInformation($"成功加载 {mqttServerDtos.Count} 个MQTT配置");
return true;
}
catch (Exception ex)
{
_logger.LogError(ex, $"加载MQTT配置时发生错误: {ex.Message}");
return false;
}
}
/// <summary>
/// 连接MQTT服务器列表
/// </summary>
private async Task ConnectMqttServersAsync(CancellationToken stoppingToken)
{
var connectTasks = _mqttServers.Values
.Where(m => m.IsActive)
.Select(mqtt => _mqttServiceManager.ConnectMqttServerAsync(mqtt.Id, stoppingToken));
await Task.WhenAll(connectTasks);
}
/// <summary>
/// 处理MQTT列表变化
/// </summary>
private void HandleMqttListChanged(List<MqttServer> mqtts)
{
_logger.LogInformation("MQTT列表发生了变化正在重新加载数据...");
_reloadSemaphore.Release();
}
/// <summary>
/// 释放资源
/// </summary>
public override void Dispose()
{
_logger.LogInformation("正在释放MQTT后台服务资源...");
_eventService.OnLoadDataCompleted -= OnLoadDataCompleted;
_reloadSemaphore?.Dispose();
base.Dispose();
_logger.LogInformation("MQTT后台服务资源已释放");
}
}
}