diff --git a/App.xaml.cs b/App.xaml.cs index 859c6e2..e8a3b6d 100644 --- a/App.xaml.cs +++ b/App.xaml.cs @@ -79,10 +79,10 @@ public partial class App : Application { Host.Services.GetRequiredService().StartService(); } - if (connectionSettings.EnableOpcUaService) - { - Host.Services.GetRequiredService().StartService(); - } + // if (connectionSettings.EnableOpcUaService) + // { + // Host.Services.GetRequiredService().StartService(); + // } } protected override async void OnExit(ExitEventArgs e) @@ -101,6 +101,7 @@ public partial class App : Application services.AddSingleton(); services.AddSingleton(); services.AddHostedService(); + services.AddHostedService(); services.AddSingleton(); services.AddSingleton(); diff --git a/Helper/ServiceHelper.cs b/Helper/ServiceHelper.cs index 585aa9b..f5acbe5 100644 --- a/Helper/ServiceHelper.cs +++ b/Helper/ServiceHelper.cs @@ -78,28 +78,14 @@ public static class ServiceHelper (int)PollLevelType.ThirtyMinutes) } }; - /// /// 创建并配置 OPC UA 会话。 /// /// OPC UA 服务器的终结点 URL。 + /// /// 创建的 Session 对象,如果失败则返回 null。 - public static Session CreateOpcUaSession(string endpointUrl) + public static async Task CreateOpcUaSessionAsync(string endpointUrl, CancellationToken stoppingToken = default) { - return CreateOpcUaSessionAsync(endpointUrl) - .GetAwaiter() - .GetResult(); - } - - /// - /// 创建并配置 OPC UA 会话。 - /// - /// OPC UA 服务器的终结点 URL。 - /// 创建的 Session 对象,如果失败则返回 null。 - public static async Task CreateOpcUaSessionAsync(string endpointUrl) - { - try - { // 1. 创建应用程序配置 var application = new ApplicationInstance { @@ -157,10 +143,10 @@ public static class ServiceHelper // 验证并检查证书 await config.Validate(ApplicationType.Client); - await application.CheckApplicationInstanceCertificate(false, 0); + // 2. 查找并选择端点 (将 useSecurity 设置为 false 以进行诊断) - var selectedEndpoint = CoreClientUtils.SelectEndpoint(endpointUrl, false); + var selectedEndpoint = CoreClientUtils.SelectEndpoint(config, endpointUrl, false); var session = await Session.Create( config, @@ -169,16 +155,7 @@ public static class ServiceHelper "PMSWPF OPC UA Session", 60000, new UserIdentity(new AnonymousIdentityToken()), - null); - - - NotificationHelper.ShowSuccess($"已连接到 OPC UA 服务器: {endpointUrl}"); + null,stoppingToken); return session; - } - catch (Exception ex) - { - NotificationHelper.ShowError($"连接 OPC UA 服务器失败: {endpointUrl} - {ex.Message}", ex); - return null; - } } } \ No newline at end of file diff --git a/Services/OpcUaBackgroundService.cs b/Services/OpcUaBackgroundService.cs index 005fc55..20210af 100644 --- a/Services/OpcUaBackgroundService.cs +++ b/Services/OpcUaBackgroundService.cs @@ -1,172 +1,193 @@ -using System; -using System.Collections.Generic; -using System.Collections.ObjectModel; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; +using System.Collections.Concurrent; using Microsoft.Extensions.Hosting; using Opc.Ua; using Opc.Ua.Client; -using Opc.Ua.Configuration; using PMSWPF.Enums; using PMSWPF.Helper; using PMSWPF.Models; namespace PMSWPF.Services { - public class OpcUaBackgroundService + public class OpcUaBackgroundService : BackgroundService { private readonly DataServices _dataServices; + private readonly IDataProcessingService _dataProcessingService; // 存储 OPC UA 设备,键为设备Id,值为会话对象。 - private readonly Dictionary _deviceDic; + private readonly ConcurrentDictionary _deviceDic; // 存储 OPC UA 会话,键为终结点 URL,值为会话对象。 - private readonly Dictionary _sessionsDic; + private readonly ConcurrentDictionary _sessionsDic; // 存储 OPC UA 订阅,键为终结点 URL,值为订阅对象。 - private readonly Dictionary _subscriptionsDic; + private readonly ConcurrentDictionary _subscriptionsDic; // 存储活动的 OPC UA 变量,键为变量的OpcNodeId - private readonly Dictionary _opcUaNodeIdVariableDic; // Key: VariableData.Id + private readonly ConcurrentDictionary _opcUaNodeIdVariableDic; // 储存所有要轮询更新的变量,键是Device.Id,值是这个设备所有要轮询的变量 - private readonly Dictionary> _pollVariableDic; // Key: VariableData.Id + private readonly ConcurrentDictionary> _pollVariableDic; // 储存所有要订阅更新的变量,键是Device.Id,值是这个设备所有要轮询的变量 - private readonly Dictionary> _subVariableDic; + private readonly ConcurrentDictionary> _subVariableDic; + private readonly SemaphoreSlim _reloadSemaphore = new SemaphoreSlim(0); - - // 后台服务的主线程,负责连接服务器,加载变量,订阅变量 - private Thread _serviceMainThread; + // OPC UA 轮询间隔(毫秒) + private readonly int OpcUaPollIntervalMs = 100; - // 轮询线程 - private Thread _pollThread; + // OPC UA 订阅发布间隔(毫秒) + private readonly int OpcUaSubscriptionPublishingIntervalMs = 1000; - // 重新加载事件 - private readonly ManualResetEvent _reloadEvent = new ManualResetEvent(false); + // OPC UA 订阅采样间隔(毫秒) + private readonly int OpcUaSubscriptionSamplingIntervalMs = 1000; - // 停止事件,触发后会停止整个Opc后台服务 - private readonly ManualResetEvent _stopdEvent = new ManualResetEvent(false); - - - /// - /// OpcUaBackgroundService 的构造函数。 - /// - /// 数据服务,用于访问数据库中的变量信息。 - public OpcUaBackgroundService(DataServices dataServices) + public OpcUaBackgroundService(DataServices dataServices, IDataProcessingService dataProcessingService) { _dataServices = dataServices; - _sessionsDic = new Dictionary(); - _subscriptionsDic = new Dictionary(); - _opcUaNodeIdVariableDic = new(); - _pollVariableDic = new(); - _subVariableDic = new(); - _deviceDic = new(); - } + _dataProcessingService = dataProcessingService; + _deviceDic = new ConcurrentDictionary(); + _sessionsDic = new ConcurrentDictionary(); + _subscriptionsDic = new ConcurrentDictionary(); + _opcUaNodeIdVariableDic = new ConcurrentDictionary(); + _pollVariableDic = new ConcurrentDictionary>(); + _subVariableDic = new ConcurrentDictionary>(); - /// - /// 后台服务的主执行方法。 - /// - /// 用于通知服务停止的取消令牌。 - /// 表示异步操作的任务。 - public void StartService() - { - NlogHelper.Info("OPC UA 服务正在启动..."); - _reloadEvent.Set(); - _serviceMainThread = new Thread(Execute); - _serviceMainThread.IsBackground = true; - _serviceMainThread.Name = "OpcUaServiceThread"; - _serviceMainThread.Start(); - } - - public void StopService() - { - NlogHelper.Info("OPC UA 服务正在停止..."); - _stopdEvent.Set(); - DisconnectAllOpcUaSessions(); - - _reloadEvent.Close(); - _stopdEvent.Close(); - NlogHelper.Info("OPC UA 服务已经停止。"); - } - - private void Execute() - { - // 订阅变量数据变化事件,以便在变量配置发生变化时重新加载。 _dataServices.OnDeviceListChanged += HandleDeviceListChanged; + _dataServices.OnDeviceIsActiveChanged += HandleDeviceIsActiveChanged; + } - while (!_stopdEvent.WaitOne(0)) + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + NlogHelper.Info("OPC UA 后台服务正在启动。"); + _reloadSemaphore.Release(); // Initial trigger to load variables and connect + + try { - _reloadEvent.WaitOne(); - - if (_dataServices.Devices == null || _dataServices.Devices.Count == 0) + while (!stoppingToken.IsCancellationRequested) { - _reloadEvent.Reset(); - continue; + await _reloadSemaphore.WaitAsync(stoppingToken); // Wait for a reload signal + + if (stoppingToken.IsCancellationRequested) + { + break; + } + + if (_dataServices.Devices == null || _dataServices.Devices.Count == 0) + { + NlogHelper.Info("没有可用的OPC UA设备,等待设备列表更新..."); + continue; + } + + var isLoaded = LoadVariables(); + if (!isLoaded) + { + NlogHelper.Info("加载变量过程中发生了错误,停止后面的操作。"); + continue; + } + + await ConnectOpcUaServiceAsync(stoppingToken); + await SetupOpcUaSubscriptionAsync(stoppingToken); + NlogHelper.Info("OPC UA 后台服务已启动。"); + + // 持续轮询,直到取消请求或需要重新加载 + while (!stoppingToken.IsCancellationRequested && _reloadSemaphore.CurrentCount == 0) + { + await PollOpcUaVariableOnceAsync(stoppingToken); + await Task.Delay(OpcUaPollIntervalMs, stoppingToken); + } } - - NlogHelper.Info("OpcUa后台服务开始加载变量..."); - // 初始化时加载所有活动的 OPC UA 变量。 - LoadOpcUaVariables(); - - //连接服务器 - ConnectOpcUaService(); - // // 添加订阅变量 - SetupOpcUaSubscription(); - - if (_pollThread == null) - { - _pollThread = new Thread(PollOpcUaVariable); - _pollThread.IsBackground = true; - _pollThread.Name = "OpcUaPollThread"; - _pollThread.Start(); - } - - NlogHelper.Info("OpcUa后台服务已启动。"); - _reloadEvent.Reset(); } - - - // 循环运行,直到接收到停止信号。 - // while (!stoppingToken.IsCancellationRequested) - // { - // // 可以在这里添加周期性任务,例如检查和重新连接断开的会话。 - // await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken); - // } - NlogHelper.Info("OpcUa后台服务正在停止。"); - // 服务停止时,取消订阅事件并断开所有 OPC UA 连接。 - _dataServices.OnDeviceListChanged -= HandleDeviceListChanged; + catch (OperationCanceledException) + { + NlogHelper.Info("OPC UA 后台服务已停止。"); + } + catch (Exception e) + { + NlogHelper.Error($"OPC UA 后台服务运行中发生了错误:{e.Message}", e); + } + finally + { + await DisconnectAllOpcUaSessionsAsync(); + _dataServices.OnDeviceListChanged -= HandleDeviceListChanged; + _dataServices.OnDeviceIsActiveChanged -= HandleDeviceIsActiveChanged; + } } private void HandleDeviceListChanged(List devices) { - NlogHelper.Info("变量数据已更改。正在重新加载 OPC UA 变量。"); - _reloadEvent.Set(); + NlogHelper.Info("设备列表已更改。OPC UA 客户端可能需要重新初始化。"); + _reloadSemaphore.Release(); // 触发ExecuteAsync中的全面重新加载 + } + + private async void HandleDeviceIsActiveChanged(Device device, bool isActive) + { + if (device.ProtocolType != ProtocolType.OpcUA) + return; + + NlogHelper.Info($"设备 {device.Name} (ID: {device.Id}) 的IsActive状态改变为 {isActive}。"); + + if (!isActive) + { + // 设备变为非活动状态,断开连接 + if (_sessionsDic.TryRemove(device.OpcUaEndpointUrl, out var session)) + { + try + { + if (_subscriptionsDic.TryRemove(device.OpcUaEndpointUrl, out var subscription)) + { + // 删除订阅。 + await subscription.DeleteAsync(true); + NlogHelper.Info($"已删除设备 {device.Name} ({device.OpcUaEndpointUrl}) 的订阅。"); + } + + if (session.Connected) + { + await session.CloseAsync(); + NotificationHelper.ShowSuccess($"已断开设备 {device.Name} ({device.OpcUaEndpointUrl}) 的连接。"); + } + } + catch (Exception ex) + { + NlogHelper.Error($"断开设备 {device.Name} ({device.OpcUaEndpointUrl}) 连接时发生错误:{ex.Message}", ex); + } + } + } + + // 触发重新加载,让LoadVariables和ConnectOpcUaServiceAsync处理设备列表的更新 + _reloadSemaphore.Release(); } /// /// 从数据库加载所有活动的 OPC UA 变量,并进行相应的连接和订阅管理。 /// - private void LoadOpcUaVariables() + private bool LoadVariables() { try { - var _opcUaDevices = _dataServices - .Devices.Where(d => d.ProtocolType == ProtocolType.OpcUA && d.IsActive == true) - .ToList(); - - if (_opcUaDevices.Count == 0) - return; _deviceDic.Clear(); _pollVariableDic.Clear(); _subVariableDic.Clear(); _opcUaNodeIdVariableDic.Clear(); - foreach (var opcUaDevice in _opcUaDevices) + + NlogHelper.Info("开始加载OPC UA变量...."); + var opcUaDevices = _dataServices + .Devices.Where(d => d.ProtocolType == ProtocolType.OpcUA && d.IsActive == true) + .ToList(); + + if (opcUaDevices.Count == 0) { - // 将设备保存到字典中,方便之后查找 - _deviceDic.Add(opcUaDevice.Id, opcUaDevice); + NlogHelper.Info("没有找到活动的OPC UA设备。"); + return true; // No active devices, but not an error + } + + int totalPollVariableCount = 0; + int totalSubVariableCount = 0; + + foreach (var opcUaDevice in opcUaDevices) + { + _deviceDic.AddOrUpdate(opcUaDevice.Id, opcUaDevice, (key, oldValue) => opcUaDevice); + //查找设备中所有要轮询的变量 var dPollList = opcUaDevice.VariableTables?.SelectMany(vt => vt.DataVariables) .Where(vd => vd.IsActive == true && @@ -176,258 +197,392 @@ namespace PMSWPF.Services // 将变量保存到字典中,方便Read后还原 foreach (var variableData in dPollList) { - _opcUaNodeIdVariableDic.Add(variableData.OpcUaNodeId, variableData); + _opcUaNodeIdVariableDic.AddOrUpdate(variableData.OpcUaNodeId, variableData, + (key, oldValue) => variableData); } - NlogHelper.Info($"加载OpcUa轮询变量:{dPollList.Count}个"); - _pollVariableDic.Add(opcUaDevice.Id, dPollList); + totalPollVariableCount += dPollList.Count; + _pollVariableDic.AddOrUpdate(opcUaDevice.Id, dPollList, (key, oldValue) => dPollList); + //查找设备中所有要订阅的变量 var dSubList = opcUaDevice.VariableTables?.SelectMany(vt => vt.DataVariables) .Where(vd => vd.IsActive == true && vd.ProtocolType == ProtocolType.OpcUA && vd.OpcUaUpdateType == OpcUaUpdateType.OpcUaSubscription) .ToList(); - _subVariableDic.Add(opcUaDevice.Id, dSubList); - NlogHelper.Info($"加载OpcUa订阅变量:{dSubList.Count}个"); + totalSubVariableCount += dSubList.Count; + _subVariableDic.AddOrUpdate(opcUaDevice.Id, dSubList, (key, oldValue) => dSubList); } + + NlogHelper.Info( + $"OPC UA 变量加载成功,共加载OPC UA设备:{opcUaDevices.Count}个,轮询变量数:{totalPollVariableCount},订阅变量数:{totalSubVariableCount}"); + return true; } catch (Exception e) { - NotificationHelper.ShowError($"加载OpcUa变量的过程中发生了错误:{e.Message}"); + NotificationHelper.ShowError($"加载OPC UA变量的过程中发生了错误:{e.Message}", e); + return false; } } /// /// 连接到 OPC UA 服务器并订阅或轮询指定的变量。 /// - private void ConnectOpcUaService() + private async Task ConnectOpcUaServiceAsync(CancellationToken stoppingToken) { - foreach (Device device in _deviceDic.Values) + if (stoppingToken.IsCancellationRequested) { - Session session = null; - // 检查是否已存在到该终结点的活动会话。 - if (_sessionsDic.TryGetValue(device.OpcUaEndpointUrl, out session) && session.Connected) + return; + } + + var connectTasks = new List(); + + // 遍历_deviceDic中的所有设备,尝试连接 + foreach (var device in _deviceDic.Values.ToList()) + { + connectTasks.Add(ConnectSingleOpcUaDeviceAsync(device, stoppingToken)); + } + + await Task.WhenAll(connectTasks); + } + + /// + /// 连接单个OPC UA设备。 + /// + /// 要连接的设备。 + /// 取消令牌。 + private async Task ConnectSingleOpcUaDeviceAsync(Device device, CancellationToken stoppingToken = default) + { + if (stoppingToken.IsCancellationRequested) + { + return; + } + + // Check if already connected + if (_sessionsDic.TryGetValue(device.OpcUaEndpointUrl, out var existingSession)) + { + if (existingSession.Connected) { NlogHelper.Info($"已连接到 OPC UA 服务器: {device.OpcUaEndpointUrl}"); - continue; + return; + } + else + { + // Remove disconnected session from dictionary to attempt reconnection + _sessionsDic.TryRemove(device.OpcUaEndpointUrl, out _); + } + } + + NlogHelper.Info($"开始连接OPC UA服务器: {device.Name} ({device.OpcUaEndpointUrl})"); + try + { + var session = await ServiceHelper.CreateOpcUaSessionAsync(device.OpcUaEndpointUrl, stoppingToken); + if (session == null) + { + NlogHelper.Warn($"创建OPC UA会话失败: {device.OpcUaEndpointUrl}"); + return; // 连接失败,直接返回 } - session = ServiceHelper.CreateOpcUaSession(device.OpcUaEndpointUrl); - if (session == null) - return; // 连接失败,直接返回 - - _sessionsDic[device.OpcUaEndpointUrl] = session; + _sessionsDic.AddOrUpdate(device.OpcUaEndpointUrl, session, (key, oldValue) => session); + NotificationHelper.ShowSuccess($"已连接到OPC UA服务器: {device.Name} ({device.OpcUaEndpointUrl})"); + } + catch (Exception e) + { + NotificationHelper.ShowError( + $"OPC UA服务连接 {device.Name} ({device.OpcUaEndpointUrl}) 过程中发生错误:{e.Message}", e); } } - private void PollOpcUaVariable() + private async Task PollOpcUaVariableOnceAsync(CancellationToken stoppingToken) { - NlogHelper.Info("OpcUa轮询变量线程已启动,开始轮询变量...."); - while (!_stopdEvent.WaitOne(0)) + try { - try - { - foreach (var deviceId in _pollVariableDic.Keys.ToList()) - { - Thread.Sleep(100); - if (!_deviceDic.TryGetValue(deviceId, out var device) || device.OpcUaEndpointUrl == null) - { - NlogHelper.Warn( - $"OpcUa轮询变量时,在deviceDic中未找到ID为 {deviceId} 的设备,或其服务器地址为空,请检查!"); - continue; - } + // 获取当前需要轮询的设备ID列表的快照 + var deviceIdsToPoll = _pollVariableDic.Keys.ToList(); - _sessionsDic.TryGetValue(device.OpcUaEndpointUrl, out Session session); - if (session == null || !session.Connected) - { - if (!_stopdEvent.WaitOne(0)) - { - NlogHelper.Warn( - $"用于 {device.OpcUaEndpointUrl} 的 OPC UA 会话未连接。正在尝试重新连接..."); - // 尝试重新连接会话 - ConnectOpcUaService(); - continue; - } - } + // 为每个设备创建并发轮询任务 + var pollingTasks = deviceIdsToPoll.Select(async deviceId => + { + if (stoppingToken.IsCancellationRequested) + { + return; // 任务被取消,退出循环 + } - var nodesToRead = new ReadValueIdCollection(); - if (!_pollVariableDic.TryGetValue(deviceId, out var variableList)) - { - continue; - } + if (!_deviceDic.TryGetValue(deviceId, out var device) || + device.OpcUaEndpointUrl == null) + { + NlogHelper.Warn( + $"OpcUa轮询变量时,在deviceDic中未找到ID为 {deviceId} 的设备,或其服务器地址为空,请检查!"); + return; // 跳过此设备 + } - foreach (var variable in variableList) - { - // 获取变量的轮询间隔。 - if (!ServiceHelper.PollingIntervals.TryGetValue(variable.PollLevelType, out var interval)) - { - NlogHelper.Info($"未知的轮询级别 {variable.PollLevelType},跳过变量 {variable.Name}。"); - continue; - } + if (!device.IsActive) + { + return; + } - // 检查是否达到轮询时间。 - if ((DateTime.Now - variable.UpdateTime) < interval) - continue; // 未到轮询时间,跳过。 + if (!_sessionsDic.TryGetValue( + device.OpcUaEndpointUrl, out Session session) || + !session.Connected) + { + + if (device.IsActive) + { + // 尝试重新连接会话 + NlogHelper.Warn( + $"用于 {device.OpcUaEndpointUrl} 的 OPC UA 会话未连接。正在尝试重新连接..."); + await ConnectSingleOpcUaDeviceAsync( + device, stoppingToken); + } - nodesToRead.Add(new ReadValueId - { - NodeId = new NodeId(variable.OpcUaNodeId), - AttributeId = Attributes.Value - }); - } + return; // 跳过本次轮询 + } - // 如果没有要读取的变量则跳过 - if (nodesToRead.Count == 0) - continue; + var nodesToRead = new ReadValueIdCollection(); + if (!_pollVariableDic.TryGetValue(deviceId, out var variableList)) + { + return; // 跳过此设备 + } + foreach (var variable in variableList) + { + if (stoppingToken.IsCancellationRequested) + { + return; // 任务被取消,退出循环 + } - session.Read( - null, - 0, - TimestampsToReturn.Both, - nodesToRead, - out DataValueCollection results, - out DiagnosticInfoCollection diagnosticInfos); + // 获取变量的轮询间隔。 + if (!ServiceHelper.PollingIntervals.TryGetValue( + variable.PollLevelType, out var interval)) + { + NlogHelper.Info( + $"未知的轮询级别 {variable.PollLevelType},跳过变量 {variable.Name}。"); + continue; + } - if (results == null || results.Count == 0) - continue; - for (int i = 0; i < results.Count; i++) - { - var value = results[i]; - var nodeId = nodesToRead[i] - .NodeId.ToString(); - if (!_opcUaNodeIdVariableDic.TryGetValue(nodeId, out var variable)) - { - NlogHelper.Warn( - $"在字典中未找到OpcUaNodeId为 {nodeId} 的变量对象!"); - continue; - } + // 检查是否达到轮询时间。 + if ((DateTime.Now - variable.UpdateTime) < interval) + continue; // 未到轮询时间,跳过。 - if (!StatusCode.IsGood(value.StatusCode)) - { - NlogHelper.Warn( - $"读取 OPC UA 变量 {variable.Name} ({variable.OpcUaNodeId}) 失败: {value.StatusCode}"); - continue; - } + nodesToRead.Add(new ReadValueId + { + NodeId = new NodeId(variable.OpcUaNodeId), + AttributeId = Attributes.Value + }); + } + // 如果没有要读取的变量则跳过 + if (nodesToRead.Count == 0) + return; // 跳过此设备 - // 更新变量数据 - variable.DataValue = value.Value.ToString(); - variable.DisplayValue = value.Value.ToString(); // 或者根据需要进行格式化 - variable.UpdateTime = DateTime.Now; - NlogHelper.Info($"轮询变量:{variable.Name},值:{variable.DataValue}"); - // Console.WriteLine($"结果变量跟更新时间:{variable.UpdateTime}"); - // await _dataServices.UpdateVariableDataAsync(variable); - } - } - } - catch (Exception ex) - { - NotificationHelper.ShowError($"OPC UA 轮询期间发生错误: {ex.Message}", ex); - } + var readResponse = await session.ReadAsync( + null, + 0, + TimestampsToReturn.Both, + nodesToRead, + stoppingToken); + + var results = readResponse.Results; + var diagnosticInfos = readResponse.DiagnosticInfos; + + if (results == null || results.Count == 0) + return; // 没有读取到结果 + + for (int i = 0; i < results.Count; i++) + { + var value = results[i]; + var nodeId = nodesToRead[i] + .NodeId.ToString(); + if (!_opcUaNodeIdVariableDic.TryGetValue( + nodeId, out var variable)) + { + NlogHelper.Warn( + $"在字典中未找到OpcUaNodeId为 {nodeId} 的变量对象!"); + continue; + } + + if (!StatusCode.IsGood(value.StatusCode)) + { + NlogHelper.Warn( + $"读取 OPC UA 变量 {variable.Name} ({variable.OpcUaNodeId}) 失败: {value.StatusCode}"); + continue; + } + + // 更新变量数据并入队 + await UpdateAndEnqueueVariableData(variable, value.Value); + } + }) + .ToList(); + + // 等待所有设备的轮询任务完成 + await Task.WhenAll(pollingTasks); + } + catch (OperationCanceledException) + { + NlogHelper.Info("OPC UA 后台服务轮询变量被取消。"); + } + catch (Exception ex) + { + NotificationHelper.ShowError($"OPC UA 后台服务在轮询变量过程中发生错误:{ex.Message}", ex); + } + } + + /// + /// 更新变量数据,并将其推送到数据处理队列。 + /// + /// 要更新的变量。 + /// 读取到的数据值。 + private async Task UpdateAndEnqueueVariableData(VariableData variable, object value) + { + try + { + // 更新变量的原始数据值和显示值。 + variable.DataValue = value.ToString(); + variable.DisplayValue = value.ToString(); // 或者根据需要进行格式化 + variable.UpdateTime = DateTime.Now; + NlogHelper.Info($"轮询变量:{variable.Name},值:{variable.DataValue}"); + // 将更新后的数据推入处理队列。 + await _dataProcessingService.EnqueueAsync(variable); + } + catch (Exception ex) + { + NlogHelper.Error($"更新变量 {variable.Name} 并入队失败:{ex.Message}", ex); + } + } + + /// + /// 设置 OPC UA 订阅并添加监控项。 + /// + /// 取消令牌。 + private async Task SetupOpcUaSubscriptionAsync(CancellationToken stoppingToken) + { + if (stoppingToken.IsCancellationRequested) + { + return; } - NlogHelper.Info("OpcUa轮询变量线程已停止。"); + var setupSubscriptionTasks = new List(); + + foreach (var deviceId in _subVariableDic.Keys.ToList()) + { + setupSubscriptionTasks.Add(Task.Run(() => + { + if (stoppingToken.IsCancellationRequested) + { + return; // 任务被取消,退出循环 + } + + var device = _dataServices.Devices.FirstOrDefault(d => d.Id == deviceId); + if (device == null) + { + NlogHelper.Warn($"未找到ID为 {deviceId} 的设备,无法设置订阅。"); + return; + } + + Subscription subscription = null; + // 得到session + if (!_sessionsDic.TryGetValue(device.OpcUaEndpointUrl, out var session)) + { + NlogHelper.Info($"从OpcUa会话字典中获取会话失败: {device.OpcUaEndpointUrl} "); + return; + } + + // 判断设备是否已经添加了订阅 + if (_subscriptionsDic.TryGetValue(device.OpcUaEndpointUrl, out subscription)) + { + NlogHelper.Info($"OPC UA 终结点 {device.OpcUaEndpointUrl} 已存在订阅。"); + } + else + { + subscription = new Subscription(session.DefaultSubscription); + subscription.PublishingInterval = OpcUaSubscriptionPublishingIntervalMs; // 发布间隔(毫秒) + session.AddSubscription(subscription); + subscription.Create(); + _subscriptionsDic.AddOrUpdate(device.OpcUaEndpointUrl, subscription, + (key, oldValue) => subscription); + } + + // 将变量添加到订阅 + if (_subVariableDic.TryGetValue(deviceId, out var variablesToSubscribe)) + { + foreach (VariableData variable in variablesToSubscribe) + { + // 7. 创建监控项并添加到订阅中。 + MonitoredItem monitoredItem = new MonitoredItem(subscription.DefaultItem); + monitoredItem.DisplayName = variable.Name; + monitoredItem.StartNodeId = new NodeId(variable.OpcUaNodeId); // 设置要监控的节点 ID + monitoredItem.AttributeId = Attributes.Value; // 监控节点的值属性 + monitoredItem.SamplingInterval = OpcUaSubscriptionSamplingIntervalMs; // 采样间隔(毫秒) + monitoredItem.QueueSize = 1; // 队列大小 + monitoredItem.DiscardOldest = true; // 丢弃最旧的数据 + // 注册数据变化通知事件。 + monitoredItem.Notification += (sender, e) => OnSubNotification(variable,sender, e); + + subscription.AddItem(monitoredItem); + } + + subscription.ApplyChanges(); // 应用更改 + NlogHelper.Info($"设备: {device.Name}, 添加了 {variablesToSubscribe.Count} 个订阅变量。"); + } + })); + } + + await Task.WhenAll(setupSubscriptionTasks); } /// /// 订阅变量变化的通知 /// + /// 发生变化的变量。 /// /// - private void OnSubNotification(MonitoredItem monitoredItem, MonitoredItemNotificationEventArgs e) + private async void OnSubNotification(VariableData variable, MonitoredItem monitoredItem, + MonitoredItemNotificationEventArgs e) { + foreach (var value in monitoredItem.DequeueValues()) { - NlogHelper.Info( - $"[OPC UA 通知] {monitoredItem.DisplayName}: {value.Value} | 时间戳: {value.SourceTimestamp.ToLocalTime()} | 状态: {value.StatusCode}"); Console.WriteLine( - $"[通知] {monitoredItem.DisplayName}: {value.Value} | 时间戳: {value.SourceTimestamp.ToLocalTime()} | 状态: {value.StatusCode}"); + $"[OPC UA 通知] {monitoredItem.DisplayName}: {value.Value} | 时间戳: {value.SourceTimestamp.ToLocalTime()} | 状态: {value.StatusCode}"); + if (StatusCode.IsGood(value.StatusCode)) + { + await UpdateAndEnqueueVariableData(variable, value.Value); + } } } - /// /// 断开所有 OPC UA 会话。 /// - private void DisconnectAllOpcUaSessions() + private async Task DisconnectAllOpcUaSessionsAsync() { + if (_sessionsDic.IsEmpty) + return; + NlogHelper.Info("正在断开所有 OPC UA 会话..."); + var closeTasks = new List(); + foreach (var endpointUrl in _sessionsDic.Keys.ToList()) { - NlogHelper.Info($"正在断开 OPC UA 会话: {endpointUrl}"); - if (_sessionsDic.TryGetValue(endpointUrl, out var session)) + closeTasks.Add(Task.Run(async () => { - if (_subscriptionsDic.TryGetValue(endpointUrl, out var subscription)) + NlogHelper.Info($"正在断开 OPC UA 会话: {endpointUrl}"); + if (_sessionsDic.TryRemove(endpointUrl, out var session)) { - // 删除订阅。 - subscription.Delete(true); - _subscriptionsDic.Remove(endpointUrl); + if (_subscriptionsDic.TryRemove(endpointUrl, out var subscription)) + { + // 删除订阅。 + await subscription.DeleteAsync(true); + } + + // 关闭会话。 + await session.CloseAsync(); + NotificationHelper.ShowInfo($"已从 OPC UA 服务器断开连接: {endpointUrl}"); } - - - // 关闭会话。 - session.Close(); - _sessionsDic.Remove(endpointUrl); - NotificationHelper.ShowInfo($"已从 OPC UA 服务器断开连接: {endpointUrl}"); - } + })); } - } - - /// - /// 设置 OPC UA 订阅并添加监控项。 - /// - /// OPC UA 会话。 - /// 要订阅的变量信息。 - /// OPC UA 服务器的终结点 URL。 - private void SetupOpcUaSubscription() - { - foreach (var deviceId in _subVariableDic.Keys) - { - var device = _dataServices.Devices.FirstOrDefault(d => d.Id == deviceId); - Subscription subscription = null; - // 得到session - if (!_sessionsDic.TryGetValue(device.OpcUaEndpointUrl, out var session)) - { - NlogHelper.Info($"从OpcUa会话字典中获取会话失败: {device.OpcUaEndpointUrl} "); - continue; - } - - // 判断设备是否已经添加了订阅 - if (_subscriptionsDic.TryGetValue(device.OpcUaEndpointUrl, out subscription)) - { - NlogHelper.Info($"OPC UA 终结点 {device.OpcUaEndpointUrl} 已存在订阅。"); - } - else - { - subscription = new Subscription(session.DefaultSubscription); - subscription.PublishingInterval = 1000; // 发布间隔(毫秒) - session.AddSubscription(subscription); - subscription.Create(); - _subscriptionsDic[device.OpcUaEndpointUrl] = subscription; - } - - // 将变量添加到订阅 - foreach (VariableData variable in _subVariableDic[deviceId]) - { - // 7. 创建监控项并添加到订阅中。 - MonitoredItem monitoredItem = new MonitoredItem(subscription.DefaultItem); - monitoredItem.DisplayName = variable.Name; - monitoredItem.StartNodeId = new NodeId(variable.OpcUaNodeId); // 设置要监控的节点 ID - monitoredItem.AttributeId = Attributes.Value; // 监控节点的值属性 - monitoredItem.SamplingInterval = 1000; // 采样间隔(毫秒) - monitoredItem.QueueSize = 1; // 队列大小 - monitoredItem.DiscardOldest = true; // 丢弃最旧的数据 - // 注册数据变化通知事件。 - monitoredItem.Notification += OnSubNotification; - - subscription.AddItem(monitoredItem); - subscription.ApplyChanges(); // 应用更改 - } - - NlogHelper.Info($"设备: {device.Name}, 添加了 {(_subVariableDic[deviceId]?.Count ?? 0)} 个订阅变量。"); - } + await Task.WhenAll(closeTasks); } } } \ No newline at end of file diff --git a/Services/S7BackgroundService.cs b/Services/S7BackgroundService.cs index 3c7d189..007d603 100644 --- a/Services/S7BackgroundService.cs +++ b/Services/S7BackgroundService.cs @@ -138,8 +138,11 @@ namespace PMSWPF.Services /// 设备新的IsActive状态。 private async void HandleDeviceIsActiveChanged(Device device, bool isActive) { + if (device.ProtocolType != ProtocolType.S7) + return; + + NlogHelper.Info($"设备 {device.Name} (ID: {device.Id}) 的IsActive状态改变为 {isActive}。"); - if (!isActive) { // 设备变为非活动状态,断开连接 @@ -150,7 +153,7 @@ namespace PMSWPF.Services if (plcClient.IsConnected) { plcClient.Close(); - NlogHelper.Info($"已断开设备 {device.Name} ({device.Ip}) 的连接。"); + NotificationHelper.ShowSuccess($"已断开设备 {device.Name} ({device.Ip}) 的连接。"); } } catch (Exception ex) diff --git a/ViewModels/Dialogs/OpcUaImportDialogViewModel.cs b/ViewModels/Dialogs/OpcUaImportDialogViewModel.cs index 1ea1eea..d7cd110 100644 --- a/ViewModels/Dialogs/OpcUaImportDialogViewModel.cs +++ b/ViewModels/Dialogs/OpcUaImportDialogViewModel.cs @@ -38,7 +38,8 @@ public partial class OpcUaImportDialogViewModel : ObservableObject OpcUaNodes = new ObservableCollection(); SelectedNodeVariables = new ObservableCollection(); // Automatically connect when the ViewModel is created - _ = Connect().ConfigureAwait(false); + ConnectCommand.Execute(null); + } [RelayCommand] @@ -49,7 +50,7 @@ public partial class OpcUaImportDialogViewModel : ObservableObject // 断开现有连接 if (_session != null && _session.Connected) { - _session.Close(); + await _session.CloseAsync(); _session.Dispose(); _session = null; } @@ -69,7 +70,6 @@ public partial class OpcUaImportDialogViewModel : ObservableObject catch (Exception ex) { IsConnected = false; - NlogHelper.Error($"连接 OPC UA 服务器失败: {EndpointUrl} - {ex.Message}", ex); NotificationHelper.ShowError($"连接 OPC UA 服务器失败: {EndpointUrl} - {ex.Message}", ex); } } diff --git a/ViewModels/SettingViewModel.cs b/ViewModels/SettingViewModel.cs index 971d28b..09aa293 100644 --- a/ViewModels/SettingViewModel.cs +++ b/ViewModels/SettingViewModel.cs @@ -185,14 +185,14 @@ public partial class SettingViewModel : ViewModelBase _connectionSettings.EnableOpcUaService = value; OnPropertyChanged(); _connectionSettings.Save(); - if (value) - { - _opcUaBackgroundService.StartService(); - } - else - { - _opcUaBackgroundService.StopService(); - } + // if (value) + // { + // _opcUaBackgroundService.StartService(); + // } + // else + // { + // _opcUaBackgroundService.StopService(); + // } } } }