Files
DMS/软件设计文档/原始文档/04-数据处理链设计.md

224 lines
7.2 KiB
Markdown
Raw Permalink Normal View History

2025-07-20 23:09:47 +08:00
# 软件开发文档 - 04. 数据处理链设计
本文档专门阐述当系统从S7 PLC等设备读取到变量值后如何通过一个可扩展的、链式的处理流程对数据进行加工、存储和转发。
## 1. 设计目标
* **解耦**:每个处理步骤应是独立的、可复用的。
* **可扩展**:可以轻松地添加新的处理环节(例如:数据校验、报警判断、线性转换等)。
* **高性能**:处理流程应高效,避免阻塞数据采集线程。
* **可配置**:能够根据不同变量的需求,动态构建处理链。
## 2. 核心概念:`VariableContext`
为了在处理链中传递数据,我们定义一个上下文对象 `VariableContext`。它将携带变量的原始值、当前值以及其他相关信息,在整个处理链中流动。
```csharp
// 文件: DMS.Core/Models/VariableContext.cs
namespace DMS.Core.Models;
public class VariableContext
{
public Variable Variable { get; init; }
public object RawValue { get; init; } // 从设备读取的原始值
public object CurrentValue { get; set; } // 当前处理后的值
public DateTime Timestamp { get; init; }
public bool IsValueChanged { get; set; } // 标记值是否发生变化
public bool IsProcessingTerminated { get; set; } // 是否终止后续处理
public VariableContext(Variable variable, object rawValue)
{
Variable = variable;
RawValue = rawValue;
CurrentValue = rawValue;
Timestamp = DateTime.UtcNow;
}
}
```
## 3. 处理器接口与抽象基类
我们定义一个统一的处理器接口 `IVariableProcessor` 和一个实现了链式调用的抽象基类 `VariableProcessorBase`
### 3.1. `IVariableProcessor` 接口
```csharp
// 文件: DMS.Application/Interfaces/IVariableProcessor.cs
namespace DMS.Application.Interfaces;
public interface IVariableProcessor
{
IVariableProcessor SetNext(IVariableProcessor next);
Task ProcessAsync(VariableContext context);
}
```
### 3.2. `VariableProcessorBase` 抽象基类
```csharp
// 文件: DMS.Application/Services/Processors/VariableProcessorBase.cs
namespace DMS.Application.Services.Processors;
public abstract class VariableProcessorBase : IVariableProcessor
{
private IVariableProcessor _next;
public IVariableProcessor SetNext(IVariableProcessor next)
{
_next = next;
return next;
}
public virtual async Task ProcessAsync(VariableContext context)
{
if (context.IsProcessingTerminated) return;
await HandleAsync(context);
if (_next != null && !context.IsProcessingTerminated)
{
await _next.ProcessAsync(context);
}
}
// 模板方法,由子类实现具体的处理逻辑
protected abstract Task HandleAsync(VariableContext context);
}
```
## 4. 具体处理器实现
以下是处理链中几个核心处理器的设计。
### 4.1. `ChangeDetectionProcessor` - 变化检测处理器
**职责**:检测本次读取的值与上一次的值是否相同。如果相同,则终止后续处理,以节省资源。
```csharp
// 文件: DMS.Infrastructure/Services/Processors/ChangeDetectionProcessor.cs
public class ChangeDetectionProcessor : VariableProcessorBase
{
private readonly IMemoryCache _cache; // 使用内存缓存来存储上一次的值
public ChangeDetectionProcessor(IMemoryCache cache)
{
_cache = cache;
}
protected override Task HandleAsync(VariableContext context)
{
var lastValue = _cache.Get(context.Variable.Id);
if (lastValue != null && lastValue.Equals(context.RawValue))
{
context.IsProcessingTerminated = true; // 值未变化,终止处理
}
else
{
context.IsValueChanged = true;
_cache.Set(context.Variable.Id, context.RawValue, TimeSpan.FromDays(1));
}
return Task.CompletedTask;
}
}
```
### 4.2. `HistoryStorageProcessor` - 历史存储处理器
**职责**:将变化后的值存入数据库历史记录表。
```csharp
// 文件: DMS.Infrastructure/Services/Processors/HistoryStorageProcessor.cs
public class HistoryStorageProcessor : VariableProcessorBase
{
private readonly IVariableHistoryRepository _historyRepository;
public HistoryStorageProcessor(IVariableHistoryRepository historyRepository)
{
_historyRepository = historyRepository;
}
protected override async Task HandleAsync(VariableContext context)
{
if (!context.IsValueChanged) return;
var history = new VariableHistory
{
VariableId = context.Variable.Id,
Value = context.CurrentValue.ToString(),
Timestamp = context.Timestamp
};
await _historyRepository.AddAsync(history);
}
}
```
### 4.3. `MqttPublishProcessor` - MQTT发布处理器
**职责**如果变量关联了MQTT服务器则将值发布出去。
```csharp
// 文件: DMS.Infrastructure/Services/Processors/MqttPublishProcessor.cs
public class MqttPublishProcessor : VariableProcessorBase
{
private readonly IMqttPublishService _mqttService;
private readonly IVariableRepository _variableRepository; // 用于获取关联的MQTT服务器
public MqttPublishProcessor(IMqttPublishService mqttService, IVariableRepository variableRepository)
{
_mqttService = mqttService;
_variableRepository = variableRepository;
}
protected override async Task HandleAsync(VariableContext context)
{
if (!context.IsValueChanged) return;
var variableWithMqtt = await _variableRepository.GetVariableWithMqttServersAsync(context.Variable.Id);
if (variableWithMqtt?.MqttServers == null) return;
foreach (var server in variableWithMqtt.MqttServers)
{
if (server.IsActive)
{
var topic = $"DMS/{context.Variable.VariableTable.Device.Name}/{context.Variable.VariableTable.Name}/{context.Variable.Name}";
var payload = System.Text.Json.JsonSerializer.Serialize(new { value = context.CurrentValue, timestamp = context.Timestamp });
await _mqttService.PublishAsync(server, topic, payload);
}
}
}
}
```
## 5. 构建和执行处理链
在数据采集的后台服务中,我们将动态构建并执行这个处理链。
```csharp
// 在 S7BackgroundService.cs 或类似服务中
public class DataAcquisitionService
{
private readonly IServiceProvider _serviceProvider;
public DataAcquisitionService(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}
public async Task OnDataReceived(Variable variable, object rawValue)
{
var context = new VariableContext(variable, rawValue);
// 1. 从DI容器中获取处理器实例
var changeDetector = _serviceProvider.GetRequiredService<ChangeDetectionProcessor>();
var historian = _serviceProvider.GetRequiredService<HistoryStorageProcessor>();
var publisher = _serviceProvider.GetRequiredService<MqttPublishProcessor>();
// 2. 构建处理链
changeDetector.SetNext(historian).SetNext(publisher);
// 3. 启动处理
await changeDetector.ProcessAsync(context);
}
}
```