将字典重命名
This commit is contained in:
@@ -14,44 +14,44 @@ namespace PMSWPF.Services
|
||||
private readonly IDataProcessingService _dataProcessingService;
|
||||
|
||||
// 存储 OPC UA 设备,键为设备Id,值为会话对象。
|
||||
private readonly ConcurrentDictionary<int, Device> _deviceDic;
|
||||
private readonly ConcurrentDictionary<int, Device> _opcUaDevices;
|
||||
|
||||
// 存储 OPC UA 会话,键为终结点 URL,值为会话对象。
|
||||
private readonly ConcurrentDictionary<string, Session> _sessionsDic;
|
||||
private readonly ConcurrentDictionary<string, Session> _opcUaSessions;
|
||||
|
||||
// 存储 OPC UA 订阅,键为终结点 URL,值为订阅对象。
|
||||
private readonly ConcurrentDictionary<string, Subscription> _subscriptionsDic;
|
||||
private readonly ConcurrentDictionary<string, Subscription> _opcUaSubscriptions;
|
||||
|
||||
// 存储活动的 OPC UA 变量,键为变量的OpcNodeId
|
||||
private readonly ConcurrentDictionary<string, VariableData> _opcUaNodeIdVariableDic;
|
||||
private readonly ConcurrentDictionary<string, VariableData> _opcUaPollVariablesByNodeId;
|
||||
|
||||
// 储存所有要轮询更新的变量,键是Device.Id,值是这个设备所有要轮询的变量
|
||||
private readonly ConcurrentDictionary<int, List<VariableData>> _pollVariableDic;
|
||||
private readonly ConcurrentDictionary<int, List<VariableData>> _opcUaPollVariablesByDeviceId;
|
||||
|
||||
// 储存所有要订阅更新的变量,键是Device.Id,值是这个设备所有要轮询的变量
|
||||
private readonly ConcurrentDictionary<int, List<VariableData>> _subVariableDic;
|
||||
private readonly ConcurrentDictionary<int, List<VariableData>> _opcUaSubVariablesByDeviceId;
|
||||
|
||||
private readonly SemaphoreSlim _reloadSemaphore = new SemaphoreSlim(0);
|
||||
|
||||
// OPC UA 轮询间隔(毫秒)
|
||||
private readonly int OpcUaPollIntervalMs = 100;
|
||||
private readonly int _opcUaPollIntervalMs = 100;
|
||||
|
||||
// OPC UA 订阅发布间隔(毫秒)
|
||||
private readonly int OpcUaSubscriptionPublishingIntervalMs = 1000;
|
||||
private readonly int _opcUaSubscriptionPublishingIntervalMs = 1000;
|
||||
|
||||
// OPC UA 订阅采样间隔(毫秒)
|
||||
private readonly int OpcUaSubscriptionSamplingIntervalMs = 1000;
|
||||
private readonly int _opcUaSubscriptionSamplingIntervalMs = 1000;
|
||||
|
||||
public OpcUaBackgroundService(DataServices dataServices, IDataProcessingService dataProcessingService)
|
||||
{
|
||||
_dataServices = dataServices;
|
||||
_dataProcessingService = dataProcessingService;
|
||||
_deviceDic = new ConcurrentDictionary<int, Device>();
|
||||
_sessionsDic = new ConcurrentDictionary<string, Session>();
|
||||
_subscriptionsDic = new ConcurrentDictionary<string, Subscription>();
|
||||
_opcUaNodeIdVariableDic = new ConcurrentDictionary<string, VariableData>();
|
||||
_pollVariableDic = new ConcurrentDictionary<int, List<VariableData>>();
|
||||
_subVariableDic = new ConcurrentDictionary<int, List<VariableData>>();
|
||||
_opcUaDevices = new ConcurrentDictionary<int, Device>();
|
||||
_opcUaSessions = new ConcurrentDictionary<string, Session>();
|
||||
_opcUaSubscriptions = new ConcurrentDictionary<string, Subscription>();
|
||||
_opcUaPollVariablesByNodeId = new ConcurrentDictionary<string, VariableData>();
|
||||
_opcUaPollVariablesByDeviceId = new ConcurrentDictionary<int, List<VariableData>>();
|
||||
_opcUaSubVariablesByDeviceId = new ConcurrentDictionary<int, List<VariableData>>();
|
||||
|
||||
_dataServices.OnDeviceListChanged += HandleDeviceListChanged;
|
||||
_dataServices.OnDeviceIsActiveChanged += HandleDeviceIsActiveChanged;
|
||||
@@ -94,7 +94,7 @@ namespace PMSWPF.Services
|
||||
while (!stoppingToken.IsCancellationRequested && _reloadSemaphore.CurrentCount == 0)
|
||||
{
|
||||
await PollOpcUaVariableOnceAsync(stoppingToken);
|
||||
await Task.Delay(OpcUaPollIntervalMs, stoppingToken);
|
||||
await Task.Delay(_opcUaPollIntervalMs, stoppingToken);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -130,11 +130,11 @@ namespace PMSWPF.Services
|
||||
if (!isActive)
|
||||
{
|
||||
// 设备变为非活动状态,断开连接
|
||||
if (_sessionsDic.TryRemove(device.OpcUaEndpointUrl, out var session))
|
||||
if (_opcUaSessions.TryRemove(device.OpcUaEndpointUrl, out var session))
|
||||
{
|
||||
try
|
||||
{
|
||||
if (_subscriptionsDic.TryRemove(device.OpcUaEndpointUrl, out var subscription))
|
||||
if (_opcUaSubscriptions.TryRemove(device.OpcUaEndpointUrl, out var subscription))
|
||||
{
|
||||
// 删除订阅。
|
||||
await subscription.DeleteAsync(true);
|
||||
@@ -165,10 +165,10 @@ namespace PMSWPF.Services
|
||||
{
|
||||
try
|
||||
{
|
||||
_deviceDic.Clear();
|
||||
_pollVariableDic.Clear();
|
||||
_subVariableDic.Clear();
|
||||
_opcUaNodeIdVariableDic.Clear();
|
||||
_opcUaDevices.Clear();
|
||||
_opcUaPollVariablesByDeviceId.Clear();
|
||||
_opcUaSubVariablesByDeviceId.Clear();
|
||||
_opcUaPollVariablesByNodeId.Clear();
|
||||
|
||||
NlogHelper.Info("开始加载OPC UA变量....");
|
||||
var opcUaDevices = _dataServices
|
||||
@@ -186,7 +186,7 @@ namespace PMSWPF.Services
|
||||
|
||||
foreach (var opcUaDevice in opcUaDevices)
|
||||
{
|
||||
_deviceDic.AddOrUpdate(opcUaDevice.Id, opcUaDevice, (key, oldValue) => opcUaDevice);
|
||||
_opcUaDevices.AddOrUpdate(opcUaDevice.Id, opcUaDevice, (key, oldValue) => opcUaDevice);
|
||||
|
||||
//查找设备中所有要轮询的变量
|
||||
var dPollList = opcUaDevice.VariableTables?.SelectMany(vt => vt.DataVariables)
|
||||
@@ -197,12 +197,12 @@ namespace PMSWPF.Services
|
||||
// 将变量保存到字典中,方便Read后还原
|
||||
foreach (var variableData in dPollList)
|
||||
{
|
||||
_opcUaNodeIdVariableDic.AddOrUpdate(variableData.OpcUaNodeId, variableData,
|
||||
_opcUaPollVariablesByNodeId.AddOrUpdate(variableData.OpcUaNodeId, variableData,
|
||||
(key, oldValue) => variableData);
|
||||
}
|
||||
|
||||
totalPollVariableCount += dPollList.Count;
|
||||
_pollVariableDic.AddOrUpdate(opcUaDevice.Id, dPollList, (key, oldValue) => dPollList);
|
||||
_opcUaPollVariablesByDeviceId.AddOrUpdate(opcUaDevice.Id, dPollList, (key, oldValue) => dPollList);
|
||||
|
||||
//查找设备中所有要订阅的变量
|
||||
var dSubList = opcUaDevice.VariableTables?.SelectMany(vt => vt.DataVariables)
|
||||
@@ -211,7 +211,7 @@ namespace PMSWPF.Services
|
||||
vd.OpcUaUpdateType == OpcUaUpdateType.OpcUaSubscription)
|
||||
.ToList();
|
||||
totalSubVariableCount += dSubList.Count;
|
||||
_subVariableDic.AddOrUpdate(opcUaDevice.Id, dSubList, (key, oldValue) => dSubList);
|
||||
_opcUaSubVariablesByDeviceId.AddOrUpdate(opcUaDevice.Id, dSubList, (key, oldValue) => dSubList);
|
||||
}
|
||||
|
||||
NlogHelper.Info(
|
||||
@@ -237,8 +237,8 @@ namespace PMSWPF.Services
|
||||
|
||||
var connectTasks = new List<Task>();
|
||||
|
||||
// 遍历_deviceDic中的所有设备,尝试连接
|
||||
foreach (var device in _deviceDic.Values.ToList())
|
||||
// 遍历_opcUaDevices中的所有设备,尝试连接
|
||||
foreach (var device in _opcUaDevices.Values.ToList())
|
||||
{
|
||||
connectTasks.Add(ConnectSingleOpcUaDeviceAsync(device, stoppingToken));
|
||||
}
|
||||
@@ -259,7 +259,7 @@ namespace PMSWPF.Services
|
||||
}
|
||||
|
||||
// Check if already connected
|
||||
if (_sessionsDic.TryGetValue(device.OpcUaEndpointUrl, out var existingSession))
|
||||
if (_opcUaSessions.TryGetValue(device.OpcUaEndpointUrl, out var existingSession))
|
||||
{
|
||||
if (existingSession.Connected)
|
||||
{
|
||||
@@ -269,7 +269,7 @@ namespace PMSWPF.Services
|
||||
else
|
||||
{
|
||||
// Remove disconnected session from dictionary to attempt reconnection
|
||||
_sessionsDic.TryRemove(device.OpcUaEndpointUrl, out _);
|
||||
_opcUaSessions.TryRemove(device.OpcUaEndpointUrl, out _);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -283,7 +283,7 @@ namespace PMSWPF.Services
|
||||
return; // 连接失败,直接返回
|
||||
}
|
||||
|
||||
_sessionsDic.AddOrUpdate(device.OpcUaEndpointUrl, session, (key, oldValue) => session);
|
||||
_opcUaSessions.AddOrUpdate(device.OpcUaEndpointUrl, session, (key, oldValue) => session);
|
||||
NotificationHelper.ShowSuccess($"已连接到OPC UA服务器: {device.Name} ({device.OpcUaEndpointUrl})");
|
||||
}
|
||||
catch (Exception e)
|
||||
@@ -298,7 +298,7 @@ namespace PMSWPF.Services
|
||||
try
|
||||
{
|
||||
// 获取当前需要轮询的设备ID列表的快照
|
||||
var deviceIdsToPoll = _pollVariableDic.Keys.ToList();
|
||||
var deviceIdsToPoll = _opcUaPollVariablesByDeviceId.Keys.ToList();
|
||||
|
||||
// 为每个设备创建并发轮询任务
|
||||
var pollingTasks = deviceIdsToPoll.Select(async deviceId =>
|
||||
@@ -308,7 +308,7 @@ namespace PMSWPF.Services
|
||||
return; // 任务被取消,退出循环
|
||||
}
|
||||
|
||||
if (!_deviceDic.TryGetValue(deviceId, out var device) ||
|
||||
if (!_opcUaDevices.TryGetValue(deviceId, out var device) ||
|
||||
device.OpcUaEndpointUrl == null)
|
||||
{
|
||||
NlogHelper.Warn(
|
||||
@@ -321,7 +321,7 @@ namespace PMSWPF.Services
|
||||
return;
|
||||
}
|
||||
|
||||
if (!_sessionsDic.TryGetValue(
|
||||
if (!_opcUaSessions.TryGetValue(
|
||||
device.OpcUaEndpointUrl, out Session session) ||
|
||||
!session.Connected)
|
||||
{
|
||||
@@ -339,7 +339,7 @@ namespace PMSWPF.Services
|
||||
}
|
||||
|
||||
var nodesToRead = new ReadValueIdCollection();
|
||||
if (!_pollVariableDic.TryGetValue(deviceId, out var variableList))
|
||||
if (!_opcUaPollVariablesByDeviceId.TryGetValue(deviceId, out var variableList))
|
||||
{
|
||||
return; // 跳过此设备
|
||||
}
|
||||
@@ -393,7 +393,7 @@ namespace PMSWPF.Services
|
||||
var value = results[i];
|
||||
var nodeId = nodesToRead[i]
|
||||
.NodeId.ToString();
|
||||
if (!_opcUaNodeIdVariableDic.TryGetValue(
|
||||
if (!_opcUaPollVariablesByNodeId.TryGetValue(
|
||||
nodeId, out var variable))
|
||||
{
|
||||
NlogHelper.Warn(
|
||||
@@ -463,7 +463,7 @@ namespace PMSWPF.Services
|
||||
|
||||
var setupSubscriptionTasks = new List<Task>();
|
||||
|
||||
foreach (var deviceId in _subVariableDic.Keys.ToList())
|
||||
foreach (var deviceId in _opcUaSubVariablesByDeviceId.Keys.ToList())
|
||||
{
|
||||
setupSubscriptionTasks.Add(Task.Run(() =>
|
||||
{
|
||||
@@ -481,29 +481,29 @@ namespace PMSWPF.Services
|
||||
|
||||
Subscription subscription = null;
|
||||
// 得到session
|
||||
if (!_sessionsDic.TryGetValue(device.OpcUaEndpointUrl, out var session))
|
||||
if (!_opcUaSessions.TryGetValue(device.OpcUaEndpointUrl, out var session))
|
||||
{
|
||||
NlogHelper.Info($"从OpcUa会话字典中获取会话失败: {device.OpcUaEndpointUrl} ");
|
||||
return;
|
||||
}
|
||||
|
||||
// 判断设备是否已经添加了订阅
|
||||
if (_subscriptionsDic.TryGetValue(device.OpcUaEndpointUrl, out subscription))
|
||||
if (_opcUaSubscriptions.TryGetValue(device.OpcUaEndpointUrl, out subscription))
|
||||
{
|
||||
NlogHelper.Info($"OPC UA 终结点 {device.OpcUaEndpointUrl} 已存在订阅。");
|
||||
}
|
||||
else
|
||||
{
|
||||
subscription = new Subscription(session.DefaultSubscription);
|
||||
subscription.PublishingInterval = OpcUaSubscriptionPublishingIntervalMs; // 发布间隔(毫秒)
|
||||
subscription.PublishingInterval = _opcUaSubscriptionPublishingIntervalMs; // 发布间隔(毫秒)
|
||||
session.AddSubscription(subscription);
|
||||
subscription.Create();
|
||||
_subscriptionsDic.AddOrUpdate(device.OpcUaEndpointUrl, subscription,
|
||||
_opcUaSubscriptions.AddOrUpdate(device.OpcUaEndpointUrl, subscription,
|
||||
(key, oldValue) => subscription);
|
||||
}
|
||||
|
||||
// 将变量添加到订阅
|
||||
if (_subVariableDic.TryGetValue(deviceId, out var variablesToSubscribe))
|
||||
if (_opcUaSubVariablesByDeviceId.TryGetValue(deviceId, out var variablesToSubscribe))
|
||||
{
|
||||
foreach (VariableData variable in variablesToSubscribe)
|
||||
{
|
||||
@@ -512,7 +512,7 @@ namespace PMSWPF.Services
|
||||
monitoredItem.DisplayName = variable.Name;
|
||||
monitoredItem.StartNodeId = new NodeId(variable.OpcUaNodeId); // 设置要监控的节点 ID
|
||||
monitoredItem.AttributeId = Attributes.Value; // 监控节点的值属性
|
||||
monitoredItem.SamplingInterval = OpcUaSubscriptionSamplingIntervalMs; // 采样间隔(毫秒)
|
||||
monitoredItem.SamplingInterval = _opcUaSubscriptionSamplingIntervalMs; // 采样间隔(毫秒)
|
||||
monitoredItem.QueueSize = 1; // 队列大小
|
||||
monitoredItem.DiscardOldest = true; // 丢弃最旧的数据
|
||||
// 注册数据变化通知事件。
|
||||
@@ -556,20 +556,20 @@ namespace PMSWPF.Services
|
||||
/// </summary>
|
||||
private async Task DisconnectAllOpcUaSessionsAsync()
|
||||
{
|
||||
if (_sessionsDic.IsEmpty)
|
||||
if (_opcUaSessions.IsEmpty)
|
||||
return;
|
||||
|
||||
NlogHelper.Info("正在断开所有 OPC UA 会话...");
|
||||
var closeTasks = new List<Task>();
|
||||
|
||||
foreach (var endpointUrl in _sessionsDic.Keys.ToList())
|
||||
foreach (var endpointUrl in _opcUaSessions.Keys.ToList())
|
||||
{
|
||||
closeTasks.Add(Task.Run(async () =>
|
||||
{
|
||||
NlogHelper.Info($"正在断开 OPC UA 会话: {endpointUrl}");
|
||||
if (_sessionsDic.TryRemove(endpointUrl, out var session))
|
||||
if (_opcUaSessions.TryRemove(endpointUrl, out var session))
|
||||
{
|
||||
if (_subscriptionsDic.TryRemove(endpointUrl, out var subscription))
|
||||
if (_opcUaSubscriptions.TryRemove(endpointUrl, out var subscription))
|
||||
{
|
||||
// 删除订阅。
|
||||
await subscription.DeleteAsync(true);
|
||||
|
||||
Reference in New Issue
Block a user