Files
DMS/软件设计文档/原始文档/04-数据处理链设计.md

7.2 KiB
Raw Permalink Blame History

软件开发文档 - 04. 数据处理链设计

本文档专门阐述当系统从S7 PLC等设备读取到变量值后如何通过一个可扩展的、链式的处理流程对数据进行加工、存储和转发。

1. 设计目标

  • 解耦:每个处理步骤应是独立的、可复用的。
  • 可扩展:可以轻松地添加新的处理环节(例如:数据校验、报警判断、线性转换等)。
  • 高性能:处理流程应高效,避免阻塞数据采集线程。
  • 可配置:能够根据不同变量的需求,动态构建处理链。

2. 核心概念:VariableContext

为了在处理链中传递数据,我们定义一个上下文对象 VariableContext。它将携带变量的原始值、当前值以及其他相关信息,在整个处理链中流动。

// 文件: DMS.Core/Models/VariableContext.cs
namespace DMS.Core.Models;

public class VariableContext
{
    public Variable Variable { get; init; }
    public object RawValue { get; init; } // 从设备读取的原始值
    public object CurrentValue { get; set; } // 当前处理后的值
    public DateTime Timestamp { get; init; }
    public bool IsValueChanged { get; set; } // 标记值是否发生变化
    public bool IsProcessingTerminated { get; set; } // 是否终止后续处理

    public VariableContext(Variable variable, object rawValue)
    {
        Variable = variable;
        RawValue = rawValue;
        CurrentValue = rawValue;
        Timestamp = DateTime.UtcNow;
    }
}

3. 处理器接口与抽象基类

我们定义一个统一的处理器接口 IVariableProcessor 和一个实现了链式调用的抽象基类 VariableProcessorBase

3.1. IVariableProcessor 接口

// 文件: DMS.Application/Interfaces/IVariableProcessor.cs
namespace DMS.Application.Interfaces;

public interface IVariableProcessor
{
    IVariableProcessor SetNext(IVariableProcessor next);
    Task ProcessAsync(VariableContext context);
}

3.2. VariableProcessorBase 抽象基类

// 文件: DMS.Application/Services/Processors/VariableProcessorBase.cs
namespace DMS.Application.Services.Processors;

public abstract class VariableProcessorBase : IVariableProcessor
{
    private IVariableProcessor _next;

    public IVariableProcessor SetNext(IVariableProcessor next)
    {
        _next = next;
        return next;
    }

    public virtual async Task ProcessAsync(VariableContext context)
    {
        if (context.IsProcessingTerminated) return;

        await HandleAsync(context);

        if (_next != null && !context.IsProcessingTerminated)
        {
            await _next.ProcessAsync(context);
        }
    }

    // 模板方法,由子类实现具体的处理逻辑
    protected abstract Task HandleAsync(VariableContext context);
}

4. 具体处理器实现

以下是处理链中几个核心处理器的设计。

4.1. ChangeDetectionProcessor - 变化检测处理器

职责:检测本次读取的值与上一次的值是否相同。如果相同,则终止后续处理,以节省资源。

// 文件: DMS.Infrastructure/Services/Processors/ChangeDetectionProcessor.cs
public class ChangeDetectionProcessor : VariableProcessorBase
{
    private readonly IMemoryCache _cache; // 使用内存缓存来存储上一次的值

    public ChangeDetectionProcessor(IMemoryCache cache)
    {
        _cache = cache;
    }

    protected override Task HandleAsync(VariableContext context)
    {
        var lastValue = _cache.Get(context.Variable.Id);
        if (lastValue != null && lastValue.Equals(context.RawValue))
        {
            context.IsProcessingTerminated = true; // 值未变化,终止处理
        }
        else
        {
            context.IsValueChanged = true;
            _cache.Set(context.Variable.Id, context.RawValue, TimeSpan.FromDays(1));
        }
        return Task.CompletedTask;
    }
}

4.2. HistoryStorageProcessor - 历史存储处理器

职责:将变化后的值存入数据库历史记录表。

// 文件: DMS.Infrastructure/Services/Processors/HistoryStorageProcessor.cs
public class HistoryStorageProcessor : VariableProcessorBase
{
    private readonly IVariableHistoryRepository _historyRepository;

    public HistoryStorageProcessor(IVariableHistoryRepository historyRepository)
    {
        _historyRepository = historyRepository;
    }

    protected override async Task HandleAsync(VariableContext context)
    {
        if (!context.IsValueChanged) return;

        var history = new VariableHistory
        {
            VariableId = context.Variable.Id,
            Value = context.CurrentValue.ToString(),
            Timestamp = context.Timestamp
        };
        await _historyRepository.AddAsync(history);
    }
}

4.3. MqttPublishProcessor - MQTT发布处理器

职责如果变量关联了MQTT服务器则将值发布出去。

// 文件: DMS.Infrastructure/Services/Processors/MqttPublishProcessor.cs
public class MqttPublishProcessor : VariableProcessorBase
{
    private readonly IMqttPublishService _mqttService;
    private readonly IVariableRepository _variableRepository; // 用于获取关联的MQTT服务器

    public MqttPublishProcessor(IMqttPublishService mqttService, IVariableRepository variableRepository)
    {
        _mqttService = mqttService;
        _variableRepository = variableRepository;
    }

    protected override async Task HandleAsync(VariableContext context)
    {
        if (!context.IsValueChanged) return;

        var variableWithMqtt = await _variableRepository.GetVariableWithMqttServersAsync(context.Variable.Id);
        if (variableWithMqtt?.MqttServers == null) return;

        foreach (var server in variableWithMqtt.MqttServers)
        {
            if (server.IsActive)
            {
                var topic = $"DMS/{context.Variable.VariableTable.Device.Name}/{context.Variable.VariableTable.Name}/{context.Variable.Name}";
                var payload = System.Text.Json.JsonSerializer.Serialize(new { value = context.CurrentValue, timestamp = context.Timestamp });
                await _mqttService.PublishAsync(server, topic, payload);
            }
        }
    }
}

5. 构建和执行处理链

在数据采集的后台服务中,我们将动态构建并执行这个处理链。

// 在 S7BackgroundService.cs 或类似服务中
public class DataAcquisitionService
{
    private readonly IServiceProvider _serviceProvider;

    public DataAcquisitionService(IServiceProvider serviceProvider)
    {
        _serviceProvider = serviceProvider;
    }

    public async Task OnDataReceived(Variable variable, object rawValue)
    {
        var context = new VariableContext(variable, rawValue);

        // 1. 从DI容器中获取处理器实例
        var changeDetector = _serviceProvider.GetRequiredService<ChangeDetectionProcessor>();
        var historian = _serviceProvider.GetRequiredService<HistoryStorageProcessor>();
        var publisher = _serviceProvider.GetRequiredService<MqttPublishProcessor>();

        // 2. 构建处理链
        changeDetector.SetNext(historian).SetNext(publisher);

        // 3. 启动处理
        await changeDetector.ProcessAsync(context);
    }
}