# 软件开发文档 - 数据处理链设计 本文档专门阐述当系统从S7 PLC等设备读取到变量值后,如何通过一个可扩展的、链式的处理流程,对数据进行加工、存储和转发。 ## 1. 设计目标 * **解耦**:每个处理步骤应是独立的、可复用的。 * **可扩展**:可以轻松地添加新的处理环节(例如:数据校验、报警判断、线性转换等)。 * **高性能**:处理流程应高效,避免阻塞数据采集线程。 * **可配置**:能够根据不同变量的需求,动态构建处理链。 ## 2. 核心概念:`VariableContext` 为了在处理链中传递数据,我们定义一个上下文对象 `VariableContext`。它将携带变量的原始值、当前值以及处理状态,在整个处理流程中流动。 ```csharp // 文件: DMS.Core/Models/VariableContext.cs using System; namespace DMS.Core.Models; public class VariableContext { public Variable Variable { get; init; } public object RawValue { get; init; } // 从设备读取的原始值 public object CurrentValue { get; set; } // 当前处理后的值 public DateTime Timestamp { get; init; } public bool IsProcessed { get; set; } // 标记是否已处理(短路机制) public bool IsSuccess { get; set; } // 标记处理是否成功 public bool IsError { get; set; } // 标记处理是否发生错误 public VariableContext(Variable variable, object rawValue) { Variable = variable; RawValue = rawValue; CurrentValue = rawValue; Timestamp = DateTime.UtcNow; IsProcessed = false; IsSuccess = false; IsError = false; } } ``` ## 3. 处理器接口 我们定义一个统一的处理器接口 `IVariableProcessor`。 ### 3.1. `IVariableProcessor` 接口 ```csharp // 文件: DMS.Application/Interfaces/IVariableProcessor.cs using System.Threading.Tasks; namespace DMS.Application.Interfaces; public interface IVariableProcessor { Task ProcessAsync(VariableContext context); } ``` ## 4. 具体处理器实现 以下是处理链中几个核心处理器的设计。 ### 4.1. `ChangeDetectionProcessor` - 变化检测处理器 **职责**:检测本次读取的值与上一次的值是否相同。如果相同,则标记 `IsProcessed` 为 `true`,以短路后续处理。 ```csharp // 文件: DMS.Infrastructure/Services/Processors/ChangeDetectionProcessor.cs using DMS.Application.Interfaces; using DMS.Core.Models; using Microsoft.Extensions.Caching.Memory; using System; using System.Threading.Tasks; namespace DMS.Infrastructure.Services.Processing; public class ChangeDetectionProcessor : IVariableProcessor { private readonly IMemoryCache _cache; // 使用内存缓存来存储上一次的值 public ChangeDetectionProcessor(IMemoryCache cache) { _cache = cache; } public Task ProcessAsync(VariableContext context) { if (context.IsProcessed || context.IsError) return Task.CompletedTask; // 短路 try { var lastValue = _cache.Get(context.Variable.Id); if (lastValue != null && lastValue.Equals(context.RawValue)) { context.IsProcessed = true; // 值未变化,标记为已处理,短路后续操作 } else { _cache.Set(context.Variable.Id, context.RawValue, TimeSpan.FromDays(1)); } } catch (Exception ex) { context.IsError = true; // 标记错误 // 记录日志 } return Task.CompletedTask; } } ``` ### 4.2. `HistoryStorageProcessor` - 历史存储处理器 **职责**:将变化后的值存入数据库历史记录表。 ```csharp // 文件: DMS.Infrastructure/Services/Processors/HistoryStorageProcessor.cs using DMS.Application.Interfaces; using DMS.Core.Interfaces; using DMS.Core.Models; using System; using System.Threading.Tasks; namespace DMS.Infrastructure.Services.Processing; public class HistoryStorageProcessor : IVariableProcessor { private readonly IVariableHistoryRepository _historyRepository; public HistoryStorageProcessor(IVariableHistoryRepository historyRepository) { _historyRepository = historyRepository; } public async Task ProcessAsync(VariableContext context) { if (context.IsProcessed || context.IsError) return; // 短路 try { // 只有当值发生变化时才存储历史,这里需要根据ChangeDetectionProcessor的逻辑来判断 // 由于ChangeDetectionProcessor现在只标记IsProcessed,我们需要重新判断是否需要存储历史 // 简单起见,这里假设如果未被短路,且RawValue与CurrentValue不同,则认为值已变化 // 更严谨的做法是ChangeDetectionProcessor设置一个IsValueChanged标志 // 这里我们直接使用RawValue和CurrentValue的比较 if (!context.RawValue.Equals(context.CurrentValue)) { var history = new VariableHistory { VariableId = context.Variable.Id, Value = context.CurrentValue.ToString(), Timestamp = context.Timestamp }; await _historyRepository.AddAsync(history); } } catch (Exception ex) { context.IsError = true; // 标记错误 // 记录日志 } } } ``` ### 4.3. `MqttPublishProcessor` - MQTT发布处理器 **职责**:如果变量关联了MQTT服务器,则将值发布出去。 ```csharp // 文件: DMS.Infrastructure/Services/Processors/MqttPublishProcessor.cs using DMS.Application.Interfaces; using DMS.Core.Interfaces; using DMS.Core.Models; using DMS.Infrastructure.Services.Communication; using System.Linq; using System.Text.Json; using System.Threading.Tasks; using NLog; namespace DMS.Infrastructure.Services.Processing; /// /// MQTT发布处理器,负责将变量值发布到关联的MQTT服务器,并使用专属别名。 /// public class MqttPublishProcessor : IVariableProcessor { private readonly IMqttPublishService _mqttService; private readonly IRepositoryManager _repoManager; // 使用 RepositoryManager 来获取仓储 private static readonly ILogger _logger = LogManager.GetCurrentClassLogger(); /// /// 构造函数。 /// public MqttPublishProcessor(IMqttPublishService mqttService, IRepositoryManager repoManager) { _mqttService = mqttService; _repoManager = repoManager; } public async Task ProcessAsync(VariableContext context) { if (context.IsProcessed || context.IsError) return; // 短路 try { // 1. 从仓储获取变量及其完整的别名关联列表 var variableWithAliases = await _repoManager.Variables.GetVariableWithMqttAliasesAsync(context.Variable.Id); if (variableWithAliases?.MqttAliases == null || !variableWithAliases.MqttAliases.Any()) { return; // 没有关联的MQTT服务器,无需发布 } foreach (var aliasInfo in variableWithAliases.MqttAliases) { // 确保 MqttServer 导航属性已加载且激活 var targetServer = aliasInfo.MqttServer; if (targetServer == null || !targetServer.IsActive) { _logger.Warn($"MQTT发布失败:变量 {context.Variable.Name} 关联的MQTT服务器 {aliasInfo.MqttServerId} 不存在或未激活。"); continue; } // 使用别名构建Topic // 示例Topic格式:DMS/DeviceName/VariableAlias var topic = $"DMS/{context.Variable.VariableTable.Device.Name}/{aliasInfo.Alias}"; var payload = JsonSerializer.Serialize(new { value = context.CurrentValue, timestamp = context.Timestamp }); await _mqttService.PublishAsync(targetServer, topic, payload); } } catch (Exception ex) { context.IsError = true; // 标记错误 _logger.Error(ex, $"MQTT发布失败:变量 {context.Variable.Name}"); } } } ``` ## 5. 变量处理后台服务 (`VariableProcessingService`) 设计一个新的后台服务 `VariableProcessingService`,它将负责管理和执行所有注册的 `IVariableProcessor`。这个服务将从数据采集队列中读取 `VariableContext`,并按顺序调用所有处理器,实现短路机制。 ```csharp // 文件: DMS.Infrastructure/Services/VariableProcessingService.cs using DMS.Application.Interfaces; using DMS.Core.Models; using DMS.WPF.Services; using CommunityToolkit.Mvvm.Messaging; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using NLog; using System; using System.Collections.Generic; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; namespace DMS.Infrastructure.Services; /// /// 变量数据处理后台服务,负责协调所有 IVariableProcessor 的执行。 /// public class VariableProcessingService : BackgroundService { private readonly ChannelReader _queueReader; private readonly IServiceProvider _serviceProvider; private readonly IMessenger _messenger; private static readonly ILogger _logger = LogManager.GetCurrentClassLogger(); public VariableProcessingService( IChannelBus channelBus, IServiceProvider serviceProvider, IMessenger messenger) { _queueReader = channelBus.GetReader("DataProcessingQueue"); _serviceProvider = serviceProvider; _messenger = messenger; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { await foreach (var context in _queueReader.ReadAllAsync(stoppingToken)) { // 获取所有注册的 IVariableProcessor 实例 // 注意:这里假设处理器是 Transient 或 Scoped,每次处理都获取新的实例 var processors = _serviceProvider.GetServices(); context.IsSuccess = true; // 默认成功,任何处理器失败则设为false foreach (var processor in processors) { if (context.IsProcessed || context.IsError) // 短路机制 { context.IsSuccess = false; // 如果短路,则不认为是完全成功 break; } try { await processor.ProcessAsync(context); } catch (Exception ex) { context.IsError = true; // 标记错误 context.IsSuccess = false; // 标记失败 _logger.Error(ex, $"变量处理器 {processor.GetType().Name} 执行错误,变量ID: {context.Variable.Id}"); break; // 发生错误,中断后续处理 } } // 处理完成后,发送消息通知UI更新变量值 // 只有当处理未被短路且没有错误时才发送成功消息 if (!context.IsProcessed && !context.IsError) { _messenger.Send(new VariableValueUpdatedMessage(context.Variable.Id, context.CurrentValue)); } else if (context.IsError) { // 可以发送错误消息或进行其他错误处理 _messenger.Send(new VariableProcessingErrorMessage(context.Variable.Id, context.CurrentValue, "处理失败")); } } } } ``` ## 6. 注册处理器和后台服务 在应用程序的启动配置中(例如 `Startup.cs` 或 `Program.cs`),需要注册所有的 `IVariableProcessor` 实现和 `VariableProcessingService`。 ```csharp // 在 ConfigureServices 方法中 public void ConfigureServices(IServiceCollection services) { // ... 其他服务注册 // 注册所有 IVariableProcessor 实现 services.AddTransient(); services.AddTransient(); services.AddTransient(); // ... 注册其他处理器 // 注册后台服务 services.AddHostedService(); // 注册 IMemoryCache (如果尚未注册) services.AddMemoryCache(); } ``` ## 7. 数据采集服务中的调用 数据采集服务(例如 `S7BackgroundService`)现在只需将采集到的 `VariableContext` 写入 `DataProcessingQueue` 即可,无需关心处理链的构建和执行。 ```csharp // 在 S7BackgroundService.cs 或类似服务中 public class S7BackgroundService : IHostedService { private readonly ChannelWriter _processingQueueWriter; // ... 其他依赖 public S7BackgroundService(IRepositoryManager repo, IChannelBus bus, IMessenger msg) { // ... _processingQueueWriter = bus.GetWriter("DataProcessingQueue"); } // ... StartAsync 方法中 private async Task PollingLoopAsync(List varsToRead, int interval, CancellationToken token) { while (!token.IsCancellationRequested) { // ... 读取变量 foreach (var variable in varsToRead) { var context = new VariableContext(variable, variable.DataValue); await _processingQueueWriter.WriteAsync(context, token); } // ... } } } ```