1 实现 MQTT 变量数据发布功能

2
   3 - 迁移 IMqttServiceManager 接口到 DMS.Core
   4 - 在 DataCenterService 中添加 MQTT 服务器和变量别名的加载逻辑
   5 - 实现 MqttPublishProcessor 的核心处理逻辑
   6 - 为 DTO 和 ViewModel 的 MqttAliases 属性提供默认空列表初始化
   7 - 更新 AutoMapper 映射配置以支持 VariableMqttAliasDto
This commit is contained in:
2025-09-07 08:51:18 +08:00
parent be16e1a5d7
commit 1f9c0a1111
11 changed files with 122 additions and 44 deletions

View File

@@ -90,7 +90,6 @@ public class DataCenterService : IDataCenterService
/// </summary>
public event EventHandler<MqttServerChangedEventArgs> MqttServerChanged;
/// <summary>
/// 当变量值发生变化时触发
@@ -202,10 +201,9 @@ public class DataCenterService : IDataCenterService
/// </summary>
public void RemoveDeviceFromMemory(int deviceId)
{
if (Devices.TryGetValue(deviceId, out var deviceDto))
{
foreach (var variableTable in deviceDto.VariableTables)
foreach (var variableTable in deviceDto.VariableTables)
{
foreach (var variable in variableTable.Variables)
{
@@ -216,7 +214,7 @@ public class DataCenterService : IDataCenterService
}
Devices.TryRemove(deviceId, out _);
OnDeviceChanged(new DeviceChangedEventArgs(DataChangeType.Deleted, deviceDto));
}
}
@@ -317,7 +315,6 @@ public class DataCenterService : IDataCenterService
{
deviceDto = device;
device.VariableTables.Remove(variableTableDto);
}
OnVariableTableChanged(new VariableTableChangedEventArgs(
@@ -663,9 +660,11 @@ public class DataCenterService : IDataCenterService
// 加载所有菜单
var menus = await _repositoryManager.Menus.GetAllAsync();
var menuDtos = _mapper.Map<List<MenuBeanDto>>(menus);
var mqttServers = await LoadAllMqttServersAsync();
var variableMqttAliases = await _repositoryManager.VariableMqttAliases.GetAllAsync();
// 建立设备与变量表的关联
foreach (var deviceDto in deviceDtos)
{
@@ -691,6 +690,12 @@ public class DataCenterService : IDataCenterService
// 将变量表添加到安全字典
VariableTables.TryAdd(variableTableDto.Id, variableTableDto);
}
// 加载MQTT服务器数据到内存
foreach (var mqttServer in mqttServers)
{
MqttServers.TryAdd(mqttServer.Id, mqttServer);
}
// 将变量添加到安全字典
foreach (var variableDto in variableDtos)
@@ -699,6 +704,22 @@ public class DataCenterService : IDataCenterService
{
variableDto.VariableTable = variableTable;
}
// var alises= variableMqttAliases.FirstOrDefault(vm => vm.VariableId == variableDto.Id);
// if (alises != null)
// {
//
// var variableMqttAliasDto = _mapper.Map<VariableMqttAliasDto>(alises);
// variableMqttAliasDto.Variable = _mapper.Map<Variable>(variableDto) ;
// if (MqttServers.TryGetValue(variableMqttAliasDto.MqttServerId, out var mqttServerDto))
// {
// variableMqttAliasDto.MqttServer = _mapper.Map<MqttServer>(mqttServerDto) ;
// variableMqttAliasDto.MqttServerName = variableMqttAliasDto.MqttServer.ServerName;
// }
//
// variableDto.MqttAliases.Add(variableMqttAliasDto);
// }
Variables.TryAdd(variableDto.Id, variableDto);
}
@@ -707,12 +728,8 @@ public class DataCenterService : IDataCenterService
{
Menus.TryAdd(menuDto.Id, menuDto);
}
// 加载MQTT服务器数据到内存
foreach (var mqttServer in mqttServers)
{
MqttServers.TryAdd(mqttServer.Id, mqttServer);
}
// 构建菜单树
BuildMenuTree();

View File

@@ -1,6 +1,8 @@
using System.Threading.Tasks;
using AutoMapper;
using DMS.Application.Interfaces;
using DMS.Application.Models;
using DMS.Core.Interfaces.Services;
using DMS.Core.Models;
namespace DMS.Application.Services.Processors;
@@ -10,12 +12,14 @@ namespace DMS.Application.Services.Processors;
/// </summary>
public class MqttPublishProcessor : IVariableProcessor
{
// private readonly IMqttServiceManager _mqttServiceManager;
private readonly IMapper _mapper;
private readonly IMqttServiceManager _mqttServiceManager;
// public MqttPublishProcessor(IMqttServiceManager mqttServiceManager)
// {
// // _mqttServiceManager = mqttServiceManager;
// }
public MqttPublishProcessor(IMapper mapper, IMqttServiceManager mqttServiceManager)
{
_mapper = mapper;
_mqttServiceManager = mqttServiceManager;
}
/// <summary>
/// 处理单个变量上下文如果有关联的MQTT配置则将其推送到发送队列。
@@ -23,25 +27,25 @@ public class MqttPublishProcessor : IVariableProcessor
/// <param name="context">包含变量及其元数据的上下文对象。</param>
public async Task ProcessAsync(VariableContext context)
{
// var variable = context.Data;
// if (variable?.MqttAliases == null || variable.MqttAliases.Count == 0)
// {
// return; // 没有关联的MQTT配置直接返回
// }
//
// // 遍历所有关联的MQTT配置并将其推入发送队列
// foreach (var variableMqttAlias in variable.MqttAliases)
// {
// // 创建VariableMqtt对象
// var variableMqtt = new VariableMqtt
// {
// Variable = variable,
// Mqtt = variableMqttAlias.MqttServer,
// MqttId = variableMqttAlias.MqttServerId
// };
//
// // 发布变量数据到MQTT服务器
// await _mqttServiceManager.PublishVariableDataAsync(variableMqtt);
// }
var variable = context.Data;
if (variable?.MqttAliases == null || variable.MqttAliases.Count == 0)
{
return; // 没有关联的MQTT配置直接返回
}
// 遍历所有关联的MQTT配置并将其推入发送队列
foreach (var variableMqttAlias in variable.MqttAliases)
{
// 创建VariableMqtt对象
var variableMqtt = new VariableMqtt
{
Variable = _mapper.Map<Variable>(variable),
Mqtt = variableMqttAlias.MqttServer,
MqttId = variableMqttAlias.MqttServerId
};
// 发布变量数据到MQTT服务器
await _mqttServiceManager.PublishVariableDataAsync(variableMqtt);
}
}
}