本次提交对系统进行了两项主要的重构和优化:
1. 重构数据处理链:
* 移除了 CheckValueChangedProcessor 和 LoggingProcessor,简化了数据处理流程。
* 调整了 App.xaml.cs 中的处理器注册顺序,使处理链更加清晰。
2. 优化OPC UA变量变更处理:
* 完全重构了 OpcUaServiceManager 中的 OnVariableChanged 事件处理方法。
* 新逻辑能够根据变量属性(IsActive, PollingInterval 等)的变化,进行精确的、针对单个节点的订阅和取消订阅操作,取代了之前重置整个设备订阅的低效方式。
* 实现了对已删除变量的订阅取消逻辑,确保了资源的正确释放。
这些变更显著提升了 OPC UA 订阅管理的效率和精确性。
This commit is contained in:
@@ -1,30 +0,0 @@
|
|||||||
using DMS.Core.Models;
|
|
||||||
using DMS.Application.Interfaces;
|
|
||||||
using DMS.Application.Models;
|
|
||||||
|
|
||||||
namespace DMS.Application.Services.Processors;
|
|
||||||
|
|
||||||
public class CheckValueChangedProcessor : IVariableProcessor
|
|
||||||
{
|
|
||||||
|
|
||||||
|
|
||||||
public Task ProcessAsync(VariableContext context)
|
|
||||||
{
|
|
||||||
// Variable newVariable = context.Data;
|
|
||||||
// if (!_dataServices.AllVariables.TryGetValue(newVariable.Id, out Variable oldVariable))
|
|
||||||
// {
|
|
||||||
// NlogHelper.Warn($"检查变量值是否改变时在_dataServices.AllVariables中找不到Id:{newVariable.Id},Name:{newVariable.Name}的变量。");
|
|
||||||
// context.IsHandled = true;
|
|
||||||
// return Task.CompletedTask;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// if (newVariable.DataValue == oldVariable.DataValue)
|
|
||||||
// {
|
|
||||||
// // 值没有变化,直接完成
|
|
||||||
// context.IsHandled = true;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// 在这里处理 context.Data
|
|
||||||
return Task.CompletedTask;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,22 +0,0 @@
|
|||||||
using DMS.Application.Interfaces;
|
|
||||||
using DMS.Application.Models;
|
|
||||||
|
|
||||||
|
|
||||||
namespace DMS.Application.Services.Processors;
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 一个简单的数据处理器实现,用于演示。
|
|
||||||
/// 其主要功能是记录接收到的变量数据的名称和值。
|
|
||||||
/// </summary>
|
|
||||||
public class LoggingProcessor : IVariableProcessor
|
|
||||||
{
|
|
||||||
public LoggingProcessor()
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
public Task ProcessAsync(VariableContext context)
|
|
||||||
{
|
|
||||||
// NlogHelper.Info($"处理数据: {context.Data.Name}, 值: {context.Data.DataValue}");
|
|
||||||
return Task.CompletedTask;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -37,14 +37,6 @@ public class UpdateDbVariableProcessor : IVariableProcessor, IDisposable
|
|||||||
|
|
||||||
public async Task ProcessAsync(VariableContext context)
|
public async Task ProcessAsync(VariableContext context)
|
||||||
{
|
{
|
||||||
// 检查新值是否有效,以及是否与旧值不同
|
|
||||||
if (context.NewValue == null || Equals(context.Data.DataValue, context.NewValue?.ToString()))
|
|
||||||
{
|
|
||||||
return; // 值未变或新值无效,跳过
|
|
||||||
}
|
|
||||||
|
|
||||||
// 用新值更新上下文中的数据,确保处理链的后续环节能看到最新值
|
|
||||||
context.Data.DataValue = context.NewValue?.ToString();
|
|
||||||
|
|
||||||
_queue.Enqueue(context.Data);
|
_queue.Enqueue(context.Data);
|
||||||
|
|
||||||
|
|||||||
@@ -488,7 +488,6 @@ namespace DMS.Infrastructure.Services.OpcUa
|
|||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
|
||||||
_logger.LogInformation("正在为设备 {DeviceName} 设置订阅,需要订阅的变量数: {VariableCount}",
|
_logger.LogInformation("正在为设备 {DeviceName} 设置订阅,需要订阅的变量数: {VariableCount}",
|
||||||
context.Device.Name, context.Variables.Count);
|
context.Device.Name, context.Variables.Count);
|
||||||
|
|
||||||
@@ -508,8 +507,8 @@ namespace DMS.Infrastructure.Services.OpcUa
|
|||||||
context.Device.Name, pollingInterval, variables.Count);
|
context.Device.Name, pollingInterval, variables.Count);
|
||||||
|
|
||||||
var opcUaNodeIds = variables
|
var opcUaNodeIds = variables
|
||||||
.Select(v => v.OpcUaNodeId)
|
.Select(v => v.OpcUaNodeId)
|
||||||
.ToList();
|
.ToList();
|
||||||
|
|
||||||
context.OpcUaService.SubscribeToNode(opcUaNodeIds, HandleDataChanged,
|
context.OpcUaService.SubscribeToNode(opcUaNodeIds, HandleDataChanged,
|
||||||
pollingInterval, pollingInterval);
|
pollingInterval, pollingInterval);
|
||||||
@@ -665,6 +664,11 @@ namespace DMS.Infrastructure.Services.OpcUa
|
|||||||
_logger.LogDebug("处理变量变更事件: 变量ID={VariableId}, 变更类型={ChangeType}, 变更属性={PropertyType}",
|
_logger.LogDebug("处理变量变更事件: 变量ID={VariableId}, 变更类型={ChangeType}, 变更属性={PropertyType}",
|
||||||
e.Variable.Id, e.ChangeType, e.PropertyType);
|
e.Variable.Id, e.ChangeType, e.PropertyType);
|
||||||
|
|
||||||
|
if (!_deviceContexts.TryGetValue(e.Variable.VariableTable.DeviceId, out var context))
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// 根据变更类型和属性类型进行相应处理
|
// 根据变更类型和属性类型进行相应处理
|
||||||
switch (e.ChangeType)
|
switch (e.ChangeType)
|
||||||
{
|
{
|
||||||
@@ -675,41 +679,39 @@ namespace DMS.Infrastructure.Services.OpcUa
|
|||||||
case VariablePropertyType.OpcUaNodeId:
|
case VariablePropertyType.OpcUaNodeId:
|
||||||
case VariablePropertyType.OpcUaUpdateType:
|
case VariablePropertyType.OpcUaUpdateType:
|
||||||
case VariablePropertyType.PollingInterval:
|
case VariablePropertyType.PollingInterval:
|
||||||
if (_deviceContexts.TryGetValue(e.Variable.VariableTable.DeviceId, out var context))
|
if (context.Variables.TryGetValue(e.Variable.OpcUaNodeId, out var variableDto))
|
||||||
{
|
{
|
||||||
if (context.Variables.TryGetValue(e.Variable.OpcUaNodeId,out var variableDto))
|
if (variableDto.IsActive)
|
||||||
{
|
{
|
||||||
if (variableDto.IsActive)
|
context.OpcUaService.UnsubscribeFromNode(e.Variable.OpcUaNodeId);
|
||||||
{
|
context.OpcUaService.SubscribeToNode(
|
||||||
context.OpcUaService.UnsubscribeFromNode(e.Variable.OpcUaNodeId);
|
e.Variable.OpcUaNodeId, HandleDataChanged, e.Variable.PollingInterval,
|
||||||
context.OpcUaService.SubscribeToNode(e.Variable.OpcUaNodeId,HandleDataChanged,e.Variable.PollingInterval,e.Variable.PollingInterval);
|
e.Variable.PollingInterval);
|
||||||
_logger.LogInformation($"OpcUa变量节点:{e.Variable.OpcUaNodeId},的轮询时间修改为:{e.Variable.PollingInterval}");
|
_logger.LogInformation(
|
||||||
}
|
$"OpcUa变量节点:{e.Variable.OpcUaNodeId},的轮询时间修改为:{e.Variable.PollingInterval}");
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case VariablePropertyType.IsActive:
|
case VariablePropertyType.IsActive:
|
||||||
// 变量激活状态变化
|
// 变量激活状态变化
|
||||||
if (_deviceContexts.TryGetValue(e.Variable.VariableTable.DeviceId, out var context2))
|
if (e.Variable.IsActive)
|
||||||
{
|
{
|
||||||
if (e.Variable.IsActive)
|
// 添加变量到监控列表并重新订阅
|
||||||
|
if (context.Variables.TryAdd(e.Variable.OpcUaNodeId, e.Variable))
|
||||||
{
|
{
|
||||||
// 添加变量到监控列表并重新订阅
|
context.OpcUaService.SubscribeToNode(
|
||||||
if (context2.Variables.TryAdd(e.Variable.OpcUaNodeId, e.Variable))
|
e.Variable.OpcUaNodeId, HandleDataChanged, e.Variable.PollingInterval,
|
||||||
{
|
e.Variable.PollingInterval);
|
||||||
context2.OpcUaService.SubscribeToNode(e.Variable.OpcUaNodeId,HandleDataChanged,e.Variable.PollingInterval,e.Variable.PollingInterval);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
else
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// 从监控列表中移除变量并取消订阅
|
||||||
|
if (context.Variables.TryRemove(e.Variable.OpcUaNodeId, out _))
|
||||||
{
|
{
|
||||||
// 从监控列表中移除变量并取消订阅
|
context.OpcUaService.UnsubscribeFromNode(e.Variable.OpcUaNodeId);
|
||||||
if (context2.Variables.TryRemove(e.Variable.OpcUaNodeId, out _))
|
|
||||||
{
|
|
||||||
context2.OpcUaService.UnsubscribeFromNode(e.Variable.OpcUaNodeId);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -720,7 +722,12 @@ namespace DMS.Infrastructure.Services.OpcUa
|
|||||||
|
|
||||||
case ActionChangeType.Deleted:
|
case ActionChangeType.Deleted:
|
||||||
// 变量被删除时,取消订阅并从设备上下文的变量列表中移除
|
// 变量被删除时,取消订阅并从设备上下文的变量列表中移除
|
||||||
// await UnsubscribeVariableAsync(e.Variable);
|
// 从监控列表中移除变量并取消订阅
|
||||||
|
if (context.Variables.TryRemove(e.Variable.OpcUaNodeId, out _))
|
||||||
|
{
|
||||||
|
context.OpcUaService.UnsubscribeFromNode(e.Variable.OpcUaNodeId);
|
||||||
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -80,11 +80,9 @@ public partial class App : System.Windows.Application
|
|||||||
var dataProcessingService = Host.Services.GetRequiredService<IDataProcessingService>();
|
var dataProcessingService = Host.Services.GetRequiredService<IDataProcessingService>();
|
||||||
dataProcessingService.AddProcessor(Host.Services.GetRequiredService<ValueConvertProcessor>());
|
dataProcessingService.AddProcessor(Host.Services.GetRequiredService<ValueConvertProcessor>());
|
||||||
dataProcessingService.AddProcessor(Host.Services.GetRequiredService<UpdateViewProcessor>());
|
dataProcessingService.AddProcessor(Host.Services.GetRequiredService<UpdateViewProcessor>());
|
||||||
dataProcessingService.AddProcessor(Host.Services.GetRequiredService<CheckValueChangedProcessor>());
|
|
||||||
dataProcessingService.AddProcessor(Host.Services.GetRequiredService<LoggingProcessor>());
|
|
||||||
dataProcessingService.AddProcessor(Host.Services.GetRequiredService<HistoryProcessor>());
|
|
||||||
dataProcessingService.AddProcessor(Host.Services.GetRequiredService<MqttPublishProcessor>());
|
dataProcessingService.AddProcessor(Host.Services.GetRequiredService<MqttPublishProcessor>());
|
||||||
dataProcessingService.AddProcessor(Host.Services.GetRequiredService<UpdateDbVariableProcessor>());
|
dataProcessingService.AddProcessor(Host.Services.GetRequiredService<UpdateDbVariableProcessor>());
|
||||||
|
dataProcessingService.AddProcessor(Host.Services.GetRequiredService<HistoryProcessor>());
|
||||||
// 添加报警处理器
|
// 添加报警处理器
|
||||||
dataProcessingService.AddProcessor(Host.Services.GetRequiredService<DMS.Application.Services.Processors.AlarmProcessor>());
|
dataProcessingService.AddProcessor(Host.Services.GetRequiredService<DMS.Application.Services.Processors.AlarmProcessor>());
|
||||||
// 添加触发器处理器
|
// 添加触发器处理器
|
||||||
@@ -204,8 +202,6 @@ public partial class App : System.Windows.Application
|
|||||||
(DataProcessingService)provider.GetRequiredService<IDataProcessingService>());
|
(DataProcessingService)provider.GetRequiredService<IDataProcessingService>());
|
||||||
services.AddSingleton<ValueConvertProcessor>();
|
services.AddSingleton<ValueConvertProcessor>();
|
||||||
services.AddSingleton<UpdateViewProcessor>();
|
services.AddSingleton<UpdateViewProcessor>();
|
||||||
services.AddSingleton<CheckValueChangedProcessor>();
|
|
||||||
services.AddSingleton<LoggingProcessor>();
|
|
||||||
services.AddSingleton<UpdateDbVariableProcessor>();
|
services.AddSingleton<UpdateDbVariableProcessor>();
|
||||||
services.AddSingleton<HistoryProcessor>();
|
services.AddSingleton<HistoryProcessor>();
|
||||||
services.AddSingleton<MqttPublishProcessor>();
|
services.AddSingleton<MqttPublishProcessor>();
|
||||||
|
|||||||
Reference in New Issue
Block a user