From c4a840c8b8622015e3f524d3c5692521a7f307bc Mon Sep 17 00:00:00 2001 From: "David P.G" Date: Thu, 2 Oct 2025 22:03:31 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9C=AC=E6=AC=A1=E6=8F=90=E4=BA=A4=E5=AF=B9?= =?UTF-8?q?=20OPC=20UA=20=E6=9C=8D=E5=8A=A1=E7=9A=84=E6=8E=A5=E5=8F=A3?= =?UTF-8?q?=E8=BF=9B=E8=A1=8C=E4=BA=86=E9=87=8D=E6=9E=84=EF=BC=8C=E4=B8=BB?= =?UTF-8?q?=E8=A6=81=E5=8F=98=E6=9B=B4=E5=A6=82=E4=B8=8B=EF=BC=9A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. 简化接口: OpcUaService 的 SubscribeToNode 和 UnsubscribeFromNode 方法的参数类型从 OpcUaNode 对象更改为 string 类型的节点ID。这使得上层服务在调用时无需构造完整的 OpcUaNode 对象,降低了接口的复杂性。 2. 更新实现: OpcUaService 和 OpcUaServiceManager 的内部实现已更新,以兼容新的基于字符串的接口。 3. 优化变量更新: OpcUaServiceManager 中的 OnVariableChanged 事件处理逻辑被修改,现在能够更细粒度地处理单个变量的激活状态和轮询间隔的变化,避免了不必要的整个设备订阅的重新加载。 --- .../Interfaces/Services/IOpcUaService.cs | 16 +- .../Services/OpcUa/OpcUaService.cs | 222 +++++++++--------- .../Services/OpcUa/OpcUaServiceManager.cs | 175 +++++++------- 3 files changed, 208 insertions(+), 205 deletions(-) diff --git a/DMS.Infrastructure/Interfaces/Services/IOpcUaService.cs b/DMS.Infrastructure/Interfaces/Services/IOpcUaService.cs index 09c7948..e8cb6d6 100644 --- a/DMS.Infrastructure/Interfaces/Services/IOpcUaService.cs +++ b/DMS.Infrastructure/Interfaces/Services/IOpcUaService.cs @@ -37,32 +37,32 @@ namespace DMS.Infrastructure.Interfaces.Services /// /// 订阅单个节点的数据变化 /// - /// 要订阅的节点 + /// 要订阅的节点ID /// 数据变化时的回调方法 /// 发布间隔(毫秒) /// 采样间隔(毫秒) - void SubscribeToNode(OpcUaNode node, Action onDataChange, int publishingInterval = 1000, int samplingInterval = 500); + void SubscribeToNode(string nodeId, Action onDataChange, int publishingInterval = 1000, int samplingInterval = 500); /// /// 订阅多个节点的数据变化 /// - /// 要订阅的节点列表 + /// 要订阅的节点ID列表 /// 数据变化时的回调方法 /// 发布间隔(毫秒) /// 采样间隔(毫秒) - void SubscribeToNode(List nodes, Action onDataChange, int publishingInterval = 1000, int samplingInterval = 500); + void SubscribeToNode(List nodeIds, Action onDataChange, int publishingInterval = 1000, int samplingInterval = 500); /// /// 取消订阅单个节点 /// - /// 要取消订阅的节点 - void UnsubscribeFromNode(OpcUaNode node); + /// 要取消订阅的节点的ID + void UnsubscribeFromNode(string nodeId); /// /// 取消订阅多个节点 /// - /// 要取消订阅的节点列表 - void UnsubscribeFromNode(List nodes); + /// 要取消订阅的节点ID列表 + void UnsubscribeFromNode(List nodeIds); /// /// 获取当前已订阅的所有节点 diff --git a/DMS.Infrastructure/Services/OpcUa/OpcUaService.cs b/DMS.Infrastructure/Services/OpcUa/OpcUaService.cs index b262a36..56d620f 100644 --- a/DMS.Infrastructure/Services/OpcUa/OpcUaService.cs +++ b/DMS.Infrastructure/Services/OpcUa/OpcUaService.cs @@ -274,17 +274,17 @@ namespace DMS.Infrastructure.Services.OpcUa } } - public void SubscribeToNode(OpcUaNode node, Action onDataChange, int publishingInterval = 1000, int samplingInterval = 500) + public void SubscribeToNode(string nodeId, Action onDataChange, int publishingInterval = 1000, int samplingInterval = 500) { - _logger?.LogDebug("正在订阅单个节点: {NodeId} ({DisplayName}),发布间隔: {PublishingInterval}ms,采样间隔: {SamplingInterval}ms", - node.NodeId, node.DisplayName, publishingInterval, samplingInterval); - SubscribeToNode(new List { node }, onDataChange, publishingInterval, samplingInterval); + _logger?.LogDebug("正在订阅单个节点: {NodeId},发布间隔: {PublishingInterval}ms,采样间隔: {SamplingInterval}ms", + nodeId, publishingInterval, samplingInterval); + SubscribeToNode(new List { nodeId }, onDataChange, publishingInterval, samplingInterval); } - public void SubscribeToNode(List nodes, Action onDataChange, int publishingInterval = 1000, int samplingInterval = 500) + public void SubscribeToNode(List nodeIds, Action onDataChange, int publishingInterval = 1000, int samplingInterval = 500) { _logger?.LogDebug("正在订阅 {Count} 个节点,发布间隔: {PublishingInterval}ms,采样间隔: {SamplingInterval}ms", - nodes?.Count ?? 0, publishingInterval, samplingInterval); + nodeIds?.Count ?? 0, publishingInterval, samplingInterval); // 检查会话是否已连接 if (!IsConnected) @@ -294,36 +294,64 @@ namespace DMS.Infrastructure.Services.OpcUa } // 检查节点列表是否有效 - if (nodes == null || !nodes.Any()) + if (nodeIds == null || !nodeIds.Any()) { - _logger?.LogWarning("节点列表为null或为空,无法订阅"); + _logger?.LogWarning("节点ID列表为null或为空,无法订阅"); return; } // 确保订阅对象存在 - EnsureSubscriptionExists(publishingInterval); + // 如果还没有订阅对象,则基于会话的默认设置创建一个新的订阅 + if (_subscription == null) + { + _subscription = new Subscription(_session.DefaultSubscription) + { + // 设置服务器向客户端发送通知的速率(毫秒) + PublishingInterval = publishingInterval + }; + // 在会话中添加订阅 + _session.AddSubscription(_subscription); + // 在服务器上创建订阅 + _subscription.Create(); + } + // 如果客户端请求的发布间隔与现有订阅不同,则修改订阅 + else if (_subscription.PublishingInterval != publishingInterval) + { + _subscription.PublishingInterval = publishingInterval; + } // 创建一个用于存放待添加监视项的列表 var itemsToAdd = new List(); - // 遍历所有请求订阅的节点 - foreach (var node in nodes) + // 遍历所有请求订阅的节点ID + foreach (var nodeIdStr in nodeIds) { - // 如果节点已经存在于我们的跟踪列表中,则跳过,避免重复订阅 - if (_subscribedNodes.ContainsKey(node.NodeId)) + try { - _logger?.LogDebug("节点 {NodeId} ({DisplayName}) 已经被订阅,跳过重复订阅", node.NodeId, node.DisplayName); - continue; + var nodeId = new NodeId(nodeIdStr); + // 如果节点已经存在于我们的跟踪列表中,则跳过,避免重复订阅 + if (_subscribedNodes.ContainsKey(nodeId)) + { + _logger?.LogDebug("节点 {NodeId} 已经被订阅,跳过重复订阅", nodeIdStr); + continue; + } + + // 创建一个临时的OpcUaNode对象用于订阅 + var node = new OpcUaNode { NodeId = nodeId, DisplayName = nodeIdStr }; + + // 为每个节点创建一个监视项 + var monitoredItem = CreateMonitoredItem(node, onDataChange, samplingInterval); + + // 将创建的监视项添加到待添加列表 + itemsToAdd.Add(monitoredItem); + // 将节点添加到我们的跟踪字典中 + _subscribedNodes.TryAdd(node.NodeId, node); + _logger?.LogDebug("节点 {NodeId} 已添加到订阅列表", nodeIdStr); + } + catch (Exception ex) + { + _logger?.LogError(ex, "创建节点 {NodeId} 的监视项时发生错误", nodeIdStr); } - - // 为每个节点创建一个监视项 - var monitoredItem = CreateMonitoredItem(node, onDataChange, samplingInterval); - - // 将创建的监视项添加到待添加列表 - itemsToAdd.Add(monitoredItem); - // 将节点添加到我们的跟踪字典中 - _subscribedNodes.TryAdd(node.NodeId, node); - _logger?.LogDebug("节点 {NodeId} ({DisplayName}) 已添加到订阅列表", node.NodeId, node.DisplayName); } // 如果有新的监视项要添加 @@ -344,31 +372,6 @@ namespace DMS.Infrastructure.Services.OpcUa } } - /// - /// 确保订阅对象存在 - /// - /// 发布间隔 - private void EnsureSubscriptionExists(int publishingInterval) - { - // 如果还没有订阅对象,则基于会话的默认设置创建一个新的订阅 - if (_subscription == null) - { - _subscription = new Subscription(_session.DefaultSubscription) - { - // 设置服务器向客户端发送通知的速率(毫秒) - PublishingInterval = publishingInterval - }; - // 在会话中添加订阅 - _session.AddSubscription(_subscription); - // 在服务器上创建订阅 - _subscription.Create(); - } - // 如果客户端请求的发布间隔与现有订阅不同,则修改订阅 - else if (_subscription.PublishingInterval != publishingInterval) - { - _subscription.PublishingInterval = publishingInterval; - } - } /// /// 创建监视项 @@ -414,67 +417,74 @@ namespace DMS.Infrastructure.Services.OpcUa return monitoredItem; } - public void UnsubscribeFromNode(OpcUaNode node) - { - _logger?.LogDebug("正在取消订阅节点: {NodeId} ({DisplayName})", node.NodeId, node.DisplayName); - UnsubscribeFromNode(new List { node }); - } - - public void UnsubscribeFromNode(List nodes) - { - _logger?.LogDebug("正在取消订阅 {Count} 个节点", nodes?.Count ?? 0); - - // 检查订阅对象和节点列表是否有效 - if (_subscription == null) - { - _logger?.LogWarning("订阅对象为null,无法取消订阅"); - return; - } - - if (nodes == null || !nodes.Any()) - { - _logger?.LogWarning("节点列表为null或为空,无法取消订阅"); - return; - } - - var itemsToRemove = new List(); - // 遍历所有请求取消订阅的节点 - foreach (var node in nodes) - { - // 在当前订阅中查找与节点ID匹配的监视项 - var item = _subscription.MonitoredItems.FirstOrDefault(m => m.StartNodeId.Equals(node.NodeId)); - if (item != null) + public void UnsubscribeFromNode(string nodeId) { - _logger?.LogDebug("找到节点 {NodeId} ({DisplayName}) 的监视项,准备移除", node.NodeId, node.DisplayName); - // 如果找到,则添加到待移除列表 - itemsToRemove.Add(item); - // 从我们的跟踪字典中移除该节点 - _subscribedNodes.Remove(node.NodeId); + _logger?.LogDebug("正在取消订阅节点: {NodeId}", nodeId); + UnsubscribeFromNode(new List { nodeId }); } - else + + public void UnsubscribeFromNode(List nodeIds) { - _logger?.LogDebug("节点 {NodeId} ({DisplayName}) 未在监视项中找到,可能已经取消订阅", node.NodeId, node.DisplayName); + _logger?.LogDebug("正在取消订阅 {Count} 个节点", nodeIds?.Count ?? 0); + + // 检查订阅对象和节点列表是否有效 + if (_subscription == null) + { + _logger?.LogWarning("订阅对象为null,无法取消订阅"); + return; + } + + if (nodeIds == null || !nodeIds.Any()) + { + _logger?.LogWarning("节点ID列表为null或为空,无法取消订阅"); + return; + } + + var itemsToRemove = new List(); + // 遍历所有请求取消订阅的节点ID + foreach (var nodeIdStr in nodeIds) + { + try + { + var nodeId = new NodeId(nodeIdStr); + // 在当前订阅中查找与节点ID匹配的监视项 + var item = _subscription.MonitoredItems.FirstOrDefault(m => m.StartNodeId.Equals(nodeId)); + if (item != null) + { + _logger?.LogDebug("找到节点 {NodeId} 的监视项,准备移除", nodeIdStr); + // 如果找到,则添加到待移除列表 + itemsToRemove.Add(item); + // 从我们的跟踪字典中移除该节点 + _subscribedNodes.Remove(nodeId); + } + else + { + _logger?.LogDebug("节点 {NodeId} 未在监视项中找到,可能已经取消订阅", nodeIdStr); + } + } + catch (Exception ex) + { + _logger?.LogError(ex, "解析节点ID '{NodeIdStr}' 时发生错误,无法取消订阅", nodeIdStr); + } + } + + // 如果有需要移除的监视项 + if (itemsToRemove.Any()) + { + _logger?.LogDebug("批量移除 {Count} 个监视项", itemsToRemove.Count); + + // 从订阅中批量移除监视项 + _subscription.RemoveItems(itemsToRemove); + // 将更改应用到服务器 + _subscription.ApplyChanges(); + + _logger?.LogInformation("已成功取消订阅 {Count} 个节点", itemsToRemove.Count); + } + else + { + _logger?.LogDebug("没有找到需要移除的监视项"); + } } - } - - // 如果有需要移除的监视项 - if (itemsToRemove.Any()) - { - _logger?.LogDebug("批量移除 {Count} 个监视项", itemsToRemove.Count); - - // 从订阅中批量移除监视项 - _subscription.RemoveItems(itemsToRemove); - // 将更改应用到服务器 - _subscription.ApplyChanges(); - - _logger?.LogInformation("已成功取消订阅 {Count} 个节点", itemsToRemove.Count); - } - else - { - _logger?.LogDebug("没有找到需要移除的监视项"); - } - } - public List GetSubscribedNodes() { var subscribedNodes = _subscribedNodes.Values.ToList(); diff --git a/DMS.Infrastructure/Services/OpcUa/OpcUaServiceManager.cs b/DMS.Infrastructure/Services/OpcUa/OpcUaServiceManager.cs index fba08dc..7571f46 100644 --- a/DMS.Infrastructure/Services/OpcUa/OpcUaServiceManager.cs +++ b/DMS.Infrastructure/Services/OpcUa/OpcUaServiceManager.cs @@ -6,6 +6,7 @@ using DMS.Application.Interfaces; using DMS.Application.Models; using DMS.Core.Enums; using DMS.Core.Events; +using DMS.Core.Models; using DMS.Infrastructure.Configuration; using DMS.Infrastructure.Interfaces.Services; using DMS.Infrastructure.Models; @@ -65,8 +66,9 @@ namespace DMS.Infrastructure.Services.OpcUa { await DisconnectDeviceAsync(e.DeviceId, CancellationToken.None); } + break; - + case Core.Enums.DeviceStateType.Connection: // 处理连接状态变化(通常由底层连接过程触发) // 在OPC UA服务中,这可能是内部状态更新 @@ -107,15 +109,15 @@ namespace DMS.Infrastructure.Services.OpcUa // 检查设备协议是否为OPC UA if (device.Protocol != Core.Enums.ProtocolType.OpcUa) { - _logger.LogInformation("设备 {DeviceId} ({DeviceName}) 不是OPC UA协议,跳过加载", - device.Id, device.Name); + _logger.LogInformation("设备 {DeviceId} ({DeviceName}) 不是OPC UA协议,跳过加载", + device.Id, device.Name); return; } try { - _logger.LogInformation("处理设备添加事件: {DeviceId} ({DeviceName})", - device.Id, device.Name); + _logger.LogInformation("处理设备添加事件: {DeviceId} ({DeviceName})", + device.Id, device.Name); // 添加设备到监控列表 AddDevice(device); @@ -132,16 +134,16 @@ namespace DMS.Infrastructure.Services.OpcUa } catch (Exception ex) { - _logger.LogError(ex, "连接新添加的设备 {DeviceId} ({DeviceName}) 时发生错误", - device.Id, device.Name); + _logger.LogError(ex, "连接新添加的设备 {DeviceId} ({DeviceName}) 时发生错误", + device.Id, device.Name); } }); } } catch (Exception ex) { - _logger.LogError(ex, "处理设备添加事件时发生错误: {DeviceId} ({DeviceName})", - device?.Id, device?.Name); + _logger.LogError(ex, "处理设备添加事件时发生错误: {DeviceId} ({DeviceName})", + device?.Id, device?.Name); } } @@ -159,15 +161,15 @@ namespace DMS.Infrastructure.Services.OpcUa // 检查设备协议是否为OPC UA if (device.Protocol != Core.Enums.ProtocolType.OpcUa) { - _logger.LogInformation("设备 {DeviceId} ({DeviceName}) 不是OPC UA协议,跳过更新", - device.Id, device.Name); + _logger.LogInformation("设备 {DeviceId} ({DeviceName}) 不是OPC UA协议,跳过更新", + device.Id, device.Name); return; } try { - _logger.LogInformation("处理设备更新事件: {DeviceId} ({DeviceName})", - device.Id, device.Name); + _logger.LogInformation("处理设备更新事件: {DeviceId} ({DeviceName})", + device.Id, device.Name); // 先移除旧设备配置 if (_deviceContexts.TryGetValue(device.Id, out var oldContext)) @@ -181,8 +183,8 @@ namespace DMS.Infrastructure.Services.OpcUa } catch (Exception ex) { - _logger.LogError(ex, "断开旧设备 {DeviceId} ({DeviceName}) 连接时发生错误", - device.Id, device.Name); + _logger.LogError(ex, "断开旧设备 {DeviceId} ({DeviceName}) 连接时发生错误", + device.Id, device.Name); } }); } @@ -202,16 +204,16 @@ namespace DMS.Infrastructure.Services.OpcUa } catch (Exception ex) { - _logger.LogError(ex, "连接更新后的设备 {DeviceId} ({DeviceName}) 时发生错误", - device.Id, device.Name); + _logger.LogError(ex, "连接更新后的设备 {DeviceId} ({DeviceName}) 时发生错误", + device.Id, device.Name); } }); } } catch (Exception ex) { - _logger.LogError(ex, "处理设备更新事件时发生错误: {DeviceId} ({DeviceName})", - device?.Id, device?.Name); + _logger.LogError(ex, "处理设备更新事件时发生错误: {DeviceId} ({DeviceName})", + device?.Id, device?.Name); } } @@ -233,21 +235,6 @@ namespace DMS.Infrastructure.Services.OpcUa } } - /// - /// 获取需要订阅的变量列表 - /// - /// 设备上下文 - /// 需要订阅的变量列表 - private List GetSubscribableVariables(DeviceContext context) - { - if (context?.Variables == null) - return new List(); - - // 返回所有激活且OPC UA更新类型不是None的变量 - return context.Variables.Values - .Where(v => v.IsActive ) - .ToList(); - } /// /// 处理批量导入变量事件 @@ -265,11 +252,13 @@ namespace DMS.Infrastructure.Services.OpcUa _logger.LogInformation("处理批量导入变量事件,共 {Count} 个变量", e.Count); // 更新相关设备的变量表 - var deviceIds = e.Variables.Select(v => v.VariableTable.DeviceId).Distinct(); + var deviceIds = e.Variables.Select(v => v.VariableTable.DeviceId) + .Distinct(); foreach (var deviceId in deviceIds) { // 获取设备的变量表信息 - var variablesForDevice = e.Variables.Where(v => v.VariableTable.DeviceId == deviceId).ToList(); + var variablesForDevice = e.Variables.Where(v => v.VariableTable.DeviceId == deviceId) + .ToList(); if (variablesForDevice.Any()) { // 更新设备上下文中的变量 @@ -292,7 +281,7 @@ namespace DMS.Infrastructure.Services.OpcUa } } } - + _logger.LogInformation("批量导入变量事件处理完成,更新了 {DeviceCount} 个设备的变量信息", deviceIds.Count()); } catch (Exception ex) @@ -439,9 +428,11 @@ namespace DMS.Infrastructure.Services.OpcUa context.Device.IsRunning = false; _logger.LogWarning("设备 {DeviceName} 连接失败", context.Device.Name); } - + _eventService.RaiseDeviceStateChanged( - this, new DeviceStateChangedEventArgs(context.Device.Id, context.Device.Name, context.IsConnected, Core.Enums.DeviceStateType.Connection)); + this, + new DeviceStateChangedEventArgs(context.Device.Id, context.Device.Name, context.IsConnected, + Core.Enums.DeviceStateType.Connection)); } catch (Exception ex) { @@ -450,7 +441,9 @@ namespace DMS.Infrastructure.Services.OpcUa context.IsConnected = false; context.Device.IsRunning = false; _eventService.RaiseDeviceStateChanged( - this, new DeviceStateChangedEventArgs(context.Device.Id, context.Device.Name, false, Core.Enums.DeviceStateType.Connection)); + this, + new DeviceStateChangedEventArgs(context.Device.Id, context.Device.Name, false, + Core.Enums.DeviceStateType.Connection)); } finally { @@ -473,7 +466,9 @@ namespace DMS.Infrastructure.Services.OpcUa context.IsConnected = false; context.Device.IsRunning = false; _eventService.RaiseDeviceStateChanged( - this, new DeviceStateChangedEventArgs(context.Device.Id, context.Device.Name, false, Core.Enums.DeviceStateType.Connection)); + this, + new DeviceStateChangedEventArgs(context.Device.Id, context.Device.Name, false, + Core.Enums.DeviceStateType.Connection)); _logger.LogInformation("设备 {DeviceName} 连接已断开", context.Device.Name); } catch (Exception ex) @@ -493,20 +488,12 @@ namespace DMS.Infrastructure.Services.OpcUa try { - // 获取需要订阅的变量 - var subscribableVariables = GetSubscribableVariables(context); - - if (!subscribableVariables.Any()) - { - _logger.LogInformation("设备 {DeviceName} 没有需要订阅的变量", context.Device.Name); - return; - } _logger.LogInformation("正在为设备 {DeviceName} 设置订阅,需要订阅的变量数: {VariableCount}", - context.Device.Name, subscribableVariables.Count); + context.Device.Name, context.Variables.Count); // 按PollingInterval对变量进行分组 - var variablesByPollingInterval = subscribableVariables + var variablesByPollingInterval = context.Variables.Values .GroupBy(v => v.PollingInterval) .ToDictionary(g => g.Key, g => g.ToList()); @@ -520,11 +507,11 @@ namespace DMS.Infrastructure.Services.OpcUa "为设备 {DeviceName} 设置PollingInterval {PollingInterval} 的订阅,变量数: {VariableCount}", context.Device.Name, pollingInterval, variables.Count); - var opcUaNodes = variables - .Select(v => new OpcUaNode { NodeId = v.OpcUaNodeId }) + var opcUaNodeIds = variables + .Select(v => v.OpcUaNodeId) .ToList(); - context.OpcUaService.SubscribeToNode(opcUaNodes, HandleDataChanged, + context.OpcUaService.SubscribeToNode(opcUaNodeIds, HandleDataChanged, pollingInterval, pollingInterval); } @@ -538,7 +525,6 @@ namespace DMS.Infrastructure.Services.OpcUa } - /// /// 处理数据变化 /// @@ -546,7 +532,8 @@ namespace DMS.Infrastructure.Services.OpcUa { if (opcUaNode?.Value == null) { - _logger.LogDebug("HandleDataChanged: 接收到空节点或空值,节点ID: {NodeId}", opcUaNode?.NodeId?.ToString() ?? "Unknown"); + _logger.LogDebug("HandleDataChanged: 接收到空节点或空值,节点ID: {NodeId}", + opcUaNode?.NodeId?.ToString() ?? "Unknown"); return; } @@ -559,21 +546,24 @@ namespace DMS.Infrastructure.Services.OpcUa { if (context.Variables.TryGetValue(opcUaNode.NodeId.ToString(), out var variable)) { - _logger.LogDebug("HandleDataChanged: 找到变量 {VariableName} (ID: {VariableId}) 与节点 {NodeId} 对应,设备: {DeviceName}", - variable.Name, variable.Id, opcUaNode.NodeId, context.Device.Name); + // _logger.LogDebug( + // "HandleDataChanged: 找到变量 {VariableName} (ID: {VariableId}) 与节点 {NodeId} 对应,设备: {DeviceName}", + // variable.Name, variable.Id, opcUaNode.NodeId, context.Device.Name); // 推送到数据处理队列 - await _dataProcessingService.EnqueueAsync(new VariableContext(variable, opcUaNode.Value?.ToString())); - - _logger.LogDebug("HandleDataChanged: 变量 {VariableName} 的值已推送到数据处理队列", variable.Name); + await _dataProcessingService.EnqueueAsync( + new VariableContext(variable, opcUaNode.Value?.ToString())); + + // _logger.LogDebug("HandleDataChanged: 变量 {VariableName} 的值已推送到数据处理队列", variable.Name); break; } } } catch (Exception ex) { - _logger.LogError(ex, "处理数据变化时发生错误 - 节点ID: {NodeId}, 值: {Value}, 错误信息: {ErrorMessage}", - opcUaNode?.NodeId?.ToString() ?? "Unknown", opcUaNode?.Value?.ToString() ?? "null", ex.Message); + _logger.LogError(ex, "处理数据变化时发生错误 - 节点ID: {NodeId}, 值: {Value}, 错误信息: {ErrorMessage}", + opcUaNode?.NodeId?.ToString() ?? "Unknown", opcUaNode?.Value?.ToString() ?? "null", + ex.Message); } } @@ -633,6 +623,7 @@ namespace DMS.Infrastructure.Services.OpcUa await Task.WhenAll(disconnectTasks); } + /// /// 释放资源 /// @@ -667,12 +658,12 @@ namespace DMS.Infrastructure.Services.OpcUa /// /// 处理变量变更事件 /// - private void OnVariableChanged(object? sender, VariableChangedEventArgs e) + private async void OnVariableChanged(object? sender, VariableChangedEventArgs e) { try { - _logger.LogDebug("处理变量变更事件: 变量ID={VariableId}, 变更类型={ChangeType}, 变更属性={PropertyType}", - e.Variable.Id, e.ChangeType, e.PropertyType); + _logger.LogDebug("处理变量变更事件: 变量ID={VariableId}, 变更类型={ChangeType}, 变更属性={PropertyType}", + e.Variable.Id, e.ChangeType, e.PropertyType); // 根据变更类型和属性类型进行相应处理 switch (e.ChangeType) @@ -684,57 +675,59 @@ namespace DMS.Infrastructure.Services.OpcUa case VariablePropertyType.OpcUaNodeId: case VariablePropertyType.OpcUaUpdateType: case VariablePropertyType.PollingInterval: - // 重新设置设备的订阅 if (_deviceContexts.TryGetValue(e.Variable.VariableTable.DeviceId, out var context)) { - _ = Task.Run(async () => + if (context.Variables.TryGetValue(e.Variable.OpcUaNodeId,out var variableDto)) { - try + if (variableDto.IsActive) { - await SetupSubscriptionsAsync(context, CancellationToken.None); - _logger.LogInformation("已更新设备 {DeviceId} 的订阅,因为变量 {VariableId} 的OPC UA属性发生了变化", - e.Variable.VariableTable.DeviceId, e.Variable.Id); + context.OpcUaService.UnsubscribeFromNode(e.Variable.OpcUaNodeId); + context.OpcUaService.SubscribeToNode(e.Variable.OpcUaNodeId,HandleDataChanged,e.Variable.PollingInterval,e.Variable.PollingInterval); + _logger.LogInformation($"OpcUa变量节点:{e.Variable.OpcUaNodeId},的轮询时间修改为:{e.Variable.PollingInterval}"); } - catch (Exception ex) - { - _logger.LogError(ex, "更新设备 {DeviceId} 订阅时发生错误", e.Variable.VariableTable.DeviceId); - } - }); + + } + } break; - + case VariablePropertyType.IsActive: - // 变量激活状态变化,更新变量列表 + // 变量激活状态变化 if (_deviceContexts.TryGetValue(e.Variable.VariableTable.DeviceId, out var context2)) { if (e.Variable.IsActive) { - // 添加变量到监控列表 - context2.Variables.AddOrUpdate(e.Variable.OpcUaNodeId, e.Variable, (key, oldValue) => e.Variable); + // 添加变量到监控列表并重新订阅 + if (context2.Variables.TryAdd(e.Variable.OpcUaNodeId, e.Variable)) + { + context2.OpcUaService.SubscribeToNode(e.Variable.OpcUaNodeId,HandleDataChanged,e.Variable.PollingInterval,e.Variable.PollingInterval); + } } else { - // 从监控列表中移除变量 - context2.Variables.Remove(e.Variable.OpcUaNodeId, out _); + // 从监控列表中移除变量并取消订阅 + if (context2.Variables.TryRemove(e.Variable.OpcUaNodeId, out _)) + { + context2.OpcUaService.UnsubscribeFromNode(e.Variable.OpcUaNodeId); + } } } + break; } + break; - + case ActionChangeType.Deleted: - // 变量被删除时,从设备上下文的变量列表中移除 - if (_deviceContexts.TryGetValue(e.Variable.VariableTable.DeviceId, out var context3)) - { - context3.Variables.Remove(e.Variable.OpcUaNodeId, out _); - } + // 变量被删除时,取消订阅并从设备上下文的变量列表中移除 + // await UnsubscribeVariableAsync(e.Variable); break; } } catch (Exception ex) { - _logger.LogError(ex, "处理变量变更事件时发生错误: 变量ID={VariableId}, 变更类型={ChangeType}", - e.Variable.Id, e.ChangeType); + _logger.LogError(ex, "处理变量变更事件时发生错误: 变量ID={VariableId}, 变更类型={ChangeType}", + e.Variable.Id, e.ChangeType); } } }