2025-09-06 15:19:04 +08:00
|
|
|
|
using DMS.Infrastructure.Interfaces.Services;
|
2025-07-26 10:05:43 +08:00
|
|
|
|
using Microsoft.Extensions.Hosting;
|
|
|
|
|
|
using Microsoft.Extensions.Logging;
|
2025-09-06 15:19:04 +08:00
|
|
|
|
using System;
|
|
|
|
|
|
using System.Collections.Concurrent;
|
|
|
|
|
|
using System.Collections.Generic;
|
|
|
|
|
|
using System.Linq;
|
|
|
|
|
|
using System.Threading;
|
|
|
|
|
|
using System.Threading.Tasks;
|
|
|
|
|
|
using DMS.Application.DTOs.Events;
|
|
|
|
|
|
using DMS.Core.Models;
|
2025-07-26 10:05:43 +08:00
|
|
|
|
using DMS.Application.Interfaces;
|
2025-09-07 08:51:18 +08:00
|
|
|
|
using DMS.Core.Interfaces.Services;
|
2025-07-26 10:05:43 +08:00
|
|
|
|
|
2025-09-06 15:19:04 +08:00
|
|
|
|
namespace DMS.Infrastructure.Services
|
2025-07-26 10:05:43 +08:00
|
|
|
|
{
|
|
|
|
|
|
/// <summary>
|
2025-09-06 15:19:04 +08:00
|
|
|
|
/// MQTT后台服务,负责管理MQTT连接和数据传输
|
2025-07-26 10:05:43 +08:00
|
|
|
|
/// </summary>
|
2025-09-06 15:19:04 +08:00
|
|
|
|
public class MqttBackgroundService : BackgroundService, IMqttBackgroundService
|
2025-07-26 10:05:43 +08:00
|
|
|
|
{
|
2025-09-06 15:19:04 +08:00
|
|
|
|
private readonly ILogger<MqttBackgroundService> _logger;
|
|
|
|
|
|
private readonly IMqttServiceManager _mqttServiceManager;
|
2025-09-09 15:28:07 +08:00
|
|
|
|
private readonly IAppDataStorageService _appDataStorageService;
|
2025-09-09 13:35:16 +08:00
|
|
|
|
private readonly IAppDataCenterService _appDataCenterService;
|
2025-09-06 15:19:04 +08:00
|
|
|
|
private readonly ConcurrentDictionary<int, MqttServer> _mqttServers;
|
|
|
|
|
|
private readonly SemaphoreSlim _reloadSemaphore = new(0);
|
|
|
|
|
|
|
|
|
|
|
|
public MqttBackgroundService(
|
|
|
|
|
|
ILogger<MqttBackgroundService> logger,
|
|
|
|
|
|
IMqttServiceManager mqttServiceManager,
|
2025-09-09 15:28:07 +08:00
|
|
|
|
IAppDataStorageService appDataStorageService,
|
2025-09-09 13:35:16 +08:00
|
|
|
|
IAppDataCenterService appDataCenterService)
|
2025-09-06 15:19:04 +08:00
|
|
|
|
{
|
|
|
|
|
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
|
|
|
|
|
_mqttServiceManager = mqttServiceManager ?? throw new ArgumentNullException(nameof(mqttServiceManager));
|
2025-09-09 15:28:07 +08:00
|
|
|
|
_appDataStorageService = appDataStorageService;
|
2025-09-09 13:35:16 +08:00
|
|
|
|
_appDataCenterService = appDataCenterService ?? throw new ArgumentNullException(nameof(appDataCenterService));
|
2025-09-06 15:19:04 +08:00
|
|
|
|
_mqttServers = new ConcurrentDictionary<int, MqttServer>();
|
2025-07-26 10:05:43 +08:00
|
|
|
|
|
2025-09-09 15:28:07 +08:00
|
|
|
|
_appDataCenterService.DataLoaderService.OnLoadDataCompleted += OnLoadDataCompleted;
|
2025-09-06 15:19:04 +08:00
|
|
|
|
}
|
2025-07-26 10:05:43 +08:00
|
|
|
|
|
2025-09-06 15:19:04 +08:00
|
|
|
|
private void OnLoadDataCompleted(object? sender, DataLoadCompletedEventArgs e)
|
2025-07-26 10:05:43 +08:00
|
|
|
|
{
|
2025-09-06 15:19:04 +08:00
|
|
|
|
if (e.IsSuccess)
|
2025-07-26 10:05:43 +08:00
|
|
|
|
{
|
2025-09-06 15:19:04 +08:00
|
|
|
|
Start();
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
}
|
2025-07-26 10:05:43 +08:00
|
|
|
|
|
2025-09-06 15:19:04 +08:00
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 启动MQTT后台服务
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
private void Start(CancellationToken cancellationToken = default)
|
|
|
|
|
|
{
|
|
|
|
|
|
_reloadSemaphore.Release();
|
|
|
|
|
|
_logger.LogInformation("MQTT后台服务启动请求已发送");
|
|
|
|
|
|
}
|
2025-07-26 10:05:43 +08:00
|
|
|
|
|
2025-09-06 15:19:04 +08:00
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 停止MQTT后台服务
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
public async Task StopAsync(CancellationToken cancellationToken = default)
|
|
|
|
|
|
{
|
|
|
|
|
|
_logger.LogInformation("MQTT后台服务停止请求已发送");
|
|
|
|
|
|
}
|
2025-07-26 10:05:43 +08:00
|
|
|
|
|
2025-09-06 15:19:04 +08:00
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 添加MQTT服务器配置
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
public void AddMqttServer(MqttServer mqttServer)
|
|
|
|
|
|
{
|
|
|
|
|
|
if (mqttServer == null)
|
|
|
|
|
|
throw new ArgumentNullException(nameof(mqttServer));
|
2025-07-26 10:05:43 +08:00
|
|
|
|
|
2025-09-06 15:19:04 +08:00
|
|
|
|
_mqttServers.AddOrUpdate(mqttServer.Id, mqttServer, (key, oldValue) => mqttServer);
|
|
|
|
|
|
_mqttServiceManager.AddMqttServer(mqttServer);
|
|
|
|
|
|
_reloadSemaphore.Release();
|
|
|
|
|
|
_logger.LogInformation("已添加MQTT服务器 {ServerName} 到监控列表", mqttServer.ServerName);
|
|
|
|
|
|
}
|
2025-07-26 10:05:43 +08:00
|
|
|
|
|
2025-09-06 15:19:04 +08:00
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 移除MQTT服务器配置
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
public async Task RemoveMqttServerAsync(int mqttServerId, CancellationToken cancellationToken = default)
|
|
|
|
|
|
{
|
|
|
|
|
|
if (_mqttServers.TryRemove(mqttServerId, out var mqttServer))
|
|
|
|
|
|
{
|
|
|
|
|
|
await _mqttServiceManager.RemoveMqttServerAsync(mqttServerId, cancellationToken);
|
|
|
|
|
|
_logger.LogInformation("已移除MQTT服务器 {ServerName} 的监控", mqttServer?.ServerName ?? mqttServerId.ToString());
|
2025-07-26 10:05:43 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
2025-09-06 15:19:04 +08:00
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 更新MQTT服务器配置
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
public void UpdateMqttServer(MqttServer mqttServer)
|
2025-07-26 10:05:43 +08:00
|
|
|
|
{
|
2025-09-06 15:19:04 +08:00
|
|
|
|
if (mqttServer == null)
|
|
|
|
|
|
throw new ArgumentNullException(nameof(mqttServer));
|
|
|
|
|
|
|
|
|
|
|
|
_mqttServers.AddOrUpdate(mqttServer.Id, mqttServer, (key, oldValue) => mqttServer);
|
|
|
|
|
|
_reloadSemaphore.Release();
|
|
|
|
|
|
_logger.LogInformation("已更新MQTT服务器 {ServerName} 的配置", mqttServer.ServerName);
|
2025-07-26 10:05:43 +08:00
|
|
|
|
}
|
2025-09-06 15:19:04 +08:00
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 获取所有MQTT服务器配置
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
public IEnumerable<MqttServer> GetAllMqttServers()
|
2025-07-26 10:05:43 +08:00
|
|
|
|
{
|
2025-09-06 15:19:04 +08:00
|
|
|
|
return _mqttServers.Values.ToList();
|
2025-07-26 10:05:43 +08:00
|
|
|
|
}
|
2025-09-06 15:19:04 +08:00
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 发布变量数据到MQTT服务器
|
|
|
|
|
|
/// </summary>
|
2025-09-10 18:15:31 +08:00
|
|
|
|
public async Task PublishVariableDataAsync(VariableMqttAlias variableMqtt, CancellationToken cancellationToken = default)
|
2025-07-26 10:05:43 +08:00
|
|
|
|
{
|
2025-09-06 15:19:04 +08:00
|
|
|
|
await _mqttServiceManager.PublishVariableDataAsync(variableMqtt, cancellationToken);
|
2025-07-26 10:05:43 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-09-06 15:19:04 +08:00
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 发布批量变量数据到MQTT服务器
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
public async Task PublishVariablesDataAsync(List<VariableMqtt> variableMqtts, CancellationToken cancellationToken = default)
|
|
|
|
|
|
{
|
|
|
|
|
|
await _mqttServiceManager.PublishVariablesDataAsync(variableMqtts, cancellationToken);
|
|
|
|
|
|
}
|
2025-07-26 10:05:43 +08:00
|
|
|
|
|
2025-09-06 15:19:04 +08:00
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 后台服务的核心执行逻辑
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
2025-07-26 10:05:43 +08:00
|
|
|
|
{
|
2025-09-06 15:19:04 +08:00
|
|
|
|
_logger.LogInformation("MQTT后台服务正在启动");
|
|
|
|
|
|
|
2025-07-26 10:05:43 +08:00
|
|
|
|
try
|
|
|
|
|
|
{
|
2025-09-06 15:19:04 +08:00
|
|
|
|
while (!stoppingToken.IsCancellationRequested )
|
2025-07-26 10:05:43 +08:00
|
|
|
|
{
|
2025-09-06 15:19:04 +08:00
|
|
|
|
await _reloadSemaphore.WaitAsync(stoppingToken);
|
|
|
|
|
|
|
|
|
|
|
|
if (stoppingToken.IsCancellationRequested ) break;
|
|
|
|
|
|
|
|
|
|
|
|
// 加载MQTT配置
|
|
|
|
|
|
if (!LoadMqttConfigurations())
|
|
|
|
|
|
{
|
|
|
|
|
|
_logger.LogInformation("加载MQTT配置过程中发生了错误,停止后面的操作");
|
|
|
|
|
|
continue;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 连接MQTT服务器
|
|
|
|
|
|
await ConnectMqttServersAsync(stoppingToken);
|
|
|
|
|
|
_logger.LogInformation("MQTT后台服务已启动");
|
|
|
|
|
|
|
|
|
|
|
|
// 保持运行状态
|
|
|
|
|
|
while (!stoppingToken.IsCancellationRequested && _reloadSemaphore.CurrentCount == 0)
|
|
|
|
|
|
{
|
|
|
|
|
|
await Task.Delay(1000, stoppingToken);
|
|
|
|
|
|
}
|
2025-07-26 10:05:43 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
catch (OperationCanceledException)
|
|
|
|
|
|
{
|
2025-09-06 15:19:04 +08:00
|
|
|
|
_logger.LogInformation("MQTT后台服务正在停止");
|
2025-07-26 10:05:43 +08:00
|
|
|
|
}
|
|
|
|
|
|
catch (Exception ex)
|
|
|
|
|
|
{
|
2025-09-06 15:19:04 +08:00
|
|
|
|
_logger.LogError(ex, $"MQTT后台服务运行中发生了错误: {ex.Message}");
|
2025-07-26 10:05:43 +08:00
|
|
|
|
}
|
2025-09-06 15:19:04 +08:00
|
|
|
|
finally
|
2025-07-26 10:05:43 +08:00
|
|
|
|
{
|
2025-09-06 15:19:04 +08:00
|
|
|
|
_logger.LogInformation("MQTT后台服务已停止");
|
2025-07-26 10:05:43 +08:00
|
|
|
|
}
|
2025-09-06 15:19:04 +08:00
|
|
|
|
}
|
2025-07-26 10:05:43 +08:00
|
|
|
|
|
2025-09-06 15:19:04 +08:00
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 加载MQTT配置
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
private bool LoadMqttConfigurations()
|
|
|
|
|
|
{
|
2025-07-26 10:05:43 +08:00
|
|
|
|
try
|
|
|
|
|
|
{
|
2025-09-06 15:19:04 +08:00
|
|
|
|
_logger.LogInformation("开始加载MQTT配置...");
|
|
|
|
|
|
_mqttServers.Clear();
|
|
|
|
|
|
|
|
|
|
|
|
// 从数据服务中心获取所有激活的MQTT服务器
|
2025-09-09 15:28:07 +08:00
|
|
|
|
var mqttServerDtos = _appDataStorageService.MqttServers.Values
|
|
|
|
|
|
.Where(m => m.IsActive)
|
|
|
|
|
|
.ToList();
|
2025-09-06 15:19:04 +08:00
|
|
|
|
|
|
|
|
|
|
foreach (var mqttServerDto in mqttServerDtos)
|
2025-07-26 10:05:43 +08:00
|
|
|
|
{
|
2025-09-06 15:19:04 +08:00
|
|
|
|
// 将 MqttServerDto 转换为 MqttServer
|
|
|
|
|
|
var mqttServer = new MqttServer
|
|
|
|
|
|
{
|
|
|
|
|
|
Id = mqttServerDto.Id,
|
|
|
|
|
|
ServerName = mqttServerDto.ServerName,
|
|
|
|
|
|
ServerUrl = mqttServerDto.ServerUrl,
|
|
|
|
|
|
Port = mqttServerDto.Port,
|
|
|
|
|
|
Username = mqttServerDto.Username,
|
|
|
|
|
|
Password = mqttServerDto.Password,
|
|
|
|
|
|
IsActive = mqttServerDto.IsActive,
|
|
|
|
|
|
SubscribeTopic = mqttServerDto.SubscribeTopic,
|
|
|
|
|
|
PublishTopic = mqttServerDto.PublishTopic,
|
|
|
|
|
|
ClientId = mqttServerDto.ClientId,
|
|
|
|
|
|
CreatedAt = mqttServerDto.CreatedAt,
|
|
|
|
|
|
ConnectedAt = mqttServerDto.ConnectedAt,
|
|
|
|
|
|
ConnectionDuration = mqttServerDto.ConnectionDuration,
|
|
|
|
|
|
MessageFormat = mqttServerDto.MessageFormat
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
_mqttServers.TryAdd(mqttServer.Id, mqttServer);
|
|
|
|
|
|
_mqttServiceManager.AddMqttServer(mqttServer);
|
2025-07-26 10:05:43 +08:00
|
|
|
|
}
|
2025-09-06 15:19:04 +08:00
|
|
|
|
|
|
|
|
|
|
_logger.LogInformation($"成功加载 {mqttServerDtos.Count} 个MQTT配置");
|
|
|
|
|
|
return true;
|
2025-07-26 10:05:43 +08:00
|
|
|
|
}
|
|
|
|
|
|
catch (Exception ex)
|
|
|
|
|
|
{
|
2025-09-06 15:19:04 +08:00
|
|
|
|
_logger.LogError(ex, $"加载MQTT配置时发生错误: {ex.Message}");
|
|
|
|
|
|
return false;
|
2025-07-26 10:05:43 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-09-06 15:19:04 +08:00
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 连接MQTT服务器列表
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
private async Task ConnectMqttServersAsync(CancellationToken stoppingToken)
|
2025-07-26 10:05:43 +08:00
|
|
|
|
{
|
2025-09-06 15:19:04 +08:00
|
|
|
|
var connectTasks = _mqttServers.Values
|
|
|
|
|
|
.Where(m => m.IsActive)
|
|
|
|
|
|
.Select(mqtt => _mqttServiceManager.ConnectMqttServerAsync(mqtt.Id, stoppingToken));
|
2025-07-26 10:05:43 +08:00
|
|
|
|
|
2025-09-06 15:19:04 +08:00
|
|
|
|
await Task.WhenAll(connectTasks);
|
2025-07-26 10:05:43 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-09-06 15:19:04 +08:00
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 处理MQTT列表变化
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
private void HandleMqttListChanged(List<MqttServer> mqtts)
|
2025-07-26 10:05:43 +08:00
|
|
|
|
{
|
2025-09-06 15:19:04 +08:00
|
|
|
|
_logger.LogInformation("MQTT列表发生了变化,正在重新加载数据...");
|
|
|
|
|
|
_reloadSemaphore.Release();
|
2025-07-26 10:05:43 +08:00
|
|
|
|
}
|
2025-09-06 19:59:21 +08:00
|
|
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
/// 释放资源
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
public override void Dispose()
|
|
|
|
|
|
{
|
|
|
|
|
|
_logger.LogInformation("正在释放MQTT后台服务资源...");
|
|
|
|
|
|
|
2025-09-09 15:28:07 +08:00
|
|
|
|
_appDataCenterService.DataLoaderService.OnLoadDataCompleted -= OnLoadDataCompleted;
|
2025-09-06 19:59:21 +08:00
|
|
|
|
_reloadSemaphore?.Dispose();
|
|
|
|
|
|
|
|
|
|
|
|
base.Dispose();
|
|
|
|
|
|
|
|
|
|
|
|
_logger.LogInformation("MQTT后台服务资源已释放");
|
|
|
|
|
|
}
|
2025-07-26 10:05:43 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|