Files
DMS/DMS.Infrastructure/Services/MqttBackgroundService.cs
David P.G 1f9c0a1111 1 实现 MQTT 变量数据发布功能
2
   3 - 迁移 IMqttServiceManager 接口到 DMS.Core
   4 - 在 DataCenterService 中添加 MQTT 服务器和变量别名的加载逻辑
   5 - 实现 MqttPublishProcessor 的核心处理逻辑
   6 - 为 DTO 和 ViewModel 的 MqttAliases 属性提供默认空列表初始化
   7 - 更新 AutoMapper 映射配置以支持 VariableMqttAliasDto
2025-09-07 08:51:18 +08:00

263 lines
9.7 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using DMS.Infrastructure.Interfaces.Services;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DMS.Application.DTOs.Events;
using DMS.Core.Models;
using DMS.Application.Interfaces;
using DMS.Core.Interfaces.Services;
namespace DMS.Infrastructure.Services
{
/// <summary>
/// MQTT后台服务负责管理MQTT连接和数据传输
/// </summary>
public class MqttBackgroundService : BackgroundService, IMqttBackgroundService
{
private readonly ILogger<MqttBackgroundService> _logger;
private readonly IMqttServiceManager _mqttServiceManager;
private readonly IDataCenterService _dataCenterService;
private readonly ConcurrentDictionary<int, MqttServer> _mqttServers;
private readonly SemaphoreSlim _reloadSemaphore = new(0);
public MqttBackgroundService(
ILogger<MqttBackgroundService> logger,
IMqttServiceManager mqttServiceManager,
IDataCenterService dataCenterService)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_mqttServiceManager = mqttServiceManager ?? throw new ArgumentNullException(nameof(mqttServiceManager));
_dataCenterService = dataCenterService ?? throw new ArgumentNullException(nameof(dataCenterService));
_mqttServers = new ConcurrentDictionary<int, MqttServer>();
_dataCenterService.OnLoadDataCompleted += OnLoadDataCompleted;
}
private void OnLoadDataCompleted(object? sender, DataLoadCompletedEventArgs e)
{
if (e.IsSuccess)
{
Start();
}
}
/// <summary>
/// 启动MQTT后台服务
/// </summary>
private void Start(CancellationToken cancellationToken = default)
{
_reloadSemaphore.Release();
_logger.LogInformation("MQTT后台服务启动请求已发送");
}
/// <summary>
/// 停止MQTT后台服务
/// </summary>
public async Task StopAsync(CancellationToken cancellationToken = default)
{
_logger.LogInformation("MQTT后台服务停止请求已发送");
}
/// <summary>
/// 添加MQTT服务器配置
/// </summary>
public void AddMqttServer(MqttServer mqttServer)
{
if (mqttServer == null)
throw new ArgumentNullException(nameof(mqttServer));
_mqttServers.AddOrUpdate(mqttServer.Id, mqttServer, (key, oldValue) => mqttServer);
_mqttServiceManager.AddMqttServer(mqttServer);
_reloadSemaphore.Release();
_logger.LogInformation("已添加MQTT服务器 {ServerName} 到监控列表", mqttServer.ServerName);
}
/// <summary>
/// 移除MQTT服务器配置
/// </summary>
public async Task RemoveMqttServerAsync(int mqttServerId, CancellationToken cancellationToken = default)
{
if (_mqttServers.TryRemove(mqttServerId, out var mqttServer))
{
await _mqttServiceManager.RemoveMqttServerAsync(mqttServerId, cancellationToken);
_logger.LogInformation("已移除MQTT服务器 {ServerName} 的监控", mqttServer?.ServerName ?? mqttServerId.ToString());
}
}
/// <summary>
/// 更新MQTT服务器配置
/// </summary>
public void UpdateMqttServer(MqttServer mqttServer)
{
if (mqttServer == null)
throw new ArgumentNullException(nameof(mqttServer));
_mqttServers.AddOrUpdate(mqttServer.Id, mqttServer, (key, oldValue) => mqttServer);
_reloadSemaphore.Release();
_logger.LogInformation("已更新MQTT服务器 {ServerName} 的配置", mqttServer.ServerName);
}
/// <summary>
/// 获取所有MQTT服务器配置
/// </summary>
public IEnumerable<MqttServer> GetAllMqttServers()
{
return _mqttServers.Values.ToList();
}
/// <summary>
/// 发布变量数据到MQTT服务器
/// </summary>
public async Task PublishVariableDataAsync(VariableMqtt variableMqtt, CancellationToken cancellationToken = default)
{
await _mqttServiceManager.PublishVariableDataAsync(variableMqtt, cancellationToken);
}
/// <summary>
/// 发布批量变量数据到MQTT服务器
/// </summary>
public async Task PublishVariablesDataAsync(List<VariableMqtt> variableMqtts, CancellationToken cancellationToken = default)
{
await _mqttServiceManager.PublishVariablesDataAsync(variableMqtts, cancellationToken);
}
/// <summary>
/// 后台服务的核心执行逻辑
/// </summary>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("MQTT后台服务正在启动");
try
{
while (!stoppingToken.IsCancellationRequested )
{
await _reloadSemaphore.WaitAsync(stoppingToken);
if (stoppingToken.IsCancellationRequested ) break;
// 加载MQTT配置
if (!LoadMqttConfigurations())
{
_logger.LogInformation("加载MQTT配置过程中发生了错误停止后面的操作");
continue;
}
// 连接MQTT服务器
await ConnectMqttServersAsync(stoppingToken);
_logger.LogInformation("MQTT后台服务已启动");
// 保持运行状态
while (!stoppingToken.IsCancellationRequested && _reloadSemaphore.CurrentCount == 0)
{
await Task.Delay(1000, stoppingToken);
}
}
}
catch (OperationCanceledException)
{
_logger.LogInformation("MQTT后台服务正在停止");
}
catch (Exception ex)
{
_logger.LogError(ex, $"MQTT后台服务运行中发生了错误: {ex.Message}");
}
finally
{
_logger.LogInformation("MQTT后台服务已停止");
}
}
/// <summary>
/// 加载MQTT配置
/// </summary>
private bool LoadMqttConfigurations()
{
try
{
_logger.LogInformation("开始加载MQTT配置...");
_mqttServers.Clear();
// 从数据服务中心获取所有激活的MQTT服务器
var mqttServerDtos = _dataCenterService.MqttServers.Values
.Where(m => m.IsActive)
.ToList();
foreach (var mqttServerDto in mqttServerDtos)
{
// 将 MqttServerDto 转换为 MqttServer
var mqttServer = new MqttServer
{
Id = mqttServerDto.Id,
ServerName = mqttServerDto.ServerName,
ServerUrl = mqttServerDto.ServerUrl,
Port = mqttServerDto.Port,
Username = mqttServerDto.Username,
Password = mqttServerDto.Password,
IsActive = mqttServerDto.IsActive,
SubscribeTopic = mqttServerDto.SubscribeTopic,
PublishTopic = mqttServerDto.PublishTopic,
ClientId = mqttServerDto.ClientId,
CreatedAt = mqttServerDto.CreatedAt,
ConnectedAt = mqttServerDto.ConnectedAt,
ConnectionDuration = mqttServerDto.ConnectionDuration,
MessageFormat = mqttServerDto.MessageFormat
};
_mqttServers.TryAdd(mqttServer.Id, mqttServer);
_mqttServiceManager.AddMqttServer(mqttServer);
}
_logger.LogInformation($"成功加载 {mqttServerDtos.Count} 个MQTT配置");
return true;
}
catch (Exception ex)
{
_logger.LogError(ex, $"加载MQTT配置时发生错误: {ex.Message}");
return false;
}
}
/// <summary>
/// 连接MQTT服务器列表
/// </summary>
private async Task ConnectMqttServersAsync(CancellationToken stoppingToken)
{
var connectTasks = _mqttServers.Values
.Where(m => m.IsActive)
.Select(mqtt => _mqttServiceManager.ConnectMqttServerAsync(mqtt.Id, stoppingToken));
await Task.WhenAll(connectTasks);
}
/// <summary>
/// 处理MQTT列表变化
/// </summary>
private void HandleMqttListChanged(List<MqttServer> mqtts)
{
_logger.LogInformation("MQTT列表发生了变化正在重新加载数据...");
_reloadSemaphore.Release();
}
/// <summary>
/// 释放资源
/// </summary>
public override void Dispose()
{
_logger.LogInformation("正在释放MQTT后台服务资源...");
_dataCenterService.OnLoadDataCompleted -= OnLoadDataCompleted;
_reloadSemaphore?.Dispose();
base.Dispose();
_logger.LogInformation("MQTT后台服务资源已释放");
}
}
}