# 软件开发文档 - 数据处理链设计 本文档专门阐述当系统从S7 PLC等设备读取到变量值后,如何通过一个可扩展的、链式的处理流程,对数据进行加工、存储和转发。 ## 1. 设计目标 * **解耦**:每个处理步骤应是独立的、可复用的。 * **可扩展**:可以轻松地添加新的处理环节(例如:数据校验、报警判断、线性转换等)。 * **高性能**:处理流程应高效,避免阻塞数据采集线程。 * **可配置**:能够根据不同变量的需求,动态构建处理链。 ## 2. 核心概念:`VariableContext` 为了在处理链中传递数据,我们定义一个上下文对象 `VariableContext`。它将携带变量的原始值、当前值以及其他相关信息,在整个处理链中流动。 ```csharp // 文件: DMS.Core/Models/VariableContext.cs namespace DMS.Core.Models; public class VariableContext { public Variable Variable { get; init; } public object RawValue { get; init; } // 从设备读取的原始值 public object CurrentValue { get; set; } // 当前处理后的值 public DateTime Timestamp { get; init; } public bool IsValueChanged { get; set; } // 标记值是否发生变化 public bool IsProcessingTerminated { get; set; } // 是否终止后续处理 public VariableContext(Variable variable, object rawValue) { Variable = variable; RawValue = rawValue; CurrentValue = rawValue; Timestamp = DateTime.UtcNow; } } ``` ## 3. 处理器接口与抽象基类 我们定义一个统一的处理器接口 `IVariableProcessor` 和一个实现了链式调用的抽象基类 `VariableProcessorBase`。 ### 3.1. `IVariableProcessor` 接口 ```csharp // 文件: DMS.Application/Interfaces/IVariableProcessor.cs namespace DMS.Application.Interfaces; public interface IVariableProcessor { IVariableProcessor SetNext(IVariableProcessor next); Task ProcessAsync(VariableContext context); } ``` ### 3.2. `VariableProcessorBase` 抽象基类 ```csharp // 文件: DMS.Application/Services/Processors/VariableProcessorBase.cs namespace DMS.Application.Services.Processors; public abstract class VariableProcessorBase : IVariableProcessor { private IVariableProcessor _next; public IVariableProcessor SetNext(IVariableProcessor next) { _next = next; return next; } public virtual async Task ProcessAsync(VariableContext context) { if (context.IsProcessingTerminated) return; await HandleAsync(context); if (_next != null && !context.IsProcessingTerminated) { await _next.ProcessAsync(context); } } // 模板方法,由子类实现具体的处理逻辑 protected abstract Task HandleAsync(VariableContext context); } ``` ## 4. 具体处理器实现 以下是处理链中几个核心处理器的设计。 ### 4.1. `ChangeDetectionProcessor` - 变化检测处理器 **职责**:检测本次读取的值与上一次的值是否相同。如果相同,则终止后续处理,以节省资源。 ```csharp // 文件: DMS.Infrastructure/Services/Processors/ChangeDetectionProcessor.cs public class ChangeDetectionProcessor : VariableProcessorBase { private readonly IMemoryCache _cache; // 使用内存缓存来存储上一次的值 public ChangeDetectionProcessor(IMemoryCache cache) { _cache = cache; } protected override Task HandleAsync(VariableContext context) { var lastValue = _cache.Get(context.Variable.Id); if (lastValue != null && lastValue.Equals(context.RawValue)) { context.IsProcessingTerminated = true; // 值未变化,终止处理 } else { context.IsValueChanged = true; _cache.Set(context.Variable.Id, context.RawValue, TimeSpan.FromDays(1)); } return Task.CompletedTask; } } ``` ### 4.2. `HistoryStorageProcessor` - 历史存储处理器 **职责**:将变化后的值存入数据库历史记录表。 ```csharp // 文件: DMS.Infrastructure/Services/Processors/HistoryStorageProcessor.cs public class HistoryStorageProcessor : VariableProcessorBase { private readonly IVariableHistoryRepository _historyRepository; public HistoryStorageProcessor(IVariableHistoryRepository historyRepository) { _historyRepository = historyRepository; } protected override async Task HandleAsync(VariableContext context) { if (!context.IsValueChanged) return; var history = new VariableHistory { VariableId = context.Variable.Id, Value = context.CurrentValue.ToString(), Timestamp = context.Timestamp }; await _historyRepository.AddAsync(history); } } ``` ### 4.3. `MqttPublishProcessor` - MQTT发布处理器 **职责**:如果变量关联了MQTT服务器,则将值发布出去。 ```csharp // 文件: DMS.Infrastructure/Services/Processors/MqttPublishProcessor.cs public class MqttPublishProcessor : VariableProcessorBase { private readonly IMqttPublishService _mqttService; private readonly IVariableRepository _variableRepository; // 用于获取关联的MQTT服务器 public MqttPublishProcessor(IMqttPublishService mqttService, IVariableRepository variableRepository) { _mqttService = mqttService; _variableRepository = variableRepository; } protected override async Task HandleAsync(VariableContext context) { if (!context.IsValueChanged) return; var variableWithMqtt = await _variableRepository.GetVariableWithMqttServersAsync(context.Variable.Id); if (variableWithMqtt?.MqttServers == null) return; foreach (var server in variableWithMqtt.MqttServers) { if (server.IsActive) { var topic = $"DMS/{context.Variable.VariableTable.Device.Name}/{context.Variable.VariableTable.Name}/{context.Variable.Name}"; var payload = System.Text.Json.JsonSerializer.Serialize(new { value = context.CurrentValue, timestamp = context.Timestamp }); await _mqttService.PublishAsync(server, topic, payload); } } } } ``` ## 5. 构建和执行处理链 在数据采集的后台服务中,我们将动态构建并执行这个处理链。 ```csharp // 在 S7BackgroundService.cs 或类似服务中 public class DataAcquisitionService { private readonly IServiceProvider _serviceProvider; public DataAcquisitionService(IServiceProvider serviceProvider) { _serviceProvider = serviceProvider; } public async Task OnDataReceived(Variable variable, object rawValue) { var context = new VariableContext(variable, rawValue); // 1. 从DI容器中获取处理器实例 var changeDetector = _serviceProvider.GetRequiredService(); var historian = _serviceProvider.GetRequiredService(); var publisher = _serviceProvider.GetRequiredService(); // 2. 构建处理链 changeDetector.SetNext(historian).SetNext(publisher); // 3. 启动处理 await changeDetector.ProcessAsync(context); } } ```