From a43b9780979e4a523f486426dc29e65e0ce883e5 Mon Sep 17 00:00:00 2001 From: "David P.G" Date: Wed, 10 Sep 2025 18:15:31 +0800 Subject: [PATCH] =?UTF-8?q?=E5=9F=BA=E6=9C=AC=E5=AE=8C=E6=88=90MQTT?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E7=9A=84=E5=8F=91=E9=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DMS.Application/DTOs/MqttServerDto.cs | 3 ++ .../Processors/MqttPublishProcessor.cs | 14 ++---- .../Services/IMqttServiceManager.cs | 2 +- DMS.Core/Models/MqttServer.cs | 15 +++++++ DMS.Infrastructure/Entities/DbMqttServer.cs | 18 ++++++++ .../Services/IMqttBackgroundService.cs | 2 +- .../Services/MqttBackgroundService.cs | 2 +- .../Services/MqttServiceManager.cs | 27 +++++++++--- DMS.WPF/Profiles/MappingProfile.cs | 1 + .../ViewModels/Dialogs/MqttDialogViewModel.cs | 5 ++- .../Items/MqttServerItemViewModel.cs | 9 ++++ DMS.WPF/Views/Dialogs/MqttDialog.xaml | 43 ++++++++++++++++++- DMS.WPF/Views/MqttServerDetailView.xaml | 9 ++++ 13 files changed, 127 insertions(+), 23 deletions(-) diff --git a/DMS.Application/DTOs/MqttServerDto.cs b/DMS.Application/DTOs/MqttServerDto.cs index 09f8036..d814289 100644 --- a/DMS.Application/DTOs/MqttServerDto.cs +++ b/DMS.Application/DTOs/MqttServerDto.cs @@ -22,5 +22,8 @@ public class MqttServerDto public DateTime? ConnectedAt { get; set; } public long ConnectionDuration { get; set; } public string MessageFormat { get; set; } + public string MessageHeader { get; set; } + public string MessageContent { get; set; } + public string MessageFooter { get; set; } public List VariableAliases { get; set; } = new(); } \ No newline at end of file diff --git a/DMS.Application/Services/Processors/MqttPublishProcessor.cs b/DMS.Application/Services/Processors/MqttPublishProcessor.cs index b7a36f5..f4d25ea 100644 --- a/DMS.Application/Services/Processors/MqttPublishProcessor.cs +++ b/DMS.Application/Services/Processors/MqttPublishProcessor.cs @@ -34,18 +34,12 @@ public class MqttPublishProcessor : IVariableProcessor } // 遍历所有关联的MQTT配置,并将其推入发送队列 - foreach (var variableMqttAlias in variable.MqttAliases) + foreach (var variableMqttAliasDto in variable.MqttAliases) { - // 创建VariableMqtt对象 - var variableMqtt = new VariableMqtt - { - Variable = _mapper.Map(variable), - Mqtt = variableMqttAlias.MqttServer, - MqttId = variableMqttAlias.MqttServerId - }; - // 发布变量数据到MQTT服务器 - await _mqttServiceManager.PublishVariableDataAsync(variableMqtt); + var variableMqttAlias = _mapper.Map(variableMqttAliasDto); + variableMqttAlias.Variable.DataValue=variable.DataValue; + await _mqttServiceManager.PublishVariableDataAsync(variableMqttAlias); } } } \ No newline at end of file diff --git a/DMS.Core/Interfaces/Services/IMqttServiceManager.cs b/DMS.Core/Interfaces/Services/IMqttServiceManager.cs index 68b8efc..3e6accc 100644 --- a/DMS.Core/Interfaces/Services/IMqttServiceManager.cs +++ b/DMS.Core/Interfaces/Services/IMqttServiceManager.cs @@ -55,7 +55,7 @@ namespace DMS.Core.Interfaces.Services /// /// 发布变量数据到MQTT服务器 /// - Task PublishVariableDataAsync(VariableMqtt variableMqtt, CancellationToken cancellationToken = default); + Task PublishVariableDataAsync(VariableMqttAlias variableMqtt, CancellationToken cancellationToken = default); /// /// 发布批量变量数据到MQTT服务器 diff --git a/DMS.Core/Models/MqttServer.cs b/DMS.Core/Models/MqttServer.cs index 2f15a09..dff6ce2 100644 --- a/DMS.Core/Models/MqttServer.cs +++ b/DMS.Core/Models/MqttServer.cs @@ -48,6 +48,21 @@ public class MqttServer /// public string MessageFormat { get; set; } + /// + /// 消息头格式。 + /// + public string MessageHeader { get; set; } + + /// + /// 消息内容格式。 + /// + public string MessageContent { get; set; } + + /// + /// 消息尾格式。 + /// + public string MessageFooter { get; set; } + /// /// 与此服务器关联的所有变量别名。通过此集合可以反向查找关联的变量。 /// diff --git a/DMS.Infrastructure/Entities/DbMqttServer.cs b/DMS.Infrastructure/Entities/DbMqttServer.cs index ec1c426..27411cc 100644 --- a/DMS.Infrastructure/Entities/DbMqttServer.cs +++ b/DMS.Infrastructure/Entities/DbMqttServer.cs @@ -84,4 +84,22 @@ public class DbMqttServer /// [SugarColumn(IsNullable = true)] public string MessageFormat { get; set; } + + /// + /// 消息头格式 + /// + [SugarColumn(IsNullable = true)] + public string MessageHeader { get; set; } + + /// + /// 消息内容格式 + /// + [SugarColumn(IsNullable = true)] + public string MessageContent { get; set; } + + /// + /// 消息尾格式 + /// + [SugarColumn(IsNullable = true)] + public string MessageFooter { get; set; } } \ No newline at end of file diff --git a/DMS.Infrastructure/Interfaces/Services/IMqttBackgroundService.cs b/DMS.Infrastructure/Interfaces/Services/IMqttBackgroundService.cs index b7c9f0b..f1dc587 100644 --- a/DMS.Infrastructure/Interfaces/Services/IMqttBackgroundService.cs +++ b/DMS.Infrastructure/Interfaces/Services/IMqttBackgroundService.cs @@ -44,7 +44,7 @@ namespace DMS.Infrastructure.Interfaces.Services /// /// 发布变量数据到MQTT服务器 /// - Task PublishVariableDataAsync(VariableMqtt variableMqtt, CancellationToken cancellationToken = default); + Task PublishVariableDataAsync(VariableMqttAlias variableMqtt, CancellationToken cancellationToken = default); /// /// 发布批量变量数据到MQTT服务器 diff --git a/DMS.Infrastructure/Services/MqttBackgroundService.cs b/DMS.Infrastructure/Services/MqttBackgroundService.cs index 6e15e2d..5172a0c 100644 --- a/DMS.Infrastructure/Services/MqttBackgroundService.cs +++ b/DMS.Infrastructure/Services/MqttBackgroundService.cs @@ -117,7 +117,7 @@ namespace DMS.Infrastructure.Services /// /// 发布变量数据到MQTT服务器 /// - public async Task PublishVariableDataAsync(VariableMqtt variableMqtt, CancellationToken cancellationToken = default) + public async Task PublishVariableDataAsync(VariableMqttAlias variableMqtt, CancellationToken cancellationToken = default) { await _mqttServiceManager.PublishVariableDataAsync(variableMqtt, cancellationToken); } diff --git a/DMS.Infrastructure/Services/MqttServiceManager.cs b/DMS.Infrastructure/Services/MqttServiceManager.cs index 8cdea04..93f3f65 100644 --- a/DMS.Infrastructure/Services/MqttServiceManager.cs +++ b/DMS.Infrastructure/Services/MqttServiceManager.cs @@ -5,6 +5,7 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; +using System.Text; using System.Threading; using System.Threading.Tasks; using DMS.Core.Models; @@ -215,17 +216,17 @@ namespace DMS.Infrastructure.Services /// /// 发布变量数据到MQTT服务器 /// - public async Task PublishVariableDataAsync(VariableMqtt variableMqtt, CancellationToken cancellationToken = default) + public async Task PublishVariableDataAsync(VariableMqttAlias variableMqtt, CancellationToken cancellationToken = default) { - if (variableMqtt?.Mqtt == null || variableMqtt.Variable == null) + if (variableMqtt?.MqttServer == null || variableMqtt.Variable == null) { _logger.LogWarning("无效的VariableMqtt对象,跳过发布"); return; } - if (!_mqttContexts.TryGetValue(variableMqtt.Mqtt.Id, out var context)) + if (!_mqttContexts.TryGetValue(variableMqtt.MqttServer.Id, out var context)) { - _logger.LogWarning("未找到MQTT服务器 {MqttServerId}", variableMqtt.Mqtt.Id); + _logger.LogWarning("未找到MQTT服务器 {MqttServerId}", variableMqtt.MqttServer.Id); return; } @@ -239,11 +240,11 @@ namespace DMS.Infrastructure.Services { var topic = context.MqttServer.PublishTopic; - var sendMsg = $"{variableMqtt.Variable.Name}:{variableMqtt.Variable.DataValue}"; + var sendMsg = BuildSendMessage(variableMqtt); await context.MqttService.PublishAsync(topic, sendMsg); - _logger.LogDebug("成功向MQTT服务器 {ServerName} 发布变量 {VariableName} 的数据", - context.MqttServer.ServerName, variableMqtt.Variable.Name); + _logger.LogDebug("成功向MQTT服务器 {ServerName} 发布变量 {VariableName} 的数据:{sendMsg}", + context.MqttServer.ServerName, variableMqtt.Variable.Name,sendMsg); } catch (Exception ex) { @@ -252,6 +253,18 @@ namespace DMS.Infrastructure.Services } } + private string BuildSendMessage(VariableMqttAlias variableMqtt) + { + StringBuilder sb = new StringBuilder(); + var now = DateTime.Now; + var timestamp = ((DateTimeOffset)now).ToUnixTimeMilliseconds(); + sb.Append(variableMqtt.MqttServer.MessageHeader.Replace("{timestamp}",timestamp.ToString())); + sb.Append(variableMqtt.MqttServer.MessageContent.Replace("{name}",variableMqtt.Alias).Replace("{value}",variableMqtt.Variable.DataValue)); + sb.Append(variableMqtt.MqttServer.MessageFooter); + + return sb.ToString(); + } + /// /// 发布批量变量数据到MQTT服务器 /// diff --git a/DMS.WPF/Profiles/MappingProfile.cs b/DMS.WPF/Profiles/MappingProfile.cs index 274b24c..083a794 100644 --- a/DMS.WPF/Profiles/MappingProfile.cs +++ b/DMS.WPF/Profiles/MappingProfile.cs @@ -18,6 +18,7 @@ namespace DMS.WPF.Profiles .ReverseMap(); CreateMap(); CreateMap().ReverseMap(); + CreateMap().ReverseMap(); CreateMap() diff --git a/DMS.WPF/ViewModels/Dialogs/MqttDialogViewModel.cs b/DMS.WPF/ViewModels/Dialogs/MqttDialogViewModel.cs index d01a9c7..30458bd 100644 --- a/DMS.WPF/ViewModels/Dialogs/MqttDialogViewModel.cs +++ b/DMS.WPF/ViewModels/Dialogs/MqttDialogViewModel.cs @@ -37,7 +37,10 @@ public partial class MqttDialogViewModel : DialogViewModelBase _variableAliases = new(); diff --git a/DMS.WPF/Views/Dialogs/MqttDialog.xaml b/DMS.WPF/Views/Dialogs/MqttDialog.xaml index c77c230..c38acfe 100644 --- a/DMS.WPF/Views/Dialogs/MqttDialog.xaml +++ b/DMS.WPF/Views/Dialogs/MqttDialog.xaml @@ -44,6 +44,9 @@ + + + @@ -102,10 +105,46 @@ Margin="0,15,0,0" hc:InfoElement.Title="订阅主题:" Text="{Binding MqttServer.SubscribeTopic, UpdateSourceTrigger=PropertyChanged}" /> - + + + + + + + + + + + + + + + + + + +