From ea18a6ac2c25ca098b41e9cc508e2c991d35e0e4 Mon Sep 17 00:00:00 2001 From: "David P.G" Date: Sun, 5 Oct 2025 12:11:04 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=A2=9E=E5=BC=BAMQTT=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1=E7=AE=A1=E7=90=86=E5=8A=9F=E8=83=BD=E5=B9=B6=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E8=AE=BE=E5=A4=87=E8=A7=86=E5=9B=BE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Interfaces/Database/IMqttAppService.cs | 10 +- .../Management/IMqttManagementService.cs | 16 +- .../Services/Database/MqttAppService.cs | 56 +++ .../Management/MqttManagementService.cs | 171 ++++++- DMS.WPF/Services/DataEventService.cs | 4 + DMS.WPF/Views/DevicesView.xaml | 447 +++++++++--------- 6 files changed, 468 insertions(+), 236 deletions(-) diff --git a/DMS.Application/Interfaces/Database/IMqttAppService.cs b/DMS.Application/Interfaces/Database/IMqttAppService.cs index 08fd279..c07c7ef 100644 --- a/DMS.Application/Interfaces/Database/IMqttAppService.cs +++ b/DMS.Application/Interfaces/Database/IMqttAppService.cs @@ -28,10 +28,18 @@ public interface IMqttAppService /// Task UpdateMqttServerAsync(MqttServerDto mqttServerDto); - + /// + /// 异步批量更新MQTT服务器。 + /// + Task UpdateMqttServersAsync(List mqttServerDtos); /// /// 异步根据ID删除一个MQTT服务器。 /// Task DeleteMqttServerAsync(int id); + + /// + /// 异步批量删除MQTT服务器。 + /// + Task DeleteMqttServersAsync(List ids); } \ No newline at end of file diff --git a/DMS.Application/Interfaces/Management/IMqttManagementService.cs b/DMS.Application/Interfaces/Management/IMqttManagementService.cs index 9ab3a53..54142c5 100644 --- a/DMS.Application/Interfaces/Management/IMqttManagementService.cs +++ b/DMS.Application/Interfaces/Management/IMqttManagementService.cs @@ -17,15 +17,25 @@ public interface IMqttManagementService /// /// 异步创建一个新的MQTT服务器。 /// - Task CreateMqttServerAsync(MqttServerDto mqttServerDto); + Task CreateMqttServerAsync(MqttServerDto mqttServerDto); /// /// 异步更新一个已存在的MQTT服务器。 /// - Task UpdateMqttServerAsync(MqttServerDto mqttServerDto); + Task UpdateMqttServerAsync(MqttServerDto mqttServerDto); + + /// + /// 异步批量更新MQTT服务器。 + /// + Task UpdateMqttServersAsync(List mqttServerDtos); /// /// 异步删除一个MQTT服务器。 /// - Task DeleteMqttServerAsync(int id); + Task DeleteMqttServerAsync(int id); + + /// + /// 异步批量删除MQTT服务器。 + /// + Task DeleteMqttServersAsync(List ids); } \ No newline at end of file diff --git a/DMS.Application/Services/Database/MqttAppService.cs b/DMS.Application/Services/Database/MqttAppService.cs index edf92c1..1a4dfa4 100644 --- a/DMS.Application/Services/Database/MqttAppService.cs +++ b/DMS.Application/Services/Database/MqttAppService.cs @@ -117,4 +117,60 @@ public class MqttAppService : IMqttAppService throw new ApplicationException($"删除MQTT服务器时发生错误,操作已回滚,错误信息:{ex.Message}", ex); } } + + /// + /// 异步批量更新MQTT服务器(事务性操作)。 + /// + /// 要更新的MQTT服务器数据传输对象列表。 + /// 成功更新的MQTT服务器数量。 + /// 如果批量更新MQTT服务器时发生错误。 + public async Task UpdateMqttServersAsync(List mqttServerDtos) + { + try + { + await _repoManager.BeginTranAsync(); + var count = 0; + foreach (var mqttServerDto in mqttServerDtos) + { + var mqttServer = await _repoManager.MqttServers.GetByIdAsync(mqttServerDto.Id); + if (mqttServer != null) + { + _mapper.Map(mqttServerDto, mqttServer); + await _repoManager.MqttServers.UpdateAsync(mqttServer); + count++; + } + } + await _repoManager.CommitAsync(); + return count; + } + catch (Exception ex) + { + await _repoManager.RollbackAsync(); + throw new ApplicationException("批量更新MQTT服务器时发生错误,操作已回滚。", ex); + } + } + + /// + /// 异步批量删除MQTT服务器(事务性操作)。 + /// + /// 要删除的MQTT服务器ID列表。 + /// 如果删除成功则为 true,否则为 false。 + /// 如果批量删除MQTT服务器时发生错误。 + public async Task DeleteMqttServersAsync(List ids) + { + try + { + if (ids == null || !ids.Any()) return true; + + await _repoManager.BeginTranAsync(); + var result = await _repoManager.MqttServers.DeleteByIdsAsync(ids); + await _repoManager.CommitAsync(); + return result > 0; + } + catch (Exception ex) + { + await _repoManager.RollbackAsync(); + throw new ApplicationException($"批量删除MQTT服务器时发生错误,操作已回滚,错误信息:{ex.Message}", ex); + } + } } \ No newline at end of file diff --git a/DMS.Application/Services/Management/MqttManagementService.cs b/DMS.Application/Services/Management/MqttManagementService.cs index cc7cd5e..7e73661 100644 --- a/DMS.Application/Services/Management/MqttManagementService.cs +++ b/DMS.Application/Services/Management/MqttManagementService.cs @@ -1,3 +1,5 @@ +using System.Collections.Concurrent; +using AutoMapper; using DMS.Application.DTOs; using DMS.Application.Events; using DMS.Application.Interfaces; @@ -15,12 +17,20 @@ public class MqttManagementService : IMqttManagementService private readonly IMqttAppService _mqttAppService; private readonly IAppDataStorageService _appDataStorageService; private readonly IEventService _eventService; + private readonly IMapper _mapper; + private readonly IDataProcessingService _dataProcessingService; - public MqttManagementService(IMqttAppService mqttAppService, IAppDataStorageService appDataStorageService, IEventService eventService) + public MqttManagementService(IMqttAppService mqttAppService, + IAppDataStorageService appDataStorageService, + IEventService eventService, + IMapper mapper, + IDataProcessingService dataProcessingService) { _mqttAppService = mqttAppService; _appDataStorageService = appDataStorageService; _eventService = eventService; + _mapper = mapper; + _dataProcessingService = dataProcessingService; } /// @@ -42,7 +52,7 @@ public class MqttManagementService : IMqttManagementService /// /// 异步创建一个新的MQTT服务器。 /// - public async Task CreateMqttServerAsync(MqttServerDto mqttServerDto) + public async Task CreateMqttServerAsync(MqttServerDto mqttServerDto) { var result = await _mqttAppService.CreateMqttServerAsync(mqttServerDto); @@ -52,42 +62,165 @@ public class MqttManagementService : IMqttManagementService mqttServerDto.Id = result; // 假设返回的ID是新创建的 if (_appDataStorageService.MqttServers.TryAdd(mqttServerDto.Id, mqttServerDto)) { - _eventService.RaiseMqttServerChanged(this, new MqttServerChangedEventArgs(ActionChangeType.Added, mqttServerDto)); + _eventService.RaiseMqttServerChanged( + this, new MqttServerChangedEventArgs(ActionChangeType.Added, mqttServerDto)); + } + } + + return mqttServerDto; + } + + /// + /// 异步更新一个已存在的MQTT服务器。 + /// + public async Task UpdateMqttServerAsync(MqttServerDto mqttServerDto) + { + return await UpdateMqttServersAsync(new List() { mqttServerDto }); + } + + /// + /// 异步批量更新MQTT服务器。 + /// + public async Task UpdateMqttServersAsync(List mqttServerDtos) + { + var result = await _mqttAppService.UpdateMqttServersAsync(mqttServerDtos); + + // 批量更新成功后,更新内存中的MQTT服务器 + if (result > 0 && mqttServerDtos != null) + { + foreach (var mqttServerDto in mqttServerDtos) + { + if (_appDataStorageService.MqttServers.TryGetValue(mqttServerDto.Id, out var mMqttServerDto)) + { + // 比较旧值和新值,确定哪个属性发生了变化 + var changedProperties = GetChangedProperties(mMqttServerDto, mqttServerDto); + + // 更新内存中的MQTT服务器 + _mapper.Map(mqttServerDto, mMqttServerDto); + + // 为每个发生变化的属性触发事件 + foreach (var property in changedProperties) + { + _eventService.RaiseMqttServerChanged( + this, new MqttServerChangedEventArgs(ActionChangeType.Updated, mMqttServerDto, property)); + } + + // 如果没有任何属性发生变化,至少触发一次更新事件 + if (changedProperties.Count == 0) + { + _eventService.RaiseMqttServerChanged( + this, new MqttServerChangedEventArgs(ActionChangeType.Updated, mMqttServerDto, MqttServerPropertyType.All)); + } + } + else + { + // 如果内存中不存在该MQTT服务器,则直接添加 + _appDataStorageService.MqttServers.TryAdd(mqttServerDto.Id, mqttServerDto); + _eventService.RaiseMqttServerChanged( + this, new MqttServerChangedEventArgs(ActionChangeType.Added, mqttServerDto, MqttServerPropertyType.All)); + } } } return result; } - /// - /// 异步更新一个已存在的MQTT服务器。 - /// - public async Task UpdateMqttServerAsync(MqttServerDto mqttServerDto) - { - await _mqttAppService.UpdateMqttServerAsync(mqttServerDto); - - // 更新成功后,更新内存中的MQTT服务器 - _appDataStorageService.MqttServers.AddOrUpdate(mqttServerDto.Id, mqttServerDto, (key, oldValue) => mqttServerDto); - _eventService.RaiseMqttServerChanged(this, new MqttServerChangedEventArgs(ActionChangeType.Updated, mqttServerDto)); - } - /// /// 异步删除一个MQTT服务器。 /// - public async Task DeleteMqttServerAsync(int id) + public async Task DeleteMqttServerAsync(int id) { var mqttServer = await _mqttAppService.GetMqttServerByIdAsync(id); // 获取MQTT服务器信息用于内存删除 - var result = await _mqttAppService.DeleteMqttServerAsync(id); + var result = await _mqttAppService.DeleteMqttServerAsync(id) > 0; // 删除成功后,从内存中移除MQTT服务器 - if (result>0) + if (result && mqttServer != null) { if (_appDataStorageService.MqttServers.TryRemove(id, out var mqttServerDto)) { - _eventService.RaiseMqttServerChanged(this, new MqttServerChangedEventArgs(ActionChangeType.Deleted, mqttServerDto)); + _eventService.RaiseMqttServerChanged( + this, new MqttServerChangedEventArgs(ActionChangeType.Deleted, mqttServerDto)); } } + + return result; } + /// + /// 异步批量删除MQTT服务器。 + /// + public async Task DeleteMqttServersAsync(List ids) + { + var result = await _mqttAppService.DeleteMqttServersAsync(ids); + + // 批量删除成功后,从内存中移除MQTT服务器 + if (result && ids != null) + { + foreach (var id in ids) + { + if (_appDataStorageService.MqttServers.TryRemove(id, out var mqttServerDto)) + { + _eventService.RaiseMqttServerChanged( + this, new MqttServerChangedEventArgs(ActionChangeType.Deleted, mqttServerDto)); + } + } + } + + return result; + } + /// + /// 获取发生变化的属性列表 + /// + /// 旧MQTT服务器值 + /// 新MQTT服务器值 + /// 发生变化的属性列表 + private List GetChangedProperties(MqttServerDto oldMqttServer, MqttServerDto newMqttServer) + { + var changedProperties = new List(); + + if (oldMqttServer.ServerName != newMqttServer.ServerName) + changedProperties.Add(MqttServerPropertyType.ServerName); + + if (oldMqttServer.ServerUrl != newMqttServer.ServerUrl) + changedProperties.Add(MqttServerPropertyType.ServerUrl); + + if (oldMqttServer.Port != newMqttServer.Port) + changedProperties.Add(MqttServerPropertyType.Port); + + if (oldMqttServer.IsConnect != newMqttServer.IsConnect) + changedProperties.Add(MqttServerPropertyType.IsConnect); + + if (oldMqttServer.Username != newMqttServer.Username) + changedProperties.Add(MqttServerPropertyType.Username); + + if (oldMqttServer.Password != newMqttServer.Password) + changedProperties.Add(MqttServerPropertyType.Password); + + if (oldMqttServer.IsActive != newMqttServer.IsActive) + changedProperties.Add(MqttServerPropertyType.IsActive); + + if (oldMqttServer.SubscribeTopic != newMqttServer.SubscribeTopic) + changedProperties.Add(MqttServerPropertyType.SubscribeTopic); + + if (oldMqttServer.PublishTopic != newMqttServer.PublishTopic) + changedProperties.Add(MqttServerPropertyType.PublishTopic); + + if (oldMqttServer.ClientId != newMqttServer.ClientId) + changedProperties.Add(MqttServerPropertyType.ClientId); + + if (oldMqttServer.MessageFormat != newMqttServer.MessageFormat) + changedProperties.Add(MqttServerPropertyType.MessageFormat); + + if (oldMqttServer.MessageHeader != newMqttServer.MessageHeader) + changedProperties.Add(MqttServerPropertyType.MessageHeader); + + if (oldMqttServer.MessageContent != newMqttServer.MessageContent) + changedProperties.Add(MqttServerPropertyType.MessageContent); + + if (oldMqttServer.MessageFooter != newMqttServer.MessageFooter) + changedProperties.Add(MqttServerPropertyType.MessageFooter); + + return changedProperties; + } } \ No newline at end of file diff --git a/DMS.WPF/Services/DataEventService.cs b/DMS.WPF/Services/DataEventService.cs index b789a7f..8eecac9 100644 --- a/DMS.WPF/Services/DataEventService.cs +++ b/DMS.WPF/Services/DataEventService.cs @@ -23,6 +23,7 @@ public class DataEventService : IDataEventService private readonly IMapper _mapper; private readonly IDataStorageService _dataStorageService; private readonly IEventService _eventService; + private readonly INotificationService _notificationService; private readonly IAppDataCenterService _appDataCenterService; private readonly IWPFDataService _wpfDataService; private readonly ILogger _logger; @@ -33,6 +34,7 @@ public class DataEventService : IDataEventService public DataEventService(IMapper mapper, IDataStorageService dataStorageService, IEventService eventService, + INotificationService notificationService, IAppDataCenterService appDataCenterService, IWPFDataService wpfDataService, ILogger logger) @@ -40,6 +42,7 @@ public class DataEventService : IDataEventService _mapper = mapper; _dataStorageService = dataStorageService; _eventService = eventService; + _notificationService = notificationService; _appDataCenterService = appDataCenterService; _wpfDataService = wpfDataService; _logger = logger; @@ -79,6 +82,7 @@ public class DataEventService : IDataEventService break; case MqttServerPropertyType.IsConnect: mqttServerItem.IsConnect=e.MqttServer.IsConnect; + _notificationService.ShowSuccess($"MQTT服务器:{mqttServerItem.ServerName},连接发生了变化,状态:{e.MqttServer.IsConnect}"); break; case MqttServerPropertyType.Username: break; diff --git a/DMS.WPF/Views/DevicesView.xaml b/DMS.WPF/Views/DevicesView.xaml index 422f688..22b9100 100644 --- a/DMS.WPF/Views/DevicesView.xaml +++ b/DMS.WPF/Views/DevicesView.xaml @@ -1,16 +1,17 @@ - + @@ -22,17 +23,19 @@ - + - + @@ -43,8 +46,7 @@ - + @@ -53,71 +55,80 @@ - - - + + + - + - - + + - + - - + + - - + + - + @@ -129,55 +140,58 @@ - - + + - + - + - - - - -