添加了后台的数据处理服务。并判断如果值发生了变化则更新数据库
This commit is contained in:
@@ -63,7 +63,9 @@ public partial class App : Application
|
|||||||
|
|
||||||
// 初始化数据处理链
|
// 初始化数据处理链
|
||||||
var dataProcessingService = Host.Services.GetRequiredService<IDataProcessingService>();
|
var dataProcessingService = Host.Services.GetRequiredService<IDataProcessingService>();
|
||||||
|
dataProcessingService.AddProcessor(Host.Services.GetRequiredService<CheckValueChangedProcessor>());
|
||||||
dataProcessingService.AddProcessor(Host.Services.GetRequiredService<LoggingDataProcessor>());
|
dataProcessingService.AddProcessor(Host.Services.GetRequiredService<LoggingDataProcessor>());
|
||||||
|
dataProcessingService.AddProcessor(Host.Services.GetRequiredService<UpdateDbVariableProcessor>());
|
||||||
}
|
}
|
||||||
catch (Exception exception)
|
catch (Exception exception)
|
||||||
{
|
{
|
||||||
@@ -111,7 +113,9 @@ public partial class App : Application
|
|||||||
// 注册数据处理服务和处理器
|
// 注册数据处理服务和处理器
|
||||||
services.AddSingleton<IDataProcessingService, DataProcessingService>();
|
services.AddSingleton<IDataProcessingService, DataProcessingService>();
|
||||||
services.AddHostedService(provider => (DataProcessingService)provider.GetRequiredService<IDataProcessingService>());
|
services.AddHostedService(provider => (DataProcessingService)provider.GetRequiredService<IDataProcessingService>());
|
||||||
|
services.AddSingleton<CheckValueChangedProcessor>();
|
||||||
services.AddSingleton<LoggingDataProcessor>();
|
services.AddSingleton<LoggingDataProcessor>();
|
||||||
|
services.AddSingleton<UpdateDbVariableProcessor>();
|
||||||
|
|
||||||
// 注册数据仓库
|
// 注册数据仓库
|
||||||
services.AddSingleton<DeviceRepository>();
|
services.AddSingleton<DeviceRepository>();
|
||||||
|
|||||||
@@ -166,14 +166,13 @@ public class VarDataRepository
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="variableData">VariableData实体</param>
|
/// <param name="variableData">VariableData实体</param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
public async Task<bool> UpdateAsync(VariableData variableData)
|
public async Task<int> UpdateAsync(VariableData variableData)
|
||||||
{
|
{
|
||||||
Stopwatch stopwatch = new Stopwatch();
|
Stopwatch stopwatch = new Stopwatch();
|
||||||
stopwatch.Start();
|
stopwatch.Start();
|
||||||
using (var _db = DbContext.GetInstance())
|
using (var _db = DbContext.GetInstance())
|
||||||
{
|
{
|
||||||
var result = await _db.UpdateNav(_mapper.Map<DbVariableData>(variableData))
|
var result = await _db.Updateable<DbVariableData>(_mapper.Map<DbVariableData>(variableData))
|
||||||
.Include(d => d.Mqtts)
|
|
||||||
.ExecuteCommandAsync();
|
.ExecuteCommandAsync();
|
||||||
stopwatch.Stop();
|
stopwatch.Stop();
|
||||||
NlogHelper.Info($"更新VariableData '{variableData.Name}' 耗时:{stopwatch.ElapsedMilliseconds}ms");
|
NlogHelper.Info($"更新VariableData '{variableData.Name}' 耗时:{stopwatch.ElapsedMilliseconds}ms");
|
||||||
|
|||||||
16
Models/VariableDataContext.cs
Normal file
16
Models/VariableDataContext.cs
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
using PMSWPF.Models;
|
||||||
|
|
||||||
|
namespace PMSWPF.Models
|
||||||
|
{
|
||||||
|
public class VariableDataContext
|
||||||
|
{
|
||||||
|
public VariableData Data { get; set; }
|
||||||
|
public bool IsHandled { get; set; }
|
||||||
|
|
||||||
|
public VariableDataContext(VariableData data)
|
||||||
|
{
|
||||||
|
Data = data;
|
||||||
|
IsHandled = false; // 默认未处理
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -5,6 +5,7 @@ using System.Threading.Channels;
|
|||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using Microsoft.Extensions.Hosting;
|
using Microsoft.Extensions.Hosting;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
|
using PMSWPF.Helper;
|
||||||
using PMSWPF.Models;
|
using PMSWPF.Models;
|
||||||
|
|
||||||
namespace PMSWPF.Services;
|
namespace PMSWPF.Services;
|
||||||
@@ -15,9 +16,9 @@ namespace PMSWPF.Services;
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public class DataProcessingService : BackgroundService, IDataProcessingService
|
public class DataProcessingService : BackgroundService, IDataProcessingService
|
||||||
{
|
{
|
||||||
private readonly ILogger<DataProcessingService> _logger;
|
|
||||||
// 使用 Channel 作为高性能的生产者/消费者队列
|
// 使用 Channel 作为高性能的生产者/消费者队列
|
||||||
private readonly Channel<VariableData> _queue;
|
private readonly Channel<VariableDataContext> _queue;
|
||||||
|
|
||||||
// 存储数据处理器的链表
|
// 存储数据处理器的链表
|
||||||
private readonly List<IVariableDataProcessor> _processors;
|
private readonly List<IVariableDataProcessor> _processors;
|
||||||
|
|
||||||
@@ -25,11 +26,10 @@ public class DataProcessingService : BackgroundService, IDataProcessingService
|
|||||||
/// 构造函数,注入日志记录器。
|
/// 构造函数,注入日志记录器。
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="logger">日志记录器实例。</param>
|
/// <param name="logger">日志记录器实例。</param>
|
||||||
public DataProcessingService(ILogger<DataProcessingService> logger)
|
public DataProcessingService()
|
||||||
{
|
{
|
||||||
_logger = logger;
|
|
||||||
// 创建一个无边界的 Channel,允许生产者快速写入而不会被阻塞。
|
// 创建一个无边界的 Channel,允许生产者快速写入而不会被阻塞。
|
||||||
_queue = Channel.CreateUnbounded<VariableData>();
|
_queue = Channel.CreateUnbounded<VariableDataContext>();
|
||||||
_processors = new List<IVariableDataProcessor>();
|
_processors = new List<IVariableDataProcessor>();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -54,8 +54,9 @@ public class DataProcessingService : BackgroundService, IDataProcessingService
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var context = new VariableDataContext(data);
|
||||||
// 将数据项写入 Channel,供后台服务处理。
|
// 将数据项写入 Channel,供后台服务处理。
|
||||||
await _queue.Writer.WriteAsync(data);
|
await _queue.Writer.WriteAsync(context);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -65,7 +66,7 @@ public class DataProcessingService : BackgroundService, IDataProcessingService
|
|||||||
/// <param name="stoppingToken">用于通知服务停止的取消令牌。</param>
|
/// <param name="stoppingToken">用于通知服务停止的取消令牌。</param>
|
||||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||||
{
|
{
|
||||||
_logger.LogInformation("数据处理服务已启动。");
|
NlogHelper.Info("数据处理服务已启动。");
|
||||||
|
|
||||||
// 当服务未被请求停止时,持续循环
|
// 当服务未被请求停止时,持续循环
|
||||||
while (!stoppingToken.IsCancellationRequested)
|
while (!stoppingToken.IsCancellationRequested)
|
||||||
@@ -73,12 +74,18 @@ public class DataProcessingService : BackgroundService, IDataProcessingService
|
|||||||
try
|
try
|
||||||
{
|
{
|
||||||
// 从队列中异步读取一个数据项,如果队列为空,则等待。
|
// 从队列中异步读取一个数据项,如果队列为空,则等待。
|
||||||
var data = await _queue.Reader.ReadAsync(stoppingToken);
|
var context = await _queue.Reader.ReadAsync(stoppingToken);
|
||||||
|
|
||||||
// 依次调用处理链中的每一个处理器
|
// 依次调用处理链中的每一个处理器
|
||||||
foreach (var processor in _processors)
|
foreach (var processor in _processors)
|
||||||
{
|
{
|
||||||
await processor.ProcessAsync(data);
|
if (context.IsHandled)
|
||||||
|
{
|
||||||
|
// NlogHelper.Info($"{context.Data.Name}的数据处理已短路,跳过后续处理器。");
|
||||||
|
break; // 短路,跳过后续处理器
|
||||||
|
}
|
||||||
|
|
||||||
|
await processor.ProcessAsync(context);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (OperationCanceledException)
|
catch (OperationCanceledException)
|
||||||
@@ -87,10 +94,10 @@ public class DataProcessingService : BackgroundService, IDataProcessingService
|
|||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
_logger.LogError(ex, "处理变量数据时发生错误。");
|
NlogHelper.Error($"处理变量数据时发生错误:{ex.Message}", ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_logger.LogInformation("数据处理服务已停止。");
|
NlogHelper.Info("数据处理服务已停止。");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1,4 +1,5 @@
|
|||||||
using AutoMapper;
|
using System.Collections.Concurrent;
|
||||||
|
using AutoMapper;
|
||||||
using CommunityToolkit.Mvvm.ComponentModel;
|
using CommunityToolkit.Mvvm.ComponentModel;
|
||||||
using CommunityToolkit.Mvvm.Messaging;
|
using CommunityToolkit.Mvvm.Messaging;
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
@@ -40,6 +41,9 @@ public partial class DataServices : ObservableRecipient, IRecipient<LoadMessage>
|
|||||||
[ObservableProperty]
|
[ObservableProperty]
|
||||||
private List<Mqtt> _mqtts;
|
private List<Mqtt> _mqtts;
|
||||||
|
|
||||||
|
|
||||||
|
public ConcurrentDictionary<int, VariableData> AllVariables;
|
||||||
|
|
||||||
// 设备数据仓库,用于设备数据的CRUD操作。
|
// 设备数据仓库,用于设备数据的CRUD操作。
|
||||||
private readonly DeviceRepository _deviceRepository;
|
private readonly DeviceRepository _deviceRepository;
|
||||||
|
|
||||||
@@ -94,6 +98,7 @@ public partial class DataServices : ObservableRecipient, IRecipient<LoadMessage>
|
|||||||
_mqttRepository = mqttRepository;
|
_mqttRepository = mqttRepository;
|
||||||
_varDataRepository = varDataRepository;
|
_varDataRepository = varDataRepository;
|
||||||
_variableDatas = new List<VariableData>();
|
_variableDatas = new List<VariableData>();
|
||||||
|
AllVariables = new ConcurrentDictionary<int, VariableData>();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -154,6 +159,13 @@ public partial class DataServices : ObservableRecipient, IRecipient<LoadMessage>
|
|||||||
{
|
{
|
||||||
device.PropertyChanged += Device_PropertyChanged;
|
device.PropertyChanged += Device_PropertyChanged;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var allVar = await _varDataRepository.GetAllAsync();
|
||||||
|
foreach (var variableData in allVar)
|
||||||
|
{
|
||||||
|
AllVariables.AddOrUpdate(variableData.Id, variableData, (key, old) => variableData);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
OnDeviceListChanged?.Invoke(Devices);
|
OnDeviceListChanged?.Invoke(Devices);
|
||||||
|
|||||||
@@ -14,5 +14,5 @@ public interface IVariableDataProcessor
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="data">要处理的变量数据。</param>
|
/// <param name="data">要处理的变量数据。</param>
|
||||||
/// <returns>一个表示异步操作的任务。</returns>
|
/// <returns>一个表示异步操作的任务。</returns>
|
||||||
Task ProcessAsync(VariableData data);
|
Task ProcessAsync(VariableDataContext context);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -297,124 +297,10 @@ namespace PMSWPF.Services
|
|||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
// 获取当前需要轮询的设备ID列表的快照
|
|
||||||
var deviceIdsToPoll = _opcUaPollVariablesByDeviceId.Keys.ToList();
|
var deviceIdsToPoll = _opcUaPollVariablesByDeviceId.Keys.ToList();
|
||||||
|
|
||||||
// 为每个设备创建并发轮询任务
|
var pollingTasks = deviceIdsToPoll.Select(deviceId => PollSingleDeviceVariablesAsync(deviceId, stoppingToken)).ToList();
|
||||||
var pollingTasks = deviceIdsToPoll.Select(async deviceId =>
|
|
||||||
{
|
|
||||||
if (stoppingToken.IsCancellationRequested)
|
|
||||||
{
|
|
||||||
return; // 任务被取消,退出循环
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!_opcUaDevices.TryGetValue(deviceId, out var device) ||
|
|
||||||
device.OpcUaEndpointUrl == null)
|
|
||||||
{
|
|
||||||
NlogHelper.Warn(
|
|
||||||
$"OpcUa轮询变量时,在deviceDic中未找到ID为 {deviceId} 的设备,或其服务器地址为空,请检查!");
|
|
||||||
return; // 跳过此设备
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!device.IsActive)
|
|
||||||
{
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!_opcUaSessions.TryGetValue(
|
|
||||||
device.OpcUaEndpointUrl, out Session session) ||
|
|
||||||
!session.Connected)
|
|
||||||
{
|
|
||||||
|
|
||||||
if (device.IsActive)
|
|
||||||
{
|
|
||||||
// 尝试重新连接会话
|
|
||||||
NlogHelper.Warn(
|
|
||||||
$"用于 {device.OpcUaEndpointUrl} 的 OPC UA 会话未连接。正在尝试重新连接...");
|
|
||||||
await ConnectSingleOpcUaDeviceAsync(
|
|
||||||
device, stoppingToken);
|
|
||||||
}
|
|
||||||
|
|
||||||
return; // 跳过本次轮询
|
|
||||||
}
|
|
||||||
|
|
||||||
var nodesToRead = new ReadValueIdCollection();
|
|
||||||
if (!_opcUaPollVariablesByDeviceId.TryGetValue(deviceId, out var variableList))
|
|
||||||
{
|
|
||||||
return; // 跳过此设备
|
|
||||||
}
|
|
||||||
|
|
||||||
foreach (var variable in variableList)
|
|
||||||
{
|
|
||||||
if (stoppingToken.IsCancellationRequested)
|
|
||||||
{
|
|
||||||
return; // 任务被取消,退出循环
|
|
||||||
}
|
|
||||||
|
|
||||||
// 获取变量的轮询间隔。
|
|
||||||
if (!ServiceHelper.PollingIntervals.TryGetValue(
|
|
||||||
variable.PollLevelType, out var interval))
|
|
||||||
{
|
|
||||||
NlogHelper.Info(
|
|
||||||
$"未知的轮询级别 {variable.PollLevelType},跳过变量 {variable.Name}。");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 检查是否达到轮询时间。
|
|
||||||
if ((DateTime.Now - variable.UpdateTime) < interval)
|
|
||||||
continue; // 未到轮询时间,跳过。
|
|
||||||
|
|
||||||
nodesToRead.Add(new ReadValueId
|
|
||||||
{
|
|
||||||
NodeId = new NodeId(variable.OpcUaNodeId),
|
|
||||||
AttributeId = Attributes.Value
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// 如果没有要读取的变量则跳过
|
|
||||||
if (nodesToRead.Count == 0)
|
|
||||||
return; // 跳过此设备
|
|
||||||
|
|
||||||
var readResponse = await session.ReadAsync(
|
|
||||||
null,
|
|
||||||
0,
|
|
||||||
TimestampsToReturn.Both,
|
|
||||||
nodesToRead,
|
|
||||||
stoppingToken);
|
|
||||||
|
|
||||||
var results = readResponse.Results;
|
|
||||||
var diagnosticInfos = readResponse.DiagnosticInfos;
|
|
||||||
|
|
||||||
if (results == null || results.Count == 0)
|
|
||||||
return; // 没有读取到结果
|
|
||||||
|
|
||||||
for (int i = 0; i < results.Count; i++)
|
|
||||||
{
|
|
||||||
var value = results[i];
|
|
||||||
var nodeId = nodesToRead[i]
|
|
||||||
.NodeId.ToString();
|
|
||||||
if (!_opcUaPollVariablesByNodeId.TryGetValue(
|
|
||||||
nodeId, out var variable))
|
|
||||||
{
|
|
||||||
NlogHelper.Warn(
|
|
||||||
$"在字典中未找到OpcUaNodeId为 {nodeId} 的变量对象!");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!StatusCode.IsGood(value.StatusCode))
|
|
||||||
{
|
|
||||||
NlogHelper.Warn(
|
|
||||||
$"读取 OPC UA 变量 {variable.Name} ({variable.OpcUaNodeId}) 失败: {value.StatusCode}");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 更新变量数据并入队
|
|
||||||
await UpdateAndEnqueueVariableData(variable, value.Value);
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.ToList();
|
|
||||||
|
|
||||||
// 等待所有设备的轮询任务完成
|
|
||||||
await Task.WhenAll(pollingTasks);
|
await Task.WhenAll(pollingTasks);
|
||||||
}
|
}
|
||||||
catch (OperationCanceledException)
|
catch (OperationCanceledException)
|
||||||
@@ -427,6 +313,97 @@ namespace PMSWPF.Services
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 轮询单个设备的所有 OPC UA 变量。
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="deviceId">设备的 ID。</param>
|
||||||
|
/// <param name="stoppingToken">取消令牌。</param>
|
||||||
|
private async Task PollSingleDeviceVariablesAsync(int deviceId, CancellationToken stoppingToken)
|
||||||
|
{
|
||||||
|
if (stoppingToken.IsCancellationRequested) return;
|
||||||
|
|
||||||
|
if (!_opcUaDevices.TryGetValue(deviceId, out var device) || device.OpcUaEndpointUrl == null)
|
||||||
|
{
|
||||||
|
NlogHelper.Warn($"OpcUa轮询变量时,在deviceDic中未找到ID为 {deviceId} 的设备,或其服务器地址为空,请检查!");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!device.IsActive) return;
|
||||||
|
|
||||||
|
if (!_opcUaSessions.TryGetValue(device.OpcUaEndpointUrl, out var session) || !session.Connected)
|
||||||
|
{
|
||||||
|
if (device.IsActive)
|
||||||
|
{
|
||||||
|
NlogHelper.Warn($"用于 {device.OpcUaEndpointUrl} 的 OPC UA 会话未连接。正在尝试重新连接...");
|
||||||
|
await ConnectSingleOpcUaDeviceAsync(device, stoppingToken);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!_opcUaPollVariablesByDeviceId.TryGetValue(deviceId, out var variableList) || variableList.Count == 0)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach (var variable in variableList)
|
||||||
|
{
|
||||||
|
if (stoppingToken.IsCancellationRequested) return;
|
||||||
|
|
||||||
|
if (!ServiceHelper.PollingIntervals.TryGetValue(variable.PollLevelType, out var interval) || (DateTime.Now - variable.UpdateTime) < interval)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
await ReadAndProcessOpcUaVariableAsync(session, variable, stoppingToken);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 读取单个 OPC UA 变量并处理其数据。
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="session">OPC UA 会话。</param>
|
||||||
|
/// <param name="variable">要读取的变量。</param>
|
||||||
|
/// <param name="stoppingToken">取消令牌。</param>
|
||||||
|
private async Task ReadAndProcessOpcUaVariableAsync(Session session, VariableData variable, CancellationToken stoppingToken)
|
||||||
|
{
|
||||||
|
var nodesToRead = new ReadValueIdCollection
|
||||||
|
{
|
||||||
|
new ReadValueId
|
||||||
|
{
|
||||||
|
NodeId = new NodeId(variable.OpcUaNodeId),
|
||||||
|
AttributeId = Attributes.Value
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var readResponse = await session.ReadAsync(null, 0, TimestampsToReturn.Both, nodesToRead, stoppingToken);
|
||||||
|
var result = readResponse.Results?.FirstOrDefault();
|
||||||
|
if (result == null) return;
|
||||||
|
|
||||||
|
if (!StatusCode.IsGood(result.StatusCode))
|
||||||
|
{
|
||||||
|
NlogHelper.Warn($"读取 OPC UA 变量 {variable.Name} ({variable.OpcUaNodeId}) 失败: {result.StatusCode}");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
await UpdateAndEnqueueVariableData(variable, result.Value);
|
||||||
|
}
|
||||||
|
catch (ServiceResultException ex) when (ex.StatusCode == StatusCodes.BadSessionIdInvalid)
|
||||||
|
{
|
||||||
|
NlogHelper.Error($"OPC UA会话ID无效,变量: {variable.Name} ({variable.OpcUaNodeId})。正在尝试重新连接...", ex);
|
||||||
|
// Assuming device can be retrieved from variable or passed as parameter if needed for ConnectSingleOpcUaDeviceAsync
|
||||||
|
// For now, I'll just log and let the outer loop handle reconnection if the session is truly invalid for the device.
|
||||||
|
// If a full device object is needed here, it would need to be passed down from PollSingleDeviceVariablesAsync.
|
||||||
|
// For simplicity, I'll remove the direct reconnection attempt here and rely on the outer loop.
|
||||||
|
await ConnectSingleOpcUaDeviceAsync(variable.VariableTable.Device, stoppingToken);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
NlogHelper.Error($"轮询OPC UA变量 {variable.Name} ({variable.OpcUaNodeId}) 时发生未知错误: {ex.Message}", ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 更新变量数据,并将其推送到数据处理队列。
|
/// 更新变量数据,并将其推送到数据处理队列。
|
||||||
/// </summary>
|
/// </summary>
|
||||||
@@ -440,7 +417,7 @@ namespace PMSWPF.Services
|
|||||||
variable.DataValue = value.ToString();
|
variable.DataValue = value.ToString();
|
||||||
variable.DisplayValue = value.ToString(); // 或者根据需要进行格式化
|
variable.DisplayValue = value.ToString(); // 或者根据需要进行格式化
|
||||||
variable.UpdateTime = DateTime.Now;
|
variable.UpdateTime = DateTime.Now;
|
||||||
NlogHelper.Info($"轮询变量:{variable.Name},值:{variable.DataValue}");
|
Console.WriteLine($"OpcUa后台服务轮询变量:{variable.Name},值:{variable.DataValue}");
|
||||||
// 将更新后的数据推入处理队列。
|
// 将更新后的数据推入处理队列。
|
||||||
await _dataProcessingService.EnqueueAsync(variable);
|
await _dataProcessingService.EnqueueAsync(variable);
|
||||||
}
|
}
|
||||||
|
|||||||
33
Services/Processors/CheckValueChangedProcessor.cs
Normal file
33
Services/Processors/CheckValueChangedProcessor.cs
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
using PMSWPF.Helper;
|
||||||
|
using PMSWPF.Models;
|
||||||
|
|
||||||
|
namespace PMSWPF.Services.Processors;
|
||||||
|
|
||||||
|
public class CheckValueChangedProcessor : IVariableDataProcessor
|
||||||
|
{
|
||||||
|
private readonly DataServices _dataServices;
|
||||||
|
|
||||||
|
public CheckValueChangedProcessor(DataServices dataServices)
|
||||||
|
{
|
||||||
|
_dataServices = dataServices;
|
||||||
|
}
|
||||||
|
public Task ProcessAsync(VariableDataContext context)
|
||||||
|
{
|
||||||
|
VariableData newVariable = context.Data;
|
||||||
|
if (!_dataServices.AllVariables.TryGetValue(newVariable.Id, out VariableData oldVariable))
|
||||||
|
{
|
||||||
|
NlogHelper.Warn($"检查变量值是否改变时在_dataServices.AllVariables中找不到Id:{newVariable.Id},Name:{newVariable.Name}的变量。");
|
||||||
|
context.IsHandled = true;
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (newVariable.DataValue == oldVariable.DataValue)
|
||||||
|
{
|
||||||
|
// 值没有变化,直接完成
|
||||||
|
context.IsHandled = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 在这里处理 context.Data
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -11,25 +11,13 @@ namespace PMSWPF.Services.Processors;
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public class LoggingDataProcessor : IVariableDataProcessor
|
public class LoggingDataProcessor : IVariableDataProcessor
|
||||||
{
|
{
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 构造函数,注入日志记录器。
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="logger">日志记录器实例。</param>
|
|
||||||
public LoggingDataProcessor()
|
public LoggingDataProcessor()
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
public Task ProcessAsync(VariableDataContext context)
|
||||||
/// 实现处理逻辑,此处为记录日志。
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="data">要处理的变量数据。</param>
|
|
||||||
/// <returns>一个表示完成的异步任务。</returns>
|
|
||||||
public Task ProcessAsync(VariableData data)
|
|
||||||
{
|
{
|
||||||
// 使用日志记录器输出变量的名称和值
|
NlogHelper.Info($"处理数据: {context.Data.Name}, 值: {context.Data.DataValue}");
|
||||||
NlogHelper.Info($"处理数据: {data.Name}, 值: {data.DataValue}");
|
|
||||||
// 由于此操作是同步的,直接返回一个已完成的任务。
|
|
||||||
return Task.CompletedTask;
|
return Task.CompletedTask;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
37
Services/Processors/UpdateDbVariableProcessor.cs
Normal file
37
Services/Processors/UpdateDbVariableProcessor.cs
Normal file
@@ -0,0 +1,37 @@
|
|||||||
|
using System.Threading.Tasks;
|
||||||
|
using PMSWPF.Models;
|
||||||
|
using PMSWPF.Helper;
|
||||||
|
|
||||||
|
namespace PMSWPF.Services.Processors
|
||||||
|
{
|
||||||
|
public class UpdateDbVariableProcessor : IVariableDataProcessor
|
||||||
|
{
|
||||||
|
private readonly DataServices _dataServices;
|
||||||
|
|
||||||
|
public UpdateDbVariableProcessor(DataServices dataServices)
|
||||||
|
{
|
||||||
|
_dataServices = dataServices;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task ProcessAsync(VariableDataContext context)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
// 假设 DataServices 有一个方法来更新 VariableData
|
||||||
|
await _dataServices.UpdateVariableDataAsync(context.Data);
|
||||||
|
NlogHelper.Info($"数据库变量 {context.Data.Name} 更新成功,值为: {context.Data.DataValue}");
|
||||||
|
|
||||||
|
if (!_dataServices.AllVariables.TryGetValue(context.Data.Id, out VariableData oldVariable))
|
||||||
|
{
|
||||||
|
NlogHelper.Warn($"数据库更新完成修改变量值是否改变时在_dataServices.AllVariables中找不到Id:{context.Data.Id},Name:{context.Data.Name}的变量。");
|
||||||
|
context.IsHandled = true;
|
||||||
|
}
|
||||||
|
oldVariable.DataValue = context.Data.DataValue;
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
NlogHelper.Error($"更新数据库变量 {context.Data.Name} 失败: {ex.Message}", ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -283,7 +283,9 @@ namespace PMSWPF.Services
|
|||||||
{
|
{
|
||||||
// 更新变量的原始数据值和显示值。
|
// 更新变量的原始数据值和显示值。
|
||||||
variable.DataValue = dataItem.Value.ToString();
|
variable.DataValue = dataItem.Value.ToString();
|
||||||
|
variable.DisplayValue = dataItem.Value.ToString();
|
||||||
variable.UpdateTime = DateTime.Now;
|
variable.UpdateTime = DateTime.Now;
|
||||||
|
Console.WriteLine($"S7后台服务轮询变量:{variable.Name},值:{variable.DataValue}");
|
||||||
// 将更新后的数据推入处理队列。
|
// 将更新后的数据推入处理队列。
|
||||||
await _dataProcessingService.EnqueueAsync(variable);
|
await _dataProcessingService.EnqueueAsync(variable);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user