From 310a7f86748b8b109f81cc557ace88987d1c2a6f Mon Sep 17 00:00:00 2001 From: "David P.G" Date: Thu, 10 Jul 2025 17:16:15 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BB=99OPCUC=E5=90=8E=E5=8F=B0=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1=E6=B7=BB=E5=8A=A0=E4=BA=86=E6=9B=B4=E6=96=B0=E7=B1=BB?= =?UTF-8?q?=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Data/Entities/DbVariableData.cs | 6 + Enums/OpcUaUpdateType.cs | 11 + Enums/PollLevelType.cs | 4 +- Models/VariableData.cs | 6 + Services/DataServices.cs | 36 +- Services/MqttBackgroundService.cs | 58 +-- Services/OpcUaBackgroundService.cs | 588 +++++++++++++++++++---------- ViewModels/DevicesViewModel.cs | 5 +- 8 files changed, 478 insertions(+), 236 deletions(-) create mode 100644 Enums/OpcUaUpdateType.cs diff --git a/Data/Entities/DbVariableData.cs b/Data/Entities/DbVariableData.cs index f39987b..3b07ac1 100644 --- a/Data/Entities/DbVariableData.cs +++ b/Data/Entities/DbVariableData.cs @@ -100,6 +100,12 @@ public class DbVariableData [SugarColumn(ColumnDataType = "varchar(20)",IsNullable =true, SqlParameterDbType = typeof(EnumToStringConvert))] public PollLevelType PollLevelType { get; set; } + /// + /// OPC UA更新类型,例如轮询或订阅。 + /// + [SugarColumn(ColumnDataType = "varchar(20)", IsNullable = true, SqlParameterDbType = typeof(EnumToStringConvert))] + public OpcUaUpdateType OpcUaUpdateType { get; set; } + /// /// 指示变量是否已被逻辑删除。 /// diff --git a/Enums/OpcUaUpdateType.cs b/Enums/OpcUaUpdateType.cs new file mode 100644 index 0000000..2adab26 --- /dev/null +++ b/Enums/OpcUaUpdateType.cs @@ -0,0 +1,11 @@ +using System.ComponentModel; + +namespace PMSWPF.Enums; + +public enum OpcUaUpdateType +{ + [Description("OpcUa轮询")] + OpcUaPoll, + [Description("OpcUa订阅")] + OpcUaSubscription +} \ No newline at end of file diff --git a/Enums/PollLevelType.cs b/Enums/PollLevelType.cs index 369e693..2334b22 100644 --- a/Enums/PollLevelType.cs +++ b/Enums/PollLevelType.cs @@ -29,6 +29,8 @@ namespace PMSWPF.Enums [Description("10分钟")] TenMinutes = 600000, [Description("30分钟")] - ThirtyMinutes = 1800000 + ThirtyMinutes = 1800000, + [Description("1小时")] + OneHour = 3600000 } } \ No newline at end of file diff --git a/Models/VariableData.cs b/Models/VariableData.cs index 5259a9b..0cc224c 100644 --- a/Models/VariableData.cs +++ b/Models/VariableData.cs @@ -100,6 +100,12 @@ public partial class VariableData : ObservableObject [ObservableProperty] private PollLevelType pollLevelType = PollLevelType.ThirtySeconds; + /// + /// OPC UA更新类型,例如轮询或订阅。 + /// + [ObservableProperty] + private OpcUaUpdateType opcUaUpdateType = OpcUaUpdateType.OpcUaPoll; + /// /// 最后一次轮询时间。 /// diff --git a/Services/DataServices.cs b/Services/DataServices.cs index 8244896..293279c 100644 --- a/Services/DataServices.cs +++ b/Services/DataServices.cs @@ -60,7 +60,7 @@ public partial class DataServices : ObservableRecipient, IRecipient public event EventHandler> OnMqttListChanged; // 变量数据变更事件,当变量数据更新时触发。 - public event EventHandler> OnVariableDataChanged; + public event Action> OnVariableDataChanged; /// /// 当_devices属性值改变时触发的局部方法,用于调用OnDeviceListChanged事件。 @@ -69,6 +69,20 @@ public partial class DataServices : ObservableRecipient, IRecipient partial void OnDevicesChanged(List devices) { OnDeviceListChanged?.Invoke(this, devices); + + + VariableDatas.Clear(); + foreach (Device device in devices) + { + foreach (VariableTable variableTable in device.VariableTables) + { + foreach (VariableData variableData in variableTable.DataVariables) + { + VariableDatas.Add(variableData); + } + } + } + OnVariableDataChanged?.Invoke(VariableDatas); } /// @@ -106,6 +120,7 @@ public partial class DataServices : ObservableRecipient, IRecipient _menuRepository = new MenuRepository(); _mqttRepository = new MqttRepository(); _varDataRepository = new VarDataRepository(); + _variableDatas = new List(); } /// @@ -123,7 +138,6 @@ public partial class DataServices : ObservableRecipient, IRecipient await LoadDevices(); await LoadMenus(); await LoadMqtts(); - await LoadVariableDatas(); break; case LoadTypes.Devices: // 仅加载设备数据 await LoadDevices(); @@ -184,14 +198,6 @@ public partial class DataServices : ObservableRecipient, IRecipient Mqtts = await _mqttRepository.GetAll(); } - /// - /// 异步获取所有变量数据。 - /// - /// 包含所有变量数据的列表。 - public async Task> GetAllVariableDataAsync() - { - return await _varDataRepository.GetAllAsync(); - } /// /// 异步根据ID获取设备数据。 @@ -211,4 +217,14 @@ public partial class DataServices : ObservableRecipient, IRecipient { VariableDatas = await _varDataRepository.GetAllAsync(); } + + /// + /// 异步更新变量数据。 + /// + /// 要更新的变量数据。 + /// 表示异步操作的任务。 + public async Task UpdateVariableDataAsync(VariableData variableData) + { + await _varDataRepository.UpdateAsync(variableData); + } } \ No newline at end of file diff --git a/Services/MqttBackgroundService.cs b/Services/MqttBackgroundService.cs index d4660b1..19a58d3 100644 --- a/Services/MqttBackgroundService.cs +++ b/Services/MqttBackgroundService.cs @@ -21,12 +21,16 @@ namespace PMSWPF.Services { // 数据服务实例,用于访问和操作应用程序数据,如MQTT配置和变量数据。 private readonly DataServices _dataServices; + // 存储MQTT客户端实例的字典,键为MQTT配置ID,值为IMqttClient对象。 private readonly Dictionary _mqttClients; + // 存储MQTT配置的字典,键为MQTT配置ID,值为Mqtt模型对象。 private readonly Dictionary _mqttConfigurations; + // 存储与MQTT配置关联的变量数据的字典,键为MQTT配置ID,值为VariableData列表。 private readonly Dictionary> _mqttVariableData; + // 定时器,用于周期性地执行数据发布任务。 private Timer _timer; @@ -86,12 +90,11 @@ namespace PMSWPF.Services await client.DisconnectAsync(); } } + // 清空所有字典。 _mqttClients.Clear(); _mqttConfigurations.Clear(); _mqttVariableData.Clear(); - - } /// @@ -111,7 +114,7 @@ namespace PMSWPF.Services foreach (var variable in variables) { // 如果变量已被修改(IsModified标志为true)。 - if (variable.IsModified) + if (variable.IsModified) { // 获取发布主题。 var topic = _mqttConfigurations[mqttConfigId].PublishTopic; @@ -119,13 +122,15 @@ namespace PMSWPF.Services { // 构建MQTT消息。 var message = new MqttApplicationMessageBuilder() - .WithTopic($"{topic}/{variable.Name}") // 主题格式:PublishTopic/VariableName - .WithPayload(variable.DataValue) // 消息载荷为变量的值 - .Build(); + .WithTopic($"{topic}/{variable.Name}") // 主题格式:PublishTopic/VariableName + .WithPayload(variable.DataValue) // 消息载荷为变量的值 + .Build(); // 发布MQTT消息。 await client.PublishAsync(message); - NlogHelper.Info($"Published {variable.Name} = {variable.DataValue} to {topic}/{variable.Name}",throttle:true); // 记录发布信息 + NlogHelper.Info( + $"Published {variable.Name} = {variable.DataValue} to {topic}/{variable.Name}", + throttle: true); // 记录发布信息 variable.IsModified = false; // 发布后重置修改标志。 } } @@ -142,11 +147,14 @@ namespace PMSWPF.Services { // 从数据服务获取所有MQTT配置。 var allMqtts = await _dataServices.GetMqttsAsync(); - var activeMqtts = allMqtts.Where(m => m.IsActive).ToList(); - var activeMqttIds = activeMqtts.Select(m => m.Id).ToHashSet(); + var activeMqtts = allMqtts.Where(m => m.IsActive) + .ToList(); + var activeMqttIds = activeMqtts.Select(m => m.Id) + .ToHashSet(); // 断开并移除不再活跃或已删除的MQTT客户端。 - var clientsToDisconnect = _mqttClients.Keys.Except(activeMqttIds).ToList(); + var clientsToDisconnect = _mqttClients.Keys.Except(activeMqttIds) + .ToList(); foreach (var id in clientsToDisconnect) { if (_mqttClients.TryGetValue(id, out var client)) @@ -160,9 +168,12 @@ namespace PMSWPF.Services mqttConfig.IsConnected = false; } } + _mqttClients.Remove(id); - NlogHelper.Info($"Disconnected and removed MQTT client for ID: {id} (no longer active or removed)."); + NlogHelper.Info( + $"Disconnected and removed MQTT client for ID: {id} (no longer active or removed)."); } + _mqttConfigurations.Remove(id); _mqttVariableData.Remove(id); } @@ -174,6 +185,7 @@ namespace PMSWPF.Services { await ConnectMqttClient(mqtt); } + // 始终更新或添加MQTT配置到字典。 _mqttConfigurations[mqtt.Id] = mqtt; } @@ -193,11 +205,11 @@ namespace PMSWPF.Services var client = factory.CreateMqttClient(); // 构建MQTT客户端连接选项。 var options = new MqttClientOptionsBuilder() - .WithClientId(mqtt.ClientID) - .WithTcpServer(mqtt.Host, mqtt.Port) - .WithCredentials(mqtt.UserName, mqtt.PassWord) - .WithCleanSession() // 清理会话,每次连接都是新会话 - .Build(); + .WithClientId(mqtt.ClientID) + .WithTcpServer(mqtt.Host, mqtt.Port) + .WithCredentials(mqtt.UserName, mqtt.PassWord) + .WithCleanSession() // 清理会话,每次连接都是新会话 + .Build(); // 设置连接成功事件处理程序。 client.UseConnectedHandler(e => @@ -221,7 +233,7 @@ namespace PMSWPF.Services } catch (Exception ex) { - NlogHelper.Error($"Failed to reconnect to MQTT broker: {mqtt.Name}",ex ); + NlogHelper.Error($"Failed to reconnect to MQTT broker: {mqtt.Name}", ex); } }); @@ -243,7 +255,9 @@ namespace PMSWPF.Services private async Task LoadVariableData() { // 从数据服务获取所有变量数据。 - var allVariables = await _dataServices.GetAllVariableDataAsync(); + var allVariables = _dataServices.VariableDatas; + if (!allVariables.Any()) + return; _mqttVariableData.Clear(); // 清空现有数据 // 遍历所有变量,并根据其关联的MQTT配置进行分组。 @@ -258,8 +272,10 @@ namespace PMSWPF.Services { _mqttVariableData[mqtt.Id] = new List(); } + // 将变量添加到对应MQTT配置的列表中。 - _mqttVariableData[mqtt.Id].Add(variable); + _mqttVariableData[mqtt.Id] + .Add(variable); } } } @@ -283,11 +299,11 @@ namespace PMSWPF.Services /// /// 事件发送者。 /// 更新后的变量数据列表。 - private async void HandleVariableDataChanged(object sender, List variableDatas) + private async void HandleVariableDataChanged( List variableDatas) { NlogHelper.Info("Variable data changed. Reloading variable associations."); // 记录变量数据变化信息 // 重新加载变量数据。 await LoadVariableData(); } } -} +} \ No newline at end of file diff --git a/Services/OpcUaBackgroundService.cs b/Services/OpcUaBackgroundService.cs index 80be192..55f8c77 100644 --- a/Services/OpcUaBackgroundService.cs +++ b/Services/OpcUaBackgroundService.cs @@ -66,6 +66,7 @@ namespace PMSWPF.Services private readonly Dictionary _opcUaSubscriptions; // 存储活动的 OPC UA 变量,键为变量的唯一 ID。 private readonly Dictionary _opcUaVariables; // Key: VariableData.Id + private readonly Dictionary _opcUaPollingTasks; // Key: VariableData.Id, Value: CancellationTokenSource for polling task /// /// OpcUaBackgroundService 的构造函数。 @@ -77,6 +78,7 @@ namespace PMSWPF.Services _opcUaSessions = new Dictionary(); _opcUaSubscriptions = new Dictionary(); _opcUaVariables = new Dictionary(); + _opcUaPollingTasks = new Dictionary(); } /// @@ -102,6 +104,10 @@ namespace PMSWPF.Services try { _cancellationTokenSource.Cancel(); + NlogHelper.Info("OpcUaBackgroundService stopping."); + // 服务停止时,取消订阅事件并断开所有 OPC UA 连接。 + _dataServices.OnVariableDataChanged -= HandleVariableDataChanged; + await DisconnectAllOpcUaSessions(); } finally { @@ -120,16 +126,13 @@ namespace PMSWPF.Services await LoadOpcUaVariables(); // 循环运行,直到接收到停止信号。 - while (!stoppingToken.IsCancellationRequested) - { - // 可以在这里添加周期性任务,例如检查和重新连接断开的会话。 - await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken); - } + // while (!stoppingToken.IsCancellationRequested) + // { + // // 可以在这里添加周期性任务,例如检查和重新连接断开的会话。 + // await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken); + // } - NlogHelper.Info("OpcUaBackgroundService stopping."); - // 服务停止时,取消订阅事件并断开所有 OPC UA 连接。 - _dataServices.OnVariableDataChanged -= HandleVariableDataChanged; - await DisconnectAllOpcUaSessions(); + } /// @@ -138,11 +141,356 @@ namespace PMSWPF.Services private async Task LoadOpcUaVariables() { NlogHelper.Info("正在加载 OPC UA 变量..."); - var allVariables = await _dataServices.GetAllVariableDataAsync(); - var opcUaVariables = allVariables.Where(v => v.ProtocolType == ProtocolType.OpcUA && v.IsActive).ToList(); + // var allVariables = await _dataServices.GetAllVariableDataAsync(); + var opcUaVariables = _dataServices.VariableDatas.Where(v => v.ProtocolType == ProtocolType.OpcUA && v.IsActive).ToList(); + + if (opcUaVariables==null || opcUaVariables.Count==0) + return; // 清理不再活跃或已删除的变量。 - var currentOpcUaVariableIds = opcUaVariables.Select(v => v.Id).ToHashSet(); + await RemoveInactiveOpcUaVariables(opcUaVariables); + + // 处理新增或更新的变量。 + await ProcessActiveOpcUaVariables(opcUaVariables); + } + + /// + /// 连接到 OPC UA 服务器并订阅或轮询指定的变量。 + /// + /// 要订阅或轮询的变量信息。 + /// 变量所属的设备信息。 + private async Task ConnectAndSubscribeOpcUa(VariableData variable, Device device) + { + NlogHelper.Info($"正在为变量 '{variable.Name}' 连接和处理 OPC UA 服务器... (更新类型: {variable.OpcUaUpdateType})"); + if (string.IsNullOrEmpty(device.OpcUaEndpointUrl) || string.IsNullOrEmpty(variable.OpcUaNodeId)) + { + NlogHelper.Warn($"OPC UA variable {variable.Name} has invalid EndpointUrl or NodeId."); + return; + } + + Session session = null; + // 检查是否已存在到该终结点的活动会话。 + if (_opcUaSessions.TryGetValue(device.OpcUaEndpointUrl, out session) && session.Connected) + { + NlogHelper.Info($"Already connected to OPC UA endpoint: {device.OpcUaEndpointUrl}"); + } + else + { + session = await CreateOpcUaSessionAsync(device.OpcUaEndpointUrl); + if (session == null) + { + return; // 连接失败,直接返回 + } + } + + if (variable.OpcUaUpdateType == OpcUaUpdateType.OpcUaSubscription) + { + SetupOpcUaSubscription(session, variable, device.OpcUaEndpointUrl); + } + else if (variable.OpcUaUpdateType == OpcUaUpdateType.OpcUaPoll) + { + StartOpcUaPolling(session, variable, device); + } + } + + private async Task PollOpcUaVariable(Session session, VariableData variable, Device device, CancellationToken cancellationToken) + { + while (!cancellationToken.IsCancellationRequested) + { + try + { + if (session != null && session.Connected) + { + var nodeToRead = new ReadValueId + { + NodeId = new NodeId(variable.OpcUaNodeId), + AttributeId = Attributes.Value + }; + + var nodesToRead = new ReadValueIdCollection { nodeToRead }; + session.Read( + null, + 0, + TimestampsToReturn.Both, + nodesToRead, + out DataValueCollection results, + out DiagnosticInfoCollection diagnosticInfos); + + if (results != null && results.Count > 0) + { + var value = results[0]; + if (StatusCode.IsGood(value.StatusCode)) + { + NlogHelper.Info($"[OPC UA 轮询] {variable.Name}: {value.Value} | 时间戳: {value.SourceTimestamp.ToLocalTime()} | 状态: {value.StatusCode}"); + // 更新变量数据 + variable.DataValue = value.Value.ToString(); + variable.DisplayValue = value.Value.ToString(); // 或者根据需要进行格式化 + // await _dataServices.UpdateVariableDataAsync(variable); + } + else + { + NlogHelper.Warn($"Failed to read OPC UA variable {variable.Name} ({variable.OpcUaNodeId}): {value.StatusCode}"); + } + } + } + else + { + NlogHelper.Warn($"OPC UA session for {device.OpcUaEndpointUrl} is not connected. Attempting to reconnect..."); + // 尝试重新连接会话 + await ConnectAndSubscribeOpcUa(variable, device); + } + } + catch (Exception ex) + { + NlogHelper.Error($"Error during OPC UA polling for {variable.Name}: {ex.Message}", ex); + } + + // 根据 PollLevelType 设置轮询间隔 + var pollInterval = GetPollInterval(variable.PollLevelType); + await Task.Delay(pollInterval, cancellationToken); + } + NlogHelper.Info($"Polling for OPC UA variable {variable.Name} stopped."); + } + + private void OnNotification(MonitoredItem monitoredItem, MonitoredItemNotificationEventArgs e) + { + foreach (var value in monitoredItem.DequeueValues()) + { + NlogHelper.Info($"[OPC UA 通知] {monitoredItem.DisplayName}: {value.Value} | 时间戳: {value.SourceTimestamp.ToLocalTime()} | 状态: {value.StatusCode}"); + Console.WriteLine($"[通知] {monitoredItem.DisplayName}: {value.Value} | 时间戳: {value.SourceTimestamp.ToLocalTime()} | 状态: {value.StatusCode}"); + } + } + + /// + /// 根据 PollLevelType 获取轮询间隔。 + /// + /// 轮询级别类型。 + /// 时间间隔。 + private TimeSpan GetPollInterval(PollLevelType pollLevelType) + { + return pollLevelType switch + { + PollLevelType.OneSecond => TimeSpan.FromSeconds(1), + PollLevelType.FiveSeconds => TimeSpan.FromSeconds(5), + PollLevelType.TenSeconds => TimeSpan.FromSeconds(10), + PollLevelType.ThirtySeconds => TimeSpan.FromSeconds(30), + PollLevelType.OneMinute => TimeSpan.FromMinutes(1), + PollLevelType.FiveMinutes => TimeSpan.FromMinutes(5), + PollLevelType.TenMinutes => TimeSpan.FromMinutes(10), + PollLevelType.ThirtyMinutes => TimeSpan.FromMinutes(30), + PollLevelType.OneHour => TimeSpan.FromHours(1), + _ => TimeSpan.FromSeconds(1), // 默认1秒 + }; + } + + /// + /// 断开与指定 OPC UA 服务器的连接。 + /// + /// OPC UA 服务器的终结点 URL。 + private async Task DisconnectOpcUaSession(string endpointUrl) + { + NlogHelper.Info($"正在断开 OPC UA 会话: {endpointUrl}"); + if (_opcUaSessions.TryGetValue(endpointUrl, out var session)) + { + if (_opcUaSubscriptions.TryGetValue(endpointUrl, out var subscription)) + { + // 删除订阅。 + subscription.Delete(true); + _opcUaSubscriptions.Remove(endpointUrl); + } + + // 取消与此会话相关的轮询任务 + var variablesToCancelPolling = _opcUaVariables.Where(kv => kv.Value.VariableTable != null && kv.Value.VariableTable.DeviceId.HasValue && _dataServices.GetDeviceByIdAsync(kv.Value.VariableTable.DeviceId.Value).Result?.OpcUaEndpointUrl == endpointUrl && kv.Value.OpcUaUpdateType == OpcUaUpdateType.OpcUaPoll).ToList(); + foreach (var entry in variablesToCancelPolling) + { + if (_opcUaPollingTasks.ContainsKey(entry.Key)) + { + var cts = _opcUaPollingTasks[entry.Key]; + _opcUaPollingTasks.Remove(entry.Key); + cts.Cancel(); + cts.Dispose(); + NlogHelper.Info($"Cancelled polling for variable: {entry.Value.Name} (ID: {entry.Key})"); + } + } + + // 关闭会话。 + session.Close(); + _opcUaSessions.Remove(endpointUrl); + NlogHelper.Info($"Disconnected from OPC UA server: {endpointUrl}"); + NotificationHelper.ShowInfo($"已从 OPC UA 服务器断开连接: {endpointUrl}"); + } + } + + /// + /// 断开所有 OPC UA 会话。 + /// + private async Task DisconnectAllOpcUaSessions() + { + NlogHelper.Info("正在断开所有 OPC UA 会话..."); + foreach (var endpointUrl in _opcUaSessions.Keys.ToList()) + { + await DisconnectOpcUaSession(endpointUrl); + } + } + + /// + /// 处理变量数据变化的事件。 + /// 当数据库中的变量信息发生变化时,此方法被调用以重新加载和配置 OPC UA 变量。 + /// + /// 事件发送者。 + /// 变化的变量数据列表。 + private async void HandleVariableDataChanged( List variableDatas) + { + NlogHelper.Info("Variable data changed. Reloading OPC UA variables."); + await LoadOpcUaVariables(); + } + + /// + /// 创建并配置 OPC UA 会话。 + /// + /// OPC UA 服务器的终结点 URL。 + /// 创建的 Session 对象,如果失败则返回 null。 + private async Task CreateOpcUaSessionAsync(string endpointUrl) + { + try + { + // 1. 创建应用程序配置 + var application = new ApplicationInstance + { + ApplicationName = "OpcUADemoClient", + ApplicationType = ApplicationType.Client, + ConfigSectionName = "Opc.Ua.Client" + }; + + var config = new ApplicationConfiguration() + { + ApplicationName = application.ApplicationName, + ApplicationUri = $"urn:{System.Net.Dns.GetHostName()}:OpcUADemoClient", + ApplicationType = application.ApplicationType, + SecurityConfiguration = new SecurityConfiguration + { + ApplicationCertificate = new CertificateIdentifier + { + StoreType = "Directory", + StorePath = "%CommonApplicationData%/OPC Foundation/CertificateStores/MachineDefault", + SubjectName = application.ApplicationName + }, + TrustedIssuerCertificates = new CertificateTrustList + { + StoreType = "Directory", + StorePath = "%CommonApplicationData%/OPC Foundation/CertificateStores/UA Certificate Authorities" + }, + TrustedPeerCertificates = new CertificateTrustList + { + StoreType = "Directory", + StorePath = "%CommonApplicationData%/OPC Foundation/CertificateStores/UA Applications" + }, + RejectedCertificateStore = new CertificateTrustList + { + StoreType = "Directory", + StorePath = "%CommonApplicationData%/OPC Foundation/CertificateStores/RejectedCertificates" + }, + AutoAcceptUntrustedCertificates = true // 自动接受不受信任的证书 (仅用于测试) + }, + TransportQuotas = new TransportQuotas { OperationTimeout = 15000 }, + ClientConfiguration = new ClientConfiguration { DefaultSessionTimeout = 60000 }, + TraceConfiguration = new TraceConfiguration + { + OutputFilePath = "./Logs/OpcUaClient.log", DeleteOnLoad = true, + TraceMasks = Utils.TraceMasks.Error | Utils.TraceMasks.Security + } + }; + application.ApplicationConfiguration = config; + + // 验证并检查证书 + await config.Validate(ApplicationType.Client); + await application.CheckApplicationInstanceCertificate(false, 0); + + // 2. 查找并选择端点 (将 useSecurity 设置为 false 以进行诊断) + var selectedEndpoint = CoreClientUtils.SelectEndpoint(endpointUrl, false); + + var session = await Session.Create( + config, + new ConfiguredEndpoint(null, selectedEndpoint, EndpointConfiguration.Create(config)), + false, + "PMSWPF OPC UA Session", + 60000, + new UserIdentity(new AnonymousIdentityToken()), + null); + + _opcUaSessions[endpointUrl] = session; + NotificationHelper.ShowSuccess($"已连接到 OPC UA 服务器: {endpointUrl}"); + return session; + } + catch (Exception ex) + { + NotificationHelper.ShowError($"连接 OPC UA 服务器失败: {endpointUrl} - {ex.Message}", ex); + return null; + } + } + + /// + /// 设置 OPC UA 订阅并添加监控项。 + /// + /// OPC UA 会话。 + /// 要订阅的变量信息。 + /// OPC UA 服务器的终结点 URL。 + private void SetupOpcUaSubscription(Session session, VariableData variable, string endpointUrl) + { + Subscription subscription = null; + if (_opcUaSubscriptions.TryGetValue(endpointUrl, out subscription)) + { + NlogHelper.Info($"Already has subscription for OPC UA endpoint: {endpointUrl}"); + } + else + { + subscription = new Subscription(session.DefaultSubscription); + subscription.PublishingInterval = 1000; // 发布间隔(毫秒) + session.AddSubscription(subscription); + subscription.Create(); + _opcUaSubscriptions[endpointUrl] = subscription; + } + + // 7. 创建监控项并添加到订阅中。 + MonitoredItem monitoredItem = new MonitoredItem(subscription.DefaultItem); + monitoredItem.DisplayName = variable.Name; + monitoredItem.StartNodeId = new NodeId(variable.OpcUaNodeId); // 设置要监控的节点 ID + monitoredItem.AttributeId = Attributes.Value; // 监控节点的值属性 + monitoredItem.SamplingInterval = 1000; // 采样间隔(毫秒) + monitoredItem.QueueSize = 1; // 队列大小 + monitoredItem.DiscardOldest = true; // 丢弃最旧的数据 + // 注册数据变化通知事件。 + monitoredItem.Notification += OnNotification; + + subscription.AddItem(monitoredItem); + subscription.ApplyChanges(); // 应用更改 + NlogHelper.Info($"Subscribed to OPC UA variable: {variable.Name} ({variable.OpcUaNodeId})"); + } + + /// + /// 启动 OPC UA 变量的轮询任务。 + /// + /// OPC UA 会话。 + /// 要轮询的变量信息。 + /// 变量所属的设备信息。 + private void StartOpcUaPolling(Session session, VariableData variable, Device device) + { + if (!_opcUaPollingTasks.ContainsKey(variable.Id)) + { + var cts = new CancellationTokenSource(); + _opcUaPollingTasks.Add(variable.Id, cts); + _ = Task.Run(() => PollOpcUaVariable(session, variable, device, cts.Token), cts.Token); + NlogHelper.Info($"Started polling for OPC UA variable: {variable.Name} ({variable.OpcUaNodeId})"); + } + } + + /// + /// 移除不再活跃或已删除的 OPC UA 变量。 + /// + /// 当前所有活跃的 OPC UA 变量列表。 + private async Task RemoveInactiveOpcUaVariables(List activeOpcUaVariables) + { + var currentOpcUaVariableIds = activeOpcUaVariables.Select(v => v.Id).ToHashSet(); var variablesToRemove = _opcUaVariables.Keys.Except(currentOpcUaVariableIds).ToList(); NlogHelper.Info($"发现 {variablesToRemove.Count} 个要移除的 OPC UA 变量。"); @@ -151,18 +499,37 @@ namespace PMSWPF.Services { if (_opcUaVariables.TryGetValue(id, out var variable)) { - // 获取关联的设备信息 - var device = await _dataServices.GetDeviceByIdAsync(variable.VariableTable.DeviceId??0); - if (device != null) + if (variable.OpcUaUpdateType == OpcUaUpdateType.OpcUaSubscription) { - // 断开与该变量相关的 OPC UA 会话。 - await DisconnectOpcUaSession(device.OpcUaEndpointUrl); + // 获取关联的设备信息 + var device = await _dataServices.GetDeviceByIdAsync(variable.VariableTable.DeviceId??0); + if (device != null) + { + // 断开与该变量相关的 OPC UA 会话。 + await DisconnectOpcUaSession(device.OpcUaEndpointUrl); + } + } + else if (variable.OpcUaUpdateType == OpcUaUpdateType.OpcUaPoll) + { + if (_opcUaPollingTasks.ContainsKey(variable.Id)) + { + var cts = _opcUaPollingTasks[variable.Id]; + _opcUaPollingTasks.Remove(variable.Id); + cts.Cancel(); + cts.Dispose(); + } } _opcUaVariables.Remove(id); } } + } - // 处理新增或更新的变量。 + /// + /// 处理新增或更新的活跃 OPC UA 变量。 + /// + /// 当前所有活跃的 OPC UA 变量列表。 + private async Task ProcessActiveOpcUaVariables(List opcUaVariables) + { foreach (var variable in opcUaVariables) { // 获取关联的设备信息 @@ -191,190 +558,5 @@ namespace PMSWPF.Services } } } - - /// - /// 连接到 OPC UA 服务器并订阅指定的变量。 - /// - /// 要订阅的变量信息。 - /// 变量所属的设备信息。 - private async Task ConnectAndSubscribeOpcUa(VariableData variable, Device device) - { - NlogHelper.Info($"正在为变量 '{variable.Name}' 连接和订阅 OPC UA 服务器..."); - if (string.IsNullOrEmpty(device.OpcUaEndpointUrl) || string.IsNullOrEmpty(variable.OpcUaNodeId)) - { - NlogHelper.Warn($"OPC UA variable {variable.Name} has invalid EndpointUrl or NodeId."); - return; - } - - Session session = null; - // 检查是否已存在到该终结点的活动会话。 - if (_opcUaSessions.TryGetValue(device.OpcUaEndpointUrl, out session) && session.Connected) - { - NlogHelper.Info($"Already connected to OPC UA endpoint: {device.OpcUaEndpointUrl}"); - } - else - { - try - { - // 1. 创建应用程序配置 - var application = new ApplicationInstance - { - ApplicationName = "OpcUADemoClient", - ApplicationType = ApplicationType.Client, - ConfigSectionName = "Opc.Ua.Client" - }; - - var config = new ApplicationConfiguration() - { - ApplicationName = application.ApplicationName, - ApplicationUri = $"urn:{System.Net.Dns.GetHostName()}:OpcUADemoClient", - ApplicationType = application.ApplicationType, - SecurityConfiguration = new SecurityConfiguration - { - ApplicationCertificate = new CertificateIdentifier - { - StoreType = "Directory", - StorePath - = "%CommonApplicationData%/OPC Foundation/CertificateStores/MachineDefault", - SubjectName = application.ApplicationName - }, - TrustedIssuerCertificates - = new CertificateTrustList - { - StoreType = "Directory", - StorePath - = "%CommonApplicationData%/OPC Foundation/CertificateStores/UA Certificate Authorities" - }, - TrustedPeerCertificates - = new CertificateTrustList - { - StoreType = "Directory", - StorePath - = "%CommonApplicationData%/OPC Foundation/CertificateStores/UA Applications" - }, - RejectedCertificateStore - = new CertificateTrustList - { - StoreType = "Directory", - StorePath - = "%CommonApplicationData%/OPC Foundation/CertificateStores/RejectedCertificates" - }, - AutoAcceptUntrustedCertificates = true // 自动接受不受信任的证书 (仅用于测试) - }, - TransportQuotas = new TransportQuotas { OperationTimeout = 15000 }, - ClientConfiguration = new ClientConfiguration { DefaultSessionTimeout = 60000 }, - TraceConfiguration = new TraceConfiguration - { - OutputFilePath = "./Logs/OpcUaClient.log", DeleteOnLoad = true, - TraceMasks = Utils.TraceMasks.Error | Utils.TraceMasks.Security - } - }; - application.ApplicationConfiguration = config; - - // 验证并检查证书 - await config.Validate(ApplicationType.Client); - await application.CheckApplicationInstanceCertificate(false, 0); - - // 2. 查找并选择端点 (将 useSecurity 设置为 false 以进行诊断) - var selectedEndpoint = CoreClientUtils.SelectEndpoint(device.OpcUaEndpointUrl, false); - - session = await Session.Create( - config, - new ConfiguredEndpoint(null, selectedEndpoint, EndpointConfiguration.Create(config)), - false, - "PMSWPF OPC UA Session", - 60000, - new UserIdentity(new AnonymousIdentityToken()), - null); - - _opcUaSessions[device.OpcUaEndpointUrl] = session; - NlogHelper.Info($"Connected to OPC UA server: {device.OpcUaEndpointUrl}"); - NotificationHelper.ShowSuccess($"已连接到 OPC UA 服务器: {device.OpcUaEndpointUrl}"); - - // 6. 创建订阅。 - Subscription subscription = new Subscription(session.DefaultSubscription); - subscription.PublishingInterval = 1000; // 发布间隔(毫秒) - session.AddSubscription(subscription); - subscription.Create(); - - _opcUaSubscriptions[device.OpcUaEndpointUrl] = subscription; - - // 7. 创建监控项并添加到订阅中。 - MonitoredItem monitoredItem = new MonitoredItem(subscription.DefaultItem); - monitoredItem.DisplayName = variable.Name; - monitoredItem.StartNodeId = new NodeId(variable.OpcUaNodeId); // 设置要监控的节点 ID - monitoredItem.AttributeId = Attributes.Value; // 监控节点的值属性 - monitoredItem.SamplingInterval = 1000; // 采样间隔(毫秒) - monitoredItem.QueueSize = 1; // 队列大小 - monitoredItem.DiscardOldest = true; // 丢弃最旧的数据 - // 注册数据变化通知事件。 - monitoredItem.Notification += OnNotification; - - subscription.AddItem(monitoredItem); - subscription.ApplyChanges(); // 应用更改 - } - catch (Exception ex) - { - NlogHelper.Error($"连接或订阅 OPC UA 服务器失败: {device.OpcUaEndpointUrl} - {ex.Message}", ex); - NotificationHelper.ShowError($"连接或订阅 OPC UA 服务器失败: {device.OpcUaEndpointUrl} - {ex.Message}", ex); - } - } - } - - private void OnNotification(MonitoredItem monitoredItem, MonitoredItemNotificationEventArgs e) - { - foreach (var value in monitoredItem.DequeueValues()) - { - NlogHelper.Info($"[OPC UA 通知] {monitoredItem.DisplayName}: {value.Value} | 时间戳: {value.SourceTimestamp.ToLocalTime()} | 状态: {value.StatusCode}"); - Console.WriteLine($"[通知] {monitoredItem.DisplayName}: {value.Value} | 时间戳: {value.SourceTimestamp.ToLocalTime()} | 状态: {value.StatusCode}"); - } - } - - /// - /// 断开与指定 OPC UA 服务器的连接。 - /// - /// OPC UA 服务器的终结点 URL。 - private async Task DisconnectOpcUaSession(string endpointUrl) - { - NlogHelper.Info($"正在断开 OPC UA 会话: {endpointUrl}"); - if (_opcUaSessions.TryGetValue(endpointUrl, out var session)) - { - if (_opcUaSubscriptions.TryGetValue(endpointUrl, out var subscription)) - { - // 删除订阅。 - subscription.Delete(true); - _opcUaSubscriptions.Remove(endpointUrl); - } - // 关闭会话。 - session.Close(); - _opcUaSessions.Remove(endpointUrl); - NlogHelper.Info($"Disconnected from OPC UA server: {endpointUrl}"); - NotificationHelper.ShowInfo($"已从 OPC UA 服务器断开连接: {endpointUrl}"); - } - } - - /// - /// 断开所有 OPC UA 会话。 - /// - private async Task DisconnectAllOpcUaSessions() - { - NlogHelper.Info("正在断开所有 OPC UA 会话..."); - foreach (var endpointUrl in _opcUaSessions.Keys.ToList()) - { - await DisconnectOpcUaSession(endpointUrl); - } - } - - /// - /// 处理变量数据变化的事件。 - /// 当数据库中的变量信息发生变化时,此方法被调用以重新加载和配置 OPC UA 变量。 - /// - /// 事件发送者。 - /// 变化的变量数据列表。 - private async void HandleVariableDataChanged(object sender, List variableDatas) - { - NlogHelper.Info("Variable data changed. Reloading OPC UA variables."); - await LoadOpcUaVariables(); - } } } \ No newline at end of file diff --git a/ViewModels/DevicesViewModel.cs b/ViewModels/DevicesViewModel.cs index 905ea40..5f169ac 100644 --- a/ViewModels/DevicesViewModel.cs +++ b/ViewModels/DevicesViewModel.cs @@ -53,7 +53,10 @@ public partial class DevicesViewModel : ViewModelBase _dataServices = dataServices; MessageHelper.SendLoadMessage(LoadTypes.Devices); - _dataServices.OnDeviceListChanged += (sender, devices) => { Devices = new ObservableCollection(devices); }; + _dataServices.OnDeviceListChanged += (sender, devices) => + { + Devices = new ObservableCollection(devices); + }; } ///