223 lines
7.2 KiB
Markdown
223 lines
7.2 KiB
Markdown
# 软件开发文档 - 数据处理链设计
|
||
|
||
本文档专门阐述当系统从S7 PLC等设备读取到变量值后,如何通过一个可扩展的、链式的处理流程,对数据进行加工、存储和转发。
|
||
|
||
## 1. 设计目标
|
||
|
||
* **解耦**:每个处理步骤应是独立的、可复用的。
|
||
* **可扩展**:可以轻松地添加新的处理环节(例如:数据校验、报警判断、线性转换等)。
|
||
* **高性能**:处理流程应高效,避免阻塞数据采集线程。
|
||
* **可配置**:能够根据不同变量的需求,动态构建处理链。
|
||
|
||
## 2. 核心概念:`VariableContext`
|
||
|
||
为了在处理链中传递数据,我们定义一个上下文对象 `VariableContext`。它将携带变量的原始值、当前值以及其他相关信息,在整个处理链中流动。
|
||
|
||
```csharp
|
||
// 文件: DMS.Core/Models/VariableContext.cs
|
||
namespace DMS.Core.Models;
|
||
|
||
public class VariableContext
|
||
{
|
||
public Variable Variable { get; init; }
|
||
public object RawValue { get; init; } // 从设备读取的原始值
|
||
public object CurrentValue { get; set; } // 当前处理后的值
|
||
public DateTime Timestamp { get; init; }
|
||
public bool IsValueChanged { get; set; } // 标记值是否发生变化
|
||
public bool IsProcessingTerminated { get; set; } // 是否终止后续处理
|
||
|
||
public VariableContext(Variable variable, object rawValue)
|
||
{
|
||
Variable = variable;
|
||
RawValue = rawValue;
|
||
CurrentValue = rawValue;
|
||
Timestamp = DateTime.UtcNow;
|
||
}
|
||
}
|
||
```
|
||
|
||
## 3. 处理器接口与抽象基类
|
||
|
||
我们定义一个统一的处理器接口 `IVariableProcessor` 和一个实现了链式调用的抽象基类 `VariableProcessorBase`。
|
||
|
||
### 3.1. `IVariableProcessor` 接口
|
||
|
||
```csharp
|
||
// 文件: DMS.Application/Interfaces/IVariableProcessor.cs
|
||
namespace DMS.Application.Interfaces;
|
||
|
||
public interface IVariableProcessor
|
||
{
|
||
IVariableProcessor SetNext(IVariableProcessor next);
|
||
Task ProcessAsync(VariableContext context);
|
||
}
|
||
```
|
||
|
||
### 3.2. `VariableProcessorBase` 抽象基类
|
||
|
||
```csharp
|
||
// 文件: DMS.Application/Services/Processors/VariableProcessorBase.cs
|
||
namespace DMS.Application.Services.Processors;
|
||
|
||
public abstract class VariableProcessorBase : IVariableProcessor
|
||
{
|
||
private IVariableProcessor _next;
|
||
|
||
public IVariableProcessor SetNext(IVariableProcessor next)
|
||
{
|
||
_next = next;
|
||
return next;
|
||
}
|
||
|
||
public virtual async Task ProcessAsync(VariableContext context)
|
||
{
|
||
if (context.IsProcessingTerminated) return;
|
||
|
||
await HandleAsync(context);
|
||
|
||
if (_next != null && !context.IsProcessingTerminated)
|
||
{
|
||
await _next.ProcessAsync(context);
|
||
}
|
||
}
|
||
|
||
// 模板方法,由子类实现具体的处理逻辑
|
||
protected abstract Task HandleAsync(VariableContext context);
|
||
}
|
||
```
|
||
|
||
## 4. 具体处理器实现
|
||
|
||
以下是处理链中几个核心处理器的设计。
|
||
|
||
### 4.1. `ChangeDetectionProcessor` - 变化检测处理器
|
||
|
||
**职责**:检测本次读取的值与上一次的值是否相同。如果相同,则终止后续处理,以节省资源。
|
||
|
||
```csharp
|
||
// 文件: DMS.Infrastructure/Services/Processors/ChangeDetectionProcessor.cs
|
||
public class ChangeDetectionProcessor : VariableProcessorBase
|
||
{
|
||
private readonly IMemoryCache _cache; // 使用内存缓存来存储上一次的值
|
||
|
||
public ChangeDetectionProcessor(IMemoryCache cache)
|
||
{
|
||
_cache = cache;
|
||
}
|
||
|
||
protected override Task HandleAsync(VariableContext context)
|
||
{
|
||
var lastValue = _cache.Get(context.Variable.Id);
|
||
if (lastValue != null && lastValue.Equals(context.RawValue))
|
||
{
|
||
context.IsProcessingTerminated = true; // 值未变化,终止处理
|
||
}
|
||
else
|
||
{
|
||
context.IsValueChanged = true;
|
||
_cache.Set(context.Variable.Id, context.RawValue, TimeSpan.FromDays(1));
|
||
}
|
||
return Task.CompletedTask;
|
||
}
|
||
}
|
||
```
|
||
|
||
### 4.2. `HistoryStorageProcessor` - 历史存储处理器
|
||
|
||
**职责**:将变化后的值存入数据库历史记录表。
|
||
|
||
```csharp
|
||
// 文件: DMS.Infrastructure/Services/Processors/HistoryStorageProcessor.cs
|
||
public class HistoryStorageProcessor : VariableProcessorBase
|
||
{
|
||
private readonly IVariableHistoryRepository _historyRepository;
|
||
|
||
public HistoryStorageProcessor(IVariableHistoryRepository historyRepository)
|
||
{
|
||
_historyRepository = historyRepository;
|
||
}
|
||
|
||
protected override async Task HandleAsync(VariableContext context)
|
||
{
|
||
if (!context.IsValueChanged) return;
|
||
|
||
var history = new VariableHistory
|
||
{
|
||
VariableId = context.Variable.Id,
|
||
Value = context.CurrentValue.ToString(),
|
||
Timestamp = context.Timestamp
|
||
};
|
||
await _historyRepository.AddAsync(history);
|
||
}
|
||
}
|
||
```
|
||
|
||
### 4.3. `MqttPublishProcessor` - MQTT发布处理器
|
||
|
||
**职责**:如果变量关联了MQTT服务器,则将值发布出去。
|
||
|
||
```csharp
|
||
// 文件: DMS.Infrastructure/Services/Processors/MqttPublishProcessor.cs
|
||
public class MqttPublishProcessor : VariableProcessorBase
|
||
{
|
||
private readonly IMqttPublishService _mqttService;
|
||
private readonly IVariableRepository _variableRepository; // 用于获取关联的MQTT服务器
|
||
|
||
public MqttPublishProcessor(IMqttPublishService mqttService, IVariableRepository variableRepository)
|
||
{
|
||
_mqttService = mqttService;
|
||
_variableRepository = variableRepository;
|
||
}
|
||
|
||
protected override async Task HandleAsync(VariableContext context)
|
||
{
|
||
if (!context.IsValueChanged) return;
|
||
|
||
var variableWithMqtt = await _variableRepository.GetVariableWithMqttServersAsync(context.Variable.Id);
|
||
if (variableWithMqtt?.MqttServers == null) return;
|
||
|
||
foreach (var server in variableWithMqtt.MqttServers)
|
||
{
|
||
if (server.IsActive)
|
||
{
|
||
var topic = $"DMS/{context.Variable.VariableTable.Device.Name}/{context.Variable.VariableTable.Name}/{context.Variable.Name}";
|
||
var payload = System.Text.Json.JsonSerializer.Serialize(new { value = context.CurrentValue, timestamp = context.Timestamp });
|
||
await _mqttService.PublishAsync(server, topic, payload);
|
||
}
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
## 5. 构建和执行处理链
|
||
|
||
在数据采集的后台服务中,我们将动态构建并执行这个处理链。
|
||
|
||
```csharp
|
||
// 在 S7BackgroundService.cs 或类似服务中
|
||
public class DataAcquisitionService
|
||
{
|
||
private readonly IServiceProvider _serviceProvider;
|
||
|
||
public DataAcquisitionService(IServiceProvider serviceProvider)
|
||
{
|
||
_serviceProvider = serviceProvider;
|
||
}
|
||
|
||
public async Task OnDataReceived(Variable variable, object rawValue)
|
||
{
|
||
var context = new VariableContext(variable, rawValue);
|
||
|
||
// 1. 从DI容器中获取处理器实例
|
||
var changeDetector = _serviceProvider.GetRequiredService<ChangeDetectionProcessor>();
|
||
var historian = _serviceProvider.GetRequiredService<HistoryStorageProcessor>();
|
||
var publisher = _serviceProvider.GetRequiredService<MqttPublishProcessor>();
|
||
|
||
// 2. 构建处理链
|
||
changeDetector.SetNext(historian).SetNext(publisher);
|
||
|
||
// 3. 启动处理
|
||
await changeDetector.ProcessAsync(context);
|
||
}
|
||
}
|
||
``` |