263 lines
9.7 KiB
C#
263 lines
9.7 KiB
C#
using DMS.Infrastructure.Interfaces.Services;
|
||
using Microsoft.Extensions.Hosting;
|
||
using Microsoft.Extensions.Logging;
|
||
using System;
|
||
using System.Collections.Concurrent;
|
||
using System.Collections.Generic;
|
||
using System.Linq;
|
||
using System.Threading;
|
||
using System.Threading.Tasks;
|
||
using DMS.Application.DTOs.Events;
|
||
using DMS.Core.Models;
|
||
using DMS.Application.Interfaces;
|
||
using DMS.Core.Interfaces.Services;
|
||
|
||
namespace DMS.Infrastructure.Services
|
||
{
|
||
/// <summary>
|
||
/// MQTT后台服务,负责管理MQTT连接和数据传输
|
||
/// </summary>
|
||
public class MqttBackgroundService : BackgroundService, IMqttBackgroundService
|
||
{
|
||
private readonly ILogger<MqttBackgroundService> _logger;
|
||
private readonly IMqttServiceManager _mqttServiceManager;
|
||
private readonly IAppDataCenterService _appDataCenterService;
|
||
private readonly ConcurrentDictionary<int, MqttServer> _mqttServers;
|
||
private readonly SemaphoreSlim _reloadSemaphore = new(0);
|
||
|
||
public MqttBackgroundService(
|
||
ILogger<MqttBackgroundService> logger,
|
||
IMqttServiceManager mqttServiceManager,
|
||
IAppDataCenterService appDataCenterService)
|
||
{
|
||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||
_mqttServiceManager = mqttServiceManager ?? throw new ArgumentNullException(nameof(mqttServiceManager));
|
||
_appDataCenterService = appDataCenterService ?? throw new ArgumentNullException(nameof(appDataCenterService));
|
||
_mqttServers = new ConcurrentDictionary<int, MqttServer>();
|
||
|
||
_appDataCenterService.OnLoadDataCompleted += OnLoadDataCompleted;
|
||
}
|
||
|
||
private void OnLoadDataCompleted(object? sender, DataLoadCompletedEventArgs e)
|
||
{
|
||
if (e.IsSuccess)
|
||
{
|
||
Start();
|
||
}
|
||
|
||
}
|
||
|
||
/// <summary>
|
||
/// 启动MQTT后台服务
|
||
/// </summary>
|
||
private void Start(CancellationToken cancellationToken = default)
|
||
{
|
||
_reloadSemaphore.Release();
|
||
_logger.LogInformation("MQTT后台服务启动请求已发送");
|
||
}
|
||
|
||
/// <summary>
|
||
/// 停止MQTT后台服务
|
||
/// </summary>
|
||
public async Task StopAsync(CancellationToken cancellationToken = default)
|
||
{
|
||
_logger.LogInformation("MQTT后台服务停止请求已发送");
|
||
}
|
||
|
||
/// <summary>
|
||
/// 添加MQTT服务器配置
|
||
/// </summary>
|
||
public void AddMqttServer(MqttServer mqttServer)
|
||
{
|
||
if (mqttServer == null)
|
||
throw new ArgumentNullException(nameof(mqttServer));
|
||
|
||
_mqttServers.AddOrUpdate(mqttServer.Id, mqttServer, (key, oldValue) => mqttServer);
|
||
_mqttServiceManager.AddMqttServer(mqttServer);
|
||
_reloadSemaphore.Release();
|
||
_logger.LogInformation("已添加MQTT服务器 {ServerName} 到监控列表", mqttServer.ServerName);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 移除MQTT服务器配置
|
||
/// </summary>
|
||
public async Task RemoveMqttServerAsync(int mqttServerId, CancellationToken cancellationToken = default)
|
||
{
|
||
if (_mqttServers.TryRemove(mqttServerId, out var mqttServer))
|
||
{
|
||
await _mqttServiceManager.RemoveMqttServerAsync(mqttServerId, cancellationToken);
|
||
_logger.LogInformation("已移除MQTT服务器 {ServerName} 的监控", mqttServer?.ServerName ?? mqttServerId.ToString());
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 更新MQTT服务器配置
|
||
/// </summary>
|
||
public void UpdateMqttServer(MqttServer mqttServer)
|
||
{
|
||
if (mqttServer == null)
|
||
throw new ArgumentNullException(nameof(mqttServer));
|
||
|
||
_mqttServers.AddOrUpdate(mqttServer.Id, mqttServer, (key, oldValue) => mqttServer);
|
||
_reloadSemaphore.Release();
|
||
_logger.LogInformation("已更新MQTT服务器 {ServerName} 的配置", mqttServer.ServerName);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 获取所有MQTT服务器配置
|
||
/// </summary>
|
||
public IEnumerable<MqttServer> GetAllMqttServers()
|
||
{
|
||
return _mqttServers.Values.ToList();
|
||
}
|
||
|
||
/// <summary>
|
||
/// 发布变量数据到MQTT服务器
|
||
/// </summary>
|
||
public async Task PublishVariableDataAsync(VariableMqtt variableMqtt, CancellationToken cancellationToken = default)
|
||
{
|
||
await _mqttServiceManager.PublishVariableDataAsync(variableMqtt, cancellationToken);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 发布批量变量数据到MQTT服务器
|
||
/// </summary>
|
||
public async Task PublishVariablesDataAsync(List<VariableMqtt> variableMqtts, CancellationToken cancellationToken = default)
|
||
{
|
||
await _mqttServiceManager.PublishVariablesDataAsync(variableMqtts, cancellationToken);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 后台服务的核心执行逻辑
|
||
/// </summary>
|
||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||
{
|
||
_logger.LogInformation("MQTT后台服务正在启动");
|
||
|
||
try
|
||
{
|
||
while (!stoppingToken.IsCancellationRequested )
|
||
{
|
||
await _reloadSemaphore.WaitAsync(stoppingToken);
|
||
|
||
if (stoppingToken.IsCancellationRequested ) break;
|
||
|
||
// 加载MQTT配置
|
||
if (!LoadMqttConfigurations())
|
||
{
|
||
_logger.LogInformation("加载MQTT配置过程中发生了错误,停止后面的操作");
|
||
continue;
|
||
}
|
||
|
||
// 连接MQTT服务器
|
||
await ConnectMqttServersAsync(stoppingToken);
|
||
_logger.LogInformation("MQTT后台服务已启动");
|
||
|
||
// 保持运行状态
|
||
while (!stoppingToken.IsCancellationRequested && _reloadSemaphore.CurrentCount == 0)
|
||
{
|
||
await Task.Delay(1000, stoppingToken);
|
||
}
|
||
}
|
||
}
|
||
catch (OperationCanceledException)
|
||
{
|
||
_logger.LogInformation("MQTT后台服务正在停止");
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
_logger.LogError(ex, $"MQTT后台服务运行中发生了错误: {ex.Message}");
|
||
}
|
||
finally
|
||
{
|
||
_logger.LogInformation("MQTT后台服务已停止");
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 加载MQTT配置
|
||
/// </summary>
|
||
private bool LoadMqttConfigurations()
|
||
{
|
||
try
|
||
{
|
||
_logger.LogInformation("开始加载MQTT配置...");
|
||
_mqttServers.Clear();
|
||
|
||
// 从数据服务中心获取所有激活的MQTT服务器
|
||
var mqttServerDtos = _appDataCenterService.MqttServers.Values
|
||
.Where(m => m.IsActive)
|
||
.ToList();
|
||
|
||
foreach (var mqttServerDto in mqttServerDtos)
|
||
{
|
||
// 将 MqttServerDto 转换为 MqttServer
|
||
var mqttServer = new MqttServer
|
||
{
|
||
Id = mqttServerDto.Id,
|
||
ServerName = mqttServerDto.ServerName,
|
||
ServerUrl = mqttServerDto.ServerUrl,
|
||
Port = mqttServerDto.Port,
|
||
Username = mqttServerDto.Username,
|
||
Password = mqttServerDto.Password,
|
||
IsActive = mqttServerDto.IsActive,
|
||
SubscribeTopic = mqttServerDto.SubscribeTopic,
|
||
PublishTopic = mqttServerDto.PublishTopic,
|
||
ClientId = mqttServerDto.ClientId,
|
||
CreatedAt = mqttServerDto.CreatedAt,
|
||
ConnectedAt = mqttServerDto.ConnectedAt,
|
||
ConnectionDuration = mqttServerDto.ConnectionDuration,
|
||
MessageFormat = mqttServerDto.MessageFormat
|
||
};
|
||
|
||
_mqttServers.TryAdd(mqttServer.Id, mqttServer);
|
||
_mqttServiceManager.AddMqttServer(mqttServer);
|
||
}
|
||
|
||
_logger.LogInformation($"成功加载 {mqttServerDtos.Count} 个MQTT配置");
|
||
return true;
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
_logger.LogError(ex, $"加载MQTT配置时发生错误: {ex.Message}");
|
||
return false;
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 连接MQTT服务器列表
|
||
/// </summary>
|
||
private async Task ConnectMqttServersAsync(CancellationToken stoppingToken)
|
||
{
|
||
var connectTasks = _mqttServers.Values
|
||
.Where(m => m.IsActive)
|
||
.Select(mqtt => _mqttServiceManager.ConnectMqttServerAsync(mqtt.Id, stoppingToken));
|
||
|
||
await Task.WhenAll(connectTasks);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 处理MQTT列表变化
|
||
/// </summary>
|
||
private void HandleMqttListChanged(List<MqttServer> mqtts)
|
||
{
|
||
_logger.LogInformation("MQTT列表发生了变化,正在重新加载数据...");
|
||
_reloadSemaphore.Release();
|
||
}
|
||
|
||
/// <summary>
|
||
/// 释放资源
|
||
/// </summary>
|
||
public override void Dispose()
|
||
{
|
||
_logger.LogInformation("正在释放MQTT后台服务资源...");
|
||
|
||
_appDataCenterService.OnLoadDataCompleted -= OnLoadDataCompleted;
|
||
_reloadSemaphore?.Dispose();
|
||
|
||
base.Dispose();
|
||
|
||
_logger.LogInformation("MQTT后台服务资源已释放");
|
||
}
|
||
}
|
||
} |