基本完成MQTT消息的发送

This commit is contained in:
2025-09-10 18:15:31 +08:00
parent a9ca89b44a
commit a43b978097
13 changed files with 127 additions and 23 deletions

View File

@@ -22,5 +22,8 @@ public class MqttServerDto
public DateTime? ConnectedAt { get; set; }
public long ConnectionDuration { get; set; }
public string MessageFormat { get; set; }
public string MessageHeader { get; set; }
public string MessageContent { get; set; }
public string MessageFooter { get; set; }
public List<VariableMqttAliasDto> VariableAliases { get; set; } = new();
}

View File

@@ -34,18 +34,12 @@ public class MqttPublishProcessor : IVariableProcessor
}
// 遍历所有关联的MQTT配置并将其推入发送队列
foreach (var variableMqttAlias in variable.MqttAliases)
foreach (var variableMqttAliasDto in variable.MqttAliases)
{
// 创建VariableMqtt对象
var variableMqtt = new VariableMqtt
{
Variable = _mapper.Map<Variable>(variable),
Mqtt = variableMqttAlias.MqttServer,
MqttId = variableMqttAlias.MqttServerId
};
// 发布变量数据到MQTT服务器
await _mqttServiceManager.PublishVariableDataAsync(variableMqtt);
var variableMqttAlias = _mapper.Map<VariableMqttAlias>(variableMqttAliasDto);
variableMqttAlias.Variable.DataValue=variable.DataValue;
await _mqttServiceManager.PublishVariableDataAsync(variableMqttAlias);
}
}
}

View File

@@ -55,7 +55,7 @@ namespace DMS.Core.Interfaces.Services
/// <summary>
/// 发布变量数据到MQTT服务器
/// </summary>
Task PublishVariableDataAsync(VariableMqtt variableMqtt, CancellationToken cancellationToken = default);
Task PublishVariableDataAsync(VariableMqttAlias variableMqtt, CancellationToken cancellationToken = default);
/// <summary>
/// 发布批量变量数据到MQTT服务器

View File

@@ -48,6 +48,21 @@ public class MqttServer
/// </summary>
public string MessageFormat { get; set; }
/// <summary>
/// 消息头格式。
/// </summary>
public string MessageHeader { get; set; }
/// <summary>
/// 消息内容格式。
/// </summary>
public string MessageContent { get; set; }
/// <summary>
/// 消息尾格式。
/// </summary>
public string MessageFooter { get; set; }
/// <summary>
/// 与此服务器关联的所有变量别名。通过此集合可以反向查找关联的变量。
/// </summary>

View File

@@ -84,4 +84,22 @@ public class DbMqttServer
/// </summary>
[SugarColumn(IsNullable = true)]
public string MessageFormat { get; set; }
/// <summary>
/// 消息头格式
/// </summary>
[SugarColumn(IsNullable = true)]
public string MessageHeader { get; set; }
/// <summary>
/// 消息内容格式
/// </summary>
[SugarColumn(IsNullable = true)]
public string MessageContent { get; set; }
/// <summary>
/// 消息尾格式
/// </summary>
[SugarColumn(IsNullable = true)]
public string MessageFooter { get; set; }
}

View File

@@ -44,7 +44,7 @@ namespace DMS.Infrastructure.Interfaces.Services
/// <summary>
/// 发布变量数据到MQTT服务器
/// </summary>
Task PublishVariableDataAsync(VariableMqtt variableMqtt, CancellationToken cancellationToken = default);
Task PublishVariableDataAsync(VariableMqttAlias variableMqtt, CancellationToken cancellationToken = default);
/// <summary>
/// 发布批量变量数据到MQTT服务器

View File

@@ -117,7 +117,7 @@ namespace DMS.Infrastructure.Services
/// <summary>
/// 发布变量数据到MQTT服务器
/// </summary>
public async Task PublishVariableDataAsync(VariableMqtt variableMqtt, CancellationToken cancellationToken = default)
public async Task PublishVariableDataAsync(VariableMqttAlias variableMqtt, CancellationToken cancellationToken = default)
{
await _mqttServiceManager.PublishVariableDataAsync(variableMqtt, cancellationToken);
}

View File

@@ -5,6 +5,7 @@ using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using DMS.Core.Models;
@@ -215,17 +216,17 @@ namespace DMS.Infrastructure.Services
/// <summary>
/// 发布变量数据到MQTT服务器
/// </summary>
public async Task PublishVariableDataAsync(VariableMqtt variableMqtt, CancellationToken cancellationToken = default)
public async Task PublishVariableDataAsync(VariableMqttAlias variableMqtt, CancellationToken cancellationToken = default)
{
if (variableMqtt?.Mqtt == null || variableMqtt.Variable == null)
if (variableMqtt?.MqttServer == null || variableMqtt.Variable == null)
{
_logger.LogWarning("无效的VariableMqtt对象跳过发布");
return;
}
if (!_mqttContexts.TryGetValue(variableMqtt.Mqtt.Id, out var context))
if (!_mqttContexts.TryGetValue(variableMqtt.MqttServer.Id, out var context))
{
_logger.LogWarning("未找到MQTT服务器 {MqttServerId}", variableMqtt.Mqtt.Id);
_logger.LogWarning("未找到MQTT服务器 {MqttServerId}", variableMqtt.MqttServer.Id);
return;
}
@@ -239,11 +240,11 @@ namespace DMS.Infrastructure.Services
{
var topic = context.MqttServer.PublishTopic;
var sendMsg = $"{variableMqtt.Variable.Name}:{variableMqtt.Variable.DataValue}";
var sendMsg = BuildSendMessage(variableMqtt);
await context.MqttService.PublishAsync(topic, sendMsg);
_logger.LogDebug("成功向MQTT服务器 {ServerName} 发布变量 {VariableName} 的数据",
context.MqttServer.ServerName, variableMqtt.Variable.Name);
_logger.LogDebug("成功向MQTT服务器 {ServerName} 发布变量 {VariableName} 的数据{sendMsg}",
context.MqttServer.ServerName, variableMqtt.Variable.Name,sendMsg);
}
catch (Exception ex)
{
@@ -252,6 +253,18 @@ namespace DMS.Infrastructure.Services
}
}
private string BuildSendMessage(VariableMqttAlias variableMqtt)
{
StringBuilder sb = new StringBuilder();
var now = DateTime.Now;
var timestamp = ((DateTimeOffset)now).ToUnixTimeMilliseconds();
sb.Append(variableMqtt.MqttServer.MessageHeader.Replace("{timestamp}",timestamp.ToString()));
sb.Append(variableMqtt.MqttServer.MessageContent.Replace("{name}",variableMqtt.Alias).Replace("{value}",variableMqtt.Variable.DataValue));
sb.Append(variableMqtt.MqttServer.MessageFooter);
return sb.ToString();
}
/// <summary>
/// 发布批量变量数据到MQTT服务器
/// </summary>

View File

@@ -18,6 +18,7 @@ namespace DMS.WPF.Profiles
.ReverseMap();
CreateMap<VariableItemViewModel, VariableItemViewModel>();
CreateMap<VariableMqttAliasDto, VariableMqttAliasItemViewModel>().ReverseMap();
CreateMap<VariableMqttAlias, VariableMqttAliasItemViewModel>().ReverseMap();
CreateMap<MenuBeanDto, MenuItemViewModel>()

View File

@@ -37,7 +37,10 @@ public partial class MqttDialogViewModel : DialogViewModelBase<MqttServerItemVie
CreatedAt = mqttServer.CreatedAt,
ConnectedAt = mqttServer.ConnectedAt,
ConnectionDuration = mqttServer.ConnectionDuration,
MessageFormat = mqttServer.MessageFormat
MessageFormat = mqttServer.MessageFormat,
MessageHeader = mqttServer.MessageHeader,
MessageContent = mqttServer.MessageContent,
MessageFooter = mqttServer.MessageFooter
};
}

View File

@@ -52,6 +52,15 @@ public partial class MqttServerItemViewModel : ObservableObject
[ObservableProperty]
private string _messageFormat;
[ObservableProperty]
private string _messageHeader;
[ObservableProperty]
private string _messageContent;
[ObservableProperty]
private string _messageFooter;
[ObservableProperty]
private ObservableCollection<VariableMqttAliasItemViewModel> _variableAliases = new();

View File

@@ -44,6 +44,9 @@
<RowDefinition Height="Auto" />
<RowDefinition Height="Auto" />
<RowDefinition Height="Auto" />
<RowDefinition Height="Auto" />
<RowDefinition Height="Auto" />
<RowDefinition Height="Auto" />
</Grid.RowDefinitions>
<!-- Row 0 -->
@@ -103,9 +106,45 @@
hc:InfoElement.Title="订阅主题:"
Text="{Binding MqttServer.SubscribeTopic, UpdateSourceTrigger=PropertyChanged}" />
<!-- Row 5 -->
<hc:TextBox
Grid.Row="4"
Grid.Column="0"
Grid.ColumnSpan="3"
Margin="0,15,0,0"
hc:InfoElement.Title="消息头:"
Text="{Binding MqttServer.MessageHeader, UpdateSourceTrigger=PropertyChanged}"
TextWrapping="Wrap"
AcceptsReturn="True"
VerticalContentAlignment="Top" />
<hc:TextBox
Grid.Row="5"
Grid.Column="0"
Grid.ColumnSpan="3"
Margin="0,15,0,0"
hc:InfoElement.Title="消息内容:"
Text="{Binding MqttServer.MessageContent, UpdateSourceTrigger=PropertyChanged}"
TextWrapping="Wrap"
AcceptsReturn="True"
VerticalContentAlignment="Top" />
<!-- Row 6 -->
<hc:TextBox
Grid.Row="6"
Grid.Column="0"
Grid.ColumnSpan="3"
Margin="0,15,0,0"
hc:InfoElement.Title="消息尾:"
Text="{Binding MqttServer.MessageFooter, UpdateSourceTrigger=PropertyChanged}"
TextWrapping="Wrap"
AcceptsReturn="True"
VerticalContentAlignment="Top" />
<!-- Row 4 -->
<CheckBox
Grid.Row="4"
Grid.Row="7"
Grid.Column="0"
Margin="0,20,0,0"
Content="是否启用"

View File

@@ -69,6 +69,15 @@
<!-- ItemsSource="{Binding Source={extensions:EnumBindingSourceExtension {x:Type enums:MqttPlatform}}}" -->
<!-- DisplayMemberPath="Description" -->
<!-- SelectedValuePath="Value"/> -->
<TextBlock Grid.Row="5" Grid.Column="0" Text="消息头:" Margin="0,0,5,0" VerticalAlignment="Center"/>
<TextBox Grid.Row="5" Grid.Column="1" Text="{Binding CurrentMqtt.MessageHeader, Mode=TwoWay, UpdateSourceTrigger=PropertyChanged}" Margin="0,0,10,0"/>
<TextBlock Grid.Row="5" Grid.Column="2" Text="消息内容:" Margin="0,0,5,0" VerticalAlignment="Center"/>
<TextBox Grid.Row="5" Grid.Column="3" Text="{Binding CurrentMqtt.MessageContent, Mode=TwoWay, UpdateSourceTrigger=PropertyChanged}"/>
<TextBlock Grid.Row="6" Grid.Column="0" Text="消息尾:" Margin="0,0,5,0" VerticalAlignment="Center"/>
<TextBox Grid.Row="6" Grid.Column="1" Text="{Binding CurrentMqtt.MessageFooter, Mode=TwoWay, UpdateSourceTrigger=PropertyChanged}" Margin="0,0,10,0"/>
</Grid>
<Button Content="保存更改" Command="{Binding SaveChangesCommand}" HorizontalAlignment="Right" Margin="0,10,0,0"/>
</StackPanel>