本次提交对 OPC UA 服务的接口进行了重构,主要变更如下:

1.  简化接口: OpcUaService 的 SubscribeToNode 和 UnsubscribeFromNode 方法的参数类型从 OpcUaNode 对象更改为 string 类型的节点ID。这使得上层服务在调用时无需构造完整的
  OpcUaNode 对象,降低了接口的复杂性。

  2.  更新实现: OpcUaService 和 OpcUaServiceManager 的内部实现已更新,以兼容新的基于字符串的接口。

  3.  优化变量更新: OpcUaServiceManager 中的 OnVariableChanged
  事件处理逻辑被修改,现在能够更细粒度地处理单个变量的激活状态和轮询间隔的变化,避免了不必要的整个设备订阅的重新加载。
This commit is contained in:
2025-10-02 22:03:31 +08:00
parent acd397ea55
commit c4a840c8b8
3 changed files with 208 additions and 205 deletions

View File

@@ -6,6 +6,7 @@ using DMS.Application.Interfaces;
using DMS.Application.Models;
using DMS.Core.Enums;
using DMS.Core.Events;
using DMS.Core.Models;
using DMS.Infrastructure.Configuration;
using DMS.Infrastructure.Interfaces.Services;
using DMS.Infrastructure.Models;
@@ -65,8 +66,9 @@ namespace DMS.Infrastructure.Services.OpcUa
{
await DisconnectDeviceAsync(e.DeviceId, CancellationToken.None);
}
break;
case Core.Enums.DeviceStateType.Connection:
// 处理连接状态变化(通常由底层连接过程触发)
// 在OPC UA服务中这可能是内部状态更新
@@ -107,15 +109,15 @@ namespace DMS.Infrastructure.Services.OpcUa
// 检查设备协议是否为OPC UA
if (device.Protocol != Core.Enums.ProtocolType.OpcUa)
{
_logger.LogInformation("设备 {DeviceId} ({DeviceName}) 不是OPC UA协议跳过加载",
device.Id, device.Name);
_logger.LogInformation("设备 {DeviceId} ({DeviceName}) 不是OPC UA协议跳过加载",
device.Id, device.Name);
return;
}
try
{
_logger.LogInformation("处理设备添加事件: {DeviceId} ({DeviceName})",
device.Id, device.Name);
_logger.LogInformation("处理设备添加事件: {DeviceId} ({DeviceName})",
device.Id, device.Name);
// 添加设备到监控列表
AddDevice(device);
@@ -132,16 +134,16 @@ namespace DMS.Infrastructure.Services.OpcUa
}
catch (Exception ex)
{
_logger.LogError(ex, "连接新添加的设备 {DeviceId} ({DeviceName}) 时发生错误",
device.Id, device.Name);
_logger.LogError(ex, "连接新添加的设备 {DeviceId} ({DeviceName}) 时发生错误",
device.Id, device.Name);
}
});
}
}
catch (Exception ex)
{
_logger.LogError(ex, "处理设备添加事件时发生错误: {DeviceId} ({DeviceName})",
device?.Id, device?.Name);
_logger.LogError(ex, "处理设备添加事件时发生错误: {DeviceId} ({DeviceName})",
device?.Id, device?.Name);
}
}
@@ -159,15 +161,15 @@ namespace DMS.Infrastructure.Services.OpcUa
// 检查设备协议是否为OPC UA
if (device.Protocol != Core.Enums.ProtocolType.OpcUa)
{
_logger.LogInformation("设备 {DeviceId} ({DeviceName}) 不是OPC UA协议跳过更新",
device.Id, device.Name);
_logger.LogInformation("设备 {DeviceId} ({DeviceName}) 不是OPC UA协议跳过更新",
device.Id, device.Name);
return;
}
try
{
_logger.LogInformation("处理设备更新事件: {DeviceId} ({DeviceName})",
device.Id, device.Name);
_logger.LogInformation("处理设备更新事件: {DeviceId} ({DeviceName})",
device.Id, device.Name);
// 先移除旧设备配置
if (_deviceContexts.TryGetValue(device.Id, out var oldContext))
@@ -181,8 +183,8 @@ namespace DMS.Infrastructure.Services.OpcUa
}
catch (Exception ex)
{
_logger.LogError(ex, "断开旧设备 {DeviceId} ({DeviceName}) 连接时发生错误",
device.Id, device.Name);
_logger.LogError(ex, "断开旧设备 {DeviceId} ({DeviceName}) 连接时发生错误",
device.Id, device.Name);
}
});
}
@@ -202,16 +204,16 @@ namespace DMS.Infrastructure.Services.OpcUa
}
catch (Exception ex)
{
_logger.LogError(ex, "连接更新后的设备 {DeviceId} ({DeviceName}) 时发生错误",
device.Id, device.Name);
_logger.LogError(ex, "连接更新后的设备 {DeviceId} ({DeviceName}) 时发生错误",
device.Id, device.Name);
}
});
}
}
catch (Exception ex)
{
_logger.LogError(ex, "处理设备更新事件时发生错误: {DeviceId} ({DeviceName})",
device?.Id, device?.Name);
_logger.LogError(ex, "处理设备更新事件时发生错误: {DeviceId} ({DeviceName})",
device?.Id, device?.Name);
}
}
@@ -233,21 +235,6 @@ namespace DMS.Infrastructure.Services.OpcUa
}
}
/// <summary>
/// 获取需要订阅的变量列表
/// </summary>
/// <param name="context">设备上下文</param>
/// <returns>需要订阅的变量列表</returns>
private List<VariableDto> GetSubscribableVariables(DeviceContext context)
{
if (context?.Variables == null)
return new List<VariableDto>();
// 返回所有激活且OPC UA更新类型不是None的变量
return context.Variables.Values
.Where(v => v.IsActive )
.ToList();
}
/// <summary>
/// 处理批量导入变量事件
@@ -265,11 +252,13 @@ namespace DMS.Infrastructure.Services.OpcUa
_logger.LogInformation("处理批量导入变量事件,共 {Count} 个变量", e.Count);
// 更新相关设备的变量表
var deviceIds = e.Variables.Select(v => v.VariableTable.DeviceId).Distinct();
var deviceIds = e.Variables.Select(v => v.VariableTable.DeviceId)
.Distinct();
foreach (var deviceId in deviceIds)
{
// 获取设备的变量表信息
var variablesForDevice = e.Variables.Where(v => v.VariableTable.DeviceId == deviceId).ToList();
var variablesForDevice = e.Variables.Where(v => v.VariableTable.DeviceId == deviceId)
.ToList();
if (variablesForDevice.Any())
{
// 更新设备上下文中的变量
@@ -292,7 +281,7 @@ namespace DMS.Infrastructure.Services.OpcUa
}
}
}
_logger.LogInformation("批量导入变量事件处理完成,更新了 {DeviceCount} 个设备的变量信息", deviceIds.Count());
}
catch (Exception ex)
@@ -439,9 +428,11 @@ namespace DMS.Infrastructure.Services.OpcUa
context.Device.IsRunning = false;
_logger.LogWarning("设备 {DeviceName} 连接失败", context.Device.Name);
}
_eventService.RaiseDeviceStateChanged(
this, new DeviceStateChangedEventArgs(context.Device.Id, context.Device.Name, context.IsConnected, Core.Enums.DeviceStateType.Connection));
this,
new DeviceStateChangedEventArgs(context.Device.Id, context.Device.Name, context.IsConnected,
Core.Enums.DeviceStateType.Connection));
}
catch (Exception ex)
{
@@ -450,7 +441,9 @@ namespace DMS.Infrastructure.Services.OpcUa
context.IsConnected = false;
context.Device.IsRunning = false;
_eventService.RaiseDeviceStateChanged(
this, new DeviceStateChangedEventArgs(context.Device.Id, context.Device.Name, false, Core.Enums.DeviceStateType.Connection));
this,
new DeviceStateChangedEventArgs(context.Device.Id, context.Device.Name, false,
Core.Enums.DeviceStateType.Connection));
}
finally
{
@@ -473,7 +466,9 @@ namespace DMS.Infrastructure.Services.OpcUa
context.IsConnected = false;
context.Device.IsRunning = false;
_eventService.RaiseDeviceStateChanged(
this, new DeviceStateChangedEventArgs(context.Device.Id, context.Device.Name, false, Core.Enums.DeviceStateType.Connection));
this,
new DeviceStateChangedEventArgs(context.Device.Id, context.Device.Name, false,
Core.Enums.DeviceStateType.Connection));
_logger.LogInformation("设备 {DeviceName} 连接已断开", context.Device.Name);
}
catch (Exception ex)
@@ -493,20 +488,12 @@ namespace DMS.Infrastructure.Services.OpcUa
try
{
// 获取需要订阅的变量
var subscribableVariables = GetSubscribableVariables(context);
if (!subscribableVariables.Any())
{
_logger.LogInformation("设备 {DeviceName} 没有需要订阅的变量", context.Device.Name);
return;
}
_logger.LogInformation("正在为设备 {DeviceName} 设置订阅,需要订阅的变量数: {VariableCount}",
context.Device.Name, subscribableVariables.Count);
context.Device.Name, context.Variables.Count);
// 按PollingInterval对变量进行分组
var variablesByPollingInterval = subscribableVariables
var variablesByPollingInterval = context.Variables.Values
.GroupBy(v => v.PollingInterval)
.ToDictionary(g => g.Key, g => g.ToList());
@@ -520,11 +507,11 @@ namespace DMS.Infrastructure.Services.OpcUa
"为设备 {DeviceName} 设置PollingInterval {PollingInterval} 的订阅,变量数: {VariableCount}",
context.Device.Name, pollingInterval, variables.Count);
var opcUaNodes = variables
.Select(v => new OpcUaNode { NodeId = v.OpcUaNodeId })
var opcUaNodeIds = variables
.Select(v => v.OpcUaNodeId)
.ToList();
context.OpcUaService.SubscribeToNode(opcUaNodes, HandleDataChanged,
context.OpcUaService.SubscribeToNode(opcUaNodeIds, HandleDataChanged,
pollingInterval, pollingInterval);
}
@@ -538,7 +525,6 @@ namespace DMS.Infrastructure.Services.OpcUa
}
/// <summary>
/// 处理数据变化
/// </summary>
@@ -546,7 +532,8 @@ namespace DMS.Infrastructure.Services.OpcUa
{
if (opcUaNode?.Value == null)
{
_logger.LogDebug("HandleDataChanged: 接收到空节点或空值节点ID: {NodeId}", opcUaNode?.NodeId?.ToString() ?? "Unknown");
_logger.LogDebug("HandleDataChanged: 接收到空节点或空值节点ID: {NodeId}",
opcUaNode?.NodeId?.ToString() ?? "Unknown");
return;
}
@@ -559,21 +546,24 @@ namespace DMS.Infrastructure.Services.OpcUa
{
if (context.Variables.TryGetValue(opcUaNode.NodeId.ToString(), out var variable))
{
_logger.LogDebug("HandleDataChanged: 找到变量 {VariableName} (ID: {VariableId}) 与节点 {NodeId} 对应,设备: {DeviceName}",
variable.Name, variable.Id, opcUaNode.NodeId, context.Device.Name);
// _logger.LogDebug(
// "HandleDataChanged: 找到变量 {VariableName} (ID: {VariableId}) 与节点 {NodeId} 对应,设备: {DeviceName}",
// variable.Name, variable.Id, opcUaNode.NodeId, context.Device.Name);
// 推送到数据处理队列
await _dataProcessingService.EnqueueAsync(new VariableContext(variable, opcUaNode.Value?.ToString()));
_logger.LogDebug("HandleDataChanged: 变量 {VariableName} 的值已推送到数据处理队列", variable.Name);
await _dataProcessingService.EnqueueAsync(
new VariableContext(variable, opcUaNode.Value?.ToString()));
// _logger.LogDebug("HandleDataChanged: 变量 {VariableName} 的值已推送到数据处理队列", variable.Name);
break;
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "处理数据变化时发生错误 - 节点ID: {NodeId}, 值: {Value}, 错误信息: {ErrorMessage}",
opcUaNode?.NodeId?.ToString() ?? "Unknown", opcUaNode?.Value?.ToString() ?? "null", ex.Message);
_logger.LogError(ex, "处理数据变化时发生错误 - 节点ID: {NodeId}, 值: {Value}, 错误信息: {ErrorMessage}",
opcUaNode?.NodeId?.ToString() ?? "Unknown", opcUaNode?.Value?.ToString() ?? "null",
ex.Message);
}
}
@@ -633,6 +623,7 @@ namespace DMS.Infrastructure.Services.OpcUa
await Task.WhenAll(disconnectTasks);
}
/// <summary>
/// 释放资源
/// </summary>
@@ -667,12 +658,12 @@ namespace DMS.Infrastructure.Services.OpcUa
/// <summary>
/// 处理变量变更事件
/// </summary>
private void OnVariableChanged(object? sender, VariableChangedEventArgs e)
private async void OnVariableChanged(object? sender, VariableChangedEventArgs e)
{
try
{
_logger.LogDebug("处理变量变更事件: 变量ID={VariableId}, 变更类型={ChangeType}, 变更属性={PropertyType}",
e.Variable.Id, e.ChangeType, e.PropertyType);
_logger.LogDebug("处理变量变更事件: 变量ID={VariableId}, 变更类型={ChangeType}, 变更属性={PropertyType}",
e.Variable.Id, e.ChangeType, e.PropertyType);
// 根据变更类型和属性类型进行相应处理
switch (e.ChangeType)
@@ -684,57 +675,59 @@ namespace DMS.Infrastructure.Services.OpcUa
case VariablePropertyType.OpcUaNodeId:
case VariablePropertyType.OpcUaUpdateType:
case VariablePropertyType.PollingInterval:
// 重新设置设备的订阅
if (_deviceContexts.TryGetValue(e.Variable.VariableTable.DeviceId, out var context))
{
_ = Task.Run(async () =>
if (context.Variables.TryGetValue(e.Variable.OpcUaNodeId,out var variableDto))
{
try
if (variableDto.IsActive)
{
await SetupSubscriptionsAsync(context, CancellationToken.None);
_logger.LogInformation("已更新设备 {DeviceId} 的订阅,因为变量 {VariableId} 的OPC UA属性发生了变化",
e.Variable.VariableTable.DeviceId, e.Variable.Id);
context.OpcUaService.UnsubscribeFromNode(e.Variable.OpcUaNodeId);
context.OpcUaService.SubscribeToNode(e.Variable.OpcUaNodeId,HandleDataChanged,e.Variable.PollingInterval,e.Variable.PollingInterval);
_logger.LogInformation($"OpcUa变量节点{e.Variable.OpcUaNodeId},的轮询时间修改为:{e.Variable.PollingInterval}");
}
catch (Exception ex)
{
_logger.LogError(ex, "更新设备 {DeviceId} 订阅时发生错误", e.Variable.VariableTable.DeviceId);
}
});
}
}
break;
case VariablePropertyType.IsActive:
// 变量激活状态变化,更新变量列表
// 变量激活状态变化
if (_deviceContexts.TryGetValue(e.Variable.VariableTable.DeviceId, out var context2))
{
if (e.Variable.IsActive)
{
// 添加变量到监控列表
context2.Variables.AddOrUpdate(e.Variable.OpcUaNodeId, e.Variable, (key, oldValue) => e.Variable);
// 添加变量到监控列表并重新订阅
if (context2.Variables.TryAdd(e.Variable.OpcUaNodeId, e.Variable))
{
context2.OpcUaService.SubscribeToNode(e.Variable.OpcUaNodeId,HandleDataChanged,e.Variable.PollingInterval,e.Variable.PollingInterval);
}
}
else
{
// 从监控列表中移除变量
context2.Variables.Remove(e.Variable.OpcUaNodeId, out _);
// 从监控列表中移除变量并取消订阅
if (context2.Variables.TryRemove(e.Variable.OpcUaNodeId, out _))
{
context2.OpcUaService.UnsubscribeFromNode(e.Variable.OpcUaNodeId);
}
}
}
break;
}
break;
case ActionChangeType.Deleted:
// 变量被删除时,从设备上下文的变量列表中移除
if (_deviceContexts.TryGetValue(e.Variable.VariableTable.DeviceId, out var context3))
{
context3.Variables.Remove(e.Variable.OpcUaNodeId, out _);
}
// 变量被删除时,取消订阅并从设备上下文的变量列表中移除
// await UnsubscribeVariableAsync(e.Variable);
break;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "处理变量变更事件时发生错误: 变量ID={VariableId}, 变更类型={ChangeType}",
e.Variable.Id, e.ChangeType);
_logger.LogError(ex, "处理变量变更事件时发生错误: 变量ID={VariableId}, 变更类型={ChangeType}",
e.Variable.Id, e.ChangeType);
}
}
}