390 lines
14 KiB
Markdown
390 lines
14 KiB
Markdown
|
|
# 软件开发文档 - 数据处理链设计
|
|||
|
|
|
|||
|
|
本文档专门阐述当系统从S7 PLC等设备读取到变量值后,如何通过一个可扩展的、链式的处理流程,对数据进行加工、存储和转发。
|
|||
|
|
|
|||
|
|
## 1. 设计目标
|
|||
|
|
|
|||
|
|
* **解耦**:每个处理步骤应是独立的、可复用的。
|
|||
|
|
* **可扩展**:可以轻松地添加新的处理环节(例如:数据校验、报警判断、线性转换等)。
|
|||
|
|
* **高性能**:处理流程应高效,避免阻塞数据采集线程。
|
|||
|
|
* **可配置**:能够根据不同变量的需求,动态构建处理链。
|
|||
|
|
|
|||
|
|
## 2. 核心概念:`VariableContext`
|
|||
|
|
|
|||
|
|
为了在处理链中传递数据,我们定义一个上下文对象 `VariableContext`。它将携带变量的原始值、当前值以及处理状态,在整个处理流程中流动。
|
|||
|
|
|
|||
|
|
```csharp
|
|||
|
|
// 文件: DMS.Core/Models/VariableContext.cs
|
|||
|
|
using System;
|
|||
|
|
|
|||
|
|
namespace DMS.Core.Models;
|
|||
|
|
|
|||
|
|
public class VariableContext
|
|||
|
|
{
|
|||
|
|
public Variable Variable { get; init; }
|
|||
|
|
public object RawValue { get; init; } // 从设备读取的原始值
|
|||
|
|
public object CurrentValue { get; set; } // 当前处理后的值
|
|||
|
|
public DateTime Timestamp { get; init; }
|
|||
|
|
public bool IsProcessed { get; set; } // 标记是否已处理(短路机制)
|
|||
|
|
public bool IsSuccess { get; set; } // 标记处理是否成功
|
|||
|
|
public bool IsError { get; set; } // 标记处理是否发生错误
|
|||
|
|
|
|||
|
|
public VariableContext(Variable variable, object rawValue)
|
|||
|
|
{
|
|||
|
|
Variable = variable;
|
|||
|
|
RawValue = rawValue;
|
|||
|
|
CurrentValue = rawValue;
|
|||
|
|
Timestamp = DateTime.UtcNow;
|
|||
|
|
IsProcessed = false;
|
|||
|
|
IsSuccess = false;
|
|||
|
|
IsError = false;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
## 3. 处理器接口
|
|||
|
|
|
|||
|
|
我们定义一个统一的处理器接口 `IVariableProcessor`。
|
|||
|
|
|
|||
|
|
### 3.1. `IVariableProcessor` 接口
|
|||
|
|
|
|||
|
|
```csharp
|
|||
|
|
// 文件: DMS.Application/Interfaces/IVariableProcessor.cs
|
|||
|
|
using System.Threading.Tasks;
|
|||
|
|
|
|||
|
|
namespace DMS.Application.Interfaces;
|
|||
|
|
|
|||
|
|
public interface IVariableProcessor
|
|||
|
|
{
|
|||
|
|
Task ProcessAsync(VariableContext context);
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
## 4. 具体处理器实现
|
|||
|
|
|
|||
|
|
以下是处理链中几个核心处理器的设计。
|
|||
|
|
|
|||
|
|
### 4.1. `ChangeDetectionProcessor` - 变化检测处理器
|
|||
|
|
|
|||
|
|
**职责**:检测本次读取的值与上一次的值是否相同。如果相同,则标记 `IsProcessed` 为 `true`,以短路后续处理。
|
|||
|
|
|
|||
|
|
```csharp
|
|||
|
|
// 文件: DMS.Infrastructure/Services/Processors/ChangeDetectionProcessor.cs
|
|||
|
|
using DMS.Application.Interfaces;
|
|||
|
|
using DMS.Core.Models;
|
|||
|
|
using Microsoft.Extensions.Caching.Memory;
|
|||
|
|
using System;
|
|||
|
|
using System.Threading.Tasks;
|
|||
|
|
|
|||
|
|
namespace DMS.Infrastructure.Services.Processing;
|
|||
|
|
|
|||
|
|
public class ChangeDetectionProcessor : IVariableProcessor
|
|||
|
|
{
|
|||
|
|
private readonly IMemoryCache _cache; // 使用内存缓存来存储上一次的值
|
|||
|
|
|
|||
|
|
public ChangeDetectionProcessor(IMemoryCache cache)
|
|||
|
|
{
|
|||
|
|
_cache = cache;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
public Task ProcessAsync(VariableContext context)
|
|||
|
|
{
|
|||
|
|
if (context.IsProcessed || context.IsError) return Task.CompletedTask; // 短路
|
|||
|
|
|
|||
|
|
try
|
|||
|
|
{
|
|||
|
|
var lastValue = _cache.Get(context.Variable.Id);
|
|||
|
|
if (lastValue != null && lastValue.Equals(context.RawValue))
|
|||
|
|
{
|
|||
|
|
context.IsProcessed = true; // 值未变化,标记为已处理,短路后续操作
|
|||
|
|
}
|
|||
|
|
else
|
|||
|
|
{
|
|||
|
|
_cache.Set(context.Variable.Id, context.RawValue, TimeSpan.FromDays(1));
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
catch (Exception ex)
|
|||
|
|
{
|
|||
|
|
context.IsError = true; // 标记错误
|
|||
|
|
// 记录日志
|
|||
|
|
}
|
|||
|
|
return Task.CompletedTask;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 4.2. `HistoryStorageProcessor` - 历史存储处理器
|
|||
|
|
|
|||
|
|
**职责**:将变化后的值存入数据库历史记录表。
|
|||
|
|
|
|||
|
|
```csharp
|
|||
|
|
// 文件: DMS.Infrastructure/Services/Processors/HistoryStorageProcessor.cs
|
|||
|
|
using DMS.Application.Interfaces;
|
|||
|
|
using DMS.Core.Interfaces;
|
|||
|
|
using DMS.Core.Models;
|
|||
|
|
using System;
|
|||
|
|
using System.Threading.Tasks;
|
|||
|
|
|
|||
|
|
namespace DMS.Infrastructure.Services.Processing;
|
|||
|
|
|
|||
|
|
public class HistoryStorageProcessor : IVariableProcessor
|
|||
|
|
{
|
|||
|
|
private readonly IVariableHistoryRepository _historyRepository;
|
|||
|
|
|
|||
|
|
public HistoryStorageProcessor(IVariableHistoryRepository historyRepository)
|
|||
|
|
{
|
|||
|
|
_historyRepository = historyRepository;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
public async Task ProcessAsync(VariableContext context)
|
|||
|
|
{
|
|||
|
|
if (context.IsProcessed || context.IsError) return; // 短路
|
|||
|
|
|
|||
|
|
try
|
|||
|
|
{
|
|||
|
|
// 只有当值发生变化时才存储历史,这里需要根据ChangeDetectionProcessor的逻辑来判断
|
|||
|
|
// 由于ChangeDetectionProcessor现在只标记IsProcessed,我们需要重新判断是否需要存储历史
|
|||
|
|
// 简单起见,这里假设如果未被短路,且RawValue与CurrentValue不同,则认为值已变化
|
|||
|
|
// 更严谨的做法是ChangeDetectionProcessor设置一个IsValueChanged标志
|
|||
|
|
// 这里我们直接使用RawValue和CurrentValue的比较
|
|||
|
|
if (!context.RawValue.Equals(context.CurrentValue))
|
|||
|
|
{
|
|||
|
|
var history = new VariableHistory
|
|||
|
|
{
|
|||
|
|
VariableId = context.Variable.Id,
|
|||
|
|
Value = context.CurrentValue.ToString(),
|
|||
|
|
Timestamp = context.Timestamp
|
|||
|
|
};
|
|||
|
|
await _historyRepository.AddAsync(history);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
catch (Exception ex)
|
|||
|
|
{
|
|||
|
|
context.IsError = true; // 标记错误
|
|||
|
|
// 记录日志
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
### 4.3. `MqttPublishProcessor` - MQTT发布处理器
|
|||
|
|
|
|||
|
|
**职责**:如果变量关联了MQTT服务器,则将值发布出去。
|
|||
|
|
|
|||
|
|
```csharp
|
|||
|
|
// 文件: DMS.Infrastructure/Services/Processors/MqttPublishProcessor.cs
|
|||
|
|
using DMS.Application.Interfaces;
|
|||
|
|
using DMS.Core.Interfaces;
|
|||
|
|
using DMS.Core.Models;
|
|||
|
|
using DMS.Infrastructure.Services.Communication;
|
|||
|
|
using System.Linq;
|
|||
|
|
using System.Text.Json;
|
|||
|
|
using System.Threading.Tasks;
|
|||
|
|
using NLog;
|
|||
|
|
|
|||
|
|
namespace DMS.Infrastructure.Services.Processing;
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// MQTT发布处理器,负责将变量值发布到关联的MQTT服务器,并使用专属别名。
|
|||
|
|
/// </summary>
|
|||
|
|
public class MqttPublishProcessor : IVariableProcessor
|
|||
|
|
{
|
|||
|
|
private readonly IMqttPublishService _mqttService;
|
|||
|
|
private readonly IRepositoryManager _repoManager; // 使用 RepositoryManager 来获取仓储
|
|||
|
|
private static readonly ILogger _logger = LogManager.GetCurrentClassLogger();
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 构造函数。
|
|||
|
|
/// </summary>
|
|||
|
|
public MqttPublishProcessor(IMqttPublishService mqttService, IRepositoryManager repoManager)
|
|||
|
|
{
|
|||
|
|
_mqttService = mqttService;
|
|||
|
|
_repoManager = repoManager;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
public async Task ProcessAsync(VariableContext context)
|
|||
|
|
{
|
|||
|
|
if (context.IsProcessed || context.IsError) return; // 短路
|
|||
|
|
|
|||
|
|
try
|
|||
|
|
{
|
|||
|
|
// 1. 从仓储获取变量及其完整的别名关联列表
|
|||
|
|
var variableWithAliases = await _repoManager.Variables.GetVariableWithMqttAliasesAsync(context.Variable.Id);
|
|||
|
|
|
|||
|
|
if (variableWithAliases?.MqttAliases == null || !variableWithAliases.MqttAliases.Any())
|
|||
|
|
{
|
|||
|
|
return; // 没有关联的MQTT服务器,无需发布
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
foreach (var aliasInfo in variableWithAliases.MqttAliases)
|
|||
|
|
{
|
|||
|
|
// 确保 MqttServer 导航属性已加载且激活
|
|||
|
|
var targetServer = aliasInfo.MqttServer;
|
|||
|
|
if (targetServer == null || !targetServer.IsActive)
|
|||
|
|
{
|
|||
|
|
_logger.Warn($"MQTT发布失败:变量 {context.Variable.Name} 关联的MQTT服务器 {aliasInfo.MqttServerId} 不存在或未激活。");
|
|||
|
|
continue;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 使用别名构建Topic
|
|||
|
|
// 示例Topic格式:DMS/DeviceName/VariableAlias
|
|||
|
|
var topic = $"DMS/{context.Variable.VariableTable.Device.Name}/{aliasInfo.Alias}";
|
|||
|
|
var payload = JsonSerializer.Serialize(new { value = context.CurrentValue, timestamp = context.Timestamp });
|
|||
|
|
|
|||
|
|
await _mqttService.PublishAsync(targetServer, topic, payload);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
catch (Exception ex)
|
|||
|
|
{
|
|||
|
|
context.IsError = true; // 标记错误
|
|||
|
|
_logger.Error(ex, $"MQTT发布失败:变量 {context.Variable.Name}");
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
## 5. 变量处理后台服务 (`VariableProcessingService`)
|
|||
|
|
|
|||
|
|
设计一个新的后台服务 `VariableProcessingService`,它将负责管理和执行所有注册的 `IVariableProcessor`。这个服务将从数据采集队列中读取 `VariableContext`,并按顺序调用所有处理器,实现短路机制。
|
|||
|
|
|
|||
|
|
```csharp
|
|||
|
|
// 文件: DMS.Infrastructure/Services/VariableProcessingService.cs
|
|||
|
|
using DMS.Application.Interfaces;
|
|||
|
|
using DMS.Core.Models;
|
|||
|
|
using DMS.WPF.Services;
|
|||
|
|
using CommunityToolkit.Mvvm.Messaging;
|
|||
|
|
using Microsoft.Extensions.DependencyInjection;
|
|||
|
|
using Microsoft.Extensions.Hosting;
|
|||
|
|
using NLog;
|
|||
|
|
using System;
|
|||
|
|
using System.Collections.Generic;
|
|||
|
|
using System.Threading;
|
|||
|
|
using System.Threading.Channels;
|
|||
|
|
using System.Threading.Tasks;
|
|||
|
|
|
|||
|
|
namespace DMS.Infrastructure.Services;
|
|||
|
|
|
|||
|
|
/// <summary>
|
|||
|
|
/// 变量数据处理后台服务,负责协调所有 IVariableProcessor 的执行。
|
|||
|
|
/// </summary>
|
|||
|
|
public class VariableProcessingService : BackgroundService
|
|||
|
|
{
|
|||
|
|
private readonly ChannelReader<VariableContext> _queueReader;
|
|||
|
|
private readonly IServiceProvider _serviceProvider;
|
|||
|
|
private readonly IMessenger _messenger;
|
|||
|
|
private static readonly ILogger _logger = LogManager.GetCurrentClassLogger();
|
|||
|
|
|
|||
|
|
public VariableProcessingService(
|
|||
|
|
IChannelBus channelBus,
|
|||
|
|
IServiceProvider serviceProvider,
|
|||
|
|
IMessenger messenger)
|
|||
|
|
{
|
|||
|
|
_queueReader = channelBus.GetReader<VariableContext>("DataProcessingQueue");
|
|||
|
|
_serviceProvider = serviceProvider;
|
|||
|
|
_messenger = messenger;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|||
|
|
{
|
|||
|
|
await foreach (var context in _queueReader.ReadAllAsync(stoppingToken))
|
|||
|
|
{
|
|||
|
|
// 获取所有注册的 IVariableProcessor 实例
|
|||
|
|
// 注意:这里假设处理器是 Transient 或 Scoped,每次处理都获取新的实例
|
|||
|
|
var processors = _serviceProvider.GetServices<IVariableProcessor>();
|
|||
|
|
|
|||
|
|
context.IsSuccess = true; // 默认成功,任何处理器失败则设为false
|
|||
|
|
|
|||
|
|
foreach (var processor in processors)
|
|||
|
|
{
|
|||
|
|
if (context.IsProcessed || context.IsError) // 短路机制
|
|||
|
|
{
|
|||
|
|
context.IsSuccess = false; // 如果短路,则不认为是完全成功
|
|||
|
|
break;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
try
|
|||
|
|
{
|
|||
|
|
await processor.ProcessAsync(context);
|
|||
|
|
}
|
|||
|
|
catch (Exception ex)
|
|||
|
|
{
|
|||
|
|
context.IsError = true; // 标记错误
|
|||
|
|
context.IsSuccess = false; // 标记失败
|
|||
|
|
_logger.Error(ex, $"变量处理器 {processor.GetType().Name} 执行错误,变量ID: {context.Variable.Id}");
|
|||
|
|
break; // 发生错误,中断后续处理
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// 处理完成后,发送消息通知UI更新变量值
|
|||
|
|
// 只有当处理未被短路且没有错误时才发送成功消息
|
|||
|
|
if (!context.IsProcessed && !context.IsError)
|
|||
|
|
{
|
|||
|
|
_messenger.Send(new VariableValueUpdatedMessage(context.Variable.Id, context.CurrentValue));
|
|||
|
|
}
|
|||
|
|
else if (context.IsError)
|
|||
|
|
{
|
|||
|
|
// 可以发送错误消息或进行其他错误处理
|
|||
|
|
_messenger.Send(new VariableProcessingErrorMessage(context.Variable.Id, context.CurrentValue, "处理失败"));
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
## 6. 注册处理器和后台服务
|
|||
|
|
|
|||
|
|
在应用程序的启动配置中(例如 `Startup.cs` 或 `Program.cs`),需要注册所有的 `IVariableProcessor` 实现和 `VariableProcessingService`。
|
|||
|
|
|
|||
|
|
```csharp
|
|||
|
|
// 在 ConfigureServices 方法中
|
|||
|
|
public void ConfigureServices(IServiceCollection services)
|
|||
|
|
{
|
|||
|
|
// ... 其他服务注册
|
|||
|
|
|
|||
|
|
// 注册所有 IVariableProcessor 实现
|
|||
|
|
services.AddTransient<IVariableProcessor, ChangeDetectionProcessor>();
|
|||
|
|
services.AddTransient<IVariableProcessor, HistoryStorageProcessor>();
|
|||
|
|
services.AddTransient<IVariableProcessor, MqttPublishProcessor>();
|
|||
|
|
// ... 注册其他处理器
|
|||
|
|
|
|||
|
|
// 注册后台服务
|
|||
|
|
services.AddHostedService<VariableProcessingService>();
|
|||
|
|
|
|||
|
|
// 注册 IMemoryCache (如果尚未注册)
|
|||
|
|
services.AddMemoryCache();
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
## 7. 数据采集服务中的调用
|
|||
|
|
|
|||
|
|
数据采集服务(例如 `S7BackgroundService`)现在只需将采集到的 `VariableContext` 写入 `DataProcessingQueue` 即可,无需关心处理链的构建和执行。
|
|||
|
|
|
|||
|
|
```csharp
|
|||
|
|
// 在 S7BackgroundService.cs 或类似服务中
|
|||
|
|
public class S7BackgroundService : IHostedService
|
|||
|
|
{
|
|||
|
|
private readonly ChannelWriter<VariableContext> _processingQueueWriter;
|
|||
|
|
// ... 其他依赖
|
|||
|
|
|
|||
|
|
public S7BackgroundService(IRepositoryManager repo, IChannelBus bus, IMessenger msg)
|
|||
|
|
{
|
|||
|
|
// ...
|
|||
|
|
_processingQueueWriter = bus.GetWriter<VariableContext>("DataProcessingQueue");
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// ... StartAsync 方法中
|
|||
|
|
private async Task PollingLoopAsync(List<Variable> varsToRead, int interval, CancellationToken token)
|
|||
|
|
{
|
|||
|
|
while (!token.IsCancellationRequested)
|
|||
|
|
{
|
|||
|
|
// ... 读取变量
|
|||
|
|
foreach (var variable in varsToRead)
|
|||
|
|
{
|
|||
|
|
var context = new VariableContext(variable, variable.DataValue);
|
|||
|
|
await _processingQueueWriter.WriteAsync(context, token);
|
|||
|
|
}
|
|||
|
|
// ...
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
```
|