Files
DMS/软件设计文档/05-数据处理链设计.md

14 KiB
Raw Blame History

软件开发文档 - 数据处理链设计

本文档专门阐述当系统从S7 PLC等设备读取到变量值后如何通过一个可扩展的、链式的处理流程对数据进行加工、存储和转发。

1. 设计目标

  • 解耦:每个处理步骤应是独立的、可复用的。
  • 可扩展:可以轻松地添加新的处理环节(例如:数据校验、报警判断、线性转换等)。
  • 高性能:处理流程应高效,避免阻塞数据采集线程。
  • 可配置:能够根据不同变量的需求,动态构建处理链。

2. 核心概念:VariableContext

为了在处理链中传递数据,我们定义一个上下文对象 VariableContext。它将携带变量的原始值、当前值以及处理状态,在整个处理流程中流动。

// 文件: DMS.Core/Models/VariableContext.cs
using System;

namespace DMS.Core.Models;

public class VariableContext
{
    public Variable Variable { get; init; }
    public object RawValue { get; init; } // 从设备读取的原始值
    public object CurrentValue { get; set; } // 当前处理后的值
    public DateTime Timestamp { get; init; }
    public bool IsProcessed { get; set; } // 标记是否已处理(短路机制)
    public bool IsSuccess { get; set; } // 标记处理是否成功
    public bool IsError { get; set; } // 标记处理是否发生错误

    public VariableContext(Variable variable, object rawValue)
    {
        Variable = variable;
        RawValue = rawValue;
        CurrentValue = rawValue;
        Timestamp = DateTime.UtcNow;
        IsProcessed = false;
        IsSuccess = false;
        IsError = false;
    }
}

3. 处理器接口

我们定义一个统一的处理器接口 IVariableProcessor

3.1. IVariableProcessor 接口

// 文件: DMS.Application/Interfaces/IVariableProcessor.cs
using System.Threading.Tasks;

namespace DMS.Application.Interfaces;

public interface IVariableProcessor
{
    Task ProcessAsync(VariableContext context);
}

4. 具体处理器实现

以下是处理链中几个核心处理器的设计。

4.1. ChangeDetectionProcessor - 变化检测处理器

职责:检测本次读取的值与上一次的值是否相同。如果相同,则标记 IsProcessedtrue,以短路后续处理。

// 文件: DMS.Infrastructure/Services/Processors/ChangeDetectionProcessor.cs
using DMS.Application.Interfaces;
using DMS.Core.Models;
using Microsoft.Extensions.Caching.Memory;
using System;
using System.Threading.Tasks;

namespace DMS.Infrastructure.Services.Processing;

public class ChangeDetectionProcessor : IVariableProcessor
{
    private readonly IMemoryCache _cache; // 使用内存缓存来存储上一次的值

    public ChangeDetectionProcessor(IMemoryCache cache)
    {
        _cache = cache;
    }

    public Task ProcessAsync(VariableContext context)
    {
        if (context.IsProcessed || context.IsError) return Task.CompletedTask; // 短路

        try
        {
            var lastValue = _cache.Get(context.Variable.Id);
            if (lastValue != null && lastValue.Equals(context.RawValue))
            {
                context.IsProcessed = true; // 值未变化,标记为已处理,短路后续操作
            }
            else
            {
                _cache.Set(context.Variable.Id, context.RawValue, TimeSpan.FromDays(1));
            }
        }
        catch (Exception ex)
        {
            context.IsError = true; // 标记错误
            // 记录日志
        }
        return Task.CompletedTask;
    }
}

4.2. HistoryStorageProcessor - 历史存储处理器

职责:将变化后的值存入数据库历史记录表。

// 文件: DMS.Infrastructure/Services/Processors/HistoryStorageProcessor.cs
using DMS.Application.Interfaces;
using DMS.Core.Interfaces;
using DMS.Core.Models;
using System;
using System.Threading.Tasks;

namespace DMS.Infrastructure.Services.Processing;

public class HistoryStorageProcessor : IVariableProcessor
{
    private readonly IVariableHistoryRepository _historyRepository;

    public HistoryStorageProcessor(IVariableHistoryRepository historyRepository)
    {
        _historyRepository = historyRepository;
    }

    public async Task ProcessAsync(VariableContext context)
    {
        if (context.IsProcessed || context.IsError) return; // 短路

        try
        {
            // 只有当值发生变化时才存储历史这里需要根据ChangeDetectionProcessor的逻辑来判断
            // 由于ChangeDetectionProcessor现在只标记IsProcessed我们需要重新判断是否需要存储历史
            // 简单起见这里假设如果未被短路且RawValue与CurrentValue不同则认为值已变化
            // 更严谨的做法是ChangeDetectionProcessor设置一个IsValueChanged标志
            // 这里我们直接使用RawValue和CurrentValue的比较
            if (!context.RawValue.Equals(context.CurrentValue))
            {
                var history = new VariableHistory
                {
                    VariableId = context.Variable.Id,
                    Value = context.CurrentValue.ToString(),
                    Timestamp = context.Timestamp
                };
                await _historyRepository.AddAsync(history);
            }
        }
        catch (Exception ex)
        {
            context.IsError = true; // 标记错误
            // 记录日志
        }
    }
}

4.3. MqttPublishProcessor - MQTT发布处理器

职责如果变量关联了MQTT服务器则将值发布出去。

// 文件: DMS.Infrastructure/Services/Processors/MqttPublishProcessor.cs
using DMS.Application.Interfaces;
using DMS.Core.Interfaces;
using DMS.Core.Models;
using DMS.Infrastructure.Services.Communication;
using System.Linq;
using System.Text.Json;
using System.Threading.Tasks;
using NLog;

namespace DMS.Infrastructure.Services.Processing;

/// <summary>
/// MQTT发布处理器负责将变量值发布到关联的MQTT服务器并使用专属别名。
/// </summary>
public class MqttPublishProcessor : IVariableProcessor
{
    private readonly IMqttPublishService _mqttService;
    private readonly IRepositoryManager _repoManager; // 使用 RepositoryManager 来获取仓储
    private static readonly ILogger _logger = LogManager.GetCurrentClassLogger();

    /// <summary>
    /// 构造函数。
    /// </summary>
    public MqttPublishProcessor(IMqttPublishService mqttService, IRepositoryManager repoManager)
    {
        _mqttService = mqttService;
        _repoManager = repoManager;
    }

    public async Task ProcessAsync(VariableContext context)
    {
        if (context.IsProcessed || context.IsError) return; // 短路

        try
        {
            // 1. 从仓储获取变量及其完整的别名关联列表
            var variableWithAliases = await _repoManager.Variables.GetVariableWithMqttAliasesAsync(context.Variable.Id);
            
            if (variableWithAliases?.MqttAliases == null || !variableWithAliases.MqttAliases.Any())
            {
                return; // 没有关联的MQTT服务器无需发布
            }

            foreach (var aliasInfo in variableWithAliases.MqttAliases)
            {
                // 确保 MqttServer 导航属性已加载且激活
                var targetServer = aliasInfo.MqttServer;
                if (targetServer == null || !targetServer.IsActive)
                {
                    _logger.Warn($"MQTT发布失败变量 {context.Variable.Name} 关联的MQTT服务器 {aliasInfo.MqttServerId} 不存在或未激活。");
                    continue;
                }

                // 使用别名构建Topic
                // 示例Topic格式DMS/DeviceName/VariableAlias
                var topic = $"DMS/{context.Variable.VariableTable.Device.Name}/{aliasInfo.Alias}";
                var payload = JsonSerializer.Serialize(new { value = context.CurrentValue, timestamp = context.Timestamp });

                await _mqttService.PublishAsync(targetServer, topic, payload);
            }
        }
        catch (Exception ex)
        {
            context.IsError = true; // 标记错误
            _logger.Error(ex, $"MQTT发布失败变量 {context.Variable.Name}");
        }
    }
}

5. 变量处理后台服务 (VariableProcessingService)

设计一个新的后台服务 VariableProcessingService,它将负责管理和执行所有注册的 IVariableProcessor。这个服务将从数据采集队列中读取 VariableContext,并按顺序调用所有处理器,实现短路机制。

// 文件: DMS.Infrastructure/Services/VariableProcessingService.cs
using DMS.Application.Interfaces;
using DMS.Core.Models;
using DMS.WPF.Services;
using CommunityToolkit.Mvvm.Messaging;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using NLog;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace DMS.Infrastructure.Services;

/// <summary>
/// 变量数据处理后台服务,负责协调所有 IVariableProcessor 的执行。
/// </summary>
public class VariableProcessingService : BackgroundService
{
    private readonly ChannelReader<VariableContext> _queueReader;
    private readonly IServiceProvider _serviceProvider;
    private readonly IMessenger _messenger;
    private static readonly ILogger _logger = LogManager.GetCurrentClassLogger();

    public VariableProcessingService(
        IChannelBus channelBus,
        IServiceProvider serviceProvider,
        IMessenger messenger)
    {
        _queueReader = channelBus.GetReader<VariableContext>("DataProcessingQueue");
        _serviceProvider = serviceProvider;
        _messenger = messenger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await foreach (var context in _queueReader.ReadAllAsync(stoppingToken))
        {
            // 获取所有注册的 IVariableProcessor 实例
            // 注意:这里假设处理器是 Transient 或 Scoped每次处理都获取新的实例
            var processors = _serviceProvider.GetServices<IVariableProcessor>();

            context.IsSuccess = true; // 默认成功任何处理器失败则设为false

            foreach (var processor in processors)
            {
                if (context.IsProcessed || context.IsError) // 短路机制
                {
                    context.IsSuccess = false; // 如果短路,则不认为是完全成功
                    break;
                }

                try
                {
                    await processor.ProcessAsync(context);
                }
                catch (Exception ex)
                {
                    context.IsError = true; // 标记错误
                    context.IsSuccess = false; // 标记失败
                    _logger.Error(ex, $"变量处理器 {processor.GetType().Name} 执行错误变量ID: {context.Variable.Id}");
                    break; // 发生错误,中断后续处理
                }
            }

            // 处理完成后发送消息通知UI更新变量值
            // 只有当处理未被短路且没有错误时才发送成功消息
            if (!context.IsProcessed && !context.IsError)
            {
                _messenger.Send(new VariableValueUpdatedMessage(context.Variable.Id, context.CurrentValue));
            }
            else if (context.IsError)
            {
                // 可以发送错误消息或进行其他错误处理
                _messenger.Send(new VariableProcessingErrorMessage(context.Variable.Id, context.CurrentValue, "处理失败"));
            }
        }
    }
}

6. 注册处理器和后台服务

在应用程序的启动配置中(例如 Startup.csProgram.cs),需要注册所有的 IVariableProcessor 实现和 VariableProcessingService

// 在 ConfigureServices 方法中
public void ConfigureServices(IServiceCollection services)
{
    // ... 其他服务注册

    // 注册所有 IVariableProcessor 实现
    services.AddTransient<IVariableProcessor, ChangeDetectionProcessor>();
    services.AddTransient<IVariableProcessor, HistoryStorageProcessor>();
    services.AddTransient<IVariableProcessor, MqttPublishProcessor>();
    // ... 注册其他处理器

    // 注册后台服务
    services.AddHostedService<VariableProcessingService>();

    // 注册 IMemoryCache (如果尚未注册)
    services.AddMemoryCache();
}

7. 数据采集服务中的调用

数据采集服务(例如 S7BackgroundService)现在只需将采集到的 VariableContext 写入 DataProcessingQueue 即可,无需关心处理链的构建和执行。

// 在 S7BackgroundService.cs 或类似服务中
public class S7BackgroundService : IHostedService
{
    private readonly ChannelWriter<VariableContext> _processingQueueWriter;
    // ... 其他依赖

    public S7BackgroundService(IRepositoryManager repo, IChannelBus bus, IMessenger msg)
    {
        // ...
        _processingQueueWriter = bus.GetWriter<VariableContext>("DataProcessingQueue");
    }

    // ... StartAsync 方法中
    private async Task PollingLoopAsync(List<Variable> varsToRead, int interval, CancellationToken token)
    {
        while (!token.IsCancellationRequested)
        {
            // ... 读取变量
            foreach (var variable in varsToRead)
            {
                var context = new VariableContext(variable, variable.DataValue);
                await _processingQueueWriter.WriteAsync(context, token);
            }
            // ...
        }
    }
}