diff --git a/DMS.Application/Services/EventService.cs b/DMS.Application/Services/EventService.cs index 205fe22..7ddf331 100644 --- a/DMS.Application/Services/EventService.cs +++ b/DMS.Application/Services/EventService.cs @@ -1,4 +1,3 @@ -using System; using DMS.Application.Events; using DMS.Application.Interfaces; using DMS.Core.Events; @@ -74,7 +73,7 @@ public class EventService : IEventService /// 变量值改变事件 /// public event EventHandler OnVariableChanged; - + /// /// 触发变量值改变事件 /// @@ -85,7 +84,7 @@ public class EventService : IEventService OnVariableChanged?.Invoke(sender, e); } - + /// /// 变量启停改变事件 /// @@ -159,7 +158,7 @@ public class EventService : IEventService { OnMqttServerChanged?.Invoke(sender, e); } - + #endregion #region 数据加载事件 diff --git a/DMS.Infrastructure/Services/Mqtt/MqttServiceManager.cs b/DMS.Infrastructure/Services/Mqtt/MqttServiceManager.cs index 3aadd83..ba2d018 100644 --- a/DMS.Infrastructure/Services/Mqtt/MqttServiceManager.cs +++ b/DMS.Infrastructure/Services/Mqtt/MqttServiceManager.cs @@ -44,8 +44,16 @@ namespace DMS.Infrastructure.Services.Mqtt _mapper = mapper ?? throw new ArgumentNullException(nameof(mapper)); _mqttContexts = new ConcurrentDictionary(); _semaphore = new SemaphoreSlim(10, 10); // 默认最大并发连接数为10 + + // 订阅MQTT服务器变更事件 + _eventService.OnMqttServerChanged += OnMqttServerChanged; } + /// + /// 标志是否正在处理事件,用于防止递归调用 + /// + private readonly ConcurrentDictionary _isProcessingUpdate = new(); + /// /// 初始化服务管理器 /// @@ -337,6 +345,190 @@ namespace DMS.Infrastructure.Services.Mqtt } } + /// + /// 处理MQTT服务器变更事件 + /// + private async void OnMqttServerChanged(object? sender, MqttServerChangedEventArgs e) + { + try + { + // 防止同一服务器的递归更新 + if (_isProcessingUpdate.ContainsKey(e.MqttServer.Id) && _isProcessingUpdate[e.MqttServer.Id]) + { + _logger.LogDebug("正在处理服务器 {MqttServerId} 的更新,跳过重复事件", e.MqttServer.Id); + return; + } + + _isProcessingUpdate[e.MqttServer.Id] = true; + + _logger.LogDebug("处理MQTT服务器变更事件: 服务器ID={MqttServerId}, 变更类型={ChangeType}, 变更属性={PropertyType}", + e.MqttServer.Id, e.ChangeType, e.PropertyType); + + switch (e.ChangeType) + { + case ActionChangeType.Added: + HandleMqttServerAdded(e.MqttServer); + break; + case ActionChangeType.Updated: + await HandleMqttServerUpdated(e.MqttServer, e.PropertyType); + break; + case ActionChangeType.Deleted: + await HandleMqttServerRemoved(e.MqttServer.Id); + break; + } + } + catch (Exception ex) + { + _logger.LogError(ex, "处理MQTT服务器变更事件时发生错误: 服务器ID={MqttServerId}, 变更类型={ChangeType}", + e.MqttServer.Id, e.ChangeType); + } + finally + { + _isProcessingUpdate.TryRemove(e.MqttServer.Id, out _); + } + } + + /// + /// 处理MQTT服务器添加事件 + /// + private void HandleMqttServerAdded(MqttServerDto mqttServer) + { + if (mqttServer == null) + { + _logger.LogWarning("HandleMqttServerAdded: 接收到空MQTT服务器对象"); + return; + } + + try + { + _logger.LogInformation("处理MQTT服务器添加事件: {MqttServerId} ({MqttServerName})", + mqttServer.Id, mqttServer.ServerName); + + // 将DTO转换为MqttServer实体 + var mqttServerEntity = _mapper.Map(mqttServer); + + // 添加服务器到监控列表 + AddMqttServer(mqttServerEntity); + + // 如果服务器是激活状态,则尝试连接 + if (mqttServer.IsActive) + { + // 使用Task.Run来避免阻塞事件处理 + _ = Task.Run(async () => + { + try + { + await ConnectMqttServerAsync(mqttServer.Id, CancellationToken.None); + } + catch (Exception ex) + { + _logger.LogError(ex, "连接新添加的MQTT服务器 {MqttServerId} ({MqttServerName}) 时发生错误", + mqttServer.Id, mqttServer.ServerName); + } + }); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "处理MQTT服务器添加事件时发生错误: {MqttServerId} ({MqttServerName})", + mqttServer?.Id, mqttServer?.ServerName); + } + } + + /// + /// 处理MQTT服务器更新事件 + /// + private async Task HandleMqttServerUpdated(MqttServerDto mqttServer, MqttServerPropertyType propertyType) + { + if (mqttServer == null) + { + _logger.LogWarning("HandleMqttServerUpdated: 接收到空MQTT服务器对象"); + return; + } + + try + { + _logger.LogInformation("处理MQTT服务器更新事件: {MqttServerId} ({MqttServerName}), 属性类型: {PropertyType}", + mqttServer.Id, mqttServer.ServerName, propertyType); + + // 检查上下文是否存在 + if (_mqttContexts.TryGetValue(mqttServer.Id, out var context)) + { + // 根据属性类型决定是否需要重新连接或特殊处理 + switch (propertyType) + { + case MqttServerPropertyType.ServerUrl: + case MqttServerPropertyType.Port: + case MqttServerPropertyType.Username: + case MqttServerPropertyType.Password: + case MqttServerPropertyType.ClientId: + // 这些属性修改后需要重新连接 + // 更新配置 + context.MqttServerConfig.ServerUrl = mqttServer.ServerUrl; + context.MqttServerConfig.Port = mqttServer.Port; + context.MqttServerConfig.Username = mqttServer.Username; + context.MqttServerConfig.Password = mqttServer.Password; + context.MqttServerConfig.ClientId = mqttServer.ClientId; + + // 重新连接服务器 + await ReconnectMqttServerAsync(mqttServer.Id, CancellationToken.None); + break; + case MqttServerPropertyType.IsActive: + // 检查当前激活状态和新激活状态是否一致 + if (context.MqttServerConfig.IsActive != mqttServer.IsActive) + { + context.MqttServerConfig.IsActive = mqttServer.IsActive; + + if (mqttServer.IsActive) + { + // 激活状态变为true,连接服务器 + await ConnectMqttServerAsync(mqttServer.Id, CancellationToken.None); + } + else + { + // 激活状态变为false,断开服务器连接 + await DisconnectMqttServerAsync(mqttServer.Id, CancellationToken.None); + } + } + break; + case MqttServerPropertyType.SubscribeTopic: + context.MqttServerConfig.SubscribeTopic = mqttServer.SubscribeTopic; + // 更新订阅主题 + if (context.MqttService.IsConnected && !string.IsNullOrEmpty(mqttServer.SubscribeTopic)) + { + await context.MqttService.SubscribeAsync(mqttServer.SubscribeTopic); + } + break; + default: + break; + } + } + } + catch (Exception ex) + { + _logger.LogError(ex, "处理MQTT服务器更新事件时发生错误: {MqttServerId} ({MqttServerName})", + mqttServer?.Id, mqttServer?.ServerName); + } + } + + /// + /// 处理MQTT服务器删除事件 + /// + private async Task HandleMqttServerRemoved(int mqttServerId) + { + try + { + _logger.LogInformation("处理MQTT服务器删除事件: {MqttServerId}", mqttServerId); + + // 从监控列表中移除服务器 + await RemoveMqttServerAsync(mqttServerId, CancellationToken.None); + } + catch (Exception ex) + { + _logger.LogError(ex, "处理MQTT服务器删除事件时发生错误: {MqttServerId}", mqttServerId); + } + } + /// /// 释放资源 /// @@ -345,6 +537,10 @@ namespace DMS.Infrastructure.Services.Mqtt if (!_disposed) { _disposed = true; + + // 取消事件订阅 + _eventService.OnMqttServerChanged -= OnMqttServerChanged; + _semaphore?.Dispose(); // 断开所有MQTT连接 @@ -360,6 +556,9 @@ namespace DMS.Infrastructure.Services.Mqtt context.MqttServerConfig?.ServerName ?? "Unknown"); } } + + // 清理处理更新状态的字典 + _isProcessingUpdate.Clear(); _logger.LogInformation("MQTT服务管理器已释放资源"); } diff --git a/DMS.WPF/ViewModels/MqttsViewModel.cs b/DMS.WPF/ViewModels/MqttsViewModel.cs index b91fe11..74008a4 100644 --- a/DMS.WPF/ViewModels/MqttsViewModel.cs +++ b/DMS.WPF/ViewModels/MqttsViewModel.cs @@ -64,6 +64,9 @@ public partial class MqttsViewModel : ViewModelBase _navigationService = navigationService; _notificationService = notificationService; + // Set static services for MqttServerItemViewModel + MqttServerItemViewModel.SetServices(_wpfDataService, _notificationService); + _mqttServeise = _dataStorageService.MqttServers.ToNotifyCollectionChanged(x=>x.Value); }