Files
DMS/软件设计文档/05-数据处理链设计.md

390 lines
14 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 软件开发文档 - 数据处理链设计
本文档专门阐述当系统从S7 PLC等设备读取到变量值后如何通过一个可扩展的、链式的处理流程对数据进行加工、存储和转发。
## 1. 设计目标
* **解耦**:每个处理步骤应是独立的、可复用的。
* **可扩展**:可以轻松地添加新的处理环节(例如:数据校验、报警判断、线性转换等)。
* **高性能**:处理流程应高效,避免阻塞数据采集线程。
* **可配置**:能够根据不同变量的需求,动态构建处理链。
## 2. 核心概念:`VariableContext`
为了在处理链中传递数据,我们定义一个上下文对象 `VariableContext`。它将携带变量的原始值、当前值以及处理状态,在整个处理流程中流动。
```csharp
// 文件: DMS.Core/Models/VariableContext.cs
using System;
namespace DMS.Core.Models;
public class VariableContext
{
public Variable Variable { get; init; }
public object RawValue { get; init; } // 从设备读取的原始值
public object CurrentValue { get; set; } // 当前处理后的值
public DateTime Timestamp { get; init; }
public bool IsProcessed { get; set; } // 标记是否已处理(短路机制)
public bool IsSuccess { get; set; } // 标记处理是否成功
public bool IsError { get; set; } // 标记处理是否发生错误
public VariableContext(Variable variable, object rawValue)
{
Variable = variable;
RawValue = rawValue;
CurrentValue = rawValue;
Timestamp = DateTime.UtcNow;
IsProcessed = false;
IsSuccess = false;
IsError = false;
}
}
```
## 3. 处理器接口
我们定义一个统一的处理器接口 `IVariableProcessor`
### 3.1. `IVariableProcessor` 接口
```csharp
// 文件: DMS.Application/Interfaces/IVariableProcessor.cs
using System.Threading.Tasks;
namespace DMS.Application.Interfaces;
public interface IVariableProcessor
{
Task ProcessAsync(VariableContext context);
}
```
## 4. 具体处理器实现
以下是处理链中几个核心处理器的设计。
### 4.1. `ChangeDetectionProcessor` - 变化检测处理器
**职责**:检测本次读取的值与上一次的值是否相同。如果相同,则标记 `IsProcessed``true`,以短路后续处理。
```csharp
// 文件: DMS.Infrastructure/Services/Processors/ChangeDetectionProcessor.cs
using DMS.Application.Interfaces;
using DMS.Core.Models;
using Microsoft.Extensions.Caching.Memory;
using System;
using System.Threading.Tasks;
namespace DMS.Infrastructure.Services.Processing;
public class ChangeDetectionProcessor : IVariableProcessor
{
private readonly IMemoryCache _cache; // 使用内存缓存来存储上一次的值
public ChangeDetectionProcessor(IMemoryCache cache)
{
_cache = cache;
}
public Task ProcessAsync(VariableContext context)
{
if (context.IsProcessed || context.IsError) return Task.CompletedTask; // 短路
try
{
var lastValue = _cache.Get(context.Variable.Id);
if (lastValue != null && lastValue.Equals(context.RawValue))
{
context.IsProcessed = true; // 值未变化,标记为已处理,短路后续操作
}
else
{
_cache.Set(context.Variable.Id, context.RawValue, TimeSpan.FromDays(1));
}
}
catch (Exception ex)
{
context.IsError = true; // 标记错误
// 记录日志
}
return Task.CompletedTask;
}
}
```
### 4.2. `HistoryStorageProcessor` - 历史存储处理器
**职责**:将变化后的值存入数据库历史记录表。
```csharp
// 文件: DMS.Infrastructure/Services/Processors/HistoryStorageProcessor.cs
using DMS.Application.Interfaces;
using DMS.Core.Interfaces;
using DMS.Core.Models;
using System;
using System.Threading.Tasks;
namespace DMS.Infrastructure.Services.Processing;
public class HistoryStorageProcessor : IVariableProcessor
{
private readonly IVariableHistoryRepository _historyRepository;
public HistoryStorageProcessor(IVariableHistoryRepository historyRepository)
{
_historyRepository = historyRepository;
}
public async Task ProcessAsync(VariableContext context)
{
if (context.IsProcessed || context.IsError) return; // 短路
try
{
// 只有当值发生变化时才存储历史这里需要根据ChangeDetectionProcessor的逻辑来判断
// 由于ChangeDetectionProcessor现在只标记IsProcessed我们需要重新判断是否需要存储历史
// 简单起见这里假设如果未被短路且RawValue与CurrentValue不同则认为值已变化
// 更严谨的做法是ChangeDetectionProcessor设置一个IsValueChanged标志
// 这里我们直接使用RawValue和CurrentValue的比较
if (!context.RawValue.Equals(context.CurrentValue))
{
var history = new VariableHistory
{
VariableId = context.Variable.Id,
Value = context.CurrentValue.ToString(),
Timestamp = context.Timestamp
};
await _historyRepository.AddAsync(history);
}
}
catch (Exception ex)
{
context.IsError = true; // 标记错误
// 记录日志
}
}
}
```
### 4.3. `MqttPublishProcessor` - MQTT发布处理器
**职责**如果变量关联了MQTT服务器则将值发布出去。
```csharp
// 文件: DMS.Infrastructure/Services/Processors/MqttPublishProcessor.cs
using DMS.Application.Interfaces;
using DMS.Core.Interfaces;
using DMS.Core.Models;
using DMS.Infrastructure.Services.Communication;
using System.Linq;
using System.Text.Json;
using System.Threading.Tasks;
using NLog;
namespace DMS.Infrastructure.Services.Processing;
/// <summary>
/// MQTT发布处理器负责将变量值发布到关联的MQTT服务器并使用专属别名。
/// </summary>
public class MqttPublishProcessor : IVariableProcessor
{
private readonly IMqttPublishService _mqttService;
private readonly IRepositoryManager _repoManager; // 使用 RepositoryManager 来获取仓储
private static readonly ILogger _logger = LogManager.GetCurrentClassLogger();
/// <summary>
/// 构造函数。
/// </summary>
public MqttPublishProcessor(IMqttPublishService mqttService, IRepositoryManager repoManager)
{
_mqttService = mqttService;
_repoManager = repoManager;
}
public async Task ProcessAsync(VariableContext context)
{
if (context.IsProcessed || context.IsError) return; // 短路
try
{
// 1. 从仓储获取变量及其完整的别名关联列表
var variableWithAliases = await _repoManager.Variables.GetVariableWithMqttAliasesAsync(context.Variable.Id);
if (variableWithAliases?.MqttAliases == null || !variableWithAliases.MqttAliases.Any())
{
return; // 没有关联的MQTT服务器无需发布
}
foreach (var aliasInfo in variableWithAliases.MqttAliases)
{
// 确保 MqttServer 导航属性已加载且激活
var targetServer = aliasInfo.MqttServer;
if (targetServer == null || !targetServer.IsActive)
{
_logger.Warn($"MQTT发布失败变量 {context.Variable.Name} 关联的MQTT服务器 {aliasInfo.MqttServerId} 不存在或未激活。");
continue;
}
// 使用别名构建Topic
// 示例Topic格式DMS/DeviceName/VariableAlias
var topic = $"DMS/{context.Variable.VariableTable.Device.Name}/{aliasInfo.Alias}";
var payload = JsonSerializer.Serialize(new { value = context.CurrentValue, timestamp = context.Timestamp });
await _mqttService.PublishAsync(targetServer, topic, payload);
}
}
catch (Exception ex)
{
context.IsError = true; // 标记错误
_logger.Error(ex, $"MQTT发布失败变量 {context.Variable.Name}");
}
}
}
```
## 5. 变量处理后台服务 (`VariableProcessingService`)
设计一个新的后台服务 `VariableProcessingService`,它将负责管理和执行所有注册的 `IVariableProcessor`。这个服务将从数据采集队列中读取 `VariableContext`,并按顺序调用所有处理器,实现短路机制。
```csharp
// 文件: DMS.Infrastructure/Services/VariableProcessingService.cs
using DMS.Application.Interfaces;
using DMS.Core.Models;
using DMS.WPF.Services;
using CommunityToolkit.Mvvm.Messaging;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using NLog;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace DMS.Infrastructure.Services;
/// <summary>
/// 变量数据处理后台服务,负责协调所有 IVariableProcessor 的执行。
/// </summary>
public class VariableProcessingService : BackgroundService
{
private readonly ChannelReader<VariableContext> _queueReader;
private readonly IServiceProvider _serviceProvider;
private readonly IMessenger _messenger;
private static readonly ILogger _logger = LogManager.GetCurrentClassLogger();
public VariableProcessingService(
IChannelBus channelBus,
IServiceProvider serviceProvider,
IMessenger messenger)
{
_queueReader = channelBus.GetReader<VariableContext>("DataProcessingQueue");
_serviceProvider = serviceProvider;
_messenger = messenger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await foreach (var context in _queueReader.ReadAllAsync(stoppingToken))
{
// 获取所有注册的 IVariableProcessor 实例
// 注意:这里假设处理器是 Transient 或 Scoped每次处理都获取新的实例
var processors = _serviceProvider.GetServices<IVariableProcessor>();
context.IsSuccess = true; // 默认成功任何处理器失败则设为false
foreach (var processor in processors)
{
if (context.IsProcessed || context.IsError) // 短路机制
{
context.IsSuccess = false; // 如果短路,则不认为是完全成功
break;
}
try
{
await processor.ProcessAsync(context);
}
catch (Exception ex)
{
context.IsError = true; // 标记错误
context.IsSuccess = false; // 标记失败
_logger.Error(ex, $"变量处理器 {processor.GetType().Name} 执行错误变量ID: {context.Variable.Id}");
break; // 发生错误,中断后续处理
}
}
// 处理完成后发送消息通知UI更新变量值
// 只有当处理未被短路且没有错误时才发送成功消息
if (!context.IsProcessed && !context.IsError)
{
_messenger.Send(new VariableValueUpdatedMessage(context.Variable.Id, context.CurrentValue));
}
else if (context.IsError)
{
// 可以发送错误消息或进行其他错误处理
_messenger.Send(new VariableProcessingErrorMessage(context.Variable.Id, context.CurrentValue, "处理失败"));
}
}
}
}
```
## 6. 注册处理器和后台服务
在应用程序的启动配置中(例如 `Startup.cs``Program.cs`),需要注册所有的 `IVariableProcessor` 实现和 `VariableProcessingService`
```csharp
// 在 ConfigureServices 方法中
public void ConfigureServices(IServiceCollection services)
{
// ... 其他服务注册
// 注册所有 IVariableProcessor 实现
services.AddTransient<IVariableProcessor, ChangeDetectionProcessor>();
services.AddTransient<IVariableProcessor, HistoryStorageProcessor>();
services.AddTransient<IVariableProcessor, MqttPublishProcessor>();
// ... 注册其他处理器
// 注册后台服务
services.AddHostedService<VariableProcessingService>();
// 注册 IMemoryCache (如果尚未注册)
services.AddMemoryCache();
}
```
## 7. 数据采集服务中的调用
数据采集服务(例如 `S7BackgroundService`)现在只需将采集到的 `VariableContext` 写入 `DataProcessingQueue` 即可,无需关心处理链的构建和执行。
```csharp
// 在 S7BackgroundService.cs 或类似服务中
public class S7BackgroundService : IHostedService
{
private readonly ChannelWriter<VariableContext> _processingQueueWriter;
// ... 其他依赖
public S7BackgroundService(IRepositoryManager repo, IChannelBus bus, IMessenger msg)
{
// ...
_processingQueueWriter = bus.GetWriter<VariableContext>("DataProcessingQueue");
}
// ... StartAsync 方法中
private async Task PollingLoopAsync(List<Variable> varsToRead, int interval, CancellationToken token)
{
while (!token.IsCancellationRequested)
{
// ... 读取变量
foreach (var variable in varsToRead)
{
var context = new VariableContext(variable, variable.DataValue);
await _processingQueueWriter.WriteAsync(context, token);
}
// ...
}
}
}
```