diff --git a/Data/Entities/DbVariableData.cs b/Data/Entities/DbVariableData.cs
index f39987b..3b07ac1 100644
--- a/Data/Entities/DbVariableData.cs
+++ b/Data/Entities/DbVariableData.cs
@@ -100,6 +100,12 @@ public class DbVariableData
[SugarColumn(ColumnDataType = "varchar(20)",IsNullable =true, SqlParameterDbType = typeof(EnumToStringConvert))]
public PollLevelType PollLevelType { get; set; }
+ ///
+ /// OPC UA更新类型,例如轮询或订阅。
+ ///
+ [SugarColumn(ColumnDataType = "varchar(20)", IsNullable = true, SqlParameterDbType = typeof(EnumToStringConvert))]
+ public OpcUaUpdateType OpcUaUpdateType { get; set; }
+
///
/// 指示变量是否已被逻辑删除。
///
diff --git a/Enums/OpcUaUpdateType.cs b/Enums/OpcUaUpdateType.cs
new file mode 100644
index 0000000..2adab26
--- /dev/null
+++ b/Enums/OpcUaUpdateType.cs
@@ -0,0 +1,11 @@
+using System.ComponentModel;
+
+namespace PMSWPF.Enums;
+
+public enum OpcUaUpdateType
+{
+ [Description("OpcUa轮询")]
+ OpcUaPoll,
+ [Description("OpcUa订阅")]
+ OpcUaSubscription
+}
\ No newline at end of file
diff --git a/Enums/PollLevelType.cs b/Enums/PollLevelType.cs
index 369e693..2334b22 100644
--- a/Enums/PollLevelType.cs
+++ b/Enums/PollLevelType.cs
@@ -29,6 +29,8 @@ namespace PMSWPF.Enums
[Description("10分钟")]
TenMinutes = 600000,
[Description("30分钟")]
- ThirtyMinutes = 1800000
+ ThirtyMinutes = 1800000,
+ [Description("1小时")]
+ OneHour = 3600000
}
}
\ No newline at end of file
diff --git a/Models/VariableData.cs b/Models/VariableData.cs
index 5259a9b..0cc224c 100644
--- a/Models/VariableData.cs
+++ b/Models/VariableData.cs
@@ -100,6 +100,12 @@ public partial class VariableData : ObservableObject
[ObservableProperty]
private PollLevelType pollLevelType = PollLevelType.ThirtySeconds;
+ ///
+ /// OPC UA更新类型,例如轮询或订阅。
+ ///
+ [ObservableProperty]
+ private OpcUaUpdateType opcUaUpdateType = OpcUaUpdateType.OpcUaPoll;
+
///
/// 最后一次轮询时间。
///
diff --git a/Services/DataServices.cs b/Services/DataServices.cs
index 8244896..293279c 100644
--- a/Services/DataServices.cs
+++ b/Services/DataServices.cs
@@ -60,7 +60,7 @@ public partial class DataServices : ObservableRecipient, IRecipient
public event EventHandler> OnMqttListChanged;
// 变量数据变更事件,当变量数据更新时触发。
- public event EventHandler> OnVariableDataChanged;
+ public event Action> OnVariableDataChanged;
///
/// 当_devices属性值改变时触发的局部方法,用于调用OnDeviceListChanged事件。
@@ -69,6 +69,20 @@ public partial class DataServices : ObservableRecipient, IRecipient
partial void OnDevicesChanged(List devices)
{
OnDeviceListChanged?.Invoke(this, devices);
+
+
+ VariableDatas.Clear();
+ foreach (Device device in devices)
+ {
+ foreach (VariableTable variableTable in device.VariableTables)
+ {
+ foreach (VariableData variableData in variableTable.DataVariables)
+ {
+ VariableDatas.Add(variableData);
+ }
+ }
+ }
+ OnVariableDataChanged?.Invoke(VariableDatas);
}
///
@@ -106,6 +120,7 @@ public partial class DataServices : ObservableRecipient, IRecipient
_menuRepository = new MenuRepository();
_mqttRepository = new MqttRepository();
_varDataRepository = new VarDataRepository();
+ _variableDatas = new List();
}
///
@@ -123,7 +138,6 @@ public partial class DataServices : ObservableRecipient, IRecipient
await LoadDevices();
await LoadMenus();
await LoadMqtts();
- await LoadVariableDatas();
break;
case LoadTypes.Devices: // 仅加载设备数据
await LoadDevices();
@@ -184,14 +198,6 @@ public partial class DataServices : ObservableRecipient, IRecipient
Mqtts = await _mqttRepository.GetAll();
}
- ///
- /// 异步获取所有变量数据。
- ///
- /// 包含所有变量数据的列表。
- public async Task> GetAllVariableDataAsync()
- {
- return await _varDataRepository.GetAllAsync();
- }
///
/// 异步根据ID获取设备数据。
@@ -211,4 +217,14 @@ public partial class DataServices : ObservableRecipient, IRecipient
{
VariableDatas = await _varDataRepository.GetAllAsync();
}
+
+ ///
+ /// 异步更新变量数据。
+ ///
+ /// 要更新的变量数据。
+ /// 表示异步操作的任务。
+ public async Task UpdateVariableDataAsync(VariableData variableData)
+ {
+ await _varDataRepository.UpdateAsync(variableData);
+ }
}
\ No newline at end of file
diff --git a/Services/MqttBackgroundService.cs b/Services/MqttBackgroundService.cs
index d4660b1..19a58d3 100644
--- a/Services/MqttBackgroundService.cs
+++ b/Services/MqttBackgroundService.cs
@@ -21,12 +21,16 @@ namespace PMSWPF.Services
{
// 数据服务实例,用于访问和操作应用程序数据,如MQTT配置和变量数据。
private readonly DataServices _dataServices;
+
// 存储MQTT客户端实例的字典,键为MQTT配置ID,值为IMqttClient对象。
private readonly Dictionary _mqttClients;
+
// 存储MQTT配置的字典,键为MQTT配置ID,值为Mqtt模型对象。
private readonly Dictionary _mqttConfigurations;
+
// 存储与MQTT配置关联的变量数据的字典,键为MQTT配置ID,值为VariableData列表。
private readonly Dictionary> _mqttVariableData;
+
// 定时器,用于周期性地执行数据发布任务。
private Timer _timer;
@@ -86,12 +90,11 @@ namespace PMSWPF.Services
await client.DisconnectAsync();
}
}
+
// 清空所有字典。
_mqttClients.Clear();
_mqttConfigurations.Clear();
_mqttVariableData.Clear();
-
-
}
///
@@ -111,7 +114,7 @@ namespace PMSWPF.Services
foreach (var variable in variables)
{
// 如果变量已被修改(IsModified标志为true)。
- if (variable.IsModified)
+ if (variable.IsModified)
{
// 获取发布主题。
var topic = _mqttConfigurations[mqttConfigId].PublishTopic;
@@ -119,13 +122,15 @@ namespace PMSWPF.Services
{
// 构建MQTT消息。
var message = new MqttApplicationMessageBuilder()
- .WithTopic($"{topic}/{variable.Name}") // 主题格式:PublishTopic/VariableName
- .WithPayload(variable.DataValue) // 消息载荷为变量的值
- .Build();
+ .WithTopic($"{topic}/{variable.Name}") // 主题格式:PublishTopic/VariableName
+ .WithPayload(variable.DataValue) // 消息载荷为变量的值
+ .Build();
// 发布MQTT消息。
await client.PublishAsync(message);
- NlogHelper.Info($"Published {variable.Name} = {variable.DataValue} to {topic}/{variable.Name}",throttle:true); // 记录发布信息
+ NlogHelper.Info(
+ $"Published {variable.Name} = {variable.DataValue} to {topic}/{variable.Name}",
+ throttle: true); // 记录发布信息
variable.IsModified = false; // 发布后重置修改标志。
}
}
@@ -142,11 +147,14 @@ namespace PMSWPF.Services
{
// 从数据服务获取所有MQTT配置。
var allMqtts = await _dataServices.GetMqttsAsync();
- var activeMqtts = allMqtts.Where(m => m.IsActive).ToList();
- var activeMqttIds = activeMqtts.Select(m => m.Id).ToHashSet();
+ var activeMqtts = allMqtts.Where(m => m.IsActive)
+ .ToList();
+ var activeMqttIds = activeMqtts.Select(m => m.Id)
+ .ToHashSet();
// 断开并移除不再活跃或已删除的MQTT客户端。
- var clientsToDisconnect = _mqttClients.Keys.Except(activeMqttIds).ToList();
+ var clientsToDisconnect = _mqttClients.Keys.Except(activeMqttIds)
+ .ToList();
foreach (var id in clientsToDisconnect)
{
if (_mqttClients.TryGetValue(id, out var client))
@@ -160,9 +168,12 @@ namespace PMSWPF.Services
mqttConfig.IsConnected = false;
}
}
+
_mqttClients.Remove(id);
- NlogHelper.Info($"Disconnected and removed MQTT client for ID: {id} (no longer active or removed).");
+ NlogHelper.Info(
+ $"Disconnected and removed MQTT client for ID: {id} (no longer active or removed).");
}
+
_mqttConfigurations.Remove(id);
_mqttVariableData.Remove(id);
}
@@ -174,6 +185,7 @@ namespace PMSWPF.Services
{
await ConnectMqttClient(mqtt);
}
+
// 始终更新或添加MQTT配置到字典。
_mqttConfigurations[mqtt.Id] = mqtt;
}
@@ -193,11 +205,11 @@ namespace PMSWPF.Services
var client = factory.CreateMqttClient();
// 构建MQTT客户端连接选项。
var options = new MqttClientOptionsBuilder()
- .WithClientId(mqtt.ClientID)
- .WithTcpServer(mqtt.Host, mqtt.Port)
- .WithCredentials(mqtt.UserName, mqtt.PassWord)
- .WithCleanSession() // 清理会话,每次连接都是新会话
- .Build();
+ .WithClientId(mqtt.ClientID)
+ .WithTcpServer(mqtt.Host, mqtt.Port)
+ .WithCredentials(mqtt.UserName, mqtt.PassWord)
+ .WithCleanSession() // 清理会话,每次连接都是新会话
+ .Build();
// 设置连接成功事件处理程序。
client.UseConnectedHandler(e =>
@@ -221,7 +233,7 @@ namespace PMSWPF.Services
}
catch (Exception ex)
{
- NlogHelper.Error($"Failed to reconnect to MQTT broker: {mqtt.Name}",ex );
+ NlogHelper.Error($"Failed to reconnect to MQTT broker: {mqtt.Name}", ex);
}
});
@@ -243,7 +255,9 @@ namespace PMSWPF.Services
private async Task LoadVariableData()
{
// 从数据服务获取所有变量数据。
- var allVariables = await _dataServices.GetAllVariableDataAsync();
+ var allVariables = _dataServices.VariableDatas;
+ if (!allVariables.Any())
+ return;
_mqttVariableData.Clear(); // 清空现有数据
// 遍历所有变量,并根据其关联的MQTT配置进行分组。
@@ -258,8 +272,10 @@ namespace PMSWPF.Services
{
_mqttVariableData[mqtt.Id] = new List();
}
+
// 将变量添加到对应MQTT配置的列表中。
- _mqttVariableData[mqtt.Id].Add(variable);
+ _mqttVariableData[mqtt.Id]
+ .Add(variable);
}
}
}
@@ -283,11 +299,11 @@ namespace PMSWPF.Services
///
/// 事件发送者。
/// 更新后的变量数据列表。
- private async void HandleVariableDataChanged(object sender, List variableDatas)
+ private async void HandleVariableDataChanged( List variableDatas)
{
NlogHelper.Info("Variable data changed. Reloading variable associations."); // 记录变量数据变化信息
// 重新加载变量数据。
await LoadVariableData();
}
}
-}
+}
\ No newline at end of file
diff --git a/Services/OpcUaBackgroundService.cs b/Services/OpcUaBackgroundService.cs
index 80be192..55f8c77 100644
--- a/Services/OpcUaBackgroundService.cs
+++ b/Services/OpcUaBackgroundService.cs
@@ -66,6 +66,7 @@ namespace PMSWPF.Services
private readonly Dictionary _opcUaSubscriptions;
// 存储活动的 OPC UA 变量,键为变量的唯一 ID。
private readonly Dictionary _opcUaVariables; // Key: VariableData.Id
+ private readonly Dictionary _opcUaPollingTasks; // Key: VariableData.Id, Value: CancellationTokenSource for polling task
///
/// OpcUaBackgroundService 的构造函数。
@@ -77,6 +78,7 @@ namespace PMSWPF.Services
_opcUaSessions = new Dictionary();
_opcUaSubscriptions = new Dictionary();
_opcUaVariables = new Dictionary();
+ _opcUaPollingTasks = new Dictionary();
}
///
@@ -102,6 +104,10 @@ namespace PMSWPF.Services
try
{
_cancellationTokenSource.Cancel();
+ NlogHelper.Info("OpcUaBackgroundService stopping.");
+ // 服务停止时,取消订阅事件并断开所有 OPC UA 连接。
+ _dataServices.OnVariableDataChanged -= HandleVariableDataChanged;
+ await DisconnectAllOpcUaSessions();
}
finally
{
@@ -120,16 +126,13 @@ namespace PMSWPF.Services
await LoadOpcUaVariables();
// 循环运行,直到接收到停止信号。
- while (!stoppingToken.IsCancellationRequested)
- {
- // 可以在这里添加周期性任务,例如检查和重新连接断开的会话。
- await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken);
- }
+ // while (!stoppingToken.IsCancellationRequested)
+ // {
+ // // 可以在这里添加周期性任务,例如检查和重新连接断开的会话。
+ // await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken);
+ // }
- NlogHelper.Info("OpcUaBackgroundService stopping.");
- // 服务停止时,取消订阅事件并断开所有 OPC UA 连接。
- _dataServices.OnVariableDataChanged -= HandleVariableDataChanged;
- await DisconnectAllOpcUaSessions();
+
}
///
@@ -138,11 +141,356 @@ namespace PMSWPF.Services
private async Task LoadOpcUaVariables()
{
NlogHelper.Info("正在加载 OPC UA 变量...");
- var allVariables = await _dataServices.GetAllVariableDataAsync();
- var opcUaVariables = allVariables.Where(v => v.ProtocolType == ProtocolType.OpcUA && v.IsActive).ToList();
+ // var allVariables = await _dataServices.GetAllVariableDataAsync();
+ var opcUaVariables = _dataServices.VariableDatas.Where(v => v.ProtocolType == ProtocolType.OpcUA && v.IsActive).ToList();
+
+ if (opcUaVariables==null || opcUaVariables.Count==0)
+ return;
// 清理不再活跃或已删除的变量。
- var currentOpcUaVariableIds = opcUaVariables.Select(v => v.Id).ToHashSet();
+ await RemoveInactiveOpcUaVariables(opcUaVariables);
+
+ // 处理新增或更新的变量。
+ await ProcessActiveOpcUaVariables(opcUaVariables);
+ }
+
+ ///
+ /// 连接到 OPC UA 服务器并订阅或轮询指定的变量。
+ ///
+ /// 要订阅或轮询的变量信息。
+ /// 变量所属的设备信息。
+ private async Task ConnectAndSubscribeOpcUa(VariableData variable, Device device)
+ {
+ NlogHelper.Info($"正在为变量 '{variable.Name}' 连接和处理 OPC UA 服务器... (更新类型: {variable.OpcUaUpdateType})");
+ if (string.IsNullOrEmpty(device.OpcUaEndpointUrl) || string.IsNullOrEmpty(variable.OpcUaNodeId))
+ {
+ NlogHelper.Warn($"OPC UA variable {variable.Name} has invalid EndpointUrl or NodeId.");
+ return;
+ }
+
+ Session session = null;
+ // 检查是否已存在到该终结点的活动会话。
+ if (_opcUaSessions.TryGetValue(device.OpcUaEndpointUrl, out session) && session.Connected)
+ {
+ NlogHelper.Info($"Already connected to OPC UA endpoint: {device.OpcUaEndpointUrl}");
+ }
+ else
+ {
+ session = await CreateOpcUaSessionAsync(device.OpcUaEndpointUrl);
+ if (session == null)
+ {
+ return; // 连接失败,直接返回
+ }
+ }
+
+ if (variable.OpcUaUpdateType == OpcUaUpdateType.OpcUaSubscription)
+ {
+ SetupOpcUaSubscription(session, variable, device.OpcUaEndpointUrl);
+ }
+ else if (variable.OpcUaUpdateType == OpcUaUpdateType.OpcUaPoll)
+ {
+ StartOpcUaPolling(session, variable, device);
+ }
+ }
+
+ private async Task PollOpcUaVariable(Session session, VariableData variable, Device device, CancellationToken cancellationToken)
+ {
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ try
+ {
+ if (session != null && session.Connected)
+ {
+ var nodeToRead = new ReadValueId
+ {
+ NodeId = new NodeId(variable.OpcUaNodeId),
+ AttributeId = Attributes.Value
+ };
+
+ var nodesToRead = new ReadValueIdCollection { nodeToRead };
+ session.Read(
+ null,
+ 0,
+ TimestampsToReturn.Both,
+ nodesToRead,
+ out DataValueCollection results,
+ out DiagnosticInfoCollection diagnosticInfos);
+
+ if (results != null && results.Count > 0)
+ {
+ var value = results[0];
+ if (StatusCode.IsGood(value.StatusCode))
+ {
+ NlogHelper.Info($"[OPC UA 轮询] {variable.Name}: {value.Value} | 时间戳: {value.SourceTimestamp.ToLocalTime()} | 状态: {value.StatusCode}");
+ // 更新变量数据
+ variable.DataValue = value.Value.ToString();
+ variable.DisplayValue = value.Value.ToString(); // 或者根据需要进行格式化
+ // await _dataServices.UpdateVariableDataAsync(variable);
+ }
+ else
+ {
+ NlogHelper.Warn($"Failed to read OPC UA variable {variable.Name} ({variable.OpcUaNodeId}): {value.StatusCode}");
+ }
+ }
+ }
+ else
+ {
+ NlogHelper.Warn($"OPC UA session for {device.OpcUaEndpointUrl} is not connected. Attempting to reconnect...");
+ // 尝试重新连接会话
+ await ConnectAndSubscribeOpcUa(variable, device);
+ }
+ }
+ catch (Exception ex)
+ {
+ NlogHelper.Error($"Error during OPC UA polling for {variable.Name}: {ex.Message}", ex);
+ }
+
+ // 根据 PollLevelType 设置轮询间隔
+ var pollInterval = GetPollInterval(variable.PollLevelType);
+ await Task.Delay(pollInterval, cancellationToken);
+ }
+ NlogHelper.Info($"Polling for OPC UA variable {variable.Name} stopped.");
+ }
+
+ private void OnNotification(MonitoredItem monitoredItem, MonitoredItemNotificationEventArgs e)
+ {
+ foreach (var value in monitoredItem.DequeueValues())
+ {
+ NlogHelper.Info($"[OPC UA 通知] {monitoredItem.DisplayName}: {value.Value} | 时间戳: {value.SourceTimestamp.ToLocalTime()} | 状态: {value.StatusCode}");
+ Console.WriteLine($"[通知] {monitoredItem.DisplayName}: {value.Value} | 时间戳: {value.SourceTimestamp.ToLocalTime()} | 状态: {value.StatusCode}");
+ }
+ }
+
+ ///
+ /// 根据 PollLevelType 获取轮询间隔。
+ ///
+ /// 轮询级别类型。
+ /// 时间间隔。
+ private TimeSpan GetPollInterval(PollLevelType pollLevelType)
+ {
+ return pollLevelType switch
+ {
+ PollLevelType.OneSecond => TimeSpan.FromSeconds(1),
+ PollLevelType.FiveSeconds => TimeSpan.FromSeconds(5),
+ PollLevelType.TenSeconds => TimeSpan.FromSeconds(10),
+ PollLevelType.ThirtySeconds => TimeSpan.FromSeconds(30),
+ PollLevelType.OneMinute => TimeSpan.FromMinutes(1),
+ PollLevelType.FiveMinutes => TimeSpan.FromMinutes(5),
+ PollLevelType.TenMinutes => TimeSpan.FromMinutes(10),
+ PollLevelType.ThirtyMinutes => TimeSpan.FromMinutes(30),
+ PollLevelType.OneHour => TimeSpan.FromHours(1),
+ _ => TimeSpan.FromSeconds(1), // 默认1秒
+ };
+ }
+
+ ///
+ /// 断开与指定 OPC UA 服务器的连接。
+ ///
+ /// OPC UA 服务器的终结点 URL。
+ private async Task DisconnectOpcUaSession(string endpointUrl)
+ {
+ NlogHelper.Info($"正在断开 OPC UA 会话: {endpointUrl}");
+ if (_opcUaSessions.TryGetValue(endpointUrl, out var session))
+ {
+ if (_opcUaSubscriptions.TryGetValue(endpointUrl, out var subscription))
+ {
+ // 删除订阅。
+ subscription.Delete(true);
+ _opcUaSubscriptions.Remove(endpointUrl);
+ }
+
+ // 取消与此会话相关的轮询任务
+ var variablesToCancelPolling = _opcUaVariables.Where(kv => kv.Value.VariableTable != null && kv.Value.VariableTable.DeviceId.HasValue && _dataServices.GetDeviceByIdAsync(kv.Value.VariableTable.DeviceId.Value).Result?.OpcUaEndpointUrl == endpointUrl && kv.Value.OpcUaUpdateType == OpcUaUpdateType.OpcUaPoll).ToList();
+ foreach (var entry in variablesToCancelPolling)
+ {
+ if (_opcUaPollingTasks.ContainsKey(entry.Key))
+ {
+ var cts = _opcUaPollingTasks[entry.Key];
+ _opcUaPollingTasks.Remove(entry.Key);
+ cts.Cancel();
+ cts.Dispose();
+ NlogHelper.Info($"Cancelled polling for variable: {entry.Value.Name} (ID: {entry.Key})");
+ }
+ }
+
+ // 关闭会话。
+ session.Close();
+ _opcUaSessions.Remove(endpointUrl);
+ NlogHelper.Info($"Disconnected from OPC UA server: {endpointUrl}");
+ NotificationHelper.ShowInfo($"已从 OPC UA 服务器断开连接: {endpointUrl}");
+ }
+ }
+
+ ///
+ /// 断开所有 OPC UA 会话。
+ ///
+ private async Task DisconnectAllOpcUaSessions()
+ {
+ NlogHelper.Info("正在断开所有 OPC UA 会话...");
+ foreach (var endpointUrl in _opcUaSessions.Keys.ToList())
+ {
+ await DisconnectOpcUaSession(endpointUrl);
+ }
+ }
+
+ ///
+ /// 处理变量数据变化的事件。
+ /// 当数据库中的变量信息发生变化时,此方法被调用以重新加载和配置 OPC UA 变量。
+ ///
+ /// 事件发送者。
+ /// 变化的变量数据列表。
+ private async void HandleVariableDataChanged( List variableDatas)
+ {
+ NlogHelper.Info("Variable data changed. Reloading OPC UA variables.");
+ await LoadOpcUaVariables();
+ }
+
+ ///
+ /// 创建并配置 OPC UA 会话。
+ ///
+ /// OPC UA 服务器的终结点 URL。
+ /// 创建的 Session 对象,如果失败则返回 null。
+ private async Task CreateOpcUaSessionAsync(string endpointUrl)
+ {
+ try
+ {
+ // 1. 创建应用程序配置
+ var application = new ApplicationInstance
+ {
+ ApplicationName = "OpcUADemoClient",
+ ApplicationType = ApplicationType.Client,
+ ConfigSectionName = "Opc.Ua.Client"
+ };
+
+ var config = new ApplicationConfiguration()
+ {
+ ApplicationName = application.ApplicationName,
+ ApplicationUri = $"urn:{System.Net.Dns.GetHostName()}:OpcUADemoClient",
+ ApplicationType = application.ApplicationType,
+ SecurityConfiguration = new SecurityConfiguration
+ {
+ ApplicationCertificate = new CertificateIdentifier
+ {
+ StoreType = "Directory",
+ StorePath = "%CommonApplicationData%/OPC Foundation/CertificateStores/MachineDefault",
+ SubjectName = application.ApplicationName
+ },
+ TrustedIssuerCertificates = new CertificateTrustList
+ {
+ StoreType = "Directory",
+ StorePath = "%CommonApplicationData%/OPC Foundation/CertificateStores/UA Certificate Authorities"
+ },
+ TrustedPeerCertificates = new CertificateTrustList
+ {
+ StoreType = "Directory",
+ StorePath = "%CommonApplicationData%/OPC Foundation/CertificateStores/UA Applications"
+ },
+ RejectedCertificateStore = new CertificateTrustList
+ {
+ StoreType = "Directory",
+ StorePath = "%CommonApplicationData%/OPC Foundation/CertificateStores/RejectedCertificates"
+ },
+ AutoAcceptUntrustedCertificates = true // 自动接受不受信任的证书 (仅用于测试)
+ },
+ TransportQuotas = new TransportQuotas { OperationTimeout = 15000 },
+ ClientConfiguration = new ClientConfiguration { DefaultSessionTimeout = 60000 },
+ TraceConfiguration = new TraceConfiguration
+ {
+ OutputFilePath = "./Logs/OpcUaClient.log", DeleteOnLoad = true,
+ TraceMasks = Utils.TraceMasks.Error | Utils.TraceMasks.Security
+ }
+ };
+ application.ApplicationConfiguration = config;
+
+ // 验证并检查证书
+ await config.Validate(ApplicationType.Client);
+ await application.CheckApplicationInstanceCertificate(false, 0);
+
+ // 2. 查找并选择端点 (将 useSecurity 设置为 false 以进行诊断)
+ var selectedEndpoint = CoreClientUtils.SelectEndpoint(endpointUrl, false);
+
+ var session = await Session.Create(
+ config,
+ new ConfiguredEndpoint(null, selectedEndpoint, EndpointConfiguration.Create(config)),
+ false,
+ "PMSWPF OPC UA Session",
+ 60000,
+ new UserIdentity(new AnonymousIdentityToken()),
+ null);
+
+ _opcUaSessions[endpointUrl] = session;
+ NotificationHelper.ShowSuccess($"已连接到 OPC UA 服务器: {endpointUrl}");
+ return session;
+ }
+ catch (Exception ex)
+ {
+ NotificationHelper.ShowError($"连接 OPC UA 服务器失败: {endpointUrl} - {ex.Message}", ex);
+ return null;
+ }
+ }
+
+ ///
+ /// 设置 OPC UA 订阅并添加监控项。
+ ///
+ /// OPC UA 会话。
+ /// 要订阅的变量信息。
+ /// OPC UA 服务器的终结点 URL。
+ private void SetupOpcUaSubscription(Session session, VariableData variable, string endpointUrl)
+ {
+ Subscription subscription = null;
+ if (_opcUaSubscriptions.TryGetValue(endpointUrl, out subscription))
+ {
+ NlogHelper.Info($"Already has subscription for OPC UA endpoint: {endpointUrl}");
+ }
+ else
+ {
+ subscription = new Subscription(session.DefaultSubscription);
+ subscription.PublishingInterval = 1000; // 发布间隔(毫秒)
+ session.AddSubscription(subscription);
+ subscription.Create();
+ _opcUaSubscriptions[endpointUrl] = subscription;
+ }
+
+ // 7. 创建监控项并添加到订阅中。
+ MonitoredItem monitoredItem = new MonitoredItem(subscription.DefaultItem);
+ monitoredItem.DisplayName = variable.Name;
+ monitoredItem.StartNodeId = new NodeId(variable.OpcUaNodeId); // 设置要监控的节点 ID
+ monitoredItem.AttributeId = Attributes.Value; // 监控节点的值属性
+ monitoredItem.SamplingInterval = 1000; // 采样间隔(毫秒)
+ monitoredItem.QueueSize = 1; // 队列大小
+ monitoredItem.DiscardOldest = true; // 丢弃最旧的数据
+ // 注册数据变化通知事件。
+ monitoredItem.Notification += OnNotification;
+
+ subscription.AddItem(monitoredItem);
+ subscription.ApplyChanges(); // 应用更改
+ NlogHelper.Info($"Subscribed to OPC UA variable: {variable.Name} ({variable.OpcUaNodeId})");
+ }
+
+ ///
+ /// 启动 OPC UA 变量的轮询任务。
+ ///
+ /// OPC UA 会话。
+ /// 要轮询的变量信息。
+ /// 变量所属的设备信息。
+ private void StartOpcUaPolling(Session session, VariableData variable, Device device)
+ {
+ if (!_opcUaPollingTasks.ContainsKey(variable.Id))
+ {
+ var cts = new CancellationTokenSource();
+ _opcUaPollingTasks.Add(variable.Id, cts);
+ _ = Task.Run(() => PollOpcUaVariable(session, variable, device, cts.Token), cts.Token);
+ NlogHelper.Info($"Started polling for OPC UA variable: {variable.Name} ({variable.OpcUaNodeId})");
+ }
+ }
+
+ ///
+ /// 移除不再活跃或已删除的 OPC UA 变量。
+ ///
+ /// 当前所有活跃的 OPC UA 变量列表。
+ private async Task RemoveInactiveOpcUaVariables(List activeOpcUaVariables)
+ {
+ var currentOpcUaVariableIds = activeOpcUaVariables.Select(v => v.Id).ToHashSet();
var variablesToRemove = _opcUaVariables.Keys.Except(currentOpcUaVariableIds).ToList();
NlogHelper.Info($"发现 {variablesToRemove.Count} 个要移除的 OPC UA 变量。");
@@ -151,18 +499,37 @@ namespace PMSWPF.Services
{
if (_opcUaVariables.TryGetValue(id, out var variable))
{
- // 获取关联的设备信息
- var device = await _dataServices.GetDeviceByIdAsync(variable.VariableTable.DeviceId??0);
- if (device != null)
+ if (variable.OpcUaUpdateType == OpcUaUpdateType.OpcUaSubscription)
{
- // 断开与该变量相关的 OPC UA 会话。
- await DisconnectOpcUaSession(device.OpcUaEndpointUrl);
+ // 获取关联的设备信息
+ var device = await _dataServices.GetDeviceByIdAsync(variable.VariableTable.DeviceId??0);
+ if (device != null)
+ {
+ // 断开与该变量相关的 OPC UA 会话。
+ await DisconnectOpcUaSession(device.OpcUaEndpointUrl);
+ }
+ }
+ else if (variable.OpcUaUpdateType == OpcUaUpdateType.OpcUaPoll)
+ {
+ if (_opcUaPollingTasks.ContainsKey(variable.Id))
+ {
+ var cts = _opcUaPollingTasks[variable.Id];
+ _opcUaPollingTasks.Remove(variable.Id);
+ cts.Cancel();
+ cts.Dispose();
+ }
}
_opcUaVariables.Remove(id);
}
}
+ }
- // 处理新增或更新的变量。
+ ///
+ /// 处理新增或更新的活跃 OPC UA 变量。
+ ///
+ /// 当前所有活跃的 OPC UA 变量列表。
+ private async Task ProcessActiveOpcUaVariables(List opcUaVariables)
+ {
foreach (var variable in opcUaVariables)
{
// 获取关联的设备信息
@@ -191,190 +558,5 @@ namespace PMSWPF.Services
}
}
}
-
- ///
- /// 连接到 OPC UA 服务器并订阅指定的变量。
- ///
- /// 要订阅的变量信息。
- /// 变量所属的设备信息。
- private async Task ConnectAndSubscribeOpcUa(VariableData variable, Device device)
- {
- NlogHelper.Info($"正在为变量 '{variable.Name}' 连接和订阅 OPC UA 服务器...");
- if (string.IsNullOrEmpty(device.OpcUaEndpointUrl) || string.IsNullOrEmpty(variable.OpcUaNodeId))
- {
- NlogHelper.Warn($"OPC UA variable {variable.Name} has invalid EndpointUrl or NodeId.");
- return;
- }
-
- Session session = null;
- // 检查是否已存在到该终结点的活动会话。
- if (_opcUaSessions.TryGetValue(device.OpcUaEndpointUrl, out session) && session.Connected)
- {
- NlogHelper.Info($"Already connected to OPC UA endpoint: {device.OpcUaEndpointUrl}");
- }
- else
- {
- try
- {
- // 1. 创建应用程序配置
- var application = new ApplicationInstance
- {
- ApplicationName = "OpcUADemoClient",
- ApplicationType = ApplicationType.Client,
- ConfigSectionName = "Opc.Ua.Client"
- };
-
- var config = new ApplicationConfiguration()
- {
- ApplicationName = application.ApplicationName,
- ApplicationUri = $"urn:{System.Net.Dns.GetHostName()}:OpcUADemoClient",
- ApplicationType = application.ApplicationType,
- SecurityConfiguration = new SecurityConfiguration
- {
- ApplicationCertificate = new CertificateIdentifier
- {
- StoreType = "Directory",
- StorePath
- = "%CommonApplicationData%/OPC Foundation/CertificateStores/MachineDefault",
- SubjectName = application.ApplicationName
- },
- TrustedIssuerCertificates
- = new CertificateTrustList
- {
- StoreType = "Directory",
- StorePath
- = "%CommonApplicationData%/OPC Foundation/CertificateStores/UA Certificate Authorities"
- },
- TrustedPeerCertificates
- = new CertificateTrustList
- {
- StoreType = "Directory",
- StorePath
- = "%CommonApplicationData%/OPC Foundation/CertificateStores/UA Applications"
- },
- RejectedCertificateStore
- = new CertificateTrustList
- {
- StoreType = "Directory",
- StorePath
- = "%CommonApplicationData%/OPC Foundation/CertificateStores/RejectedCertificates"
- },
- AutoAcceptUntrustedCertificates = true // 自动接受不受信任的证书 (仅用于测试)
- },
- TransportQuotas = new TransportQuotas { OperationTimeout = 15000 },
- ClientConfiguration = new ClientConfiguration { DefaultSessionTimeout = 60000 },
- TraceConfiguration = new TraceConfiguration
- {
- OutputFilePath = "./Logs/OpcUaClient.log", DeleteOnLoad = true,
- TraceMasks = Utils.TraceMasks.Error | Utils.TraceMasks.Security
- }
- };
- application.ApplicationConfiguration = config;
-
- // 验证并检查证书
- await config.Validate(ApplicationType.Client);
- await application.CheckApplicationInstanceCertificate(false, 0);
-
- // 2. 查找并选择端点 (将 useSecurity 设置为 false 以进行诊断)
- var selectedEndpoint = CoreClientUtils.SelectEndpoint(device.OpcUaEndpointUrl, false);
-
- session = await Session.Create(
- config,
- new ConfiguredEndpoint(null, selectedEndpoint, EndpointConfiguration.Create(config)),
- false,
- "PMSWPF OPC UA Session",
- 60000,
- new UserIdentity(new AnonymousIdentityToken()),
- null);
-
- _opcUaSessions[device.OpcUaEndpointUrl] = session;
- NlogHelper.Info($"Connected to OPC UA server: {device.OpcUaEndpointUrl}");
- NotificationHelper.ShowSuccess($"已连接到 OPC UA 服务器: {device.OpcUaEndpointUrl}");
-
- // 6. 创建订阅。
- Subscription subscription = new Subscription(session.DefaultSubscription);
- subscription.PublishingInterval = 1000; // 发布间隔(毫秒)
- session.AddSubscription(subscription);
- subscription.Create();
-
- _opcUaSubscriptions[device.OpcUaEndpointUrl] = subscription;
-
- // 7. 创建监控项并添加到订阅中。
- MonitoredItem monitoredItem = new MonitoredItem(subscription.DefaultItem);
- monitoredItem.DisplayName = variable.Name;
- monitoredItem.StartNodeId = new NodeId(variable.OpcUaNodeId); // 设置要监控的节点 ID
- monitoredItem.AttributeId = Attributes.Value; // 监控节点的值属性
- monitoredItem.SamplingInterval = 1000; // 采样间隔(毫秒)
- monitoredItem.QueueSize = 1; // 队列大小
- monitoredItem.DiscardOldest = true; // 丢弃最旧的数据
- // 注册数据变化通知事件。
- monitoredItem.Notification += OnNotification;
-
- subscription.AddItem(monitoredItem);
- subscription.ApplyChanges(); // 应用更改
- }
- catch (Exception ex)
- {
- NlogHelper.Error($"连接或订阅 OPC UA 服务器失败: {device.OpcUaEndpointUrl} - {ex.Message}", ex);
- NotificationHelper.ShowError($"连接或订阅 OPC UA 服务器失败: {device.OpcUaEndpointUrl} - {ex.Message}", ex);
- }
- }
- }
-
- private void OnNotification(MonitoredItem monitoredItem, MonitoredItemNotificationEventArgs e)
- {
- foreach (var value in monitoredItem.DequeueValues())
- {
- NlogHelper.Info($"[OPC UA 通知] {monitoredItem.DisplayName}: {value.Value} | 时间戳: {value.SourceTimestamp.ToLocalTime()} | 状态: {value.StatusCode}");
- Console.WriteLine($"[通知] {monitoredItem.DisplayName}: {value.Value} | 时间戳: {value.SourceTimestamp.ToLocalTime()} | 状态: {value.StatusCode}");
- }
- }
-
- ///
- /// 断开与指定 OPC UA 服务器的连接。
- ///
- /// OPC UA 服务器的终结点 URL。
- private async Task DisconnectOpcUaSession(string endpointUrl)
- {
- NlogHelper.Info($"正在断开 OPC UA 会话: {endpointUrl}");
- if (_opcUaSessions.TryGetValue(endpointUrl, out var session))
- {
- if (_opcUaSubscriptions.TryGetValue(endpointUrl, out var subscription))
- {
- // 删除订阅。
- subscription.Delete(true);
- _opcUaSubscriptions.Remove(endpointUrl);
- }
- // 关闭会话。
- session.Close();
- _opcUaSessions.Remove(endpointUrl);
- NlogHelper.Info($"Disconnected from OPC UA server: {endpointUrl}");
- NotificationHelper.ShowInfo($"已从 OPC UA 服务器断开连接: {endpointUrl}");
- }
- }
-
- ///
- /// 断开所有 OPC UA 会话。
- ///
- private async Task DisconnectAllOpcUaSessions()
- {
- NlogHelper.Info("正在断开所有 OPC UA 会话...");
- foreach (var endpointUrl in _opcUaSessions.Keys.ToList())
- {
- await DisconnectOpcUaSession(endpointUrl);
- }
- }
-
- ///
- /// 处理变量数据变化的事件。
- /// 当数据库中的变量信息发生变化时,此方法被调用以重新加载和配置 OPC UA 变量。
- ///
- /// 事件发送者。
- /// 变化的变量数据列表。
- private async void HandleVariableDataChanged(object sender, List variableDatas)
- {
- NlogHelper.Info("Variable data changed. Reloading OPC UA variables.");
- await LoadOpcUaVariables();
- }
}
}
\ No newline at end of file
diff --git a/ViewModels/DevicesViewModel.cs b/ViewModels/DevicesViewModel.cs
index 905ea40..5f169ac 100644
--- a/ViewModels/DevicesViewModel.cs
+++ b/ViewModels/DevicesViewModel.cs
@@ -53,7 +53,10 @@ public partial class DevicesViewModel : ViewModelBase
_dataServices = dataServices;
MessageHelper.SendLoadMessage(LoadTypes.Devices);
- _dataServices.OnDeviceListChanged += (sender, devices) => { Devices = new ObservableCollection(devices); };
+ _dataServices.OnDeviceListChanged += (sender, devices) =>
+ {
+ Devices = new ObservableCollection(devices);
+ };
}
///