using DMS.Infrastructure.Interfaces.Services;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DMS.Application.DTOs.Events;
using DMS.Core.Models;
using DMS.Application.Interfaces;
namespace DMS.Infrastructure.Services
{
///
/// MQTT后台服务,负责管理MQTT连接和数据传输
///
public class MqttBackgroundService : BackgroundService, IMqttBackgroundService
{
private readonly ILogger _logger;
private readonly IMqttServiceManager _mqttServiceManager;
private readonly IDataCenterService _dataCenterService;
private readonly ConcurrentDictionary _mqttServers;
private readonly SemaphoreSlim _reloadSemaphore = new(0);
public MqttBackgroundService(
ILogger logger,
IMqttServiceManager mqttServiceManager,
IDataCenterService dataCenterService)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_mqttServiceManager = mqttServiceManager ?? throw new ArgumentNullException(nameof(mqttServiceManager));
_dataCenterService = dataCenterService ?? throw new ArgumentNullException(nameof(dataCenterService));
_mqttServers = new ConcurrentDictionary();
_dataCenterService.OnLoadDataCompleted += OnLoadDataCompleted;
}
private void OnLoadDataCompleted(object? sender, DataLoadCompletedEventArgs e)
{
if (e.IsSuccess)
{
Start();
}
}
///
/// 启动MQTT后台服务
///
private void Start(CancellationToken cancellationToken = default)
{
_reloadSemaphore.Release();
_logger.LogInformation("MQTT后台服务启动请求已发送");
}
///
/// 停止MQTT后台服务
///
public async Task StopAsync(CancellationToken cancellationToken = default)
{
_logger.LogInformation("MQTT后台服务停止请求已发送");
}
///
/// 添加MQTT服务器配置
///
public void AddMqttServer(MqttServer mqttServer)
{
if (mqttServer == null)
throw new ArgumentNullException(nameof(mqttServer));
_mqttServers.AddOrUpdate(mqttServer.Id, mqttServer, (key, oldValue) => mqttServer);
_mqttServiceManager.AddMqttServer(mqttServer);
_reloadSemaphore.Release();
_logger.LogInformation("已添加MQTT服务器 {ServerName} 到监控列表", mqttServer.ServerName);
}
///
/// 移除MQTT服务器配置
///
public async Task RemoveMqttServerAsync(int mqttServerId, CancellationToken cancellationToken = default)
{
if (_mqttServers.TryRemove(mqttServerId, out var mqttServer))
{
await _mqttServiceManager.RemoveMqttServerAsync(mqttServerId, cancellationToken);
_logger.LogInformation("已移除MQTT服务器 {ServerName} 的监控", mqttServer?.ServerName ?? mqttServerId.ToString());
}
}
///
/// 更新MQTT服务器配置
///
public void UpdateMqttServer(MqttServer mqttServer)
{
if (mqttServer == null)
throw new ArgumentNullException(nameof(mqttServer));
_mqttServers.AddOrUpdate(mqttServer.Id, mqttServer, (key, oldValue) => mqttServer);
_reloadSemaphore.Release();
_logger.LogInformation("已更新MQTT服务器 {ServerName} 的配置", mqttServer.ServerName);
}
///
/// 获取所有MQTT服务器配置
///
public IEnumerable GetAllMqttServers()
{
return _mqttServers.Values.ToList();
}
///
/// 发布变量数据到MQTT服务器
///
public async Task PublishVariableDataAsync(VariableMqtt variableMqtt, CancellationToken cancellationToken = default)
{
await _mqttServiceManager.PublishVariableDataAsync(variableMqtt, cancellationToken);
}
///
/// 发布批量变量数据到MQTT服务器
///
public async Task PublishVariablesDataAsync(List variableMqtts, CancellationToken cancellationToken = default)
{
await _mqttServiceManager.PublishVariablesDataAsync(variableMqtts, cancellationToken);
}
///
/// 后台服务的核心执行逻辑
///
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("MQTT后台服务正在启动");
try
{
while (!stoppingToken.IsCancellationRequested )
{
await _reloadSemaphore.WaitAsync(stoppingToken);
if (stoppingToken.IsCancellationRequested ) break;
// 加载MQTT配置
if (!LoadMqttConfigurations())
{
_logger.LogInformation("加载MQTT配置过程中发生了错误,停止后面的操作");
continue;
}
// 连接MQTT服务器
await ConnectMqttServersAsync(stoppingToken);
_logger.LogInformation("MQTT后台服务已启动");
// 保持运行状态
while (!stoppingToken.IsCancellationRequested && _reloadSemaphore.CurrentCount == 0)
{
await Task.Delay(1000, stoppingToken);
}
}
}
catch (OperationCanceledException)
{
_logger.LogInformation("MQTT后台服务正在停止");
}
catch (Exception ex)
{
_logger.LogError(ex, $"MQTT后台服务运行中发生了错误: {ex.Message}");
}
finally
{
_logger.LogInformation("MQTT后台服务已停止");
}
}
///
/// 加载MQTT配置
///
private bool LoadMqttConfigurations()
{
try
{
_logger.LogInformation("开始加载MQTT配置...");
_mqttServers.Clear();
// 从数据服务中心获取所有激活的MQTT服务器
var mqttServerDtos = _dataCenterService.MqttServers.Values
.Where(m => m.IsActive)
.ToList();
foreach (var mqttServerDto in mqttServerDtos)
{
// 将 MqttServerDto 转换为 MqttServer
var mqttServer = new MqttServer
{
Id = mqttServerDto.Id,
ServerName = mqttServerDto.ServerName,
ServerUrl = mqttServerDto.ServerUrl,
Port = mqttServerDto.Port,
Username = mqttServerDto.Username,
Password = mqttServerDto.Password,
IsActive = mqttServerDto.IsActive,
SubscribeTopic = mqttServerDto.SubscribeTopic,
PublishTopic = mqttServerDto.PublishTopic,
ClientId = mqttServerDto.ClientId,
CreatedAt = mqttServerDto.CreatedAt,
ConnectedAt = mqttServerDto.ConnectedAt,
ConnectionDuration = mqttServerDto.ConnectionDuration,
MessageFormat = mqttServerDto.MessageFormat
};
_mqttServers.TryAdd(mqttServer.Id, mqttServer);
_mqttServiceManager.AddMqttServer(mqttServer);
}
_logger.LogInformation($"成功加载 {mqttServerDtos.Count} 个MQTT配置");
return true;
}
catch (Exception ex)
{
_logger.LogError(ex, $"加载MQTT配置时发生错误: {ex.Message}");
return false;
}
}
///
/// 连接MQTT服务器列表
///
private async Task ConnectMqttServersAsync(CancellationToken stoppingToken)
{
var connectTasks = _mqttServers.Values
.Where(m => m.IsActive)
.Select(mqtt => _mqttServiceManager.ConnectMqttServerAsync(mqtt.Id, stoppingToken));
await Task.WhenAll(connectTasks);
}
///
/// 处理MQTT列表变化
///
private void HandleMqttListChanged(List mqtts)
{
_logger.LogInformation("MQTT列表发生了变化,正在重新加载数据...");
_reloadSemaphore.Release();
}
}
}