修复OpcUa轮询读取变量混乱问题
This commit is contained in:
@@ -179,7 +179,7 @@ public class VarDataRepository
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="variableData">VariableData实体</param>
|
/// <param name="variableData">VariableData实体</param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public async Task<bool> UpdateAsync(List<VariableData> variableDatas)
|
public async Task<int> UpdateAsync(List<VariableData> variableDatas)
|
||||||
{
|
{
|
||||||
Stopwatch stopwatch = new Stopwatch();
|
Stopwatch stopwatch = new Stopwatch();
|
||||||
stopwatch.Start();
|
stopwatch.Start();
|
||||||
@@ -198,14 +198,13 @@ public class VarDataRepository
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="variableData">VariableData实体</param>
|
/// <param name="variableData">VariableData实体</param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public async Task<bool> UpdateAsync(List<VariableData> variableDatas, SqlSugarClient db)
|
public async Task<int> UpdateAsync(List<VariableData> variableDatas, SqlSugarClient db)
|
||||||
{
|
{
|
||||||
Stopwatch stopwatch = new Stopwatch();
|
Stopwatch stopwatch = new Stopwatch();
|
||||||
stopwatch.Start();
|
stopwatch.Start();
|
||||||
|
|
||||||
var dbVarDatas = variableDatas.Select(vd => vd.CopyTo<DbVariableData>());
|
var dbVarDatas = variableDatas.Select(vd => vd.CopyTo<DbVariableData>());
|
||||||
var result = await db.UpdateNav(dbVarDatas.ToList())
|
var result = await db.Updateable<DbVariableData>(dbVarDatas.ToList())
|
||||||
.Include(d => d.Mqtts)
|
|
||||||
.ExecuteCommandAsync();
|
.ExecuteCommandAsync();
|
||||||
|
|
||||||
stopwatch.Stop();
|
stopwatch.Stop();
|
||||||
|
|||||||
@@ -140,7 +140,7 @@ public partial class DataServices : ObservableRecipient, IRecipient<LoadMessage>
|
|||||||
await LoadMqtts();
|
await LoadMqtts();
|
||||||
break;
|
break;
|
||||||
case LoadTypes.Devices: // 仅加载设备数据
|
case LoadTypes.Devices: // 仅加载设备数据
|
||||||
// await LoadDevices();
|
await LoadDevices();
|
||||||
break;
|
break;
|
||||||
case LoadTypes.Menu: // 仅加载菜单数据
|
case LoadTypes.Menu: // 仅加载菜单数据
|
||||||
await LoadMenus();
|
await LoadMenus();
|
||||||
|
|||||||
@@ -69,8 +69,8 @@ namespace PMSWPF.Services
|
|||||||
// 存储 OPC UA 订阅,键为终结点 URL,值为订阅对象。
|
// 存储 OPC UA 订阅,键为终结点 URL,值为订阅对象。
|
||||||
private readonly Dictionary<string, Subscription> _opcUaSubscriptions;
|
private readonly Dictionary<string, Subscription> _opcUaSubscriptions;
|
||||||
|
|
||||||
// 存储活动的 OPC UA 变量,键为变量的唯一 ID。
|
// 存储活动的 OPC UA 变量,键为变量的OpcNodeId
|
||||||
private readonly Dictionary<int, VariableData> _opcUaVariables; // Key: VariableData.Id
|
private readonly Dictionary<string, VariableData> _opcUaNodeIdVariableDic; // Key: VariableData.Id
|
||||||
|
|
||||||
// 储存所有要轮询更新的变量,键是Device.Id,值是这个设备所有要轮询的变量
|
// 储存所有要轮询更新的变量,键是Device.Id,值是这个设备所有要轮询的变量
|
||||||
private readonly Dictionary<int, List<VariableData>> _opcUaPollVariableDic; // Key: VariableData.Id
|
private readonly Dictionary<int, List<VariableData>> _opcUaPollVariableDic; // Key: VariableData.Id
|
||||||
@@ -115,7 +115,7 @@ namespace PMSWPF.Services
|
|||||||
_dataServices = dataServices;
|
_dataServices = dataServices;
|
||||||
_opcUaSessions = new Dictionary<string, Session>();
|
_opcUaSessions = new Dictionary<string, Session>();
|
||||||
_opcUaSubscriptions = new Dictionary<string, Subscription>();
|
_opcUaSubscriptions = new Dictionary<string, Subscription>();
|
||||||
_opcUaVariables = new();
|
_opcUaNodeIdVariableDic = new();
|
||||||
_opcUaPollVariableDic = new();
|
_opcUaPollVariableDic = new();
|
||||||
_opcUaSubVariableDic = new();
|
_opcUaSubVariableDic = new();
|
||||||
}
|
}
|
||||||
@@ -152,7 +152,7 @@ namespace PMSWPF.Services
|
|||||||
|
|
||||||
protected async Task ExecuteAsync(CancellationToken stoppingToken)
|
protected async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||||
{
|
{
|
||||||
NlogHelper.Info("OpcUaBackgroundService started.");
|
NlogHelper.Info("OpcUa后台服务已启动。");
|
||||||
|
|
||||||
// 订阅变量数据变化事件,以便在变量配置发生变化时重新加载。
|
// 订阅变量数据变化事件,以便在变量配置发生变化时重新加载。
|
||||||
_dataServices.OnVariableDataChanged += HandleVariableDataChanged;
|
_dataServices.OnVariableDataChanged += HandleVariableDataChanged;
|
||||||
@@ -166,7 +166,7 @@ namespace PMSWPF.Services
|
|||||||
// // 可以在这里添加周期性任务,例如检查和重新连接断开的会话。
|
// // 可以在这里添加周期性任务,例如检查和重新连接断开的会话。
|
||||||
// await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken);
|
// await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken);
|
||||||
// }
|
// }
|
||||||
NlogHelper.Info("OpcUaBackgroundService stopping.");
|
NlogHelper.Info("OpcUa后台服务正在停止。");
|
||||||
// 服务停止时,取消订阅事件并断开所有 OPC UA 连接。
|
// 服务停止时,取消订阅事件并断开所有 OPC UA 连接。
|
||||||
_dataServices.OnVariableDataChanged -= HandleVariableDataChanged;
|
_dataServices.OnVariableDataChanged -= HandleVariableDataChanged;
|
||||||
await DisconnectAllOpcUaSessions();
|
await DisconnectAllOpcUaSessions();
|
||||||
@@ -177,34 +177,47 @@ namespace PMSWPF.Services
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
private async Task LoadOpcUaVariables()
|
private async Task LoadOpcUaVariables()
|
||||||
{
|
{
|
||||||
NlogHelper.Info("正在加载 OPC UA 变量...");
|
Console.WriteLine("正在加载 OPC UA 变量...");
|
||||||
// var allVariables = await _dataServices.GetAllVariableDataAsync();
|
// var allVariables = await _dataServices.GetAllVariableDataAsync();
|
||||||
_opcUaDevices = _dataServices.Devices.Where(d => d.ProtocolType == ProtocolType.OpcUA && d.IsActive == true)
|
_opcUaDevices = _dataServices.Devices.Where(d => d.ProtocolType == ProtocolType.OpcUA && d.IsActive == true)
|
||||||
.ToList();
|
.ToList();
|
||||||
if (_opcUaDevices.Count == 0)
|
if (_opcUaDevices.Count == 0)
|
||||||
return;
|
return;
|
||||||
|
// 是否是初次加载
|
||||||
|
|
||||||
|
|
||||||
|
_opcUaPollVariableDic.Clear();
|
||||||
|
_opcUaSubVariableDic.Clear();
|
||||||
|
_opcUaNodeIdVariableDic.Clear();
|
||||||
foreach (var opcUaDevice in _opcUaDevices)
|
foreach (var opcUaDevice in _opcUaDevices)
|
||||||
{
|
{
|
||||||
//查找设备中所有要轮询的变量
|
//查找设备中所有要轮询的变量
|
||||||
var dPollList = opcUaDevice?.VariableTables?.SelectMany(vt => vt.DataVariables)
|
var dPollList = opcUaDevice?.VariableTables?.SelectMany(vt => vt.DataVariables)
|
||||||
.Where(vd => vd.IsActive == true && vd.ProtocolType == ProtocolType.OpcUA &&
|
.Where(vd => vd.IsActive == true &&
|
||||||
|
vd.ProtocolType == ProtocolType.OpcUA &&
|
||||||
vd.OpcUaUpdateType == OpcUaUpdateType.OpcUaPoll)
|
vd.OpcUaUpdateType == OpcUaUpdateType.OpcUaPoll)
|
||||||
.ToList();
|
.ToList();
|
||||||
|
// 将变量保存到字典中,方便Read后还原
|
||||||
|
foreach (var variableData in dPollList)
|
||||||
|
{
|
||||||
|
_opcUaNodeIdVariableDic.Add(variableData.OpcUaNodeId, variableData);
|
||||||
|
}
|
||||||
|
|
||||||
_opcUaPollVariableDic.Add(opcUaDevice.Id, dPollList);
|
_opcUaPollVariableDic.Add(opcUaDevice.Id, dPollList);
|
||||||
|
Console.WriteLine("已更新OPC UA轮询变量字典");
|
||||||
//查找设备中所有要订阅的变量
|
//查找设备中所有要订阅的变量
|
||||||
var dSubList = opcUaDevice?.VariableTables?.SelectMany(vt => vt.DataVariables)
|
var dSubList = opcUaDevice?.VariableTables?.SelectMany(vt => vt.DataVariables)
|
||||||
.Where(vd => vd.IsActive == true && vd.ProtocolType == ProtocolType.OpcUA &&
|
.Where(vd => vd.IsActive == true &&
|
||||||
|
vd.ProtocolType == ProtocolType.OpcUA &&
|
||||||
vd.OpcUaUpdateType == OpcUaUpdateType.OpcUaSubscription)
|
vd.OpcUaUpdateType == OpcUaUpdateType.OpcUaSubscription)
|
||||||
.ToList();
|
.ToList();
|
||||||
_opcUaSubVariableDic.Add(opcUaDevice.Id, dSubList);
|
_opcUaSubVariableDic.Add(opcUaDevice.Id, dSubList);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (_opcUaSubVariableDic.Count == 0 && _opcUaPollVariableDic.Count == 0)
|
|
||||||
return;
|
|
||||||
|
|
||||||
//连接服务器
|
//连接服务器
|
||||||
await ConnectOpcUaService();
|
await ConnectOpcUaService();
|
||||||
// 添加订阅变量
|
// // 添加订阅变量
|
||||||
SetupOpcUaSubscription();
|
SetupOpcUaSubscription();
|
||||||
|
|
||||||
await PollOpcUaVariable();
|
await PollOpcUaVariable();
|
||||||
@@ -223,7 +236,7 @@ namespace PMSWPF.Services
|
|||||||
// 检查是否已存在到该终结点的活动会话。
|
// 检查是否已存在到该终结点的活动会话。
|
||||||
if (_opcUaSessions.TryGetValue(device.OpcUaEndpointUrl, out session) && session.Connected)
|
if (_opcUaSessions.TryGetValue(device.OpcUaEndpointUrl, out session) && session.Connected)
|
||||||
{
|
{
|
||||||
NlogHelper.Info($"Already connected to OPC UA endpoint: {device.OpcUaEndpointUrl}");
|
NlogHelper.Info($"已连接到 OPC UA 终结点: {device.OpcUaEndpointUrl}");
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@@ -242,14 +255,14 @@ namespace PMSWPF.Services
|
|||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
foreach (var deviceId in _opcUaPollVariableDic.Keys)
|
foreach (var deviceId in _opcUaPollVariableDic.Keys.ToList())
|
||||||
{
|
{
|
||||||
await Task.Delay(100);
|
await Task.Delay(100,_cancellationTokenSource.Token);
|
||||||
var device = _dataServices.Devices.FirstOrDefault(d => d.Id == deviceId);
|
var device = _dataServices.Devices.FirstOrDefault(d => d.Id == deviceId);
|
||||||
if (device == null || device.OpcUaEndpointUrl == String.Empty)
|
if (device == null || device.OpcUaEndpointUrl == String.Empty)
|
||||||
{
|
{
|
||||||
NlogHelper.Warn(
|
NlogHelper.Warn(
|
||||||
$"OpcUa轮询变量,在DataService中没有找到Id为:{deviceId},的设备,或者服务器地址:{device.OpcUaEndpointUrl} 为空,请检查!!");
|
$"OpcUa轮询变量时,在DataService中未找到ID为 {deviceId} 的设备,或其服务器地址为空,请检查!");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -257,28 +270,30 @@ namespace PMSWPF.Services
|
|||||||
if (session == null || !session.Connected)
|
if (session == null || !session.Connected)
|
||||||
{
|
{
|
||||||
NlogHelper.Warn(
|
NlogHelper.Warn(
|
||||||
$"OPC UA session for {device.OpcUaEndpointUrl} is not connected. Attempting to reconnect...");
|
$"用于 {device.OpcUaEndpointUrl} 的 OPC UA 会话未连接。正在尝试重新连接...");
|
||||||
// 尝试重新连接会话
|
// 尝试重新连接会话
|
||||||
await ConnectOpcUaService();
|
await ConnectOpcUaService();
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
var nodesToRead = new ReadValueIdCollection();
|
var nodesToRead = new ReadValueIdCollection();
|
||||||
var variableList = _opcUaPollVariableDic[deviceId];
|
if (!_opcUaPollVariableDic.TryGetValue(deviceId, out var variableList))
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
foreach (var variable in variableList)
|
foreach (var variable in variableList)
|
||||||
{
|
{
|
||||||
// 获取变量的轮询间隔。
|
// 获取变量的轮询间隔。
|
||||||
if (!_pollingIntervals.TryGetValue(variable.PollLevelType, out var interval))
|
if (!_pollingIntervals.TryGetValue(variable.PollLevelType, out var interval))
|
||||||
{
|
{
|
||||||
NlogHelper.Info($"未知轮询级别 {variable.PollLevelType},跳过变量 {variable.Name}。");
|
NlogHelper.Info($"未知的轮询级别 {variable.PollLevelType},跳过变量 {variable.Name}。");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 检查是否达到轮询时间。
|
// 检查是否达到轮询时间。
|
||||||
if ((DateTime.Now - variable.UpdateTime) < interval)
|
if ((DateTime.Now - variable.UpdateTime) < interval)
|
||||||
{
|
|
||||||
continue; // 未到轮询时间,跳过。
|
continue; // 未到轮询时间,跳过。
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
nodesToRead.Add(new ReadValueId
|
nodesToRead.Add(new ReadValueId
|
||||||
{
|
{
|
||||||
@@ -286,11 +301,11 @@ namespace PMSWPF.Services
|
|||||||
AttributeId = Attributes.Value
|
AttributeId = Attributes.Value
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// 如果没有要读取的变量则跳过
|
// 如果没有要读取的变量则跳过
|
||||||
if (nodesToRead.Count == 0)
|
if (nodesToRead.Count == 0)
|
||||||
{
|
|
||||||
continue;
|
continue;
|
||||||
}
|
|
||||||
|
|
||||||
session.Read(
|
session.Read(
|
||||||
null,
|
null,
|
||||||
@@ -300,38 +315,45 @@ namespace PMSWPF.Services
|
|||||||
out DataValueCollection results,
|
out DataValueCollection results,
|
||||||
out DiagnosticInfoCollection diagnosticInfos);
|
out DiagnosticInfoCollection diagnosticInfos);
|
||||||
|
|
||||||
if (results != null && results.Count > 0)
|
if (results == null || results.Count == 0)
|
||||||
{
|
return;
|
||||||
|
Console.WriteLine($"结果循环数量----------------------:{results.Count}");
|
||||||
for (int i = 0; i < results.Count; i++)
|
for (int i = 0; i < results.Count; i++)
|
||||||
{
|
{
|
||||||
var value = results[i];
|
var value = results[i];
|
||||||
var variable = variableList[i];
|
var nodeId = nodesToRead[i]
|
||||||
if (StatusCode.IsGood(value.StatusCode))
|
.NodeId.ToString();
|
||||||
|
if (!_opcUaNodeIdVariableDic.TryGetValue(nodeId, out var variable))
|
||||||
{
|
{
|
||||||
NlogHelper.Info(
|
NlogHelper.Warn(
|
||||||
$"[OPC UA 轮询] {variable.Name}: {value.Value} | 时间戳: {value.SourceTimestamp.ToLocalTime()} | 状态: {value.StatusCode}");
|
$"在字典中未找到OpcUaNodeId为 {nodeId} 的变量对象!");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!StatusCode.IsGood(value.StatusCode))
|
||||||
|
{
|
||||||
|
NlogHelper.Warn(
|
||||||
|
$"读取 OPC UA 变量 {variable.Name} ({variable.OpcUaNodeId}) 失败: {value.StatusCode}");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
Console.WriteLine($"轮询变量:{variable.Name}");
|
||||||
// 更新变量数据
|
// 更新变量数据
|
||||||
variable.DataValue = value.Value.ToString();
|
variable.DataValue = value.Value.ToString();
|
||||||
variable.DisplayValue = value.Value.ToString(); // 或者根据需要进行格式化
|
variable.DisplayValue = value.Value.ToString(); // 或者根据需要进行格式化
|
||||||
variable.UpdateTime = DateTime.Now;
|
variable.UpdateTime = DateTime.Now;
|
||||||
|
// Console.WriteLine($"结果变量跟更新时间:{variable.UpdateTime}");
|
||||||
// await _dataServices.UpdateVariableDataAsync(variable);
|
// await _dataServices.UpdateVariableDataAsync(variable);
|
||||||
}
|
}
|
||||||
else
|
|
||||||
{
|
|
||||||
NlogHelper.Warn(
|
|
||||||
$"Failed to read OPC UA variable {variable.Name} ({variable.OpcUaNodeId}): {value.StatusCode}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
NlogHelper.Error($"Error during OPC UA polling for {ex.Message}", ex);
|
NlogHelper.Error($"OPC UA 轮询期间发生错误: {ex.Message}", ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
NlogHelper.Info($"Polling for OPC UA variable stopped.");
|
NlogHelper.Info($"OPC UA 变量轮询已停止。");
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -392,7 +414,6 @@ namespace PMSWPF.Services
|
|||||||
// 关闭会话。
|
// 关闭会话。
|
||||||
session.Close();
|
session.Close();
|
||||||
_opcUaSessions.Remove(endpointUrl);
|
_opcUaSessions.Remove(endpointUrl);
|
||||||
NlogHelper.Info($"Disconnected from OPC UA server: {endpointUrl}");
|
|
||||||
NotificationHelper.ShowInfo($"已从 OPC UA 服务器断开连接: {endpointUrl}");
|
NotificationHelper.ShowInfo($"已从 OPC UA 服务器断开连接: {endpointUrl}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -417,7 +438,7 @@ namespace PMSWPF.Services
|
|||||||
/// <param name="variableDatas">变化的变量数据列表。</param>
|
/// <param name="variableDatas">变化的变量数据列表。</param>
|
||||||
private async void HandleVariableDataChanged(List<VariableData> variableDatas)
|
private async void HandleVariableDataChanged(List<VariableData> variableDatas)
|
||||||
{
|
{
|
||||||
NlogHelper.Info("Variable data changed. Reloading OPC UA variables.");
|
NlogHelper.Info("变量数据已更改。正在重新加载 OPC UA 变量。");
|
||||||
await LoadOpcUaVariables();
|
await LoadOpcUaVariables();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -440,7 +461,7 @@ namespace PMSWPF.Services
|
|||||||
// 判断设备是否已经添加了订阅
|
// 判断设备是否已经添加了订阅
|
||||||
if (_opcUaSubscriptions.TryGetValue(device.OpcUaEndpointUrl, out subscription))
|
if (_opcUaSubscriptions.TryGetValue(device.OpcUaEndpointUrl, out subscription))
|
||||||
{
|
{
|
||||||
NlogHelper.Info($"Already has subscription for OPC UA endpoint: {device.OpcUaEndpointUrl}");
|
NlogHelper.Info($"OPC UA 终结点 {device.OpcUaEndpointUrl} 已存在订阅。");
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@@ -469,7 +490,7 @@ namespace PMSWPF.Services
|
|||||||
subscription.ApplyChanges(); // 应用更改
|
subscription.ApplyChanges(); // 应用更改
|
||||||
}
|
}
|
||||||
|
|
||||||
NlogHelper.Info($"设备: {device.Name},添加订阅变量{_opcUaSubVariableDic[deviceId]?.Count ?? 0} 个");
|
NlogHelper.Info($"设备: {device.Name}, 添加了 {(_opcUaSubVariableDic[deviceId]?.Count ?? 0)} 个订阅变量。");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user