Files
DMS/DMS.Infrastructure/Services/MqttBackgroundService.cs

266 lines
10 KiB
C#
Raw Normal View History

2025-09-06 15:19:04 +08:00
using DMS.Infrastructure.Interfaces.Services;
2025-07-26 10:05:43 +08:00
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
2025-09-06 15:19:04 +08:00
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DMS.Application.DTOs.Events;
using DMS.Core.Models;
2025-07-26 10:05:43 +08:00
using DMS.Application.Interfaces;
using DMS.Core.Interfaces.Services;
2025-07-26 10:05:43 +08:00
2025-09-06 15:19:04 +08:00
namespace DMS.Infrastructure.Services
2025-07-26 10:05:43 +08:00
{
/// <summary>
2025-09-06 15:19:04 +08:00
/// MQTT后台服务负责管理MQTT连接和数据传输
2025-07-26 10:05:43 +08:00
/// </summary>
2025-09-06 15:19:04 +08:00
public class MqttBackgroundService : BackgroundService, IMqttBackgroundService
2025-07-26 10:05:43 +08:00
{
2025-09-06 15:19:04 +08:00
private readonly ILogger<MqttBackgroundService> _logger;
private readonly IMqttServiceManager _mqttServiceManager;
private readonly IAppDataStorageService _appDataStorageService;
private readonly IAppDataCenterService _appDataCenterService;
2025-09-06 15:19:04 +08:00
private readonly ConcurrentDictionary<int, MqttServer> _mqttServers;
private readonly SemaphoreSlim _reloadSemaphore = new(0);
public MqttBackgroundService(
ILogger<MqttBackgroundService> logger,
IMqttServiceManager mqttServiceManager,
IAppDataStorageService appDataStorageService,
IAppDataCenterService appDataCenterService)
2025-09-06 15:19:04 +08:00
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_mqttServiceManager = mqttServiceManager ?? throw new ArgumentNullException(nameof(mqttServiceManager));
_appDataStorageService = appDataStorageService;
_appDataCenterService = appDataCenterService ?? throw new ArgumentNullException(nameof(appDataCenterService));
2025-09-06 15:19:04 +08:00
_mqttServers = new ConcurrentDictionary<int, MqttServer>();
2025-07-26 10:05:43 +08:00
_appDataCenterService.DataLoaderService.OnLoadDataCompleted += OnLoadDataCompleted;
2025-09-06 15:19:04 +08:00
}
2025-07-26 10:05:43 +08:00
2025-09-06 15:19:04 +08:00
private void OnLoadDataCompleted(object? sender, DataLoadCompletedEventArgs e)
2025-07-26 10:05:43 +08:00
{
2025-09-06 15:19:04 +08:00
if (e.IsSuccess)
2025-07-26 10:05:43 +08:00
{
2025-09-06 15:19:04 +08:00
Start();
}
}
2025-07-26 10:05:43 +08:00
2025-09-06 15:19:04 +08:00
/// <summary>
/// 启动MQTT后台服务
/// </summary>
private void Start(CancellationToken cancellationToken = default)
{
_reloadSemaphore.Release();
_logger.LogInformation("MQTT后台服务启动请求已发送");
}
2025-07-26 10:05:43 +08:00
2025-09-06 15:19:04 +08:00
/// <summary>
/// 停止MQTT后台服务
/// </summary>
public async Task StopAsync(CancellationToken cancellationToken = default)
{
_logger.LogInformation("MQTT后台服务停止请求已发送");
}
2025-07-26 10:05:43 +08:00
2025-09-06 15:19:04 +08:00
/// <summary>
/// 添加MQTT服务器配置
/// </summary>
public void AddMqttServer(MqttServer mqttServer)
{
if (mqttServer == null)
throw new ArgumentNullException(nameof(mqttServer));
2025-07-26 10:05:43 +08:00
2025-09-06 15:19:04 +08:00
_mqttServers.AddOrUpdate(mqttServer.Id, mqttServer, (key, oldValue) => mqttServer);
_mqttServiceManager.AddMqttServer(mqttServer);
_reloadSemaphore.Release();
_logger.LogInformation("已添加MQTT服务器 {ServerName} 到监控列表", mqttServer.ServerName);
}
2025-07-26 10:05:43 +08:00
2025-09-06 15:19:04 +08:00
/// <summary>
/// 移除MQTT服务器配置
/// </summary>
public async Task RemoveMqttServerAsync(int mqttServerId, CancellationToken cancellationToken = default)
{
if (_mqttServers.TryRemove(mqttServerId, out var mqttServer))
{
await _mqttServiceManager.RemoveMqttServerAsync(mqttServerId, cancellationToken);
_logger.LogInformation("已移除MQTT服务器 {ServerName} 的监控", mqttServer?.ServerName ?? mqttServerId.ToString());
2025-07-26 10:05:43 +08:00
}
}
2025-09-06 15:19:04 +08:00
/// <summary>
/// 更新MQTT服务器配置
/// </summary>
public void UpdateMqttServer(MqttServer mqttServer)
2025-07-26 10:05:43 +08:00
{
2025-09-06 15:19:04 +08:00
if (mqttServer == null)
throw new ArgumentNullException(nameof(mqttServer));
_mqttServers.AddOrUpdate(mqttServer.Id, mqttServer, (key, oldValue) => mqttServer);
_reloadSemaphore.Release();
_logger.LogInformation("已更新MQTT服务器 {ServerName} 的配置", mqttServer.ServerName);
2025-07-26 10:05:43 +08:00
}
2025-09-06 15:19:04 +08:00
/// <summary>
/// 获取所有MQTT服务器配置
/// </summary>
public IEnumerable<MqttServer> GetAllMqttServers()
2025-07-26 10:05:43 +08:00
{
2025-09-06 15:19:04 +08:00
return _mqttServers.Values.ToList();
2025-07-26 10:05:43 +08:00
}
2025-09-06 15:19:04 +08:00
/// <summary>
/// 发布变量数据到MQTT服务器
/// </summary>
2025-09-10 18:15:31 +08:00
public async Task PublishVariableDataAsync(VariableMqttAlias variableMqtt, CancellationToken cancellationToken = default)
2025-07-26 10:05:43 +08:00
{
2025-09-06 15:19:04 +08:00
await _mqttServiceManager.PublishVariableDataAsync(variableMqtt, cancellationToken);
2025-07-26 10:05:43 +08:00
}
2025-09-06 15:19:04 +08:00
/// <summary>
/// 发布批量变量数据到MQTT服务器
/// </summary>
public async Task PublishVariablesDataAsync(List<VariableMqtt> variableMqtts, CancellationToken cancellationToken = default)
{
await _mqttServiceManager.PublishVariablesDataAsync(variableMqtts, cancellationToken);
}
2025-07-26 10:05:43 +08:00
2025-09-06 15:19:04 +08:00
/// <summary>
/// 后台服务的核心执行逻辑
/// </summary>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
2025-07-26 10:05:43 +08:00
{
2025-09-06 15:19:04 +08:00
_logger.LogInformation("MQTT后台服务正在启动");
2025-07-26 10:05:43 +08:00
try
{
2025-09-06 15:19:04 +08:00
while (!stoppingToken.IsCancellationRequested )
2025-07-26 10:05:43 +08:00
{
2025-09-06 15:19:04 +08:00
await _reloadSemaphore.WaitAsync(stoppingToken);
if (stoppingToken.IsCancellationRequested ) break;
// 加载MQTT配置
if (!LoadMqttConfigurations())
{
_logger.LogInformation("加载MQTT配置过程中发生了错误停止后面的操作");
continue;
}
// 连接MQTT服务器
await ConnectMqttServersAsync(stoppingToken);
_logger.LogInformation("MQTT后台服务已启动");
// 保持运行状态
while (!stoppingToken.IsCancellationRequested && _reloadSemaphore.CurrentCount == 0)
{
await Task.Delay(1000, stoppingToken);
}
2025-07-26 10:05:43 +08:00
}
}
catch (OperationCanceledException)
{
2025-09-06 15:19:04 +08:00
_logger.LogInformation("MQTT后台服务正在停止");
2025-07-26 10:05:43 +08:00
}
catch (Exception ex)
{
2025-09-06 15:19:04 +08:00
_logger.LogError(ex, $"MQTT后台服务运行中发生了错误: {ex.Message}");
2025-07-26 10:05:43 +08:00
}
2025-09-06 15:19:04 +08:00
finally
2025-07-26 10:05:43 +08:00
{
2025-09-06 15:19:04 +08:00
_logger.LogInformation("MQTT后台服务已停止");
2025-07-26 10:05:43 +08:00
}
2025-09-06 15:19:04 +08:00
}
2025-07-26 10:05:43 +08:00
2025-09-06 15:19:04 +08:00
/// <summary>
/// 加载MQTT配置
/// </summary>
private bool LoadMqttConfigurations()
{
2025-07-26 10:05:43 +08:00
try
{
2025-09-06 15:19:04 +08:00
_logger.LogInformation("开始加载MQTT配置...");
_mqttServers.Clear();
// 从数据服务中心获取所有激活的MQTT服务器
var mqttServerDtos = _appDataStorageService.MqttServers.Values
.Where(m => m.IsActive)
.ToList();
2025-09-06 15:19:04 +08:00
foreach (var mqttServerDto in mqttServerDtos)
2025-07-26 10:05:43 +08:00
{
2025-09-06 15:19:04 +08:00
// 将 MqttServerDto 转换为 MqttServer
var mqttServer = new MqttServer
{
Id = mqttServerDto.Id,
ServerName = mqttServerDto.ServerName,
ServerUrl = mqttServerDto.ServerUrl,
Port = mqttServerDto.Port,
Username = mqttServerDto.Username,
Password = mqttServerDto.Password,
IsActive = mqttServerDto.IsActive,
SubscribeTopic = mqttServerDto.SubscribeTopic,
PublishTopic = mqttServerDto.PublishTopic,
ClientId = mqttServerDto.ClientId,
CreatedAt = mqttServerDto.CreatedAt,
ConnectedAt = mqttServerDto.ConnectedAt,
ConnectionDuration = mqttServerDto.ConnectionDuration,
MessageFormat = mqttServerDto.MessageFormat
};
_mqttServers.TryAdd(mqttServer.Id, mqttServer);
_mqttServiceManager.AddMqttServer(mqttServer);
2025-07-26 10:05:43 +08:00
}
2025-09-06 15:19:04 +08:00
_logger.LogInformation($"成功加载 {mqttServerDtos.Count} 个MQTT配置");
return true;
2025-07-26 10:05:43 +08:00
}
catch (Exception ex)
{
2025-09-06 15:19:04 +08:00
_logger.LogError(ex, $"加载MQTT配置时发生错误: {ex.Message}");
return false;
2025-07-26 10:05:43 +08:00
}
}
2025-09-06 15:19:04 +08:00
/// <summary>
/// 连接MQTT服务器列表
/// </summary>
private async Task ConnectMqttServersAsync(CancellationToken stoppingToken)
2025-07-26 10:05:43 +08:00
{
2025-09-06 15:19:04 +08:00
var connectTasks = _mqttServers.Values
.Where(m => m.IsActive)
.Select(mqtt => _mqttServiceManager.ConnectMqttServerAsync(mqtt.Id, stoppingToken));
2025-07-26 10:05:43 +08:00
2025-09-06 15:19:04 +08:00
await Task.WhenAll(connectTasks);
2025-07-26 10:05:43 +08:00
}
2025-09-06 15:19:04 +08:00
/// <summary>
/// 处理MQTT列表变化
/// </summary>
private void HandleMqttListChanged(List<MqttServer> mqtts)
2025-07-26 10:05:43 +08:00
{
2025-09-06 15:19:04 +08:00
_logger.LogInformation("MQTT列表发生了变化正在重新加载数据...");
_reloadSemaphore.Release();
2025-07-26 10:05:43 +08:00
}
/// <summary>
/// 释放资源
/// </summary>
public override void Dispose()
{
_logger.LogInformation("正在释放MQTT后台服务资源...");
_appDataCenterService.DataLoaderService.OnLoadDataCompleted -= OnLoadDataCompleted;
_reloadSemaphore?.Dispose();
base.Dispose();
_logger.LogInformation("MQTT后台服务资源已释放");
}
2025-07-26 10:05:43 +08:00
}
}