diff --git a/DMS.Application/DTOs/MqttServerDto.cs b/DMS.Application/DTOs/MqttServerDto.cs
index d814289..e09866c 100644
--- a/DMS.Application/DTOs/MqttServerDto.cs
+++ b/DMS.Application/DTOs/MqttServerDto.cs
@@ -12,6 +12,7 @@ public class MqttServerDto
public string ServerName { get; set; }
public string ServerUrl { get; set; }
public int Port { get; set; }
+ public bool IsConnect { get; set; }
public string Username { get; set; }
public string Password { get; set; }
public bool IsActive { get; set; }
diff --git a/DMS.Application/Events/MqttServerChangedEventArgs.cs b/DMS.Application/Events/MqttServerChangedEventArgs.cs
index c39060d..582aa20 100644
--- a/DMS.Application/Events/MqttServerChangedEventArgs.cs
+++ b/DMS.Application/Events/MqttServerChangedEventArgs.cs
@@ -11,28 +11,30 @@ namespace DMS.Application.Events
///
/// 变更类型
///
- public DataChangeType ChangeType { get; }
+ public ActionChangeType ChangeType { get; }
///
/// MQTT服务器DTO
///
public MqttServerDto MqttServer { get; }
-
+
///
- /// 变更时间
+ /// 发生变化的属性类型
///
- public DateTime ChangeTime { get; }
+ public MqttServerPropertyType PropertyType { get; }
+
///
/// 构造函数
///
/// 变更类型
/// MQTT服务器DTO
- public MqttServerChangedEventArgs(DataChangeType changeType, MqttServerDto mqttServer)
+ /// 发生变化的属性类型
+ public MqttServerChangedEventArgs(ActionChangeType changeType, MqttServerDto mqttServer, MqttServerPropertyType propertyType = MqttServerPropertyType.All)
{
ChangeType = changeType;
MqttServer = mqttServer;
- ChangeTime = DateTime.Now;
+ PropertyType = propertyType;
}
}
}
\ No newline at end of file
diff --git a/DMS.Application/Interfaces/Database/IMqttAppService.cs b/DMS.Application/Interfaces/Database/IMqttAppService.cs
index 5e8b2b6..08fd279 100644
--- a/DMS.Application/Interfaces/Database/IMqttAppService.cs
+++ b/DMS.Application/Interfaces/Database/IMqttAppService.cs
@@ -1,4 +1,5 @@
using DMS.Application.DTOs;
+using DMS.Core.Models;
namespace DMS.Application.Interfaces.Database;
@@ -27,8 +28,10 @@ public interface IMqttAppService
///
Task UpdateMqttServerAsync(MqttServerDto mqttServerDto);
+
+
///
- /// 异步删除一个MQTT服务器。
+ /// 异步根据ID删除一个MQTT服务器。
///
- Task DeleteMqttServerAsync(int id);
+ Task DeleteMqttServerAsync(int id);
}
\ No newline at end of file
diff --git a/DMS.Application/Interfaces/IEventService.cs b/DMS.Application/Interfaces/IEventService.cs
index b376d45..0b152da 100644
--- a/DMS.Application/Interfaces/IEventService.cs
+++ b/DMS.Application/Interfaces/IEventService.cs
@@ -67,6 +67,18 @@ public interface IEventService
/// MQTT连接状态改变事件参数
void RaiseMqttConnectionChanged(object sender, MqttConnectionChangedEventArgs e);
+ ///
+ /// MQTT服务器改变事件
+ ///
+ event EventHandler OnMqttServerChanged;
+
+ ///
+ /// 触发MQTT服务器改变事件
+ ///
+ /// 事件发送者
+ /// MQTT服务器改变事件参数
+ void RaiseMqttServerChanged(object sender, MqttServerChangedEventArgs e);
+
#endregion
diff --git a/DMS.Application/Interfaces/Management/IMqttManagementService.cs b/DMS.Application/Interfaces/Management/IMqttManagementService.cs
index c13ef7d..9ab3a53 100644
--- a/DMS.Application/Interfaces/Management/IMqttManagementService.cs
+++ b/DMS.Application/Interfaces/Management/IMqttManagementService.cs
@@ -28,19 +28,4 @@ public interface IMqttManagementService
/// 异步删除一个MQTT服务器。
///
Task DeleteMqttServerAsync(int id);
-
- ///
- /// 在内存中添加MQTT服务器
- ///
- void AddMqttServerToMemory(MqttServerDto mqttServerDto);
-
- ///
- /// 在内存中更新MQTT服务器
- ///
- void UpdateMqttServerInMemory(MqttServerDto mqttServerDto);
-
- ///
- /// 在内存中删除MQTT服务器
- ///
- void RemoveMqttServerFromMemory(int mqttServerId);
}
\ No newline at end of file
diff --git a/DMS.Application/Services/Database/MqttAppService.cs b/DMS.Application/Services/Database/MqttAppService.cs
index f6b2572..edf92c1 100644
--- a/DMS.Application/Services/Database/MqttAppService.cs
+++ b/DMS.Application/Services/Database/MqttAppService.cs
@@ -98,24 +98,23 @@ public class MqttAppService : IMqttAppService
}
}
+
///
- /// 异步删除一个MQTT服务器(事务性操作)。
+ /// 异步根据ID删除一个MQTT服务器(事务性操作)。
///
/// 要删除MQTT服务器的ID。
- /// 表示异步操作的任务。
+ /// 如果删除成功则为 true,否则为 false。
/// 如果删除MQTT服务器时发生错误。
- public async Task DeleteMqttServerAsync(int id)
+ public async Task DeleteMqttServerAsync(int id)
{
try
{
- await _repoManager.BeginTranAsync();
- await _repoManager.MqttServers.DeleteByIdAsync(id);
- await _repoManager.CommitAsync();
+ return await _repoManager.MqttServers.DeleteByIdAsync(id);
}
catch (Exception ex)
{
await _repoManager.RollbackAsync();
- throw new ApplicationException("删除MQTT服务器时发生错误,操作已回滚。", ex);
+ throw new ApplicationException($"删除MQTT服务器时发生错误,操作已回滚,错误信息:{ex.Message}", ex);
}
}
}
\ No newline at end of file
diff --git a/DMS.Application/Services/EventService.cs b/DMS.Application/Services/EventService.cs
index 0f48bde..4f487e2 100644
--- a/DMS.Application/Services/EventService.cs
+++ b/DMS.Application/Services/EventService.cs
@@ -145,5 +145,20 @@ public class EventService : IEventService
MqttConnectionChanged?.Invoke(sender, e);
}
+ ///
+ /// MQTT服务器改变事件
+ ///
+ public event EventHandler OnMqttServerChanged;
+
+ ///
+ /// 触发MQTT服务器改变事件
+ ///
+ /// 事件发送者
+ /// MQTT服务器改变事件参数
+ public void RaiseMqttServerChanged(object sender, MqttServerChangedEventArgs e)
+ {
+ OnMqttServerChanged?.Invoke(sender, e);
+ }
+
#endregion
}
\ No newline at end of file
diff --git a/DMS.Application/Services/Management/MqttManagementService.cs b/DMS.Application/Services/Management/MqttManagementService.cs
index 331eb3f..cc7cd5e 100644
--- a/DMS.Application/Services/Management/MqttManagementService.cs
+++ b/DMS.Application/Services/Management/MqttManagementService.cs
@@ -14,16 +14,13 @@ public class MqttManagementService : IMqttManagementService
{
private readonly IMqttAppService _mqttAppService;
private readonly IAppDataStorageService _appDataStorageService;
+ private readonly IEventService _eventService;
- ///
- /// 当MQTT服务器数据发生变化时触发
- ///
- public event EventHandler MqttServerChanged;
-
- public MqttManagementService(IMqttAppService mqttAppService,IAppDataStorageService appDataStorageService)
+ public MqttManagementService(IMqttAppService mqttAppService, IAppDataStorageService appDataStorageService, IEventService eventService)
{
_mqttAppService = mqttAppService;
_appDataStorageService = appDataStorageService;
+ _eventService = eventService;
}
///
@@ -47,7 +44,19 @@ public class MqttManagementService : IMqttManagementService
///
public async Task CreateMqttServerAsync(MqttServerDto mqttServerDto)
{
- return await _mqttAppService.CreateMqttServerAsync(mqttServerDto);
+ var result = await _mqttAppService.CreateMqttServerAsync(mqttServerDto);
+
+ // 创建成功后,将MQTT服务器添加到内存中
+ if (result > 0)
+ {
+ mqttServerDto.Id = result; // 假设返回的ID是新创建的
+ if (_appDataStorageService.MqttServers.TryAdd(mqttServerDto.Id, mqttServerDto))
+ {
+ _eventService.RaiseMqttServerChanged(this, new MqttServerChangedEventArgs(ActionChangeType.Added, mqttServerDto));
+ }
+ }
+
+ return result;
}
///
@@ -56,6 +65,10 @@ public class MqttManagementService : IMqttManagementService
public async Task UpdateMqttServerAsync(MqttServerDto mqttServerDto)
{
await _mqttAppService.UpdateMqttServerAsync(mqttServerDto);
+
+ // 更新成功后,更新内存中的MQTT服务器
+ _appDataStorageService.MqttServers.AddOrUpdate(mqttServerDto.Id, mqttServerDto, (key, oldValue) => mqttServerDto);
+ _eventService.RaiseMqttServerChanged(this, new MqttServerChangedEventArgs(ActionChangeType.Updated, mqttServerDto));
}
///
@@ -63,45 +76,18 @@ public class MqttManagementService : IMqttManagementService
///
public async Task DeleteMqttServerAsync(int id)
{
- await _mqttAppService.DeleteMqttServerAsync(id);
- }
-
- ///
- /// 在内存中添加MQTT服务器
- ///
- public void AddMqttServerToMemory(MqttServerDto mqttServerDto)
- {
- if (_appDataStorageService.MqttServers.TryAdd(mqttServerDto.Id, mqttServerDto))
+ var mqttServer = await _mqttAppService.GetMqttServerByIdAsync(id); // 获取MQTT服务器信息用于内存删除
+ var result = await _mqttAppService.DeleteMqttServerAsync(id);
+
+ // 删除成功后,从内存中移除MQTT服务器
+ if (result>0)
{
- OnMqttServerChanged(new MqttServerChangedEventArgs(DataChangeType.Added, mqttServerDto));
+ if (_appDataStorageService.MqttServers.TryRemove(id, out var mqttServerDto))
+ {
+ _eventService.RaiseMqttServerChanged(this, new MqttServerChangedEventArgs(ActionChangeType.Deleted, mqttServerDto));
+ }
}
}
- ///
- /// 在内存中更新MQTT服务器
- ///
- public void UpdateMqttServerInMemory(MqttServerDto mqttServerDto)
- {
- _appDataStorageService.MqttServers.AddOrUpdate(mqttServerDto.Id, mqttServerDto, (key, oldValue) => mqttServerDto);
- OnMqttServerChanged(new MqttServerChangedEventArgs(DataChangeType.Updated, mqttServerDto));
- }
- ///
- /// 在内存中删除MQTT服务器
- ///
- public void RemoveMqttServerFromMemory(int mqttServerId)
- {
- if (_appDataStorageService.MqttServers.TryRemove(mqttServerId, out var mqttServerDto))
- {
- OnMqttServerChanged(new MqttServerChangedEventArgs(DataChangeType.Deleted, mqttServerDto));
- }
- }
-
- ///
- /// 触发MQTT服务器变更事件
- ///
- protected virtual void OnMqttServerChanged(MqttServerChangedEventArgs e)
- {
- MqttServerChanged?.Invoke(this, e);
- }
}
\ No newline at end of file
diff --git a/DMS.Core/Enums/MqttServerPropertyType.cs b/DMS.Core/Enums/MqttServerPropertyType.cs
new file mode 100644
index 0000000..2a47326
--- /dev/null
+++ b/DMS.Core/Enums/MqttServerPropertyType.cs
@@ -0,0 +1,83 @@
+namespace DMS.Core.Enums
+{
+ ///
+ /// MQTT服务器属性类型枚举
+ ///
+ public enum MqttServerPropertyType
+ {
+ ///
+ /// 服务器名称
+ ///
+ ServerName,
+
+ ///
+ /// 服务器URL
+ ///
+ ServerUrl,
+
+ ///
+ /// 端口
+ ///
+ Port,
+
+ ///
+ /// 是否连接
+ ///
+ IsConnect,
+
+ ///
+ /// 用户名
+ ///
+ Username,
+
+ ///
+ /// 密码
+ ///
+ Password,
+
+ ///
+ /// 是否激活
+ ///
+ IsActive,
+
+ ///
+ /// 订阅主题
+ ///
+ SubscribeTopic,
+
+ ///
+ /// 发布主题
+ ///
+ PublishTopic,
+
+ ///
+ /// 客户端ID
+ ///
+ ClientId,
+
+ ///
+ /// 消息格式
+ ///
+ MessageFormat,
+
+ ///
+ /// 消息头
+ ///
+ MessageHeader,
+
+ ///
+ /// 消息内容
+ ///
+ MessageContent,
+
+ ///
+ /// 消息尾
+ ///
+ MessageFooter,
+
+ ///
+ /// 所有属性
+ ///
+ All
+ }
+}
\ No newline at end of file
diff --git a/DMS.Core/Models/MqttServer.cs b/DMS.Core/Models/MqttServer.cs
index dff6ce2..f4a31b8 100644
--- a/DMS.Core/Models/MqttServer.cs
+++ b/DMS.Core/Models/MqttServer.cs
@@ -12,6 +12,8 @@ public class MqttServer
public string Username { get; set; } // 用户名
public string Password { get; set; } // 密码
public bool IsActive { get; set; } // 是否启用
+ public bool IsConnect { get; set; } // 是否启用
+
///
/// MQTT订阅主题。
diff --git a/DMS.Infrastructure/Services/Mqtt/MqttBackgroundService.cs b/DMS.Infrastructure/Services/Mqtt/MqttBackgroundService.cs
index 1ca41ab..ef4d9a9 100644
--- a/DMS.Infrastructure/Services/Mqtt/MqttBackgroundService.cs
+++ b/DMS.Infrastructure/Services/Mqtt/MqttBackgroundService.cs
@@ -189,7 +189,7 @@ namespace DMS.Infrastructure.Services.Mqtt
foreach (var mqttServerDto in mqttServerDtos)
{
- // 将 MqttServerDto 转换为 MqttServer
+ // 将 MqttServerDto 转换为 MqttServerConfig
var mqttServer = new MqttServer
{
Id = mqttServerDto.Id,
diff --git a/DMS.Infrastructure/Services/Mqtt/MqttDeviceContext.cs b/DMS.Infrastructure/Services/Mqtt/MqttDeviceContext.cs
index 809d790..b0c2fb1 100644
--- a/DMS.Infrastructure/Services/Mqtt/MqttDeviceContext.cs
+++ b/DMS.Infrastructure/Services/Mqtt/MqttDeviceContext.cs
@@ -12,17 +12,13 @@ namespace DMS.Infrastructure.Services.Mqtt
///
/// MQTT服务器配置
///
- public MqttServer MqttServer { get; set; }
+ public MqttServer MqttServerConfig { get; set; }
///
/// MQTT服务实例
///
public IMqttService MqttService { get; set; }
- ///
- /// 连接状态
- ///
- public bool IsConnected { get; set; }
///
/// 重连尝试次数
diff --git a/DMS.Infrastructure/Services/Mqtt/MqttServiceManager.cs b/DMS.Infrastructure/Services/Mqtt/MqttServiceManager.cs
index 58859a0..3aadd83 100644
--- a/DMS.Infrastructure/Services/Mqtt/MqttServiceManager.cs
+++ b/DMS.Infrastructure/Services/Mqtt/MqttServiceManager.cs
@@ -1,7 +1,11 @@
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Text;
+using AutoMapper;
+using DMS.Application.DTOs;
+using DMS.Application.Events;
using DMS.Application.Interfaces;
+using DMS.Core.Enums;
using DMS.Core.Interfaces.Services;
using DMS.Core.Models;
using DMS.Infrastructure.Interfaces.Services;
@@ -18,6 +22,8 @@ namespace DMS.Infrastructure.Services.Mqtt
private readonly IDataProcessingService _dataProcessingService;
private readonly IAppDataCenterService _appDataCenterService;
private readonly IMqttServiceFactory _mqttServiceFactory;
+ private readonly IEventService _eventService;
+ private readonly IMapper _mapper;
private readonly ConcurrentDictionary _mqttContexts;
private readonly SemaphoreSlim _semaphore;
private bool _disposed = false;
@@ -26,12 +32,16 @@ namespace DMS.Infrastructure.Services.Mqtt
ILogger logger,
IDataProcessingService dataProcessingService,
IAppDataCenterService appDataCenterService,
- IMqttServiceFactory mqttServiceFactory)
+ IMqttServiceFactory mqttServiceFactory,
+ IEventService eventService,
+ IMapper mapper)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_dataProcessingService = dataProcessingService ?? throw new ArgumentNullException(nameof(dataProcessingService));
_appDataCenterService = appDataCenterService ?? throw new ArgumentNullException(nameof(appDataCenterService));
_mqttServiceFactory = mqttServiceFactory ?? throw new ArgumentNullException(nameof(mqttServiceFactory));
+ _eventService = eventService ?? throw new ArgumentNullException(nameof(eventService));
+ _mapper = mapper ?? throw new ArgumentNullException(nameof(mapper));
_mqttContexts = new ConcurrentDictionary();
_semaphore = new SemaphoreSlim(10, 10); // 默认最大并发连接数为10
}
@@ -56,13 +66,17 @@ namespace DMS.Infrastructure.Services.Mqtt
var context = new MqttDeviceContext
{
- MqttServer = mqttServer,
+ MqttServerConfig = mqttServer,
MqttService = _mqttServiceFactory.CreateService(),
- IsConnected = false
};
_mqttContexts.AddOrUpdate(mqttServer.Id, context, (key, oldValue) => context);
_logger.LogInformation("已添加MQTT服务器 {MqttServerId} 到监控列表", mqttServer.Id);
+
+ // 使用AutoMapper触发MQTT服务器改变事件
+ var mqttServerDto = _mapper.Map(mqttServer);
+
+ _eventService.RaiseMqttServerChanged(this, new MqttServerChangedEventArgs(Core.Enums.ActionChangeType.Added, mqttServerDto));
}
///
@@ -74,6 +88,11 @@ namespace DMS.Infrastructure.Services.Mqtt
{
await DisconnectMqttServerAsync(mqttServerId, cancellationToken);
_logger.LogInformation("已移除MQTT服务器 {MqttServerId} 的监控", mqttServerId);
+
+ // 使用AutoMapper触发MQTT服务器删除事件
+ var mqttServerDto = _mapper.Map(context.MqttServerConfig);
+
+ _eventService.RaiseMqttServerChanged(this, new MqttServerChangedEventArgs(Core.Enums.ActionChangeType.Deleted, mqttServerDto));
}
}
@@ -98,7 +117,7 @@ namespace DMS.Infrastructure.Services.Mqtt
///
public bool IsMqttServerConnected(int mqttServerId)
{
- return _mqttContexts.TryGetValue(mqttServerId, out var context) && context.IsConnected;
+ return _mqttContexts.TryGetValue(mqttServerId, out var context) && context.MqttService.IsConnected;
}
///
@@ -136,7 +155,7 @@ namespace DMS.Infrastructure.Services.Mqtt
try
{
_logger.LogInformation("正在连接MQTT服务器 {ServerName} ({ServerUrl}:{Port})",
- context.MqttServer.ServerName, context.MqttServer.ServerUrl, context.MqttServer.Port);
+ context.MqttServerConfig.ServerName, context.MqttServerConfig.ServerUrl, context.MqttServerConfig.Port);
var stopwatch = Stopwatch.StartNew();
@@ -145,40 +164,44 @@ namespace DMS.Infrastructure.Services.Mqtt
using var linkedToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutToken.Token);
await context.MqttService.ConnectAsync(
- context.MqttServer.ServerUrl,
- context.MqttServer.Port,
- context.MqttServer.ClientId,
- context.MqttServer.Username,
- context.MqttServer.Password);
+ context.MqttServerConfig.ServerUrl,
+ context.MqttServerConfig.Port,
+ context.MqttServerConfig.ClientId,
+ context.MqttServerConfig.Username,
+ context.MqttServerConfig.Password);
stopwatch.Stop();
_logger.LogInformation("MQTT服务器 {ServerName} 连接耗时 {ElapsedMs} ms",
- context.MqttServer.ServerName, stopwatch.ElapsedMilliseconds);
-
+ context.MqttServerConfig.ServerName, stopwatch.ElapsedMilliseconds);
if (context.MqttService.IsConnected)
{
- context.IsConnected = true;
context.ReconnectAttempts = 0; // 重置重连次数
-
+ context.MqttServerConfig.IsConnect=true;
// 订阅主题
- if (!string.IsNullOrEmpty(context.MqttServer.SubscribeTopic))
+ if (!string.IsNullOrEmpty(context.MqttServerConfig.SubscribeTopic))
{
- await context.MqttService.SubscribeAsync(context.MqttServer.SubscribeTopic);
+ await context.MqttService.SubscribeAsync(context.MqttServerConfig.SubscribeTopic);
}
-
- _logger.LogInformation("MQTT服务器 {ServerName} 连接成功", context.MqttServer.ServerName);
+
+ _logger.LogInformation("MQTT服务器 {ServerName} 连接成功", context.MqttServerConfig.ServerName);
+
+ //
}
else
{
- _logger.LogWarning("MQTT服务器 {ServerName} 连接失败", context.MqttServer.ServerName);
+ context.MqttServerConfig.IsConnect = false;
+ _logger.LogWarning("MQTT服务器 {ServerName} 连接失败", context.MqttServerConfig.ServerName);
}
+ //触发MQTT连接状态改变事件
+ _eventService.RaiseMqttServerChanged(this, new MqttServerChangedEventArgs(ActionChangeType.Updated, _mapper.Map(context.MqttServerConfig), MqttServerPropertyType.IsConnect));
}
catch (Exception ex)
{
_logger.LogError(ex, "连接MQTT服务器 {ServerName} 时发生错误: {ErrorMessage}",
- context.MqttServer.ServerName, ex.Message);
- context.IsConnected = false;
+ context.MqttServerConfig.ServerName, ex.Message);
context.ReconnectAttempts++;
+ context.MqttServerConfig.IsConnect = false;
+ _eventService.RaiseMqttServerChanged(this, new MqttServerChangedEventArgs(ActionChangeType.Updated, _mapper.Map(context.MqttServerConfig), MqttServerPropertyType.IsConnect));
}
finally
{
@@ -196,15 +219,18 @@ namespace DMS.Infrastructure.Services.Mqtt
try
{
- _logger.LogInformation("正在断开MQTT服务器 {ServerName} 的连接", context.MqttServer.ServerName);
+ _logger.LogInformation("正在断开MQTT服务器 {ServerName} 的连接", context.MqttServerConfig.ServerName);
await context.MqttService.DisconnectAsync();
- context.IsConnected = false;
- _logger.LogInformation("MQTT服务器 {ServerName} 连接已断开", context.MqttServer.ServerName);
+ _logger.LogInformation("MQTT服务器 {ServerName} 连接已断开", context.MqttServerConfig.ServerName);
+
+ // 如果连接状态从连接变为断开,触发事件
+ context.MqttServerConfig.IsConnect = false;
+ _eventService.RaiseMqttServerChanged(this, new MqttServerChangedEventArgs(ActionChangeType.Updated, _mapper.Map(context.MqttServerConfig), MqttServerPropertyType.IsConnect));
}
catch (Exception ex)
{
_logger.LogError(ex, "断开MQTT服务器 {ServerName} 连接时发生错误: {ErrorMessage}",
- context.MqttServer.ServerName, ex.Message);
+ context.MqttServerConfig.ServerName, ex.Message);
}
}
@@ -225,26 +251,26 @@ namespace DMS.Infrastructure.Services.Mqtt
return;
}
- if (!context.IsConnected)
+ if (!context.MqttService.IsConnected)
{
- _logger.LogWarning("MQTT服务器 {ServerName} 未连接,跳过发布", context.MqttServer.ServerName);
+ _logger.LogWarning("MQTT服务器 {ServerName} 未连接,跳过发布", context.MqttServerConfig.ServerName);
return;
}
try
{
- var topic = context.MqttServer.PublishTopic;
+ var topic = context.MqttServerConfig.PublishTopic;
var sendMsg = BuildSendMessage(variableMqtt);
await context.MqttService.PublishAsync(topic, sendMsg);
_logger.LogDebug("成功向MQTT服务器 {ServerName} 发布变量 {VariableName} 的数据:{sendMsg}",
- context.MqttServer.ServerName, variableMqtt.Variable.Name,sendMsg);
+ context.MqttServerConfig.ServerName, variableMqtt.Variable.Name, sendMsg);
}
catch (Exception ex)
{
_logger.LogError(ex, "向MQTT服务器 {ServerName} 发布变量 {VariableName} 数据时发生错误: {ErrorMessage}",
- context.MqttServer.ServerName, variableMqtt.Variable.Name, ex.Message);
+ context.MqttServerConfig.ServerName, variableMqtt.Variable.Name, ex.Message);
}
}
@@ -253,8 +279,8 @@ namespace DMS.Infrastructure.Services.Mqtt
StringBuilder sb = new StringBuilder();
var now = DateTime.Now;
var timestamp = ((DateTimeOffset)now).ToUnixTimeMilliseconds();
- sb.Append(variableMqtt.MqttServer.MessageHeader.Replace("{timestamp}",timestamp.ToString()));
- sb.Append(variableMqtt.MqttServer.MessageContent.Replace("{name}",variableMqtt.Alias).Replace("{value}",variableMqtt.Variable.DataValue));
+ sb.Append(variableMqtt.MqttServer.MessageHeader.Replace("{timestamp}", timestamp.ToString()));
+ sb.Append(variableMqtt.MqttServer.MessageContent.Replace("{name}", variableMqtt.Alias).Replace("{value}", variableMqtt.Variable.DataValue));
sb.Append(variableMqtt.MqttServer.MessageFooter);
return sb.ToString();
@@ -283,10 +309,10 @@ namespace DMS.Infrastructure.Services.Mqtt
continue;
}
- if (!context.IsConnected)
+ if (!context.MqttService.IsConnected)
{
- _logger.LogWarning("MQTT服务器 {ServerName} 未连接,跳过 {Count} 条消息",
- context.MqttServer.ServerName, group.Count());
+ _logger.LogWarning("MQTT服务器 {ServerName} 未连接,跳过 {Count} 条消息",
+ context.MqttServerConfig.ServerName, group.Count());
continue;
}
@@ -294,19 +320,19 @@ namespace DMS.Infrastructure.Services.Mqtt
{
foreach (var variableMqtt in group)
{
- var topic = context.MqttServer.PublishTopic;
+ var topic = context.MqttServerConfig.PublishTopic;
var payload = variableMqtt.Variable?.DataValue?.ToString() ?? string.Empty;
await context.MqttService.PublishAsync(topic, payload);
}
-
+
_logger.LogInformation("成功向MQTT服务器 {ServerName} 发布 {Count} 条变量数据",
- context.MqttServer.ServerName, group.Count());
+ context.MqttServerConfig.ServerName, group.Count());
}
catch (Exception ex)
{
_logger.LogError(ex, "向MQTT服务器 {ServerName} 批量发布变量数据时发生错误: {ErrorMessage}",
- context.MqttServer.ServerName, ex.Message);
+ context.MqttServerConfig.ServerName, ex.Message);
}
}
}
@@ -320,7 +346,7 @@ namespace DMS.Infrastructure.Services.Mqtt
{
_disposed = true;
_semaphore?.Dispose();
-
+
// 断开所有MQTT连接
foreach (var context in _mqttContexts.Values)
{
@@ -331,10 +357,10 @@ namespace DMS.Infrastructure.Services.Mqtt
catch (Exception ex)
{
_logger.LogError(ex, "断开MQTT服务器 {ServerName} 连接时发生错误",
- context.MqttServer?.ServerName ?? "Unknown");
+ context.MqttServerConfig?.ServerName ?? "Unknown");
}
}
-
+
_logger.LogInformation("MQTT服务管理器已释放资源");
}
}
diff --git a/DMS.WPF/Configurations/nlog.config b/DMS.WPF/Configurations/nlog.config
index 7f3648b..756459c 100644
--- a/DMS.WPF/Configurations/nlog.config
+++ b/DMS.WPF/Configurations/nlog.config
@@ -20,6 +20,11 @@
+
+
+
+
diff --git a/DMS.WPF/Helper/DataServicesHelper.cs b/DMS.WPF/Helper/DataServicesHelper.cs
deleted file mode 100644
index 732f7dd..0000000
--- a/DMS.WPF/Helper/DataServicesHelper.cs
+++ /dev/null
@@ -1,127 +0,0 @@
-
-using DMS.Core.Enums;
-using DMS.Core.Models;
-using DMS.WPF.ViewModels;
-using DMS.WPF.ViewModels;
-using Microsoft.Extensions.DependencyInjection;
-
-namespace DMS.WPF.Helper;
-
-public class DataServicesHelper
-{
-
- ///
- /// 从设备列表中找到变量表VarTable对象
- ///
- /// VarTable的ID
- /// 如果找到择返回对象,否则返回null
- public static VariableTable FindVarTableForDevice(List devices, int vtableId)
- {
- VariableTable varTable = null;
- foreach (var device in devices)
- {
- varTable = device.VariableTables.FirstOrDefault(v => v.Id == vtableId);
- if (varTable != null)
- return varTable;
- }
-
- return varTable;
- }
-
-
-
-
- public static MenuBean FindMenusForDevice(Device device, IEnumerable menus)
- {
- // if (menus == null)
- // {
- // return null;
- // }
- //
- // foreach (var menu in menus)
- // {
- // // 检查当前菜单项是否匹配
- // if (menu.Type==MenuType.DeviceMenu && menu.DataId ==device.Id)
- // {
- // return menu;
- // }
- //
- // // 递归搜索子菜单
- // var foundInSubMenu = FindMenusForDevice(device, menu.Items);
- // if (foundInSubMenu != null)
- // {
- // return foundInSubMenu;
- // }
- // }
-
- return null;
- }
-
- ///
- /// 给菜单排序
- ///
- ///
- public static void SortMenus(MenuBean menu)
- {
- // if (menu.Items == null || menu.Items.Count() == 0)
- // return;
- // menu.Items.Sort((a, b) =>
- // a.Type.ToString().Length.CompareTo(b.Type.ToString().Length)
- // );
- // foreach (var menuItem in menu.Items)
- // {
- // SortMenus(menuItem);
- // }
- }
-
- public static ViewModelBase GetMainViewModel(string name)
- {
- ViewModelBase navgateVM = App.Current.Services.GetRequiredService();
- switch (name)
- {
- case "主页":
- navgateVM = App.Current.Services.GetRequiredService();
- break;
- case "设备":
- navgateVM = App.Current.Services.GetRequiredService();
- break;
- case "Mqtt服务器":
- navgateVM = App.Current.Services.GetRequiredService();
- break;
- case "数据转换":
- navgateVM = App.Current.Services.GetRequiredService();
- break;
- case "设置":
- navgateVM = App.Current.Services.GetRequiredService();
- break;
- }
-
- return navgateVM;
- }
-
- public static MenuBean FindVarTableMenu(int varTableId, List menus)
- {
- // if (menus == null)
- // {
- // return null;
- // }
- //
- // foreach (var menu in menus)
- // {
- // // 检查当前菜单项是否匹配
- // if (menu.Type==MenuType.VariableTableMenu && menu.DataId ==varTableId)
- // {
- // return menu;
- // }
- //
- // // 递归搜索子菜单
- // var foundInSubMenu = FindVarTableMenu(varTableId, menu.Items);
- // if (foundInSubMenu != null)
- // {
- // return foundInSubMenu;
- // }
- // }
-
- return null;
- }
-}
diff --git a/DMS.WPF/Helper/MessageHelper.cs b/DMS.WPF/Helper/MessageHelper.cs
deleted file mode 100644
index 7655a93..0000000
--- a/DMS.WPF/Helper/MessageHelper.cs
+++ /dev/null
@@ -1,31 +0,0 @@
-using CommunityToolkit.Mvvm.Messaging;
-using DMS.Core.Enums;
-using DMS.Message;
-using DMS.WPF.ViewModels;
-
-namespace DMS.WPF.Helper;
-
-public class MessageHelper
-{
- public static void Send(T message) where T : class
- {
- WeakReferenceMessenger.Default.Send(message);
- }
- ///
- /// 发送加载消息
- ///
- /// 加载的类型,如菜单
- public static void SendLoadMessage(LoadTypes loadType)
- {
- WeakReferenceMessenger.Default.Send(new LoadMessage(loadType));
- }
- ///
- /// 发送导航消息
- ///
- /// 导航View的ViewModel
- /// 带的参数
- public static void SendNavgatorMessage(ViewModelBase vm)
- {
- WeakReferenceMessenger.Default.Send(new NavgatorMessage(vm));
- }
-}
\ No newline at end of file
diff --git a/DMS.WPF/Helper/SiemensHelper.cs b/DMS.WPF/Helper/SiemensHelper.cs
deleted file mode 100644
index 86ee237..0000000
--- a/DMS.WPF/Helper/SiemensHelper.cs
+++ /dev/null
@@ -1,70 +0,0 @@
-namespace DMS.Helper;
-
-///
-/// 西门子帮助类
-///
-public static class SiemensHelper
-{
- ///
- /// 将S7数据类型字符串转换为C#数据类型字符串
- ///
- /// S7数据类型字符串
- /// 对应的C#数据类型字符串
- public static string S7ToCSharpTypeString(string s7Type)
- {
- switch (s7Type.ToUpper())
- {
- case "BOOL":
- return "bool";
- case "BYTE":
- return "byte";
- case "WORD":
- return "ushort";
- case "DWORD":
- return "uint";
- case "INT":
- return "short";
- case "DINT":
- return "int";
- case "REAL":
- return "float";
- case "LREAL":
- return "double";
- case "CHAR":
- return "char";
- case "STRING":
- return "string";
- case "TIMER":
- case "TIME":
- return "TimeSpan";
- case "COUNTER":
- return "ushort";
- case "DATE":
- return "DateTime";
- case "TIME_OF_DAY":
- case "TOD":
- return "DateTime";
- case "DATE_AND_TIME":
- case "DT":
- return "DateTime";
- default:
- return "object";
- }
- }
-
- ///
- /// 将S7读取到的值转换为显示值
- ///
- /// S7读取到的原始值
- /// 变量的数据类型
- /// 转换规则
- /// 显示值
- public static string ConvertS7Value(object value, string dataType, string conversion)
- {
- if (value == null) return string.Empty;
-
- // For now, a simple conversion to string. More complex logic can be added here.
- // Based on dataType and conversion, you might parse, format, or apply formulas.
- return value.ToString();
- }
-}
\ No newline at end of file
diff --git a/DMS.WPF/Interfaces/IDataStorageService.cs b/DMS.WPF/Interfaces/IDataStorageService.cs
index 0e7cece..15a9a34 100644
--- a/DMS.WPF/Interfaces/IDataStorageService.cs
+++ b/DMS.WPF/Interfaces/IDataStorageService.cs
@@ -25,7 +25,7 @@ public interface IDataStorageService
///
/// MQTT服务器列表。
///
- ObservableCollection MqttServers { get; set; }
+ ObservableDictionary MqttServers { get; set; }
///
/// 菜单列表。
diff --git a/DMS.WPF/Services/DataEventService.cs b/DMS.WPF/Services/DataEventService.cs
index b54898f..2325507 100644
--- a/DMS.WPF/Services/DataEventService.cs
+++ b/DMS.WPF/Services/DataEventService.cs
@@ -48,13 +48,74 @@ public class DataEventService : IDataEventService
// 监听变量值变更事件
_eventService.OnVariableValueChanged += OnVariableValueChanged;
+ _eventService.OnMqttServerChanged += OnMqttServerChanged;
_appDataCenterService.DataLoaderService.OnLoadDataCompleted += OnLoadDataCompleted;
// 监听日志变更事件
// _appDataCenterService.OnLogChanged += _logDataService.OnNlogChanged;
_logger?.LogInformation("DataEventService 初始化完成");
}
-
+
+ private void OnMqttServerChanged(object? sender, MqttServerChangedEventArgs e)
+ {
+ _logger?.LogDebug("接收到Mqtt服务器状态发生了改变,服务器名称:{mqttName}属性: {mqttProperty}",
+ e.MqttServer.ServerName, e.PropertyType);
+
+ // 在UI线程上更新变量值
+ App.Current.Dispatcher.BeginInvoke(new Action(() =>
+ {
+ //// 查找并更新对应的变量
+ if (_dataStorageService.MqttServers.TryGetValue(e.MqttServer.Id, out var mqttServerItem))
+ {
+ if (e.ChangeType == ActionChangeType.Updated)
+ {
+ switch (e.PropertyType)
+ {
+ case MqttServerPropertyType.ServerName:
+ break;
+ case MqttServerPropertyType.ServerUrl:
+ break;
+ case MqttServerPropertyType.Port:
+ break;
+ case MqttServerPropertyType.IsConnect:
+ mqttServerItem.IsConnect=e.MqttServer.IsConnect;
+ break;
+ case MqttServerPropertyType.Username:
+ break;
+ case MqttServerPropertyType.Password:
+ break;
+ case MqttServerPropertyType.IsActive:
+ break;
+ case MqttServerPropertyType.SubscribeTopic:
+ break;
+ case MqttServerPropertyType.PublishTopic:
+ break;
+ case MqttServerPropertyType.ClientId:
+ break;
+ case MqttServerPropertyType.MessageFormat:
+ break;
+ case MqttServerPropertyType.MessageHeader:
+ break;
+ case MqttServerPropertyType.MessageContent:
+ break;
+ case MqttServerPropertyType.MessageFooter:
+ break;
+ case MqttServerPropertyType.All:
+ break;
+ default:
+ break;
+ }
+ }
+
+ }
+ else
+ {
+ _logger?.LogWarning("在Mqtt服务器队列中找不到ID为 {MqttServer} 的变量,无法更新值", e.MqttServer.Id);
+ }
+ }));
+
+ }
+
private void OnLoadDataCompleted(object? sender, DataLoadCompletedEventArgs e)
{
_logger?.LogDebug("接收到数据加载完成事件,成功: {IsSuccess}", e.IsSuccess);
diff --git a/DMS.WPF/Services/DataStorageService.cs b/DMS.WPF/Services/DataStorageService.cs
index a536a84..e24270d 100644
--- a/DMS.WPF/Services/DataStorageService.cs
+++ b/DMS.WPF/Services/DataStorageService.cs
@@ -28,7 +28,7 @@ public class DataStorageService : IDataStorageService
///
/// MQTT服务器列表。
///
- public ObservableCollection MqttServers { get; set; }
+ public ObservableDictionary MqttServers { get; set; }
///
/// 菜单列表。
@@ -55,7 +55,7 @@ public class DataStorageService : IDataStorageService
Devices=new ObservableDictionary();
VariableTables = new ObservableDictionary();
Variables=new ObservableDictionary();
- MqttServers=new ObservableCollection();
+ MqttServers=new ObservableDictionary();
Menus=new ObservableCollection();
MenuTrees=new ObservableCollection();
Nlogs=new ObservableCollection();
diff --git a/DMS.WPF/Services/MqttDataService.cs b/DMS.WPF/Services/MqttDataService.cs
index 621bc85..1958503 100644
--- a/DMS.WPF/Services/MqttDataService.cs
+++ b/DMS.WPF/Services/MqttDataService.cs
@@ -41,7 +41,11 @@ public class MqttDataService : IMqttDataService
try
{
// 加载MQTT服务器数据
- _dataStorageService.MqttServers = _mapper.Map>(_appDataStorageService.MqttServers.Values);
+ foreach (var mqttServerDto in _appDataStorageService.MqttServers.Values)
+ {
+ _dataStorageService.MqttServers.TryAdd(mqttServerDto.Id,_mapper.Map(mqttServerDto));
+ }
+
}
catch (Exception ex)
{
@@ -61,7 +65,7 @@ public class MqttDataService : IMqttDataService
dto.Id = id;
var mqttServerItem = _mapper.Map(dto);
- _dataStorageService.MqttServers.Add(mqttServerItem);
+ _dataStorageService.MqttServers.Add(mqttServerItem.Id,mqttServerItem);
return mqttServerItem;
}
@@ -82,7 +86,7 @@ public class MqttDataService : IMqttDataService
public async Task DeleteMqttServer(MqttServerItemViewModel mqttServer)
{
await _mqttAppService.DeleteMqttServerAsync(mqttServer.Id);
- _dataStorageService.MqttServers.Remove(mqttServer);
+ _dataStorageService.MqttServers.Remove(mqttServer.Id);
return true;
}
}
\ No newline at end of file
diff --git a/DMS.WPF/ViewModels/MqttsViewModel.cs b/DMS.WPF/ViewModels/MqttsViewModel.cs
index 38847f3..b91fe11 100644
--- a/DMS.WPF/ViewModels/MqttsViewModel.cs
+++ b/DMS.WPF/ViewModels/MqttsViewModel.cs
@@ -1,4 +1,3 @@
-using System.Collections.ObjectModel;
using AutoMapper;
using CommunityToolkit.Mvvm.ComponentModel;
using CommunityToolkit.Mvvm.Input;
@@ -12,6 +11,8 @@ using DMS.WPF.Services;
using DMS.WPF.ViewModels.Dialogs;
using DMS.WPF.ViewModels.Items;
using Microsoft.Extensions.Logging;
+using ObservableCollections;
+using System.Collections.ObjectModel;
namespace DMS.WPF.ViewModels;
@@ -29,10 +30,11 @@ public partial class MqttsViewModel : ViewModelBase
private readonly INotificationService _notificationService;
///
- /// MQTT服务器列表。
+ /// 设备列表。
///
[ObservableProperty]
- private ObservableCollection _mqtts;
+ private INotifyCollectionChangedSynchronizedViewList _mqttServeise;
+
///
/// 当前选中的MQTT服务器。
@@ -61,8 +63,8 @@ public partial class MqttsViewModel : ViewModelBase
_mapper = mapper;
_navigationService = navigationService;
_notificationService = notificationService;
-
- Mqtts = _dataStorageService.MqttServers;
+
+ _mqttServeise = _dataStorageService.MqttServers.ToNotifyCollectionChanged(x=>x.Value);
}
///
diff --git a/DMS.WPF/ViewModels/VariableHistoryViewModel.cs b/DMS.WPF/ViewModels/VariableHistoryViewModel.cs
index d566e24..4af7bd6 100644
--- a/DMS.WPF/ViewModels/VariableHistoryViewModel.cs
+++ b/DMS.WPF/ViewModels/VariableHistoryViewModel.cs
@@ -118,6 +118,11 @@ partial class VariableHistoryViewModel : ViewModelBase, INavigatable
private void OnVariableValueChanged(object? sender, VariableValueChangedEventArgs e)
{
+ if (CurrentVariable is null)
+ {
+ return;
+ }
+
if (e.Variable.Id != CurrentVariable.Id)
{
return;
diff --git a/DMS.WPF/Views/Dialogs/MqttDialog.xaml b/DMS.WPF/Views/Dialogs/MqttDialog.xaml
index c38acfe..cf311f7 100644
--- a/DMS.WPF/Views/Dialogs/MqttDialog.xaml
+++ b/DMS.WPF/Views/Dialogs/MqttDialog.xaml
@@ -1,28 +1,28 @@
-
+
-
+
-
+
-
+
@@ -41,116 +44,117 @@
-
-
-
+
+
+
-
-
-
-
+
+
+
+
-
-
+
+
-
-
+
+
-
-
+
+
-
-
+
+
-
-
+
+
-
-
+
+
-
-
+
+
-
-
+
+
-
-
+
+
-
-
-
+
+
+
@@ -160,58 +164,54 @@
-
-
-
-
+
+
+
+
-
-
+
+
-
-
+
+
-
-
+
+
-
+
-
+
-
+
\ No newline at end of file