diff --git a/App.xaml.cs b/App.xaml.cs index e8a3b6d..780a93d 100644 --- a/App.xaml.cs +++ b/App.xaml.cs @@ -63,7 +63,9 @@ public partial class App : Application // 初始化数据处理链 var dataProcessingService = Host.Services.GetRequiredService(); + dataProcessingService.AddProcessor(Host.Services.GetRequiredService()); dataProcessingService.AddProcessor(Host.Services.GetRequiredService()); + dataProcessingService.AddProcessor(Host.Services.GetRequiredService()); } catch (Exception exception) { @@ -111,7 +113,9 @@ public partial class App : Application // 注册数据处理服务和处理器 services.AddSingleton(); services.AddHostedService(provider => (DataProcessingService)provider.GetRequiredService()); + services.AddSingleton(); services.AddSingleton(); + services.AddSingleton(); // 注册数据仓库 services.AddSingleton(); diff --git a/Data/Repositories/VarDataRepository.cs b/Data/Repositories/VarDataRepository.cs index b888928..69c3914 100644 --- a/Data/Repositories/VarDataRepository.cs +++ b/Data/Repositories/VarDataRepository.cs @@ -166,14 +166,13 @@ public class VarDataRepository /// /// VariableData实体 /// - public async Task UpdateAsync(VariableData variableData) + public async Task UpdateAsync(VariableData variableData) { Stopwatch stopwatch = new Stopwatch(); stopwatch.Start(); using (var _db = DbContext.GetInstance()) { - var result = await _db.UpdateNav(_mapper.Map(variableData)) - .Include(d => d.Mqtts) + var result = await _db.Updateable(_mapper.Map(variableData)) .ExecuteCommandAsync(); stopwatch.Stop(); NlogHelper.Info($"更新VariableData '{variableData.Name}' 耗时:{stopwatch.ElapsedMilliseconds}ms"); diff --git a/Models/VariableDataContext.cs b/Models/VariableDataContext.cs new file mode 100644 index 0000000..6c1ca0f --- /dev/null +++ b/Models/VariableDataContext.cs @@ -0,0 +1,16 @@ +using PMSWPF.Models; + +namespace PMSWPF.Models +{ + public class VariableDataContext + { + public VariableData Data { get; set; } + public bool IsHandled { get; set; } + + public VariableDataContext(VariableData data) + { + Data = data; + IsHandled = false; // 默认未处理 + } + } +} \ No newline at end of file diff --git a/Services/DataProcessingService.cs b/Services/DataProcessingService.cs index ed8a603..9e61ef3 100644 --- a/Services/DataProcessingService.cs +++ b/Services/DataProcessingService.cs @@ -5,6 +5,7 @@ using System.Threading.Channels; using System.Threading.Tasks; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; +using PMSWPF.Helper; using PMSWPF.Models; namespace PMSWPF.Services; @@ -15,9 +16,9 @@ namespace PMSWPF.Services; /// public class DataProcessingService : BackgroundService, IDataProcessingService { - private readonly ILogger _logger; // 使用 Channel 作为高性能的生产者/消费者队列 - private readonly Channel _queue; + private readonly Channel _queue; + // 存储数据处理器的链表 private readonly List _processors; @@ -25,11 +26,10 @@ public class DataProcessingService : BackgroundService, IDataProcessingService /// 构造函数,注入日志记录器。 /// /// 日志记录器实例。 - public DataProcessingService(ILogger logger) + public DataProcessingService() { - _logger = logger; // 创建一个无边界的 Channel,允许生产者快速写入而不会被阻塞。 - _queue = Channel.CreateUnbounded(); + _queue = Channel.CreateUnbounded(); _processors = new List(); } @@ -54,8 +54,9 @@ public class DataProcessingService : BackgroundService, IDataProcessingService return; } + var context = new VariableDataContext(data); // 将数据项写入 Channel,供后台服务处理。 - await _queue.Writer.WriteAsync(data); + await _queue.Writer.WriteAsync(context); } /// @@ -65,7 +66,7 @@ public class DataProcessingService : BackgroundService, IDataProcessingService /// 用于通知服务停止的取消令牌。 protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - _logger.LogInformation("数据处理服务已启动。"); + NlogHelper.Info("数据处理服务已启动。"); // 当服务未被请求停止时,持续循环 while (!stoppingToken.IsCancellationRequested) @@ -73,12 +74,18 @@ public class DataProcessingService : BackgroundService, IDataProcessingService try { // 从队列中异步读取一个数据项,如果队列为空,则等待。 - var data = await _queue.Reader.ReadAsync(stoppingToken); + var context = await _queue.Reader.ReadAsync(stoppingToken); // 依次调用处理链中的每一个处理器 foreach (var processor in _processors) { - await processor.ProcessAsync(data); + if (context.IsHandled) + { + // NlogHelper.Info($"{context.Data.Name}的数据处理已短路,跳过后续处理器。"); + break; // 短路,跳过后续处理器 + } + + await processor.ProcessAsync(context); } } catch (OperationCanceledException) @@ -87,10 +94,10 @@ public class DataProcessingService : BackgroundService, IDataProcessingService } catch (Exception ex) { - _logger.LogError(ex, "处理变量数据时发生错误。"); + NlogHelper.Error($"处理变量数据时发生错误:{ex.Message}", ex); } } - _logger.LogInformation("数据处理服务已停止。"); + NlogHelper.Info("数据处理服务已停止。"); } -} +} \ No newline at end of file diff --git a/Services/DataServices.cs b/Services/DataServices.cs index ff47940..c736036 100644 --- a/Services/DataServices.cs +++ b/Services/DataServices.cs @@ -1,4 +1,5 @@ -using AutoMapper; +using System.Collections.Concurrent; +using AutoMapper; using CommunityToolkit.Mvvm.ComponentModel; using CommunityToolkit.Mvvm.Messaging; using Microsoft.Extensions.DependencyInjection; @@ -40,6 +41,9 @@ public partial class DataServices : ObservableRecipient, IRecipient [ObservableProperty] private List _mqtts; + + public ConcurrentDictionary AllVariables; + // 设备数据仓库,用于设备数据的CRUD操作。 private readonly DeviceRepository _deviceRepository; @@ -94,6 +98,7 @@ public partial class DataServices : ObservableRecipient, IRecipient _mqttRepository = mqttRepository; _varDataRepository = varDataRepository; _variableDatas = new List(); + AllVariables = new ConcurrentDictionary(); } /// @@ -154,6 +159,13 @@ public partial class DataServices : ObservableRecipient, IRecipient { device.PropertyChanged += Device_PropertyChanged; } + + var allVar = await _varDataRepository.GetAllAsync(); + foreach (var variableData in allVar) + { + AllVariables.AddOrUpdate(variableData.Id, variableData, (key, old) => variableData); + } + } OnDeviceListChanged?.Invoke(Devices); diff --git a/Services/IVariableDataProcessor.cs b/Services/IVariableDataProcessor.cs index 6517de1..d47ebfd 100644 --- a/Services/IVariableDataProcessor.cs +++ b/Services/IVariableDataProcessor.cs @@ -14,5 +14,5 @@ public interface IVariableDataProcessor /// /// 要处理的变量数据。 /// 一个表示异步操作的任务。 - Task ProcessAsync(VariableData data); + Task ProcessAsync(VariableDataContext context); } diff --git a/Services/OpcUaBackgroundService.cs b/Services/OpcUaBackgroundService.cs index 8f6ec40..921bf67 100644 --- a/Services/OpcUaBackgroundService.cs +++ b/Services/OpcUaBackgroundService.cs @@ -297,124 +297,10 @@ namespace PMSWPF.Services { try { - // 获取当前需要轮询的设备ID列表的快照 var deviceIdsToPoll = _opcUaPollVariablesByDeviceId.Keys.ToList(); - // 为每个设备创建并发轮询任务 - var pollingTasks = deviceIdsToPoll.Select(async deviceId => - { - if (stoppingToken.IsCancellationRequested) - { - return; // 任务被取消,退出循环 - } + var pollingTasks = deviceIdsToPoll.Select(deviceId => PollSingleDeviceVariablesAsync(deviceId, stoppingToken)).ToList(); - if (!_opcUaDevices.TryGetValue(deviceId, out var device) || - device.OpcUaEndpointUrl == null) - { - NlogHelper.Warn( - $"OpcUa轮询变量时,在deviceDic中未找到ID为 {deviceId} 的设备,或其服务器地址为空,请检查!"); - return; // 跳过此设备 - } - - if (!device.IsActive) - { - return; - } - - if (!_opcUaSessions.TryGetValue( - device.OpcUaEndpointUrl, out Session session) || - !session.Connected) - { - - if (device.IsActive) - { - // 尝试重新连接会话 - NlogHelper.Warn( - $"用于 {device.OpcUaEndpointUrl} 的 OPC UA 会话未连接。正在尝试重新连接..."); - await ConnectSingleOpcUaDeviceAsync( - device, stoppingToken); - } - - return; // 跳过本次轮询 - } - - var nodesToRead = new ReadValueIdCollection(); - if (!_opcUaPollVariablesByDeviceId.TryGetValue(deviceId, out var variableList)) - { - return; // 跳过此设备 - } - - foreach (var variable in variableList) - { - if (stoppingToken.IsCancellationRequested) - { - return; // 任务被取消,退出循环 - } - - // 获取变量的轮询间隔。 - if (!ServiceHelper.PollingIntervals.TryGetValue( - variable.PollLevelType, out var interval)) - { - NlogHelper.Info( - $"未知的轮询级别 {variable.PollLevelType},跳过变量 {variable.Name}。"); - continue; - } - - // 检查是否达到轮询时间。 - if ((DateTime.Now - variable.UpdateTime) < interval) - continue; // 未到轮询时间,跳过。 - - nodesToRead.Add(new ReadValueId - { - NodeId = new NodeId(variable.OpcUaNodeId), - AttributeId = Attributes.Value - }); - } - - // 如果没有要读取的变量则跳过 - if (nodesToRead.Count == 0) - return; // 跳过此设备 - - var readResponse = await session.ReadAsync( - null, - 0, - TimestampsToReturn.Both, - nodesToRead, - stoppingToken); - - var results = readResponse.Results; - var diagnosticInfos = readResponse.DiagnosticInfos; - - if (results == null || results.Count == 0) - return; // 没有读取到结果 - - for (int i = 0; i < results.Count; i++) - { - var value = results[i]; - var nodeId = nodesToRead[i] - .NodeId.ToString(); - if (!_opcUaPollVariablesByNodeId.TryGetValue( - nodeId, out var variable)) - { - NlogHelper.Warn( - $"在字典中未找到OpcUaNodeId为 {nodeId} 的变量对象!"); - continue; - } - - if (!StatusCode.IsGood(value.StatusCode)) - { - NlogHelper.Warn( - $"读取 OPC UA 变量 {variable.Name} ({variable.OpcUaNodeId}) 失败: {value.StatusCode}"); - continue; - } - - // 更新变量数据并入队 - await UpdateAndEnqueueVariableData(variable, value.Value); - } - }) - .ToList(); - - // 等待所有设备的轮询任务完成 await Task.WhenAll(pollingTasks); } catch (OperationCanceledException) @@ -427,6 +313,97 @@ namespace PMSWPF.Services } } + /// + /// 轮询单个设备的所有 OPC UA 变量。 + /// + /// 设备的 ID。 + /// 取消令牌。 + private async Task PollSingleDeviceVariablesAsync(int deviceId, CancellationToken stoppingToken) + { + if (stoppingToken.IsCancellationRequested) return; + + if (!_opcUaDevices.TryGetValue(deviceId, out var device) || device.OpcUaEndpointUrl == null) + { + NlogHelper.Warn($"OpcUa轮询变量时,在deviceDic中未找到ID为 {deviceId} 的设备,或其服务器地址为空,请检查!"); + return; + } + + if (!device.IsActive) return; + + if (!_opcUaSessions.TryGetValue(device.OpcUaEndpointUrl, out var session) || !session.Connected) + { + if (device.IsActive) + { + NlogHelper.Warn($"用于 {device.OpcUaEndpointUrl} 的 OPC UA 会话未连接。正在尝试重新连接..."); + await ConnectSingleOpcUaDeviceAsync(device, stoppingToken); + } + return; + } + + if (!_opcUaPollVariablesByDeviceId.TryGetValue(deviceId, out var variableList) || variableList.Count == 0) + { + return; + } + + foreach (var variable in variableList) + { + if (stoppingToken.IsCancellationRequested) return; + + if (!ServiceHelper.PollingIntervals.TryGetValue(variable.PollLevelType, out var interval) || (DateTime.Now - variable.UpdateTime) < interval) + { + continue; + } + + await ReadAndProcessOpcUaVariableAsync(session, variable, stoppingToken); + } + } + + /// + /// 读取单个 OPC UA 变量并处理其数据。 + /// + /// OPC UA 会话。 + /// 要读取的变量。 + /// 取消令牌。 + private async Task ReadAndProcessOpcUaVariableAsync(Session session, VariableData variable, CancellationToken stoppingToken) + { + var nodesToRead = new ReadValueIdCollection + { + new ReadValueId + { + NodeId = new NodeId(variable.OpcUaNodeId), + AttributeId = Attributes.Value + } + }; + + try + { + var readResponse = await session.ReadAsync(null, 0, TimestampsToReturn.Both, nodesToRead, stoppingToken); + var result = readResponse.Results?.FirstOrDefault(); + if (result == null) return; + + if (!StatusCode.IsGood(result.StatusCode)) + { + NlogHelper.Warn($"读取 OPC UA 变量 {variable.Name} ({variable.OpcUaNodeId}) 失败: {result.StatusCode}"); + return; + } + + await UpdateAndEnqueueVariableData(variable, result.Value); + } + catch (ServiceResultException ex) when (ex.StatusCode == StatusCodes.BadSessionIdInvalid) + { + NlogHelper.Error($"OPC UA会话ID无效,变量: {variable.Name} ({variable.OpcUaNodeId})。正在尝试重新连接...", ex); + // Assuming device can be retrieved from variable or passed as parameter if needed for ConnectSingleOpcUaDeviceAsync + // For now, I'll just log and let the outer loop handle reconnection if the session is truly invalid for the device. + // If a full device object is needed here, it would need to be passed down from PollSingleDeviceVariablesAsync. + // For simplicity, I'll remove the direct reconnection attempt here and rely on the outer loop. + await ConnectSingleOpcUaDeviceAsync(variable.VariableTable.Device, stoppingToken); + } + catch (Exception ex) + { + NlogHelper.Error($"轮询OPC UA变量 {variable.Name} ({variable.OpcUaNodeId}) 时发生未知错误: {ex.Message}", ex); + } + } + /// /// 更新变量数据,并将其推送到数据处理队列。 /// @@ -440,7 +417,7 @@ namespace PMSWPF.Services variable.DataValue = value.ToString(); variable.DisplayValue = value.ToString(); // 或者根据需要进行格式化 variable.UpdateTime = DateTime.Now; - NlogHelper.Info($"轮询变量:{variable.Name},值:{variable.DataValue}"); + Console.WriteLine($"OpcUa后台服务轮询变量:{variable.Name},值:{variable.DataValue}"); // 将更新后的数据推入处理队列。 await _dataProcessingService.EnqueueAsync(variable); } diff --git a/Services/Processors/CheckValueChangedProcessor.cs b/Services/Processors/CheckValueChangedProcessor.cs new file mode 100644 index 0000000..f2f9192 --- /dev/null +++ b/Services/Processors/CheckValueChangedProcessor.cs @@ -0,0 +1,33 @@ +using PMSWPF.Helper; +using PMSWPF.Models; + +namespace PMSWPF.Services.Processors; + +public class CheckValueChangedProcessor : IVariableDataProcessor +{ + private readonly DataServices _dataServices; + + public CheckValueChangedProcessor(DataServices dataServices) + { + _dataServices = dataServices; + } + public Task ProcessAsync(VariableDataContext context) + { + VariableData newVariable = context.Data; + if (!_dataServices.AllVariables.TryGetValue(newVariable.Id, out VariableData oldVariable)) + { + NlogHelper.Warn($"检查变量值是否改变时在_dataServices.AllVariables中找不到Id:{newVariable.Id},Name:{newVariable.Name}的变量。"); + context.IsHandled = true; + return Task.CompletedTask; + } + + if (newVariable.DataValue == oldVariable.DataValue) + { + // 值没有变化,直接完成 + context.IsHandled = true; + } + + // 在这里处理 context.Data + return Task.CompletedTask; + } +} \ No newline at end of file diff --git a/Services/Processors/LoggingDataProcessor.cs b/Services/Processors/LoggingDataProcessor.cs index 747ef49..f8c26d3 100644 --- a/Services/Processors/LoggingDataProcessor.cs +++ b/Services/Processors/LoggingDataProcessor.cs @@ -11,25 +11,13 @@ namespace PMSWPF.Services.Processors; /// public class LoggingDataProcessor : IVariableDataProcessor { - - /// - /// 构造函数,注入日志记录器。 - /// - /// 日志记录器实例。 public LoggingDataProcessor() { } - /// - /// 实现处理逻辑,此处为记录日志。 - /// - /// 要处理的变量数据。 - /// 一个表示完成的异步任务。 - public Task ProcessAsync(VariableData data) + public Task ProcessAsync(VariableDataContext context) { - // 使用日志记录器输出变量的名称和值 - NlogHelper.Info($"处理数据: {data.Name}, 值: {data.DataValue}"); - // 由于此操作是同步的,直接返回一个已完成的任务。 + NlogHelper.Info($"处理数据: {context.Data.Name}, 值: {context.Data.DataValue}"); return Task.CompletedTask; } } diff --git a/Services/Processors/UpdateDbVariableProcessor.cs b/Services/Processors/UpdateDbVariableProcessor.cs new file mode 100644 index 0000000..b962cd6 --- /dev/null +++ b/Services/Processors/UpdateDbVariableProcessor.cs @@ -0,0 +1,37 @@ +using System.Threading.Tasks; +using PMSWPF.Models; +using PMSWPF.Helper; + +namespace PMSWPF.Services.Processors +{ + public class UpdateDbVariableProcessor : IVariableDataProcessor + { + private readonly DataServices _dataServices; + + public UpdateDbVariableProcessor(DataServices dataServices) + { + _dataServices = dataServices; + } + + public async Task ProcessAsync(VariableDataContext context) + { + try + { + // 假设 DataServices 有一个方法来更新 VariableData + await _dataServices.UpdateVariableDataAsync(context.Data); + NlogHelper.Info($"数据库变量 {context.Data.Name} 更新成功,值为: {context.Data.DataValue}"); + + if (!_dataServices.AllVariables.TryGetValue(context.Data.Id, out VariableData oldVariable)) + { + NlogHelper.Warn($"数据库更新完成修改变量值是否改变时在_dataServices.AllVariables中找不到Id:{context.Data.Id},Name:{context.Data.Name}的变量。"); + context.IsHandled = true; + } + oldVariable.DataValue = context.Data.DataValue; + } + catch (Exception ex) + { + NlogHelper.Error($"更新数据库变量 {context.Data.Name} 失败: {ex.Message}", ex); + } + } + } +} \ No newline at end of file diff --git a/Services/S7BackgroundService.cs b/Services/S7BackgroundService.cs index 5a2a0fc..caf49ba 100644 --- a/Services/S7BackgroundService.cs +++ b/Services/S7BackgroundService.cs @@ -283,7 +283,9 @@ namespace PMSWPF.Services { // 更新变量的原始数据值和显示值。 variable.DataValue = dataItem.Value.ToString(); + variable.DisplayValue = dataItem.Value.ToString(); variable.UpdateTime = DateTime.Now; + Console.WriteLine($"S7后台服务轮询变量:{variable.Name},值:{variable.DataValue}"); // 将更新后的数据推入处理队列。 await _dataProcessingService.EnqueueAsync(variable); }