diff --git a/DMS.Application/Interfaces/IDataCenterService.cs b/DMS.Application/Interfaces/IDataCenterService.cs
index 47fb891..3feceef 100644
--- a/DMS.Application/Interfaces/IDataCenterService.cs
+++ b/DMS.Application/Interfaces/IDataCenterService.cs
@@ -333,7 +333,7 @@ public interface IDataCenterService
///
/// 当数据加载完成时触发
///
- event EventHandler DataLoadCompleted;
+ event EventHandler OnLoadDataCompleted;
///
/// 当设备数据发生变化时触发
diff --git a/DMS.Application/Services/DataCenterService.cs b/DMS.Application/Services/DataCenterService.cs
index f41d5c9..6cd0752 100644
--- a/DMS.Application/Services/DataCenterService.cs
+++ b/DMS.Application/Services/DataCenterService.cs
@@ -63,7 +63,7 @@ public class DataCenterService : IDataCenterService
///
/// 当数据加载完成时触发
///
- public event EventHandler DataLoadCompleted;
+ public event EventHandler OnLoadDataCompleted;
///
/// 当设备数据发生变化时触发
@@ -747,7 +747,7 @@ public class DataCenterService : IDataCenterService
///
protected virtual void OnDataLoadCompleted(DataLoadCompletedEventArgs e)
{
- DataLoadCompleted?.Invoke(this, e);
+ OnLoadDataCompleted?.Invoke(this, e);
}
///
diff --git a/DMS.Application/Services/Processors/MqttPublishProcessor.cs b/DMS.Application/Services/Processors/MqttPublishProcessor.cs
index 0516302..dbefc74 100644
--- a/DMS.Application/Services/Processors/MqttPublishProcessor.cs
+++ b/DMS.Application/Services/Processors/MqttPublishProcessor.cs
@@ -10,11 +10,11 @@ namespace DMS.Application.Services.Processors;
///
public class MqttPublishProcessor : IVariableProcessor
{
- // private readonly MqttBackgroundService _mqttBackgroundService;
- //
- // public MqttPublishProcessor(MqttBackgroundService mqttBackgroundService)
+ // private readonly IMqttServiceManager _mqttServiceManager;
+
+ // public MqttPublishProcessor(IMqttServiceManager mqttServiceManager)
// {
- // _mqttBackgroundService = mqttBackgroundService;
+ // // _mqttServiceManager = mqttServiceManager;
// }
///
@@ -24,17 +24,24 @@ public class MqttPublishProcessor : IVariableProcessor
public async Task ProcessAsync(VariableContext context)
{
// var variable = context.Data;
- // if (variable?.VariableMqtts == null || variable.VariableMqtts.Count == 0)
+ // if (variable?.MqttAliases == null || variable.MqttAliases.Count == 0)
// {
// return; // 没有关联的MQTT配置,直接返回
// }
//
// // 遍历所有关联的MQTT配置,并将其推入发送队列
- // foreach (var variableMqtt in variable.VariableMqtts)
+ // foreach (var variableMqttAlias in variable.MqttAliases)
// {
- // // 确保VariableMqtt对象中包含了最新的Variable数据
- // variableMqtt.Variable = variable;
- // await _mqttBackgroundService.SendVariableAsync(variableMqtt);
+ // // 创建VariableMqtt对象
+ // var variableMqtt = new VariableMqtt
+ // {
+ // Variable = variable,
+ // Mqtt = variableMqttAlias.MqttServer,
+ // MqttId = variableMqttAlias.MqttServerId
+ // };
+ //
+ // // 发布变量数据到MQTT服务器
+ // await _mqttServiceManager.PublishVariableDataAsync(variableMqtt);
// }
}
}
\ No newline at end of file
diff --git a/DMS.Infrastructure/Interfaces/Services/IMqttBackgroundService.cs b/DMS.Infrastructure/Interfaces/Services/IMqttBackgroundService.cs
new file mode 100644
index 0000000..811ff3a
--- /dev/null
+++ b/DMS.Infrastructure/Interfaces/Services/IMqttBackgroundService.cs
@@ -0,0 +1,54 @@
+using DMS.Core.Models;
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DMS.Infrastructure.Interfaces.Services
+{
+ ///
+ /// MQTT后台服务接口,负责管理MQTT连接和数据传输
+ ///
+ public interface IMqttBackgroundService
+ {
+ ///
+ /// 启动MQTT后台服务
+ ///
+ Task StartAsync(CancellationToken cancellationToken = default);
+
+ ///
+ /// 停止MQTT后台服务
+ ///
+ Task StopAsync(CancellationToken cancellationToken = default);
+
+ ///
+ /// 添加MQTT服务器配置
+ ///
+ void AddMqttServer(MqttServer mqttServer);
+
+ ///
+ /// 移除MQTT服务器配置
+ ///
+ Task RemoveMqttServerAsync(int mqttServerId, CancellationToken cancellationToken = default);
+
+ ///
+ /// 更新MQTT服务器配置
+ ///
+ void UpdateMqttServer(MqttServer mqttServer);
+
+ ///
+ /// 获取所有MQTT服务器配置
+ ///
+ IEnumerable GetAllMqttServers();
+
+ ///
+ /// 发布变量数据到MQTT服务器
+ ///
+ Task PublishVariableDataAsync(VariableMqtt variableMqtt, CancellationToken cancellationToken = default);
+
+ ///
+ /// 发布批量变量数据到MQTT服务器
+ ///
+ Task PublishVariablesDataAsync(List variableMqtts, CancellationToken cancellationToken = default);
+ }
+}
\ No newline at end of file
diff --git a/DMS.Infrastructure/Interfaces/Services/IMqttService.cs b/DMS.Infrastructure/Interfaces/Services/IMqttService.cs
new file mode 100644
index 0000000..382e827
--- /dev/null
+++ b/DMS.Infrastructure/Interfaces/Services/IMqttService.cs
@@ -0,0 +1,52 @@
+using DMS.Core.Models;
+using MQTTnet.Client;
+using System;
+using System.Threading.Tasks;
+
+namespace DMS.Infrastructure.Interfaces.Services
+{
+ ///
+ /// MQTT服务接口,定义MQTT客户端的基本操作
+ ///
+ public interface IMqttService
+ {
+ ///
+ /// 获取MQTT客户端连接状态
+ ///
+ bool IsConnected { get; }
+
+ ///
+ /// 异步连接到MQTT服务器
+ ///
+ /// MQTT服务器URL
+ /// 端口号
+ /// 客户端ID
+ /// 用户名
+ /// 密码
+ Task ConnectAsync(string serverUrl, int port, string clientId, string username, string password);
+
+ ///
+ /// 异步断开MQTT连接
+ ///
+ Task DisconnectAsync();
+
+ ///
+ /// 异步发布消息
+ ///
+ /// 主题
+ /// 消息内容
+ Task PublishAsync(string topic, string payload);
+
+ ///
+ /// 异步订阅主题
+ ///
+ /// 要订阅的主题
+ Task SubscribeAsync(string topic);
+
+ ///
+ /// 设置消息接收处理程序
+ ///
+ /// 消息处理回调函数
+ void SetMessageHandler(Func handler);
+ }
+}
\ No newline at end of file
diff --git a/DMS.Infrastructure/Interfaces/Services/IMqttServiceFactory.cs b/DMS.Infrastructure/Interfaces/Services/IMqttServiceFactory.cs
new file mode 100644
index 0000000..417f23e
--- /dev/null
+++ b/DMS.Infrastructure/Interfaces/Services/IMqttServiceFactory.cs
@@ -0,0 +1,23 @@
+using DMS.Core.Models;
+
+namespace DMS.Infrastructure.Interfaces.Services
+{
+ ///
+ /// MQTT服务工厂接口,用于创建MQTT服务实例
+ ///
+ public interface IMqttServiceFactory
+ {
+ ///
+ /// 创建MQTT服务实例
+ ///
+ /// IMqttService实例
+ IMqttService CreateService();
+
+ ///
+ /// 根据MQTT服务器配置创建MQTT服务实例
+ ///
+ /// MQTT服务器配置
+ /// IMqttService实例
+ IMqttService CreateService(MqttServer mqttServer);
+ }
+}
\ No newline at end of file
diff --git a/DMS.Infrastructure/Interfaces/Services/IMqttServiceManager.cs b/DMS.Infrastructure/Interfaces/Services/IMqttServiceManager.cs
new file mode 100644
index 0000000..989b342
--- /dev/null
+++ b/DMS.Infrastructure/Interfaces/Services/IMqttServiceManager.cs
@@ -0,0 +1,69 @@
+using DMS.Core.Models;
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace DMS.Infrastructure.Interfaces.Services
+{
+ ///
+ /// MQTT服务管理器接口,负责管理MQTT连接和变量监控
+ ///
+ public interface IMqttServiceManager
+ {
+ ///
+ /// 初始化服务管理器
+ ///
+ Task InitializeAsync(CancellationToken cancellationToken = default);
+
+ ///
+ /// 添加MQTT服务器到监控列表
+ ///
+ void AddMqttServer(MqttServer mqttServer);
+
+ ///
+ /// 移除MQTT服务器监控
+ ///
+ Task RemoveMqttServerAsync(int mqttServerId, CancellationToken cancellationToken = default);
+
+ ///
+ /// 更新MQTT服务器变量别名
+ ///
+ void UpdateVariableMqttAliases(int mqttServerId, List variableMqttAliases);
+
+ ///
+ /// 获取MQTT服务器连接状态
+ ///
+ bool IsMqttServerConnected(int mqttServerId);
+
+ ///
+ /// 重新连接MQTT服务器
+ ///
+ Task ReconnectMqttServerAsync(int mqttServerId, CancellationToken cancellationToken = default);
+
+ ///
+ /// 获取所有监控的MQTT服务器ID
+ ///
+ IEnumerable GetMonitoredMqttServerIds();
+
+ ///
+ /// 连接MQTT服务器
+ ///
+ Task ConnectMqttServerAsync(int mqttServerId, CancellationToken cancellationToken = default);
+
+ ///
+ /// 断开MQTT服务器连接
+ ///
+ Task DisconnectMqttServerAsync(int mqttServerId, CancellationToken cancellationToken = default);
+
+ ///
+ /// 发布变量数据到MQTT服务器
+ ///
+ Task PublishVariableDataAsync(VariableMqtt variableMqtt, CancellationToken cancellationToken = default);
+
+ ///
+ /// 发布批量变量数据到MQTT服务器
+ ///
+ Task PublishVariablesDataAsync(List variableMqtts, CancellationToken cancellationToken = default);
+ }
+}
\ No newline at end of file
diff --git a/DMS.Infrastructure/Services/MqttBackgroundService.cs b/DMS.Infrastructure/Services/MqttBackgroundService.cs
index 0249aa7..1a8bc34 100644
--- a/DMS.Infrastructure/Services/MqttBackgroundService.cs
+++ b/DMS.Infrastructure/Services/MqttBackgroundService.cs
@@ -1,345 +1,247 @@
-using System.Collections.Concurrent;
-using System.Text;
-using System.Threading.Channels;
-using DMS.Core.Models;
+using DMS.Infrastructure.Interfaces.Services;
using Microsoft.Extensions.Hosting;
-using MQTTnet;
-using MQTTnet.Client;
-using MQTTnet.Client.Connecting;
-using MQTTnet.Client.Disconnecting;
-using MQTTnet.Client.Options;
using Microsoft.Extensions.Logging;
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using DMS.Application.DTOs.Events;
+using DMS.Core.Models;
using DMS.Application.Interfaces;
-using DMS.Core.Interfaces;
-namespace DMS.Infrastructure.Services;
-
-///
-/// MQTT后台服务,继承自BackgroundService,用于在后台管理MQTT连接和数据发布。
-///
-public class MqttBackgroundService : BackgroundService
+namespace DMS.Infrastructure.Services
{
- private readonly IRepositoryManager _repositoryManager;
- private readonly ILogger _logger;
-
- private readonly ConcurrentDictionary _mqttClients;
- private readonly ConcurrentDictionary _mqttConfigDic;
- private readonly ConcurrentDictionary _reconnectAttempts;
-
- private readonly SemaphoreSlim _reloadSemaphore = new(0);
- private readonly Channel _messageChannel;
-
///
- /// 构造函数,注入DataServices。
+ /// MQTT后台服务,负责管理MQTT连接和数据传输
///
- public MqttBackgroundService(IRepositoryManager repositoryManager, ILogger logger)
+ public class MqttBackgroundService : BackgroundService, IMqttBackgroundService
{
- _repositoryManager = repositoryManager;
- _logger = logger;
- _mqttClients = new ConcurrentDictionary();
- _mqttConfigDic = new ConcurrentDictionary();
- _reconnectAttempts = new ConcurrentDictionary();
- _messageChannel = Channel.CreateUnbounded();
+ private readonly ILogger _logger;
+ private readonly IMqttServiceManager _mqttServiceManager;
+ private readonly IDataCenterService _dataCenterService;
+ private readonly ConcurrentDictionary _mqttServers;
+ private readonly SemaphoreSlim _reloadSemaphore = new(0);
- // _deviceDataService.OnMqttListChanged += HandleMqttListChanged;
- }
-
- ///
- /// 将待发送的变量数据异步推入队列。
- ///
- /// 包含MQTT别名和变量数据的对象。
- public async Task SendVariableAsync(VariableMqtt data)
- {
- await _messageChannel.Writer.WriteAsync(data);
- }
-
- protected override async Task ExecuteAsync(CancellationToken stoppingToken)
- {
- _logger.LogInformation("Mqtt后台服务正在启动。");
- _reloadSemaphore.Release();
-
- var processQueueTask = ProcessMessageQueueAsync(stoppingToken);
-
- try
+ public MqttBackgroundService(
+ ILogger logger,
+ IMqttServiceManager mqttServiceManager,
+ IDataCenterService dataCenterService)
{
- while (!stoppingToken.IsCancellationRequested)
+ _logger = logger ?? throw new ArgumentNullException(nameof(logger));
+ _mqttServiceManager = mqttServiceManager ?? throw new ArgumentNullException(nameof(mqttServiceManager));
+ _dataCenterService = dataCenterService ?? throw new ArgumentNullException(nameof(dataCenterService));
+ _mqttServers = new ConcurrentDictionary();
+
+ _dataCenterService.OnLoadDataCompleted += OnLoadDataCompleted;
+ }
+
+ private void OnLoadDataCompleted(object? sender, DataLoadCompletedEventArgs e)
+ {
+ if (e.IsSuccess)
{
- await _reloadSemaphore.WaitAsync(stoppingToken);
+ Start();
+ }
+
+ }
- if (stoppingToken.IsCancellationRequested) break;
+ ///
+ /// 启动MQTT后台服务
+ ///
+ private void Start(CancellationToken cancellationToken = default)
+ {
+ _reloadSemaphore.Release();
+ _logger.LogInformation("MQTT后台服务启动请求已发送");
+ }
- // if (_deviceDataService.Mqtts == null || _deviceDataService.Mqtts.Count == 0)
- // {
- // _logger.LogInformation("没有可用的Mqtt配置,等待Mqtt列表更新...");
- // continue;
- // }
+ ///
+ /// 停止MQTT后台服务
+ ///
+ public async Task StopAsync(CancellationToken cancellationToken = default)
+ {
+ _logger.LogInformation("MQTT后台服务停止请求已发送");
+ }
- if (!LoadMqttConfigurations())
- {
- _logger.LogInformation("加载Mqtt配置过程中发生了错误,停止后面的操作。");
- continue;
- }
+ ///
+ /// 添加MQTT服务器配置
+ ///
+ public void AddMqttServer(MqttServer mqttServer)
+ {
+ if (mqttServer == null)
+ throw new ArgumentNullException(nameof(mqttServer));
- await ConnectMqttList(stoppingToken);
- _logger.LogInformation("Mqtt后台服务已启动。");
+ _mqttServers.AddOrUpdate(mqttServer.Id, mqttServer, (key, oldValue) => mqttServer);
+ _mqttServiceManager.AddMqttServer(mqttServer);
+ _reloadSemaphore.Release();
+ _logger.LogInformation("已添加MQTT服务器 {ServerName} 到监控列表", mqttServer.ServerName);
+ }
- while (!stoppingToken.IsCancellationRequested && _reloadSemaphore.CurrentCount == 0)
- {
- await Task.Delay(1000, stoppingToken);
- }
+ ///
+ /// 移除MQTT服务器配置
+ ///
+ public async Task RemoveMqttServerAsync(int mqttServerId, CancellationToken cancellationToken = default)
+ {
+ if (_mqttServers.TryRemove(mqttServerId, out var mqttServer))
+ {
+ await _mqttServiceManager.RemoveMqttServerAsync(mqttServerId, cancellationToken);
+ _logger.LogInformation("已移除MQTT服务器 {ServerName} 的监控", mqttServer?.ServerName ?? mqttServerId.ToString());
}
}
- catch (OperationCanceledException)
- {
- _logger.LogInformation("Mqtt后台服务正在停止。");
- }
- catch (Exception e)
- {
- _logger.LogError(e, $"Mqtt后台服务运行中发生了错误:{e.Message}");
- }
- finally
- {
- _messageChannel.Writer.Complete();
- await processQueueTask; // 等待消息队列处理完成
- await DisconnectAll(stoppingToken);
- // _deviceDataService.OnMqttListChanged -= HandleMqttListChanged;
- _logger.LogInformation("Mqtt后台服务已停止。");
- }
- }
- private async Task ProcessMessageQueueAsync(CancellationToken stoppingToken)
- {
- _logger.LogInformation("MQTT消息发送队列处理器已启动。");
- var batch = new List();
- var timer = new PeriodicTimer(TimeSpan.FromSeconds(1));
-
- while (!stoppingToken.IsCancellationRequested)
+ ///
+ /// 更新MQTT服务器配置
+ ///
+ public void UpdateMqttServer(MqttServer mqttServer)
{
+ if (mqttServer == null)
+ throw new ArgumentNullException(nameof(mqttServer));
+
+ _mqttServers.AddOrUpdate(mqttServer.Id, mqttServer, (key, oldValue) => mqttServer);
+ _reloadSemaphore.Release();
+ _logger.LogInformation("已更新MQTT服务器 {ServerName} 的配置", mqttServer.ServerName);
+ }
+
+ ///
+ /// 获取所有MQTT服务器配置
+ ///
+ public IEnumerable GetAllMqttServers()
+ {
+ return _mqttServers.Values.ToList();
+ }
+
+ ///
+ /// 发布变量数据到MQTT服务器
+ ///
+ public async Task PublishVariableDataAsync(VariableMqtt variableMqtt, CancellationToken cancellationToken = default)
+ {
+ await _mqttServiceManager.PublishVariableDataAsync(variableMqtt, cancellationToken);
+ }
+
+ ///
+ /// 发布批量变量数据到MQTT服务器
+ ///
+ public async Task PublishVariablesDataAsync(List variableMqtts, CancellationToken cancellationToken = default)
+ {
+ await _mqttServiceManager.PublishVariablesDataAsync(variableMqtts, cancellationToken);
+ }
+
+ ///
+ /// 后台服务的核心执行逻辑
+ ///
+ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
+ {
+ _logger.LogInformation("MQTT后台服务正在启动");
+
try
{
- // 等待信号:要么是新消息到达,要么是1秒定时器触发
- await Task.WhenAny(
- _messageChannel.Reader.WaitToReadAsync(stoppingToken).AsTask(),
- timer.WaitForNextTickAsync(stoppingToken).AsTask()
- );
-
- // 尽可能多地读取消息,直到达到批次上限
- while (batch.Count < 50 && _messageChannel.Reader.TryRead(out var message))
+ while (!stoppingToken.IsCancellationRequested )
{
- batch.Add(message);
- }
+ await _reloadSemaphore.WaitAsync(stoppingToken);
- if (batch.Any())
- {
- await SendBatchAsync(batch, stoppingToken);
- batch.Clear();
+ if (stoppingToken.IsCancellationRequested ) break;
+
+ // 加载MQTT配置
+ if (!LoadMqttConfigurations())
+ {
+ _logger.LogInformation("加载MQTT配置过程中发生了错误,停止后面的操作");
+ continue;
+ }
+
+ // 连接MQTT服务器
+ await ConnectMqttServersAsync(stoppingToken);
+ _logger.LogInformation("MQTT后台服务已启动");
+
+ // 保持运行状态
+ while (!stoppingToken.IsCancellationRequested && _reloadSemaphore.CurrentCount == 0)
+ {
+ await Task.Delay(1000, stoppingToken);
+ }
}
}
catch (OperationCanceledException)
{
- _logger.LogInformation("MQTT消息发送队列处理器已停止。");
- break;
+ _logger.LogInformation("MQTT后台服务正在停止");
}
catch (Exception ex)
{
- _logger.LogError(ex, $"处理MQTT消息队列时发生错误: {ex.Message}");
- await Task.Delay(5000, stoppingToken); // 发生未知错误时,延迟一段时间再重试
+ _logger.LogError(ex, $"MQTT后台服务运行中发生了错误: {ex.Message}");
+ }
+ finally
+ {
+ _logger.LogInformation("MQTT后台服务已停止");
}
}
- }
- private async Task SendBatchAsync(List batch, CancellationToken stoppingToken)
- {
- _logger.LogInformation($"准备发送一批 {batch.Count} 条MQTT消息。");
- // 按MQTT服务器ID进行分组
- var groupedByMqtt = batch.GroupBy(vm => vm.Mqtt.Id);
-
- foreach (var group in groupedByMqtt)
+ ///
+ /// 加载MQTT配置
+ ///
+ private bool LoadMqttConfigurations()
{
- var mqttId = group.Key;
- if (!_mqttClients.TryGetValue(mqttId, out var client) || !client.IsConnected)
- {
- _logger.LogWarning($"MQTT客户端 (ID: {mqttId}) 未连接或不存在,跳过 {group.Count()} 条消息。");
- continue;
- }
-
- var messages = group.Select(vm => new MqttApplicationMessageBuilder()
- .WithTopic(vm.Mqtt.PublishTopic)
- .WithPayload(vm.Variable?.DataValue?.ToString() ?? string.Empty)
- .WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce)
- .Build())
- .ToList();
try
{
- foreach (var message in messages)
+ _logger.LogInformation("开始加载MQTT配置...");
+ _mqttServers.Clear();
+
+ // 从数据服务中心获取所有激活的MQTT服务器
+ var mqttServerDtos = _dataCenterService.MqttServers.Values
+ .Where(m => m.IsActive)
+ .ToList();
+
+ foreach (var mqttServerDto in mqttServerDtos)
{
- await client.PublishAsync(message, stoppingToken);
+ // 将 MqttServerDto 转换为 MqttServer
+ var mqttServer = new MqttServer
+ {
+ Id = mqttServerDto.Id,
+ ServerName = mqttServerDto.ServerName,
+ ServerUrl = mqttServerDto.ServerUrl,
+ Port = mqttServerDto.Port,
+ Username = mqttServerDto.Username,
+ Password = mqttServerDto.Password,
+ IsActive = mqttServerDto.IsActive,
+ SubscribeTopic = mqttServerDto.SubscribeTopic,
+ PublishTopic = mqttServerDto.PublishTopic,
+ ClientId = mqttServerDto.ClientId,
+ CreatedAt = mqttServerDto.CreatedAt,
+ ConnectedAt = mqttServerDto.ConnectedAt,
+ ConnectionDuration = mqttServerDto.ConnectionDuration,
+ MessageFormat = mqttServerDto.MessageFormat
+ };
+
+ _mqttServers.TryAdd(mqttServer.Id, mqttServer);
+ _mqttServiceManager.AddMqttServer(mqttServer);
}
- _logger.LogInformation($"成功向MQTT客户端 (ID: {mqttId}) 发送 {messages.Count} 条消息。");
+
+ _logger.LogInformation($"成功加载 {mqttServerDtos.Count} 个MQTT配置");
+ return true;
}
catch (Exception ex)
{
- _logger.LogError(ex, $"向MQTT客户端 (ID: {mqttId}) 批量发送消息时发生错误: {ex.Message}");
+ _logger.LogError(ex, $"加载MQTT配置时发生错误: {ex.Message}");
+ return false;
}
}
- }
- private async Task DisconnectAll(CancellationToken stoppingToken)
- {
- var disconnectTasks = _mqttClients.Values.Select(client => client.DisconnectAsync(new MqttClientDisconnectOptions(), stoppingToken));
- await Task.WhenAll(disconnectTasks);
- _mqttClients.Clear();
- }
-
- private bool LoadMqttConfigurations()
- {
- try
+ ///
+ /// 连接MQTT服务器列表
+ ///
+ private async Task ConnectMqttServersAsync(CancellationToken stoppingToken)
{
- _logger.LogInformation("开始加载Mqtt配置文件...");
- _mqttConfigDic.Clear();
- // var mqttConfigList = _deviceDataService.Mqtts.Where(m => m.IsActive).ToList();
- //
- // foreach (var mqtt in mqttConfigList)
- // {
- // // mqtt.OnMqttIsActiveChanged += OnMqttIsActiveChangedHandler; // 移除此行,因为MqttServer没有这个事件
- // _mqttConfigDic.TryAdd(mqtt.Id, mqtt);
- // // mqtt.ConnectMessage = "配置加载成功."; // 移除此行,因为MqttServer没有这个属性
- // }
- //
- // _logger.LogInformation($"Mqtt配置文件加载成功,开启的Mqtt客户端:{mqttConfigList.Count}个。");
- return true;
- }
- catch (Exception e)
- {
- _logger.LogError(e, $"Mqtt后台服务在加载配置的过程中发生了错误:{e.Message}");
- return false;
- }
- }
+ var connectTasks = _mqttServers.Values
+ .Where(m => m.IsActive)
+ .Select(mqtt => _mqttServiceManager.ConnectMqttServerAsync(mqtt.Id, stoppingToken));
- // private async void OnMqttIsActiveChangedHandler(MqttServer mqtt) // 移除此方法,因为MqttServer没有这个事件
- // {
- // try
- // {
- // if (mqtt.IsActive)
- // {
- // await ConnectMqtt(mqtt, CancellationToken.None);
- // }
- // else
- // {
- // if (_mqttClients.TryRemove(mqtt.Id, out var client) && client.IsConnected)
- // {
- // await client.DisconnectAsync();
- // _logger.LogInformation($"{mqtt.Name}的客户端,与服务器断开连接.");
- // }
- // mqtt.IsConnected = false;
- // mqtt.ConnectMessage = "已断开连接.";
- // }
- //
- // await _repositoryManager.MqttServers.UpdateAsync(mqtt);
- // _logger.LogInformation($"Mqtt客户端:{mqtt.Name},激活状态修改成功。");
- // }
- // catch (Exception e)
- // {
- // _logger.LogError(e, $"{mqtt.Name}客户端,开启或关闭的过程中发生了错误:{e.Message}");
- // }
- // }
-
- private async Task ConnectMqttList(CancellationToken stoppingToken)
- {
- var connectTasks = _mqttConfigDic.Values.Select(mqtt => ConnectMqtt(mqtt, stoppingToken));
- await Task.WhenAll(connectTasks);
- }
-
- private async Task ConnectMqtt(MqttServer mqtt, CancellationToken stoppingToken)
- {
- if (_mqttClients.TryGetValue(mqtt.Id, out var existingClient) && existingClient.IsConnected)
- {
- _logger.LogInformation($"{mqtt.ServerName}的Mqtt服务器连接已存在。");
- return;
+ await Task.WhenAll(connectTasks);
}
- _logger.LogInformation($"开始连接:{mqtt.ServerName}的服务器...");
- // mqtt.ConnectMessage = "开始连接服务器..."; // 移除此行,因为MqttServer没有这个属性
-
- var factory = new MqttFactory();
- var client = factory.CreateMqttClient();
-
- var options = new MqttClientOptionsBuilder()
- .WithClientId(mqtt.ClientId)
- .WithTcpServer(mqtt.ServerUrl, mqtt.Port)
- .WithCredentials(mqtt.Username, mqtt.Password)
- .WithCleanSession()
- .Build();
-
- client.UseConnectedHandler(async e => await HandleConnected(e, client, mqtt));
- client.UseApplicationMessageReceivedHandler(e => HandleMessageReceived(e, mqtt));
- client.UseDisconnectedHandler(async e => await HandleDisconnected(e, options, client, mqtt, stoppingToken));
-
- try
+ ///
+ /// 处理MQTT列表变化
+ ///
+ private void HandleMqttListChanged(List mqtts)
{
- await client.ConnectAsync(options, stoppingToken);
- _mqttClients.AddOrUpdate(mqtt.Id, client, (id, oldClient) => client);
+ _logger.LogInformation("MQTT列表发生了变化,正在重新加载数据...");
+ _reloadSemaphore.Release();
}
- catch (Exception ex)
- {
- // mqtt.ConnectMessage = $"连接MQTT服务器失败: {ex.Message}"; // 移除此行,因为MqttServer没有这个属性
- _logger.LogError(ex, $"连接MQTT服务器失败: {mqtt.ServerName}");
- }
- }
-
- private static void HandleMessageReceived(MqttApplicationMessageReceivedEventArgs e, MqttServer mqtt)
- {
- var topic = e.ApplicationMessage.Topic;
- var payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
- // _logger.LogInformation($"MQTT客户端 {mqtt.ServerName} 收到消息: 主题={topic}, 消息={payload}");
- }
-
- private async Task HandleDisconnected(MqttClientDisconnectedEventArgs args, IMqttClientOptions options, IMqttClient client, MqttServer mqtt, CancellationToken stoppingToken)
- {
- _logger.LogWarning($"与MQTT服务器断开连接: {mqtt.ServerName}");
- // mqtt.ConnectMessage = "断开连接."; // 移除此行,因为MqttServer没有这个属性
- // mqtt.IsConnected = false; // 移除此行,因为MqttServer没有这个属性
-
- if (stoppingToken.IsCancellationRequested || !mqtt.IsActive) return;
-
- _reconnectAttempts.AddOrUpdate(mqtt.Id, 1, (id, count) => count + 1);
- var attempt = _reconnectAttempts[mqtt.Id];
-
- var delay = TimeSpan.FromSeconds(Math.Min(60, Math.Pow(2, attempt)));
- _logger.LogInformation($"与MQTT服务器:{mqtt.ServerName} 的连接已断开。将在 {delay.TotalSeconds} 秒后尝试第 {attempt} 次重新连接...");
- // mqtt.ConnectMessage = $"连接已断开,{delay.TotalSeconds}秒后尝试重连..."; // 移除此行,因为MqttServer没有这个属性
-
- await Task.Delay(delay, stoppingToken);
-
- try
- {
- // mqtt.ConnectMessage = "开始重新连接服务器..."; // 移除此行,因为MqttServer没有这个属性
- await client.ConnectAsync(options, stoppingToken);
- }
- catch (Exception ex)
- {
- // mqtt.ConnectMessage = "重新连接失败."; // 移除此行,因为MqttServer没有这个属性
- _logger.LogError(ex, $"重新与Mqtt服务器连接失败: {mqtt.ServerName}");
- }
- }
-
- private async Task HandleConnected(MqttClientConnectedEventArgs args, IMqttClient client, MqttServer mqtt)
- {
- _reconnectAttempts.TryRemove(mqtt.Id, out _);
- _logger.LogInformation($"已连接到MQTT服务器: {mqtt.ServerName}");
- // mqtt.IsConnected = true; // 移除此行,因为MqttServer没有这个属性
- // mqtt.ConnectMessage = "连接成功."; // 移除此行,因为MqttServer没有这个属性
-
- if (!string.IsNullOrEmpty(mqtt.SubscribeTopic))
- {
- await client.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(mqtt.SubscribeTopic).Build());
- _logger.LogInformation($"MQTT客户端 {mqtt.ServerName} 已订阅主题: {mqtt.SubscribeTopic}");
- }
- }
-
- private void HandleMqttListChanged(List mqtts)
- {
- _logger.LogInformation("Mqtt列表发生了变化,正在重新加载数据...");
- _reloadSemaphore.Release();
}
}
\ No newline at end of file
diff --git a/DMS.Infrastructure/Services/MqttDeviceContext.cs b/DMS.Infrastructure/Services/MqttDeviceContext.cs
new file mode 100644
index 0000000..84c338b
--- /dev/null
+++ b/DMS.Infrastructure/Services/MqttDeviceContext.cs
@@ -0,0 +1,48 @@
+using DMS.Core.Models;
+using DMS.Infrastructure.Interfaces.Services;
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+
+namespace DMS.Infrastructure.Services
+{
+ ///
+ /// MQTT设备上下文,用于存储单个MQTT服务器的连接信息和状态
+ ///
+ public class MqttDeviceContext
+ {
+ ///
+ /// MQTT服务器配置
+ ///
+ public MqttServer MqttServer { get; set; }
+
+ ///
+ /// MQTT服务实例
+ ///
+ public IMqttService MqttService { get; set; }
+
+ ///
+ /// 连接状态
+ ///
+ public bool IsConnected { get; set; }
+
+ ///
+ /// 重连尝试次数
+ ///
+ public int ReconnectAttempts { get; set; }
+
+ ///
+ /// 与该MQTT服务器关联的所有变量MQTT别名
+ ///
+ public ConcurrentDictionary VariableMqttAliases { get; set; }
+
+ ///
+ /// 构造函数
+ ///
+ public MqttDeviceContext()
+ {
+ VariableMqttAliases = new ConcurrentDictionary();
+ ReconnectAttempts = 0;
+ }
+ }
+}
\ No newline at end of file
diff --git a/DMS.Infrastructure/Services/MqttService.cs b/DMS.Infrastructure/Services/MqttService.cs
new file mode 100644
index 0000000..f829ac7
--- /dev/null
+++ b/DMS.Infrastructure/Services/MqttService.cs
@@ -0,0 +1,194 @@
+using DMS.Infrastructure.Interfaces.Services;
+using Microsoft.Extensions.Logging;
+using MQTTnet;
+using MQTTnet.Client;
+using MQTTnet.Client.Connecting;
+using MQTTnet.Client.Disconnecting;
+using MQTTnet.Client.Options;
+using MQTTnet.Client.Receiving;
+using System;
+using System.Threading.Tasks;
+
+namespace DMS.Infrastructure.Services
+{
+ ///
+ /// MQTT服务实现类,用于与MQTT服务器进行通信
+ ///
+ public class MqttService : IMqttService
+ {
+ private IMqttClient _mqttClient;
+ private readonly ILogger _logger;
+ private readonly IMqttClientOptions _options;
+ private Func _messageHandler;
+ private string _clientId;
+
+ public bool IsConnected => _mqttClient?.IsConnected ?? false;
+
+ ///
+ /// 构造函数,注入日志记录器
+ ///
+ /// 日志记录器实例
+ public MqttService(ILogger logger)
+ {
+ _logger = logger ?? throw new ArgumentNullException(nameof(logger));
+ var factory = new MqttFactory();
+ _mqttClient = factory.CreateMqttClient();
+ _clientId = Guid.NewGuid().ToString();
+ }
+
+ ///
+ /// 异步连接到MQTT服务器
+ ///
+ public async Task ConnectAsync(string serverUrl, int port, string clientId, string username, string password)
+ {
+ try
+ {
+ _clientId = clientId ?? Guid.NewGuid().ToString();
+
+ var options = new MqttClientOptionsBuilder()
+ .WithTcpServer(serverUrl, port)
+ .WithClientId(_clientId)
+ .WithCredentials(username, password)
+ .WithCleanSession()
+ .Build();
+
+ _mqttClient.UseConnectedHandler(async e => await HandleConnectedAsync(e));
+ _mqttClient.UseDisconnectedHandler(async e => await HandleDisconnectedAsync(e));
+ _mqttClient.UseApplicationMessageReceivedHandler(async e => await HandleMessageReceivedAsync(e));
+
+ await _mqttClient.ConnectAsync(options);
+ _logger.LogInformation($"成功连接到MQTT服务器: {serverUrl}:{port} (ClientID: {_clientId})");
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, $"连接MQTT服务器时发生错误: {ex.Message} (ClientID: {_clientId})");
+ throw;
+ }
+ }
+
+ ///
+ /// 异步断开MQTT连接
+ ///
+ public async Task DisconnectAsync()
+ {
+ try
+ {
+ if (_mqttClient != null && _mqttClient.IsConnected)
+ {
+ await _mqttClient.DisconnectAsync();
+ _logger.LogInformation($"已断开MQTT服务器连接 (ClientID: {_clientId})");
+ }
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, $"断开MQTT服务器连接时发生错误: {ex.Message} (ClientID: {_clientId})");
+ throw;
+ }
+ }
+
+ ///
+ /// 异步发布消息
+ ///
+ public async Task PublishAsync(string topic, string payload)
+ {
+ if (!IsConnected)
+ {
+ throw new InvalidOperationException("MQTT客户端未连接");
+ }
+
+ try
+ {
+ var message = new MqttApplicationMessageBuilder()
+ .WithTopic(topic)
+ .WithPayload(payload ?? string.Empty)
+ .WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce)
+ .Build();
+
+ await _mqttClient.PublishAsync(message);
+ _logger.LogDebug($"成功发布消息到主题 {topic} (ClientID: {_clientId})");
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, $"发布消息到主题 {topic} 时发生错误: {ex.Message} (ClientID: {_clientId})");
+ throw;
+ }
+ }
+
+ ///
+ /// 异步订阅主题
+ ///
+ public async Task SubscribeAsync(string topic)
+ {
+ if (!IsConnected)
+ {
+ throw new InvalidOperationException("MQTT客户端未连接");
+ }
+
+ try
+ {
+ await _mqttClient.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(topic).Build());
+ _logger.LogInformation($"成功订阅主题: {topic} (ClientID: {_clientId})");
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, $"订阅主题 {topic} 时发生错误: {ex.Message} (ClientID: {_clientId})");
+ throw;
+ }
+ }
+
+ ///
+ /// 设置消息接收处理程序
+ ///
+ public void SetMessageHandler(Func handler)
+ {
+ _messageHandler = handler;
+ }
+
+ #region 私有处理方法
+
+ private async Task HandleConnectedAsync(MqttClientConnectedEventArgs args)
+ {
+ _logger.LogInformation($"MQTT客户端连接成功 (ClientID: {_clientId})");
+ }
+
+ private async Task HandleDisconnectedAsync(MqttClientDisconnectedEventArgs args)
+ {
+ _logger.LogWarning($"MQTT客户端断开连接: {args.Reason} (ClientID: {_clientId})");
+ }
+
+ private async Task HandleMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs args)
+ {
+ var topic = args.ApplicationMessage.Topic;
+ var payload = System.Text.Encoding.UTF8.GetString(args.ApplicationMessage.Payload);
+
+ _logger.LogDebug($"收到MQTT消息 - 主题: {topic}, 内容: {payload} (ClientID: {_clientId})");
+
+ if (_messageHandler != null)
+ {
+ await _messageHandler(topic, payload);
+ }
+ }
+
+ #endregion
+
+ ///
+ /// 释放资源
+ ///
+ public void Dispose()
+ {
+ try
+ {
+ if (_mqttClient != null && _mqttClient.IsConnected)
+ {
+ _mqttClient.DisconnectAsync().Wait(TimeSpan.FromSeconds(5));
+ }
+ _mqttClient?.Dispose();
+ _logger.LogInformation($"MQTT服务资源已释放 (ClientID: {_clientId})");
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, $"释放MQTT服务资源时发生错误: {ex.Message} (ClientID: {_clientId})");
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/DMS.Infrastructure/Services/MqttServiceFactory.cs b/DMS.Infrastructure/Services/MqttServiceFactory.cs
new file mode 100644
index 0000000..8f3b232
--- /dev/null
+++ b/DMS.Infrastructure/Services/MqttServiceFactory.cs
@@ -0,0 +1,45 @@
+using DMS.Infrastructure.Interfaces.Services;
+using Microsoft.Extensions.Logging;
+using System;
+
+namespace DMS.Infrastructure.Services
+{
+ ///
+ /// MQTT服务工厂实现类,用于创建MQTT服务实例
+ ///
+ public class MqttServiceFactory : IMqttServiceFactory
+ {
+ private readonly ILogger _logger;
+
+ ///
+ /// 构造函数,注入日志记录器
+ ///
+ /// MQTT服务日志记录器
+ public MqttServiceFactory(ILogger logger)
+ {
+ _logger = logger ?? throw new ArgumentNullException(nameof(logger));
+ }
+
+ ///
+ /// 创建MQTT服务实例
+ ///
+ /// IMqttService实例
+ public IMqttService CreateService()
+ {
+ return new MqttService(_logger);
+ }
+
+ ///
+ /// 根据MQTT服务器配置创建MQTT服务实例
+ ///
+ /// MQTT服务器配置
+ /// IMqttService实例
+ public IMqttService CreateService(Core.Models.MqttServer mqttServer)
+ {
+ if (mqttServer == null)
+ throw new ArgumentNullException(nameof(mqttServer));
+
+ return new MqttService(_logger);
+ }
+ }
+}
\ No newline at end of file
diff --git a/DMS.Infrastructure/Services/MqttServiceManager.cs b/DMS.Infrastructure/Services/MqttServiceManager.cs
new file mode 100644
index 0000000..68a5cd9
--- /dev/null
+++ b/DMS.Infrastructure/Services/MqttServiceManager.cs
@@ -0,0 +1,332 @@
+using DMS.Infrastructure.Interfaces.Services;
+using Microsoft.Extensions.Logging;
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using DMS.Core.Models;
+using DMS.Application.Interfaces;
+
+namespace DMS.Infrastructure.Services
+{
+ ///
+ /// MQTT服务管理器,负责管理MQTT连接和变量监控
+ ///
+ public class MqttServiceManager : IMqttServiceManager
+ {
+ private readonly ILogger _logger;
+ private readonly IDataProcessingService _dataProcessingService;
+ private readonly IDataCenterService _dataCenterService;
+ private readonly IMqttServiceFactory _mqttServiceFactory;
+ private readonly ConcurrentDictionary _mqttContexts;
+ private readonly SemaphoreSlim _semaphore;
+ private bool _disposed = false;
+
+ public MqttServiceManager(
+ ILogger logger,
+ IDataProcessingService dataProcessingService,
+ IDataCenterService dataCenterService,
+ IMqttServiceFactory mqttServiceFactory)
+ {
+ _logger = logger ?? throw new ArgumentNullException(nameof(logger));
+ _dataProcessingService = dataProcessingService ?? throw new ArgumentNullException(nameof(dataProcessingService));
+ _dataCenterService = dataCenterService ?? throw new ArgumentNullException(nameof(dataCenterService));
+ _mqttServiceFactory = mqttServiceFactory ?? throw new ArgumentNullException(nameof(mqttServiceFactory));
+ _mqttContexts = new ConcurrentDictionary();
+ _semaphore = new SemaphoreSlim(10, 10); // 默认最大并发连接数为10
+ }
+
+ ///
+ /// 初始化服务管理器
+ ///
+ public async Task InitializeAsync(CancellationToken cancellationToken = default)
+ {
+ _logger.LogInformation("MQTT服务管理器正在初始化...");
+ // 初始化逻辑可以在需要时添加
+ _logger.LogInformation("MQTT服务管理器初始化完成");
+ }
+
+ ///
+ /// 添加MQTT服务器到监控列表
+ ///
+ public void AddMqttServer(MqttServer mqttServer)
+ {
+ if (mqttServer == null)
+ throw new ArgumentNullException(nameof(mqttServer));
+
+ var context = new MqttDeviceContext
+ {
+ MqttServer = mqttServer,
+ MqttService = _mqttServiceFactory.CreateService(),
+ IsConnected = false
+ };
+
+ _mqttContexts.AddOrUpdate(mqttServer.Id, context, (key, oldValue) => context);
+ _logger.LogInformation("已添加MQTT服务器 {MqttServerId} 到监控列表", mqttServer.Id);
+ }
+
+ ///
+ /// 移除MQTT服务器监控
+ ///
+ public async Task RemoveMqttServerAsync(int mqttServerId, CancellationToken cancellationToken = default)
+ {
+ if (_mqttContexts.TryRemove(mqttServerId, out var context))
+ {
+ await DisconnectMqttServerAsync(mqttServerId, cancellationToken);
+ _logger.LogInformation("已移除MQTT服务器 {MqttServerId} 的监控", mqttServerId);
+ }
+ }
+
+ ///
+ /// 更新MQTT服务器变量别名
+ ///
+ public void UpdateVariableMqttAliases(int mqttServerId, List variableMqttAliases)
+ {
+ if (_mqttContexts.TryGetValue(mqttServerId, out var context))
+ {
+ context.VariableMqttAliases.Clear();
+ foreach (var alias in variableMqttAliases)
+ {
+ context.VariableMqttAliases.AddOrUpdate(alias.Id, alias, (key, oldValue) => alias);
+ }
+ _logger.LogInformation("已更新MQTT服务器 {MqttServerId} 的变量别名列表,共 {Count} 个别名", mqttServerId, variableMqttAliases.Count);
+ }
+ }
+
+ ///
+ /// 获取MQTT服务器连接状态
+ ///
+ public bool IsMqttServerConnected(int mqttServerId)
+ {
+ return _mqttContexts.TryGetValue(mqttServerId, out var context) && context.IsConnected;
+ }
+
+ ///
+ /// 重新连接MQTT服务器
+ ///
+ public async Task ReconnectMqttServerAsync(int mqttServerId, CancellationToken cancellationToken = default)
+ {
+ if (_mqttContexts.TryGetValue(mqttServerId, out var context))
+ {
+ await DisconnectMqttServerAsync(mqttServerId, cancellationToken);
+ await ConnectMqttServerAsync(mqttServerId, cancellationToken);
+ }
+ }
+
+ ///
+ /// 获取所有监控的MQTT服务器ID
+ ///
+ public IEnumerable GetMonitoredMqttServerIds()
+ {
+ return _mqttContexts.Keys.ToList();
+ }
+
+ ///
+ /// 连接MQTT服务器
+ ///
+ public async Task ConnectMqttServerAsync(int mqttServerId, CancellationToken cancellationToken = default)
+ {
+ if (!_mqttContexts.TryGetValue(mqttServerId, out var context))
+ {
+ _logger.LogWarning("未找到MQTT服务器 {MqttServerId}", mqttServerId);
+ return;
+ }
+
+ await _semaphore.WaitAsync(cancellationToken);
+ try
+ {
+ _logger.LogInformation("正在连接MQTT服务器 {ServerName} ({ServerUrl}:{Port})",
+ context.MqttServer.ServerName, context.MqttServer.ServerUrl, context.MqttServer.Port);
+
+ var stopwatch = Stopwatch.StartNew();
+
+ // 设置连接超时
+ using var timeoutToken = new CancellationTokenSource(5000); // 5秒超时
+ using var linkedToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutToken.Token);
+
+ await context.MqttService.ConnectAsync(
+ context.MqttServer.ServerUrl,
+ context.MqttServer.Port,
+ context.MqttServer.ClientId,
+ context.MqttServer.Username,
+ context.MqttServer.Password);
+
+ stopwatch.Stop();
+ _logger.LogInformation("MQTT服务器 {ServerName} 连接耗时 {ElapsedMs} ms",
+ context.MqttServer.ServerName, stopwatch.ElapsedMilliseconds);
+
+ if (context.MqttService.IsConnected)
+ {
+ context.IsConnected = true;
+ context.ReconnectAttempts = 0; // 重置重连次数
+
+ // 订阅主题
+ if (!string.IsNullOrEmpty(context.MqttServer.SubscribeTopic))
+ {
+ await context.MqttService.SubscribeAsync(context.MqttServer.SubscribeTopic);
+ }
+
+ _logger.LogInformation("MQTT服务器 {ServerName} 连接成功", context.MqttServer.ServerName);
+ }
+ else
+ {
+ _logger.LogWarning("MQTT服务器 {ServerName} 连接失败", context.MqttServer.ServerName);
+ }
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "连接MQTT服务器 {ServerName} 时发生错误: {ErrorMessage}",
+ context.MqttServer.ServerName, ex.Message);
+ context.IsConnected = false;
+ context.ReconnectAttempts++;
+ }
+ finally
+ {
+ _semaphore.Release();
+ }
+ }
+
+ ///
+ /// 断开MQTT服务器连接
+ ///
+ public async Task DisconnectMqttServerAsync(int mqttServerId, CancellationToken cancellationToken = default)
+ {
+ if (!_mqttContexts.TryGetValue(mqttServerId, out var context))
+ return;
+
+ try
+ {
+ _logger.LogInformation("正在断开MQTT服务器 {ServerName} 的连接", context.MqttServer.ServerName);
+ await context.MqttService.DisconnectAsync();
+ context.IsConnected = false;
+ _logger.LogInformation("MQTT服务器 {ServerName} 连接已断开", context.MqttServer.ServerName);
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "断开MQTT服务器 {ServerName} 连接时发生错误: {ErrorMessage}",
+ context.MqttServer.ServerName, ex.Message);
+ }
+ }
+
+ ///
+ /// 发布变量数据到MQTT服务器
+ ///
+ public async Task PublishVariableDataAsync(VariableMqtt variableMqtt, CancellationToken cancellationToken = default)
+ {
+ if (variableMqtt?.Mqtt == null || variableMqtt.Variable == null)
+ {
+ _logger.LogWarning("无效的VariableMqtt对象,跳过发布");
+ return;
+ }
+
+ if (!_mqttContexts.TryGetValue(variableMqtt.Mqtt.Id, out var context))
+ {
+ _logger.LogWarning("未找到MQTT服务器 {MqttServerId}", variableMqtt.Mqtt.Id);
+ return;
+ }
+
+ if (!context.IsConnected)
+ {
+ _logger.LogWarning("MQTT服务器 {ServerName} 未连接,跳过发布", context.MqttServer.ServerName);
+ return;
+ }
+
+ try
+ {
+ var topic = context.MqttServer.PublishTopic;
+ var payload = variableMqtt.Variable.DataValue?.ToString() ?? string.Empty;
+
+ await context.MqttService.PublishAsync(topic, payload);
+ _logger.LogDebug("成功向MQTT服务器 {ServerName} 发布变量 {VariableName} 的数据",
+ context.MqttServer.ServerName, variableMqtt.Variable.Name);
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "向MQTT服务器 {ServerName} 发布变量 {VariableName} 数据时发生错误: {ErrorMessage}",
+ context.MqttServer.ServerName, variableMqtt.Variable.Name, ex.Message);
+ }
+ }
+
+ ///
+ /// 发布批量变量数据到MQTT服务器
+ ///
+ public async Task PublishVariablesDataAsync(List variableMqtts, CancellationToken cancellationToken = default)
+ {
+ if (variableMqtts == null || !variableMqtts.Any())
+ {
+ _logger.LogWarning("变量MQTT列表为空,跳过批量发布");
+ return;
+ }
+
+ // 按MQTT服务器ID进行分组
+ var groupedByMqtt = variableMqtts.GroupBy(vm => vm.Mqtt.Id);
+
+ foreach (var group in groupedByMqtt)
+ {
+ var mqttId = group.Key;
+ if (!_mqttContexts.TryGetValue(mqttId, out var context))
+ {
+ _logger.LogWarning("未找到MQTT服务器 {MqttServerId},跳过 {Count} 条消息", mqttId, group.Count());
+ continue;
+ }
+
+ if (!context.IsConnected)
+ {
+ _logger.LogWarning("MQTT服务器 {ServerName} 未连接,跳过 {Count} 条消息",
+ context.MqttServer.ServerName, group.Count());
+ continue;
+ }
+
+ try
+ {
+ foreach (var variableMqtt in group)
+ {
+ var topic = context.MqttServer.PublishTopic;
+ var payload = variableMqtt.Variable?.DataValue?.ToString() ?? string.Empty;
+
+ await context.MqttService.PublishAsync(topic, payload);
+ }
+
+ _logger.LogInformation("成功向MQTT服务器 {ServerName} 发布 {Count} 条变量数据",
+ context.MqttServer.ServerName, group.Count());
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "向MQTT服务器 {ServerName} 批量发布变量数据时发生错误: {ErrorMessage}",
+ context.MqttServer.ServerName, ex.Message);
+ }
+ }
+ }
+
+ ///
+ /// 释放资源
+ ///
+ public void Dispose()
+ {
+ if (!_disposed)
+ {
+ _disposed = true;
+ _semaphore?.Dispose();
+
+ // 断开所有MQTT连接
+ foreach (var context in _mqttContexts.Values)
+ {
+ try
+ {
+ context.MqttService?.DisconnectAsync().Wait(TimeSpan.FromSeconds(5));
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "断开MQTT服务器 {ServerName} 连接时发生错误",
+ context.MqttServer?.ServerName ?? "Unknown");
+ }
+ }
+
+ _logger.LogInformation("MQTT服务管理器已释放资源");
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/DMS.Infrastructure/Services/OpcUaBackgroundService.cs b/DMS.Infrastructure/Services/OpcUaBackgroundService.cs
index 4e20f45..b8275f5 100644
--- a/DMS.Infrastructure/Services/OpcUaBackgroundService.cs
+++ b/DMS.Infrastructure/Services/OpcUaBackgroundService.cs
@@ -81,10 +81,10 @@ public class OpcUaBackgroundService : BackgroundService
_opcUaPollVariablesByDeviceId = new ConcurrentDictionary>();
_opcUaVariablesByDeviceId = new ConcurrentDictionary>();
- _dataCenterService.DataLoadCompleted += DataLoadCompleted;
+ _dataCenterService.OnLoadDataCompleted += OnLoadDataCompleted;
}
- private void DataLoadCompleted(object? sender, DataLoadCompletedEventArgs e)
+ private void OnLoadDataCompleted(object? sender, DataLoadCompletedEventArgs e)
{
_reloadSemaphore.Release();
}
diff --git a/DMS.Infrastructure/Services/OptimizedOpcUaBackgroundService.cs b/DMS.Infrastructure/Services/OptimizedOpcUaBackgroundService.cs
index 0c0a477..cea7c74 100644
--- a/DMS.Infrastructure/Services/OptimizedOpcUaBackgroundService.cs
+++ b/DMS.Infrastructure/Services/OptimizedOpcUaBackgroundService.cs
@@ -32,10 +32,10 @@ namespace DMS.Infrastructure.Services
_opcUaServiceManager = opcUaServiceManager ?? throw new ArgumentNullException(nameof(opcUaServiceManager));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
- _dataCenterService.DataLoadCompleted += OnDataLoadCompleted;
+ _dataCenterService.OnLoadDataCompleted += OnLoadDataCompleted;
}
- private void OnDataLoadCompleted(object sender, DataLoadCompletedEventArgs e)
+ private void OnLoadDataCompleted(object sender, DataLoadCompletedEventArgs e)
{
_logger.LogInformation("收到数据加载完成通知,触发OPC UA服务重新加载");
_reloadSemaphore.Release();
@@ -145,7 +145,7 @@ namespace DMS.Infrastructure.Services
{
_logger.LogInformation("正在释放OPC UA后台服务资源...");
- _dataCenterService.DataLoadCompleted -= OnDataLoadCompleted;
+ _dataCenterService.OnLoadDataCompleted -= OnLoadDataCompleted;
_reloadSemaphore?.Dispose();
base.Dispose();
diff --git a/DMS.Infrastructure/Services/OptimizedS7BackgroundService.cs b/DMS.Infrastructure/Services/OptimizedS7BackgroundService.cs
index 330af45..0aac121 100644
--- a/DMS.Infrastructure/Services/OptimizedS7BackgroundService.cs
+++ b/DMS.Infrastructure/Services/OptimizedS7BackgroundService.cs
@@ -64,10 +64,10 @@ public class OptimizedS7BackgroundService : BackgroundService
_s7ServiceManager = s7ServiceManager;
_logger = logger;
- _dataCenterService.DataLoadCompleted += DataLoadCompleted;
+ _dataCenterService.OnLoadDataCompleted += OnLoadDataCompleted;
}
- private void DataLoadCompleted(object? sender, DataLoadCompletedEventArgs e)
+ private void OnLoadDataCompleted(object? sender, DataLoadCompletedEventArgs e)
{
_reloadSemaphore.Release();
}
diff --git a/DMS.Infrastructure/Services/S7BackgroundService.cs b/DMS.Infrastructure/Services/S7BackgroundService.cs
index 220c616..d48eac3 100644
--- a/DMS.Infrastructure/Services/S7BackgroundService.cs
+++ b/DMS.Infrastructure/Services/S7BackgroundService.cs
@@ -51,10 +51,10 @@ public class S7BackgroundService : BackgroundService
_messenger = messenger;
_logger = logger;
- _dataCenterService.DataLoadCompleted += DataLoadCompleted;
+ _dataCenterService.OnLoadDataCompleted += OnLoadDataCompleted;
}
- private void DataLoadCompleted(object? sender, DataLoadCompletedEventArgs e)
+ private void OnLoadDataCompleted(object? sender, DataLoadCompletedEventArgs e)
{
_reloadSemaphore.Release();
}
diff --git a/DMS.WPF/App.xaml.cs b/DMS.WPF/App.xaml.cs
index fe508c2..ff6a44e 100644
--- a/DMS.WPF/App.xaml.cs
+++ b/DMS.WPF/App.xaml.cs
@@ -156,6 +156,8 @@ public partial class App : System.Windows.Application
services.AddSingleton();
services.AddTransient();
+ services.AddTransient();
+ services.AddTransient();
// 注册App服务
services.AddSingleton();
@@ -167,6 +169,10 @@ public partial class App : System.Windows.Application
services.AddSingleton();
services.AddSingleton();
+ // 注册MQTT服务管理器
+ services.AddSingleton();
+ services.AddHostedService();
+
// 注册WPF中的服务
services.AddSingleton();
services.AddSingleton(provider =>
diff --git a/DMS.WPF/Services/DataServices.cs b/DMS.WPF/Services/DataServices.cs
index b3ba276..299d6d3 100644
--- a/DMS.WPF/Services/DataServices.cs
+++ b/DMS.WPF/Services/DataServices.cs
@@ -101,13 +101,13 @@ public partial class DataServices : ObservableObject, IRecipient, I
// 监听变量值变更事件
_dataCenterService.VariableValueChanged += OnVariableValueChanged;
- _dataCenterService.DataLoadCompleted += OnDataLoadCompleted;
+ _dataCenterService.OnLoadDataCompleted += OnLoadDataCompleted;
// 注册消息接收
// WeakReferenceMessenger.Register(this, (r, m) => r.Receive(m));
}
- private void OnDataLoadCompleted(object? sender, DataLoadCompletedEventArgs e)
+ private void OnLoadDataCompleted(object? sender, DataLoadCompletedEventArgs e)
{
if (e.IsSuccess)
{