feat: 实现 MQTT 服务事件驱动机制,实时响应服务器变化

This commit is contained in:
2025-10-05 12:52:22 +08:00
parent ea18a6ac2c
commit b96101dea6
3 changed files with 205 additions and 4 deletions

View File

@@ -44,8 +44,16 @@ namespace DMS.Infrastructure.Services.Mqtt
_mapper = mapper ?? throw new ArgumentNullException(nameof(mapper));
_mqttContexts = new ConcurrentDictionary<int, MqttDeviceContext>();
_semaphore = new SemaphoreSlim(10, 10); // 默认最大并发连接数为10
// 订阅MQTT服务器变更事件
_eventService.OnMqttServerChanged += OnMqttServerChanged;
}
/// <summary>
/// 标志是否正在处理事件,用于防止递归调用
/// </summary>
private readonly ConcurrentDictionary<int, bool> _isProcessingUpdate = new();
/// <summary>
/// 初始化服务管理器
/// </summary>
@@ -337,6 +345,190 @@ namespace DMS.Infrastructure.Services.Mqtt
}
}
/// <summary>
/// 处理MQTT服务器变更事件
/// </summary>
private async void OnMqttServerChanged(object? sender, MqttServerChangedEventArgs e)
{
try
{
// 防止同一服务器的递归更新
if (_isProcessingUpdate.ContainsKey(e.MqttServer.Id) && _isProcessingUpdate[e.MqttServer.Id])
{
_logger.LogDebug("正在处理服务器 {MqttServerId} 的更新,跳过重复事件", e.MqttServer.Id);
return;
}
_isProcessingUpdate[e.MqttServer.Id] = true;
_logger.LogDebug("处理MQTT服务器变更事件: 服务器ID={MqttServerId}, 变更类型={ChangeType}, 变更属性={PropertyType}",
e.MqttServer.Id, e.ChangeType, e.PropertyType);
switch (e.ChangeType)
{
case ActionChangeType.Added:
HandleMqttServerAdded(e.MqttServer);
break;
case ActionChangeType.Updated:
await HandleMqttServerUpdated(e.MqttServer, e.PropertyType);
break;
case ActionChangeType.Deleted:
await HandleMqttServerRemoved(e.MqttServer.Id);
break;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "处理MQTT服务器变更事件时发生错误: 服务器ID={MqttServerId}, 变更类型={ChangeType}",
e.MqttServer.Id, e.ChangeType);
}
finally
{
_isProcessingUpdate.TryRemove(e.MqttServer.Id, out _);
}
}
/// <summary>
/// 处理MQTT服务器添加事件
/// </summary>
private void HandleMqttServerAdded(MqttServerDto mqttServer)
{
if (mqttServer == null)
{
_logger.LogWarning("HandleMqttServerAdded: 接收到空MQTT服务器对象");
return;
}
try
{
_logger.LogInformation("处理MQTT服务器添加事件: {MqttServerId} ({MqttServerName})",
mqttServer.Id, mqttServer.ServerName);
// 将DTO转换为MqttServer实体
var mqttServerEntity = _mapper.Map<MqttServer>(mqttServer);
// 添加服务器到监控列表
AddMqttServer(mqttServerEntity);
// 如果服务器是激活状态,则尝试连接
if (mqttServer.IsActive)
{
// 使用Task.Run来避免阻塞事件处理
_ = Task.Run(async () =>
{
try
{
await ConnectMqttServerAsync(mqttServer.Id, CancellationToken.None);
}
catch (Exception ex)
{
_logger.LogError(ex, "连接新添加的MQTT服务器 {MqttServerId} ({MqttServerName}) 时发生错误",
mqttServer.Id, mqttServer.ServerName);
}
});
}
}
catch (Exception ex)
{
_logger.LogError(ex, "处理MQTT服务器添加事件时发生错误: {MqttServerId} ({MqttServerName})",
mqttServer?.Id, mqttServer?.ServerName);
}
}
/// <summary>
/// 处理MQTT服务器更新事件
/// </summary>
private async Task HandleMqttServerUpdated(MqttServerDto mqttServer, MqttServerPropertyType propertyType)
{
if (mqttServer == null)
{
_logger.LogWarning("HandleMqttServerUpdated: 接收到空MQTT服务器对象");
return;
}
try
{
_logger.LogInformation("处理MQTT服务器更新事件: {MqttServerId} ({MqttServerName}), 属性类型: {PropertyType}",
mqttServer.Id, mqttServer.ServerName, propertyType);
// 检查上下文是否存在
if (_mqttContexts.TryGetValue(mqttServer.Id, out var context))
{
// 根据属性类型决定是否需要重新连接或特殊处理
switch (propertyType)
{
case MqttServerPropertyType.ServerUrl:
case MqttServerPropertyType.Port:
case MqttServerPropertyType.Username:
case MqttServerPropertyType.Password:
case MqttServerPropertyType.ClientId:
// 这些属性修改后需要重新连接
// 更新配置
context.MqttServerConfig.ServerUrl = mqttServer.ServerUrl;
context.MqttServerConfig.Port = mqttServer.Port;
context.MqttServerConfig.Username = mqttServer.Username;
context.MqttServerConfig.Password = mqttServer.Password;
context.MqttServerConfig.ClientId = mqttServer.ClientId;
// 重新连接服务器
await ReconnectMqttServerAsync(mqttServer.Id, CancellationToken.None);
break;
case MqttServerPropertyType.IsActive:
// 检查当前激活状态和新激活状态是否一致
if (context.MqttServerConfig.IsActive != mqttServer.IsActive)
{
context.MqttServerConfig.IsActive = mqttServer.IsActive;
if (mqttServer.IsActive)
{
// 激活状态变为true连接服务器
await ConnectMqttServerAsync(mqttServer.Id, CancellationToken.None);
}
else
{
// 激活状态变为false断开服务器连接
await DisconnectMqttServerAsync(mqttServer.Id, CancellationToken.None);
}
}
break;
case MqttServerPropertyType.SubscribeTopic:
context.MqttServerConfig.SubscribeTopic = mqttServer.SubscribeTopic;
// 更新订阅主题
if (context.MqttService.IsConnected && !string.IsNullOrEmpty(mqttServer.SubscribeTopic))
{
await context.MqttService.SubscribeAsync(mqttServer.SubscribeTopic);
}
break;
default:
break;
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "处理MQTT服务器更新事件时发生错误: {MqttServerId} ({MqttServerName})",
mqttServer?.Id, mqttServer?.ServerName);
}
}
/// <summary>
/// 处理MQTT服务器删除事件
/// </summary>
private async Task HandleMqttServerRemoved(int mqttServerId)
{
try
{
_logger.LogInformation("处理MQTT服务器删除事件: {MqttServerId}", mqttServerId);
// 从监控列表中移除服务器
await RemoveMqttServerAsync(mqttServerId, CancellationToken.None);
}
catch (Exception ex)
{
_logger.LogError(ex, "处理MQTT服务器删除事件时发生错误: {MqttServerId}", mqttServerId);
}
}
/// <summary>
/// 释放资源
/// </summary>
@@ -345,6 +537,10 @@ namespace DMS.Infrastructure.Services.Mqtt
if (!_disposed)
{
_disposed = true;
// 取消事件订阅
_eventService.OnMqttServerChanged -= OnMqttServerChanged;
_semaphore?.Dispose();
// 断开所有MQTT连接
@@ -360,6 +556,9 @@ namespace DMS.Infrastructure.Services.Mqtt
context.MqttServerConfig?.ServerName ?? "Unknown");
}
}
// 清理处理更新状态的字典
_isProcessingUpdate.Clear();
_logger.LogInformation("MQTT服务管理器已释放资源");
}