修复后台更新前台不显示的问题,重新梳理了代码
This commit is contained in:
@@ -32,20 +32,20 @@ namespace PMSWPF.Services
|
||||
/// 3. **变量加载与管理 (`LoadOpcUaVariables`)**:
|
||||
/// - 从 `DataServices` 获取所有变量数据。
|
||||
/// - 清理本地缓存中已不存在或不再活跃的变量,并断开相应的 OPC UA 连接。
|
||||
/// - 遍历所有活动的 OPC UA 变量,为新增的变量调用 `ConnectAndSubscribeOpcUa` 方法建立连接和订阅。
|
||||
/// - 遍历所有活动的 OPC UA 变量,为新增的变量调用 `ConnectOpcUaService` 方法建立连接和订阅。
|
||||
/// - 如果变量已存在但会话断开,则尝试重新连接。
|
||||
///
|
||||
/// 4. **连接与订阅 (`ConnectAndSubscribeOpcUa`)**:
|
||||
/// 4. **连接与订阅 (`ConnectOpcUaService`)**:
|
||||
/// - 检查变量是否包含有效的终结点 URL 和节点 ID。
|
||||
/// - 如果到目标终结点的会话已存在且已连接,则复用该会话。
|
||||
/// - 否则,创建一个新的 OPC UA `Session`,并将其缓存到 `_opcUaSessions` 字典中。
|
||||
/// - 创建一个 `Subscription`,并将其缓存到 `_opcUaSubscriptions` 字典中。
|
||||
/// - 创建一个 `MonitoredItem` 来监控指定变量节点的值变化,并将其添加到订阅中。
|
||||
/// - 注册 `OnNotification` 事件回调,用于处理接收到的数据更新。
|
||||
/// - 注册 `OnSubNotification` 事件回调,用于处理接收到的数据更新。
|
||||
///
|
||||
/// 5. **数据接收 (`OnNotification`)**:
|
||||
/// 5. **数据接收 (`OnSubNotification`)**:
|
||||
/// - 当订阅的变量值发生变化时,OPC UA 服务器会发送通知。
|
||||
/// - `OnNotification` 方法被触发,处理接收到的数据。
|
||||
/// - `OnSubNotification` 方法被触发,处理接收到的数据。
|
||||
///
|
||||
/// 6. **动态更新 (`HandleVariableDataChanged`)**:
|
||||
/// - 当 `DataServices` 中的变量数据发生增、删、改时,会触发 `OnVariableDataChanged` 事件。
|
||||
@@ -70,15 +70,41 @@ namespace PMSWPF.Services
|
||||
private readonly Dictionary<string, Subscription> _opcUaSubscriptions;
|
||||
|
||||
// 存储活动的 OPC UA 变量,键为变量的唯一 ID。
|
||||
private readonly Dictionary<int, VariableData> _opcUaVariables; // Key: VariableData.Id
|
||||
private readonly Dictionary<int, VariableData> _opcUaVariables; // Key: VariableData.Id
|
||||
|
||||
// 储存所有要轮询更新的变量,键是Device.Id,值是这个设备所有要轮询的变量
|
||||
private readonly Dictionary<int, List<VariableData>> _opcUaPollVariableDic; // Key: VariableData.Id
|
||||
|
||||
// 储存所有要订阅更新的变量,键是Device.Id,值是这个设备所有要轮询的变量
|
||||
private readonly Dictionary<int, List<VariableData>> _opcUaSubVariableDic;
|
||||
|
||||
// private readonly Dictionary<int, CancellationTokenSource>
|
||||
// _opcUaPollingTasks; // Key: VariableData.Id, Value: CancellationTokenSource for polling task
|
||||
|
||||
private List<Device> _opcUaDevices;
|
||||
private List<VariableData> _opcUaAllVariableDatas;
|
||||
private List<VariableData> _OpcUaPollVarDataList;
|
||||
private List<VariableData> _OpcUaSubscriptionVarDataList;
|
||||
|
||||
// 定义不同轮询级别的间隔时间。
|
||||
private readonly Dictionary<PollLevelType, TimeSpan> _pollingIntervals = new Dictionary<PollLevelType, TimeSpan>
|
||||
{
|
||||
{ PollLevelType.TenMilliseconds, TimeSpan.FromMilliseconds((int)PollLevelType.TenMilliseconds) },
|
||||
{
|
||||
PollLevelType.HundredMilliseconds, TimeSpan.FromMilliseconds((int)PollLevelType.HundredMilliseconds)
|
||||
},
|
||||
{
|
||||
PollLevelType.FiveHundredMilliseconds,
|
||||
TimeSpan.FromMilliseconds((int)PollLevelType.FiveHundredMilliseconds)
|
||||
},
|
||||
{ PollLevelType.OneSecond, TimeSpan.FromMilliseconds((int)PollLevelType.OneSecond) },
|
||||
{ PollLevelType.FiveSeconds, TimeSpan.FromMilliseconds((int)PollLevelType.FiveSeconds) },
|
||||
{ PollLevelType.TenSeconds, TimeSpan.FromMilliseconds((int)PollLevelType.TenSeconds) },
|
||||
{ PollLevelType.TwentySeconds, TimeSpan.FromMilliseconds((int)PollLevelType.TwentySeconds) },
|
||||
{ PollLevelType.ThirtySeconds, TimeSpan.FromMilliseconds((int)PollLevelType.ThirtySeconds) },
|
||||
{ PollLevelType.OneMinute, TimeSpan.FromMilliseconds((int)PollLevelType.OneMinute) },
|
||||
{ PollLevelType.ThreeMinutes, TimeSpan.FromMilliseconds((int)PollLevelType.ThreeMinutes) },
|
||||
{ PollLevelType.FiveMinutes, TimeSpan.FromMilliseconds((int)PollLevelType.FiveMinutes) },
|
||||
{ PollLevelType.TenMinutes, TimeSpan.FromMilliseconds((int)PollLevelType.TenMinutes) },
|
||||
{ PollLevelType.ThirtyMinutes, TimeSpan.FromMilliseconds((int)PollLevelType.ThirtyMinutes) }
|
||||
};
|
||||
|
||||
/// <summary>
|
||||
/// OpcUaBackgroundService 的构造函数。
|
||||
@@ -89,9 +115,9 @@ namespace PMSWPF.Services
|
||||
_dataServices = dataServices;
|
||||
_opcUaSessions = new Dictionary<string, Session>();
|
||||
_opcUaSubscriptions = new Dictionary<string, Subscription>();
|
||||
_opcUaVariables = new ();
|
||||
// _opcUaPollingTasks = new Dictionary<int, CancellationTokenSource>();
|
||||
_opcUaAllVariableDatas = new List<VariableData>();
|
||||
_opcUaVariables = new();
|
||||
_opcUaPollVariableDic = new();
|
||||
_opcUaSubVariableDic = new();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -117,7 +143,6 @@ namespace PMSWPF.Services
|
||||
try
|
||||
{
|
||||
_cancellationTokenSource.Cancel();
|
||||
|
||||
}
|
||||
finally
|
||||
{
|
||||
@@ -160,24 +185,29 @@ namespace PMSWPF.Services
|
||||
return;
|
||||
foreach (var opcUaDevice in _opcUaDevices)
|
||||
{
|
||||
opcUaDevice.VariableTables.SelectMany(vt => vt.DataVariables).Where(vd => vd.IsActive == true && vd.ProtocolType == ProtocolType.OpcUA && vd.OpcUaUpdateType==OpcUaUpdateType.OpcUaPoll).ToList();
|
||||
//查找设备中所有要轮询的变量
|
||||
var dPollList = opcUaDevice?.VariableTables?.SelectMany(vt => vt.DataVariables)
|
||||
.Where(vd => vd.IsActive == true && vd.ProtocolType == ProtocolType.OpcUA &&
|
||||
vd.OpcUaUpdateType == OpcUaUpdateType.OpcUaPoll)
|
||||
.ToList();
|
||||
_opcUaPollVariableDic.Add(opcUaDevice.Id, dPollList);
|
||||
//查找设备中所有要订阅的变量
|
||||
var dSubList = opcUaDevice?.VariableTables?.SelectMany(vt => vt.DataVariables)
|
||||
.Where(vd => vd.IsActive == true && vd.ProtocolType == ProtocolType.OpcUA &&
|
||||
vd.OpcUaUpdateType == OpcUaUpdateType.OpcUaSubscription)
|
||||
.ToList();
|
||||
_opcUaSubVariableDic.Add(opcUaDevice.Id, dSubList);
|
||||
}
|
||||
_OpcUaPollVarDataList= _opcUaDevices.SelectMany(o => o.VariableTables)
|
||||
.SelectMany(v => v.DataVariables)
|
||||
.Where(vd => vd.IsActive == true && vd.ProtocolType == ProtocolType.OpcUA && vd.OpcUaUpdateType==OpcUaUpdateType.OpcUaPoll).ToList();
|
||||
_OpcUaSubscriptionVarDataList= _opcUaDevices.SelectMany(o => o.VariableTables)
|
||||
.SelectMany(v => v.DataVariables)
|
||||
.Where(vd => vd.IsActive == true && vd.ProtocolType == ProtocolType.OpcUA && vd.OpcUaUpdateType==OpcUaUpdateType.OpcUaPoll).ToList();
|
||||
|
||||
|
||||
if (_opcUaAllVariableDatas == null || _opcUaAllVariableDatas.Count == 0)
|
||||
if (_opcUaSubVariableDic.Count == 0 && _opcUaPollVariableDic.Count == 0)
|
||||
return;
|
||||
|
||||
// 清理不再活跃或已删除的变量。
|
||||
// await RemoveInactiveOpcUaVariables(_opcUaAllVariableDatas);
|
||||
|
||||
// 处理新增或更新的变量。
|
||||
await ProcessActiveOpcUaVariables(_opcUaAllVariableDatas);
|
||||
//连接服务器
|
||||
await ConnectOpcUaService();
|
||||
// 添加订阅变量
|
||||
SetupOpcUaSubscription();
|
||||
|
||||
await PollOpcUaVariable();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -185,60 +215,83 @@ namespace PMSWPF.Services
|
||||
/// </summary>
|
||||
/// <param name="variable">要订阅或轮询的变量信息。</param>
|
||||
/// <param name="device">变量所属的设备信息。</param>
|
||||
private async Task ConnectAndSubscribeOpcUa(VariableData variable, Device device)
|
||||
private async Task ConnectOpcUaService()
|
||||
{
|
||||
NlogHelper.Info($"正在为变量 '{variable.Name}' 连接和处理 OPC UA 服务器... (更新类型: {variable.OpcUaUpdateType})");
|
||||
if (string.IsNullOrEmpty(device.OpcUaEndpointUrl) || string.IsNullOrEmpty(variable.OpcUaNodeId))
|
||||
foreach (Device device in _opcUaDevices)
|
||||
{
|
||||
NlogHelper.Warn($"OPC UA variable {variable.Name} has invalid EndpointUrl or NodeId.");
|
||||
return;
|
||||
}
|
||||
Session session = null;
|
||||
// 检查是否已存在到该终结点的活动会话。
|
||||
if (_opcUaSessions.TryGetValue(device.OpcUaEndpointUrl, out session) && session.Connected)
|
||||
{
|
||||
NlogHelper.Info($"Already connected to OPC UA endpoint: {device.OpcUaEndpointUrl}");
|
||||
}
|
||||
else
|
||||
{
|
||||
session = await OpcUaServiceHelper.CreateOpcUaSessionAsync(device.OpcUaEndpointUrl);
|
||||
if (session == null)
|
||||
return; // 连接失败,直接返回
|
||||
|
||||
variable.DisplayValue = "4";
|
||||
|
||||
Session session = null;
|
||||
// 检查是否已存在到该终结点的活动会话。
|
||||
if (_opcUaSessions.TryGetValue(device.OpcUaEndpointUrl, out session) && session.Connected)
|
||||
{
|
||||
NlogHelper.Info($"Already connected to OPC UA endpoint: {device.OpcUaEndpointUrl}");
|
||||
}
|
||||
else
|
||||
{
|
||||
session = await OpcUaServiceHelper.CreateOpcUaSessionAsync(device.OpcUaEndpointUrl);
|
||||
if (session == null)
|
||||
return; // 连接失败,直接返回
|
||||
|
||||
_opcUaSessions[device.OpcUaEndpointUrl] = session;
|
||||
}
|
||||
|
||||
if (variable.OpcUaUpdateType == OpcUaUpdateType.OpcUaSubscription)
|
||||
{
|
||||
SetupOpcUaSubscription(session, variable, device.OpcUaEndpointUrl);
|
||||
}
|
||||
else if (variable.OpcUaUpdateType == OpcUaUpdateType.OpcUaPoll)
|
||||
{
|
||||
variable.DisplayValue = "5";
|
||||
StartOpcUaPolling(session, variable, device);
|
||||
_opcUaSessions[device.OpcUaEndpointUrl] = session;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task PollOpcUaVariable(Session session, VariableData variable, Device device,
|
||||
CancellationToken cancellationToken)
|
||||
private async Task PollOpcUaVariable()
|
||||
{
|
||||
variable.DisplayValue = "7";
|
||||
while (!cancellationToken.IsCancellationRequested)
|
||||
while (!_cancellationTokenSource.Token.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (session != null && session.Connected)
|
||||
foreach (var deviceId in _opcUaPollVariableDic.Keys)
|
||||
{
|
||||
var nodeToRead = new ReadValueId
|
||||
{
|
||||
NodeId = new NodeId(variable.OpcUaNodeId),
|
||||
AttributeId = Attributes.Value
|
||||
};
|
||||
await Task.Delay(100);
|
||||
var device = _dataServices.Devices.FirstOrDefault(d => d.Id == deviceId);
|
||||
if (device == null || device.OpcUaEndpointUrl == String.Empty)
|
||||
{
|
||||
NlogHelper.Warn(
|
||||
$"OpcUa轮询变量,在DataService中没有找到Id为:{deviceId},的设备,或者服务器地址:{device.OpcUaEndpointUrl} 为空,请检查!!");
|
||||
continue;
|
||||
}
|
||||
|
||||
_opcUaSessions.TryGetValue(device.OpcUaEndpointUrl, out Session session);
|
||||
if (session == null || !session.Connected)
|
||||
{
|
||||
NlogHelper.Warn(
|
||||
$"OPC UA session for {device.OpcUaEndpointUrl} is not connected. Attempting to reconnect...");
|
||||
// 尝试重新连接会话
|
||||
await ConnectOpcUaService();
|
||||
continue;
|
||||
}
|
||||
|
||||
var nodesToRead = new ReadValueIdCollection();
|
||||
var variableList = _opcUaPollVariableDic[deviceId];
|
||||
foreach (var variable in variableList)
|
||||
{
|
||||
// 获取变量的轮询间隔。
|
||||
if (!_pollingIntervals.TryGetValue(variable.PollLevelType, out var interval))
|
||||
{
|
||||
NlogHelper.Info($"未知轮询级别 {variable.PollLevelType},跳过变量 {variable.Name}。");
|
||||
continue;
|
||||
}
|
||||
// 检查是否达到轮询时间。
|
||||
if ((DateTime.Now - variable.UpdateTime) < interval)
|
||||
{
|
||||
continue; // 未到轮询时间,跳过。
|
||||
}
|
||||
|
||||
|
||||
nodesToRead.Add(new ReadValueId
|
||||
{
|
||||
NodeId = new NodeId(variable.OpcUaNodeId),
|
||||
AttributeId = Attributes.Value
|
||||
});
|
||||
}
|
||||
// 如果没有要读取的变量则跳过
|
||||
if (nodesToRead.Count == 0)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
var nodesToRead = new ReadValueIdCollection { nodeToRead };
|
||||
session.Read(
|
||||
null,
|
||||
0,
|
||||
@@ -249,45 +302,44 @@ namespace PMSWPF.Services
|
||||
|
||||
if (results != null && results.Count > 0)
|
||||
{
|
||||
var value = results[0];
|
||||
if (StatusCode.IsGood(value.StatusCode))
|
||||
for (int i = 0; i < results.Count; i++)
|
||||
{
|
||||
NlogHelper.Info(
|
||||
var value = results[i];
|
||||
var variable = variableList[i];
|
||||
if (StatusCode.IsGood(value.StatusCode))
|
||||
{
|
||||
NlogHelper.Info(
|
||||
$"[OPC UA 轮询] {variable.Name}: {value.Value} | 时间戳: {value.SourceTimestamp.ToLocalTime()} | 状态: {value.StatusCode}");
|
||||
// 更新变量数据
|
||||
variable.DataValue = value.Value.ToString();
|
||||
variable.DisplayValue = value.Value.ToString(); // 或者根据需要进行格式化
|
||||
await _dataServices.UpdateVariableDataAsync(variable);
|
||||
}
|
||||
else
|
||||
{
|
||||
NlogHelper.Warn(
|
||||
$"Failed to read OPC UA variable {variable.Name} ({variable.OpcUaNodeId}): {value.StatusCode}");
|
||||
// 更新变量数据
|
||||
variable.DataValue = value.Value.ToString();
|
||||
variable.DisplayValue = value.Value.ToString(); // 或者根据需要进行格式化
|
||||
variable.UpdateTime=DateTime.Now;
|
||||
// await _dataServices.UpdateVariableDataAsync(variable);
|
||||
}
|
||||
else
|
||||
{
|
||||
NlogHelper.Warn(
|
||||
$"Failed to read OPC UA variable {variable.Name} ({variable.OpcUaNodeId}): {value.StatusCode}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
NlogHelper.Warn(
|
||||
$"OPC UA session for {device.OpcUaEndpointUrl} is not connected. Attempting to reconnect...");
|
||||
// 尝试重新连接会话
|
||||
await ConnectAndSubscribeOpcUa(variable, device);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
NlogHelper.Error($"Error during OPC UA polling for {variable.Name}: {ex.Message}", ex);
|
||||
NlogHelper.Error($"Error during OPC UA polling for {ex.Message}", ex);
|
||||
}
|
||||
|
||||
// 根据 PollLevelType 设置轮询间隔
|
||||
var pollInterval = GetPollInterval(variable.PollLevelType);
|
||||
await Task.Delay(pollInterval, cancellationToken);
|
||||
}
|
||||
|
||||
NlogHelper.Info($"Polling for OPC UA variable {variable.Name} stopped.");
|
||||
NlogHelper.Info($"Polling for OPC UA variable stopped.");
|
||||
}
|
||||
|
||||
private void OnNotification(MonitoredItem monitoredItem, MonitoredItemNotificationEventArgs e)
|
||||
/// <summary>
|
||||
/// 订阅变量变化的通知
|
||||
/// </summary>
|
||||
/// <param name="monitoredItem"></param>
|
||||
/// <param name="e"></param>
|
||||
private void OnSubNotification(MonitoredItem monitoredItem, MonitoredItemNotificationEventArgs e)
|
||||
{
|
||||
foreach (var value in monitoredItem.DequeueValues())
|
||||
{
|
||||
@@ -336,29 +388,6 @@ namespace PMSWPF.Services
|
||||
_opcUaSubscriptions.Remove(endpointUrl);
|
||||
}
|
||||
|
||||
// 取消与此会话相关的轮询任务
|
||||
// var variablesToCancelPolling = _opcUaVariables.Where(kv => kv.Value.VariableTable != null &&
|
||||
// kv.Value.VariableTable.DeviceId.HasValue &&
|
||||
// _dataServices
|
||||
// .GetDeviceByIdAsync(
|
||||
// kv.Value.VariableTable.DeviceId
|
||||
// .Value)
|
||||
// .Result?.OpcUaEndpointUrl ==
|
||||
// endpointUrl &&
|
||||
// kv.Value.OpcUaUpdateType ==
|
||||
// OpcUaUpdateType.OpcUaPoll)
|
||||
// .ToList();
|
||||
// foreach (var entry in variablesToCancelPolling)
|
||||
// {
|
||||
// if (_opcUaPollingTasks.ContainsKey(entry.Key))
|
||||
// {
|
||||
// var cts = _opcUaPollingTasks[entry.Key];
|
||||
// _opcUaPollingTasks.Remove(entry.Key);
|
||||
// cts.Cancel();
|
||||
// cts.Dispose();
|
||||
// NlogHelper.Info($"Cancelled polling for variable: {entry.Value.Name} (ID: {entry.Key})");
|
||||
// }
|
||||
// }
|
||||
|
||||
// 关闭会话。
|
||||
session.Close();
|
||||
@@ -392,146 +421,55 @@ namespace PMSWPF.Services
|
||||
await LoadOpcUaVariables();
|
||||
}
|
||||
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// 设置 OPC UA 订阅并添加监控项。
|
||||
/// </summary>
|
||||
/// <param name="session">OPC UA 会话。</param>
|
||||
/// <param name="variable">要订阅的变量信息。</param>
|
||||
/// <param name="endpointUrl">OPC UA 服务器的终结点 URL。</param>
|
||||
private void SetupOpcUaSubscription(Session session, VariableData variable, string endpointUrl)
|
||||
private void SetupOpcUaSubscription()
|
||||
{
|
||||
Subscription subscription = null;
|
||||
if (_opcUaSubscriptions.TryGetValue(endpointUrl, out subscription))
|
||||
foreach (var deviceId in _opcUaSubVariableDic.Keys)
|
||||
{
|
||||
NlogHelper.Info($"Already has subscription for OPC UA endpoint: {endpointUrl}");
|
||||
}
|
||||
else
|
||||
{
|
||||
subscription = new Subscription(session.DefaultSubscription);
|
||||
subscription.PublishingInterval = 1000; // 发布间隔(毫秒)
|
||||
session.AddSubscription(subscription);
|
||||
subscription.Create();
|
||||
_opcUaSubscriptions[endpointUrl] = subscription;
|
||||
}
|
||||
var device = _dataServices.Devices.FirstOrDefault(d => d.Id == deviceId);
|
||||
Subscription subscription = null;
|
||||
// 得到session
|
||||
_opcUaSessions.TryGetValue(device.OpcUaEndpointUrl, out var session);
|
||||
|
||||
// 7. 创建监控项并添加到订阅中。
|
||||
MonitoredItem monitoredItem = new MonitoredItem(subscription.DefaultItem);
|
||||
monitoredItem.DisplayName = variable.Name;
|
||||
monitoredItem.StartNodeId = new NodeId(variable.OpcUaNodeId); // 设置要监控的节点 ID
|
||||
monitoredItem.AttributeId = Attributes.Value; // 监控节点的值属性
|
||||
monitoredItem.SamplingInterval = 1000; // 采样间隔(毫秒)
|
||||
monitoredItem.QueueSize = 1; // 队列大小
|
||||
monitoredItem.DiscardOldest = true; // 丢弃最旧的数据
|
||||
// 注册数据变化通知事件。
|
||||
monitoredItem.Notification += OnNotification;
|
||||
|
||||
subscription.AddItem(monitoredItem);
|
||||
subscription.ApplyChanges(); // 应用更改
|
||||
NlogHelper.Info($"Subscribed to OPC UA variable: {variable.Name} ({variable.OpcUaNodeId})");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 启动 OPC UA 变量的轮询任务。
|
||||
/// </summary>
|
||||
/// <param name="session">OPC UA 会话。</param>
|
||||
/// <param name="variable">要轮询的变量信息。</param>
|
||||
/// <param name="device">变量所属的设备信息。</param>
|
||||
private void StartOpcUaPolling(Session session, VariableData variable, Device device)
|
||||
{
|
||||
variable.DisplayValue = "5.5";
|
||||
var id = variable.Id;
|
||||
// if (!_opcUaPollingTasks.ContainsKey(id))
|
||||
// {
|
||||
variable.DisplayValue = "6";
|
||||
// _opcUaPollingTasks.Add(variable.Id, _cancellationTokenSource);
|
||||
_ = Task.Run(() => PollOpcUaVariable(session, variable, device,_cancellationTokenSource.Token), _cancellationTokenSource.Token);
|
||||
NlogHelper.Info($"Started polling for OPC UA variable: {variable.Name} ({variable.OpcUaNodeId})");
|
||||
// }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 移除不再活跃或已删除的 OPC UA 变量。
|
||||
/// </summary>
|
||||
/// <param name="activeOpcUaVariables">当前所有活跃的 OPC UA 变量列表。</param>
|
||||
private async Task RemoveInactiveOpcUaVariables(List<VariableData> activeOpcUaVariables)
|
||||
{
|
||||
var currentOpcUaVariableIds = activeOpcUaVariables.Select(v => v.Id)
|
||||
.ToHashSet();
|
||||
var variablesToRemove = _opcUaVariables.Keys.Except(currentOpcUaVariableIds)
|
||||
.ToList();
|
||||
|
||||
NlogHelper.Info($"发现 {variablesToRemove.Count} 个要移除的 OPC UA 变量。");
|
||||
|
||||
foreach (var id in variablesToRemove)
|
||||
{
|
||||
if (_opcUaVariables.TryGetValue(id, out var variable))
|
||||
// 判断设备是否已经添加了订阅
|
||||
if (_opcUaSubscriptions.TryGetValue(device.OpcUaEndpointUrl, out subscription))
|
||||
{
|
||||
if (variable.OpcUaUpdateType == OpcUaUpdateType.OpcUaSubscription)
|
||||
{
|
||||
// 获取关联的设备信息
|
||||
var device = await _dataServices.GetDeviceByIdAsync(variable.VariableTable.DeviceId ?? 0);
|
||||
if (device != null)
|
||||
{
|
||||
// 断开与该变量相关的 OPC UA 会话。
|
||||
await DisconnectOpcUaSession(device.OpcUaEndpointUrl);
|
||||
}
|
||||
}
|
||||
else if (variable.OpcUaUpdateType == OpcUaUpdateType.OpcUaPoll)
|
||||
{
|
||||
// if (_opcUaPollingTasks.ContainsKey(variable.Id))
|
||||
// {
|
||||
// var cts = _opcUaPollingTasks[variable.Id];
|
||||
// _opcUaPollingTasks.Remove(variable.Id);
|
||||
// cts.Cancel();
|
||||
// cts.Dispose();
|
||||
// }
|
||||
}
|
||||
|
||||
_opcUaVariables.Remove(id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 处理新增或更新的活跃 OPC UA 变量。
|
||||
/// </summary>
|
||||
/// <param name="opcUaVariables">当前所有活跃的 OPC UA 变量列表。</param>
|
||||
private async Task ProcessActiveOpcUaVariables(List<VariableData> opcUaVariables)
|
||||
{
|
||||
foreach (var variable in opcUaVariables)
|
||||
{
|
||||
|
||||
// 获取关联的设备信息
|
||||
var device = await _dataServices.GetDeviceByIdAsync(variable.VariableTable.DeviceId ?? 0);
|
||||
if (device == null)
|
||||
{
|
||||
NlogHelper.Warn($"变量 '{variable.Name}' (ID: {variable.Id}) 关联的设备不存在。");
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
variable.DisplayValue = "2";
|
||||
if (!_opcUaVariables.ContainsKey(variable.Id))
|
||||
{
|
||||
variable.DisplayValue = "3";
|
||||
// 如果是新变量,则添加到字典并建立连接和订阅。
|
||||
_opcUaVariables.Add( variable.Id,variable);
|
||||
//
|
||||
await ConnectAndSubscribeOpcUa(variable, device);
|
||||
NlogHelper.Info($"Already has subscription for OPC UA endpoint: {device.OpcUaEndpointUrl}");
|
||||
}
|
||||
else
|
||||
{
|
||||
Console.WriteLine($"_opcUaVariables已经包含对象:{variable.Name}");
|
||||
// // 如果变量已存在,则更新其信息。
|
||||
_opcUaVariables[variable.Id] = variable;
|
||||
// 如果终结点 URL 对应的会话已断开,则尝试重新连接。
|
||||
if (_opcUaSessions.ContainsKey(device.OpcUaEndpointUrl) &&
|
||||
!_opcUaSessions[device.OpcUaEndpointUrl].Connected)
|
||||
{
|
||||
await ConnectAndSubscribeOpcUa(variable, device);
|
||||
}
|
||||
subscription = new Subscription(session.DefaultSubscription);
|
||||
subscription.PublishingInterval = 1000; // 发布间隔(毫秒)
|
||||
session.AddSubscription(subscription);
|
||||
subscription.Create();
|
||||
_opcUaSubscriptions[device.OpcUaEndpointUrl] = subscription;
|
||||
}
|
||||
|
||||
// 将变量添加到订阅
|
||||
foreach (VariableData variable in _opcUaSubVariableDic[deviceId])
|
||||
{
|
||||
// 7. 创建监控项并添加到订阅中。
|
||||
MonitoredItem monitoredItem = new MonitoredItem(subscription.DefaultItem);
|
||||
monitoredItem.DisplayName = variable.Name;
|
||||
monitoredItem.StartNodeId = new NodeId(variable.OpcUaNodeId); // 设置要监控的节点 ID
|
||||
monitoredItem.AttributeId = Attributes.Value; // 监控节点的值属性
|
||||
monitoredItem.SamplingInterval = 1000; // 采样间隔(毫秒)
|
||||
monitoredItem.QueueSize = 1; // 队列大小
|
||||
monitoredItem.DiscardOldest = true; // 丢弃最旧的数据
|
||||
// 注册数据变化通知事件。
|
||||
monitoredItem.Notification += OnSubNotification;
|
||||
|
||||
subscription.AddItem(monitoredItem);
|
||||
subscription.ApplyChanges(); // 应用更改
|
||||
}
|
||||
|
||||
NlogHelper.Info($"设备: {device.Name},添加订阅变量{_opcUaSubVariableDic[deviceId]?.Count ?? 0} 个");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user