重构MQTT事件处理和激活状态管理功能

This commit is contained in:
2025-10-05 14:45:41 +08:00
parent b96101dea6
commit 80ea47e627
13 changed files with 70 additions and 198 deletions

View File

@@ -45,7 +45,7 @@ namespace DMS.Infrastructure.Services.Mqtt
{
Start();
}
}
/// <summary>
@@ -137,11 +137,11 @@ namespace DMS.Infrastructure.Services.Mqtt
try
{
while (!stoppingToken.IsCancellationRequested )
while (!stoppingToken.IsCancellationRequested)
{
await _reloadSemaphore.WaitAsync(stoppingToken);
if (stoppingToken.IsCancellationRequested ) break;
if (stoppingToken.IsCancellationRequested) break;
// 加载MQTT配置
if (!LoadMqttConfigurations())
@@ -155,7 +155,7 @@ namespace DMS.Infrastructure.Services.Mqtt
_logger.LogInformation("MQTT后台服务已启动");
// 保持运行状态
while (!stoppingToken.IsCancellationRequested && _reloadSemaphore.CurrentCount == 0)
while (!stoppingToken.IsCancellationRequested && _reloadSemaphore.CurrentCount == 0)
{
await Task.Delay(1000, stoppingToken);
}
@@ -186,9 +186,7 @@ namespace DMS.Infrastructure.Services.Mqtt
_mqttServers.Clear();
// 从数据服务中心获取所有激活的MQTT服务器
var mqttServerDtos = _appDataStorageService.MqttServers.Values
.Where(m => m.IsActive)
.ToList();
var mqttServerDtos = _appDataStorageService.MqttServers.Values.ToList();
foreach (var mqttServerDto in mqttServerDtos)
{
@@ -252,12 +250,12 @@ namespace DMS.Infrastructure.Services.Mqtt
public override void Dispose()
{
_logger.LogInformation("正在释放MQTT后台服务资源...");
_eventService.OnLoadDataCompleted -= OnLoadDataCompleted;
_reloadSemaphore?.Dispose();
base.Dispose();
_logger.LogInformation("MQTT后台服务资源已释放");
}
}

View File

@@ -49,11 +49,6 @@ namespace DMS.Infrastructure.Services.Mqtt
_eventService.OnMqttServerChanged += OnMqttServerChanged;
}
/// <summary>
/// 标志是否正在处理事件,用于防止递归调用
/// </summary>
private readonly ConcurrentDictionary<int, bool> _isProcessingUpdate = new();
/// <summary>
/// 初始化服务管理器
/// </summary>
@@ -81,10 +76,6 @@ namespace DMS.Infrastructure.Services.Mqtt
_mqttContexts.AddOrUpdate(mqttServer.Id, context, (key, oldValue) => context);
_logger.LogInformation("已添加MQTT服务器 {MqttServerId} 到监控列表", mqttServer.Id);
// 使用AutoMapper触发MQTT服务器改变事件
var mqttServerDto = _mapper.Map<MqttServerDto>(mqttServer);
_eventService.RaiseMqttServerChanged(this, new MqttServerChangedEventArgs(Core.Enums.ActionChangeType.Added, mqttServerDto));
}
/// <summary>
@@ -97,10 +88,6 @@ namespace DMS.Infrastructure.Services.Mqtt
await DisconnectMqttServerAsync(mqttServerId, cancellationToken);
_logger.LogInformation("已移除MQTT服务器 {MqttServerId} 的监控", mqttServerId);
// 使用AutoMapper触发MQTT服务器删除事件
var mqttServerDto = _mapper.Map<MqttServerDto>(context.MqttServerConfig);
_eventService.RaiseMqttServerChanged(this, new MqttServerChangedEventArgs(Core.Enums.ActionChangeType.Deleted, mqttServerDto));
}
}
@@ -352,14 +339,6 @@ namespace DMS.Infrastructure.Services.Mqtt
{
try
{
// 防止同一服务器的递归更新
if (_isProcessingUpdate.ContainsKey(e.MqttServer.Id) && _isProcessingUpdate[e.MqttServer.Id])
{
_logger.LogDebug("正在处理服务器 {MqttServerId} 的更新,跳过重复事件", e.MqttServer.Id);
return;
}
_isProcessingUpdate[e.MqttServer.Id] = true;
_logger.LogDebug("处理MQTT服务器变更事件: 服务器ID={MqttServerId}, 变更类型={ChangeType}, 变更属性={PropertyType}",
e.MqttServer.Id, e.ChangeType, e.PropertyType);
@@ -382,10 +361,6 @@ namespace DMS.Infrastructure.Services.Mqtt
_logger.LogError(ex, "处理MQTT服务器变更事件时发生错误: 服务器ID={MqttServerId}, 变更类型={ChangeType}",
e.MqttServer.Id, e.ChangeType);
}
finally
{
_isProcessingUpdate.TryRemove(e.MqttServer.Id, out _);
}
}
/// <summary>
@@ -474,10 +449,6 @@ namespace DMS.Infrastructure.Services.Mqtt
await ReconnectMqttServerAsync(mqttServer.Id, CancellationToken.None);
break;
case MqttServerPropertyType.IsActive:
// 检查当前激活状态和新激活状态是否一致
if (context.MqttServerConfig.IsActive != mqttServer.IsActive)
{
context.MqttServerConfig.IsActive = mqttServer.IsActive;
if (mqttServer.IsActive)
{
@@ -489,7 +460,6 @@ namespace DMS.Infrastructure.Services.Mqtt
// 激活状态变为false断开服务器连接
await DisconnectMqttServerAsync(mqttServer.Id, CancellationToken.None);
}
}
break;
case MqttServerPropertyType.SubscribeTopic:
context.MqttServerConfig.SubscribeTopic = mqttServer.SubscribeTopic;
@@ -556,9 +526,6 @@ namespace DMS.Infrastructure.Services.Mqtt
context.MqttServerConfig?.ServerName ?? "Unknown");
}
}
// 清理处理更新状态的字典
_isProcessingUpdate.Clear();
_logger.LogInformation("MQTT服务管理器已释放资源");
}