按照软件设计文档开始重构代码01

This commit is contained in:
2025-07-21 14:35:17 +08:00
parent a7b0a5d108
commit 29a2d44319
127 changed files with 12265 additions and 1586 deletions

View File

@@ -0,0 +1,390 @@
# 软件开发文档 - 数据处理链设计
本文档专门阐述当系统从S7 PLC等设备读取到变量值后如何通过一个可扩展的、链式的处理流程对数据进行加工、存储和转发。
## 1. 设计目标
* **解耦**:每个处理步骤应是独立的、可复用的。
* **可扩展**:可以轻松地添加新的处理环节(例如:数据校验、报警判断、线性转换等)。
* **高性能**:处理流程应高效,避免阻塞数据采集线程。
* **可配置**:能够根据不同变量的需求,动态构建处理链。
## 2. 核心概念:`VariableContext`
为了在处理链中传递数据,我们定义一个上下文对象 `VariableContext`。它将携带变量的原始值、当前值以及处理状态,在整个处理流程中流动。
```csharp
// 文件: DMS.Core/Models/VariableContext.cs
using System;
namespace DMS.Core.Models;
public class VariableContext
{
public Variable Variable { get; init; }
public object RawValue { get; init; } // 从设备读取的原始值
public object CurrentValue { get; set; } // 当前处理后的值
public DateTime Timestamp { get; init; }
public bool IsProcessed { get; set; } // 标记是否已处理(短路机制)
public bool IsSuccess { get; set; } // 标记处理是否成功
public bool IsError { get; set; } // 标记处理是否发生错误
public VariableContext(Variable variable, object rawValue)
{
Variable = variable;
RawValue = rawValue;
CurrentValue = rawValue;
Timestamp = DateTime.UtcNow;
IsProcessed = false;
IsSuccess = false;
IsError = false;
}
}
```
## 3. 处理器接口
我们定义一个统一的处理器接口 `IVariableProcessor`
### 3.1. `IVariableProcessor` 接口
```csharp
// 文件: DMS.Application/Interfaces/IVariableProcessor.cs
using System.Threading.Tasks;
namespace DMS.Application.Interfaces;
public interface IVariableProcessor
{
Task ProcessAsync(VariableContext context);
}
```
## 4. 具体处理器实现
以下是处理链中几个核心处理器的设计。
### 4.1. `ChangeDetectionProcessor` - 变化检测处理器
**职责**:检测本次读取的值与上一次的值是否相同。如果相同,则标记 `IsProcessed``true`,以短路后续处理。
```csharp
// 文件: DMS.Infrastructure/Services/Processors/ChangeDetectionProcessor.cs
using DMS.Application.Interfaces;
using DMS.Core.Models;
using Microsoft.Extensions.Caching.Memory;
using System;
using System.Threading.Tasks;
namespace DMS.Infrastructure.Services.Processing;
public class ChangeDetectionProcessor : IVariableProcessor
{
private readonly IMemoryCache _cache; // 使用内存缓存来存储上一次的值
public ChangeDetectionProcessor(IMemoryCache cache)
{
_cache = cache;
}
public Task ProcessAsync(VariableContext context)
{
if (context.IsProcessed || context.IsError) return Task.CompletedTask; // 短路
try
{
var lastValue = _cache.Get(context.Variable.Id);
if (lastValue != null && lastValue.Equals(context.RawValue))
{
context.IsProcessed = true; // 值未变化,标记为已处理,短路后续操作
}
else
{
_cache.Set(context.Variable.Id, context.RawValue, TimeSpan.FromDays(1));
}
}
catch (Exception ex)
{
context.IsError = true; // 标记错误
// 记录日志
}
return Task.CompletedTask;
}
}
```
### 4.2. `HistoryStorageProcessor` - 历史存储处理器
**职责**:将变化后的值存入数据库历史记录表。
```csharp
// 文件: DMS.Infrastructure/Services/Processors/HistoryStorageProcessor.cs
using DMS.Application.Interfaces;
using DMS.Core.Interfaces;
using DMS.Core.Models;
using System;
using System.Threading.Tasks;
namespace DMS.Infrastructure.Services.Processing;
public class HistoryStorageProcessor : IVariableProcessor
{
private readonly IVariableHistoryRepository _historyRepository;
public HistoryStorageProcessor(IVariableHistoryRepository historyRepository)
{
_historyRepository = historyRepository;
}
public async Task ProcessAsync(VariableContext context)
{
if (context.IsProcessed || context.IsError) return; // 短路
try
{
// 只有当值发生变化时才存储历史这里需要根据ChangeDetectionProcessor的逻辑来判断
// 由于ChangeDetectionProcessor现在只标记IsProcessed我们需要重新判断是否需要存储历史
// 简单起见这里假设如果未被短路且RawValue与CurrentValue不同则认为值已变化
// 更严谨的做法是ChangeDetectionProcessor设置一个IsValueChanged标志
// 这里我们直接使用RawValue和CurrentValue的比较
if (!context.RawValue.Equals(context.CurrentValue))
{
var history = new VariableHistory
{
VariableId = context.Variable.Id,
Value = context.CurrentValue.ToString(),
Timestamp = context.Timestamp
};
await _historyRepository.AddAsync(history);
}
}
catch (Exception ex)
{
context.IsError = true; // 标记错误
// 记录日志
}
}
}
```
### 4.3. `MqttPublishProcessor` - MQTT发布处理器
**职责**如果变量关联了MQTT服务器则将值发布出去。
```csharp
// 文件: DMS.Infrastructure/Services/Processors/MqttPublishProcessor.cs
using DMS.Application.Interfaces;
using DMS.Core.Interfaces;
using DMS.Core.Models;
using DMS.Infrastructure.Services.Communication;
using System.Linq;
using System.Text.Json;
using System.Threading.Tasks;
using NLog;
namespace DMS.Infrastructure.Services.Processing;
/// <summary>
/// MQTT发布处理器负责将变量值发布到关联的MQTT服务器并使用专属别名。
/// </summary>
public class MqttPublishProcessor : IVariableProcessor
{
private readonly IMqttPublishService _mqttService;
private readonly IRepositoryManager _repoManager; // 使用 RepositoryManager 来获取仓储
private static readonly ILogger _logger = LogManager.GetCurrentClassLogger();
/// <summary>
/// 构造函数。
/// </summary>
public MqttPublishProcessor(IMqttPublishService mqttService, IRepositoryManager repoManager)
{
_mqttService = mqttService;
_repoManager = repoManager;
}
public async Task ProcessAsync(VariableContext context)
{
if (context.IsProcessed || context.IsError) return; // 短路
try
{
// 1. 从仓储获取变量及其完整的别名关联列表
var variableWithAliases = await _repoManager.Variables.GetVariableWithMqttAliasesAsync(context.Variable.Id);
if (variableWithAliases?.MqttAliases == null || !variableWithAliases.MqttAliases.Any())
{
return; // 没有关联的MQTT服务器无需发布
}
foreach (var aliasInfo in variableWithAliases.MqttAliases)
{
// 确保 MqttServer 导航属性已加载且激活
var targetServer = aliasInfo.MqttServer;
if (targetServer == null || !targetServer.IsActive)
{
_logger.Warn($"MQTT发布失败变量 {context.Variable.Name} 关联的MQTT服务器 {aliasInfo.MqttServerId} 不存在或未激活。");
continue;
}
// 使用别名构建Topic
// 示例Topic格式DMS/DeviceName/VariableAlias
var topic = $"DMS/{context.Variable.VariableTable.Device.Name}/{aliasInfo.Alias}";
var payload = JsonSerializer.Serialize(new { value = context.CurrentValue, timestamp = context.Timestamp });
await _mqttService.PublishAsync(targetServer, topic, payload);
}
}
catch (Exception ex)
{
context.IsError = true; // 标记错误
_logger.Error(ex, $"MQTT发布失败变量 {context.Variable.Name}");
}
}
}
```
## 5. 变量处理后台服务 (`VariableProcessingService`)
设计一个新的后台服务 `VariableProcessingService`,它将负责管理和执行所有注册的 `IVariableProcessor`。这个服务将从数据采集队列中读取 `VariableContext`,并按顺序调用所有处理器,实现短路机制。
```csharp
// 文件: DMS.Infrastructure/Services/VariableProcessingService.cs
using DMS.Application.Interfaces;
using DMS.Core.Models;
using DMS.WPF.Services;
using CommunityToolkit.Mvvm.Messaging;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using NLog;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace DMS.Infrastructure.Services;
/// <summary>
/// 变量数据处理后台服务,负责协调所有 IVariableProcessor 的执行。
/// </summary>
public class VariableProcessingService : BackgroundService
{
private readonly ChannelReader<VariableContext> _queueReader;
private readonly IServiceProvider _serviceProvider;
private readonly IMessenger _messenger;
private static readonly ILogger _logger = LogManager.GetCurrentClassLogger();
public VariableProcessingService(
IChannelBus channelBus,
IServiceProvider serviceProvider,
IMessenger messenger)
{
_queueReader = channelBus.GetReader<VariableContext>("DataProcessingQueue");
_serviceProvider = serviceProvider;
_messenger = messenger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await foreach (var context in _queueReader.ReadAllAsync(stoppingToken))
{
// 获取所有注册的 IVariableProcessor 实例
// 注意:这里假设处理器是 Transient 或 Scoped每次处理都获取新的实例
var processors = _serviceProvider.GetServices<IVariableProcessor>();
context.IsSuccess = true; // 默认成功任何处理器失败则设为false
foreach (var processor in processors)
{
if (context.IsProcessed || context.IsError) // 短路机制
{
context.IsSuccess = false; // 如果短路,则不认为是完全成功
break;
}
try
{
await processor.ProcessAsync(context);
}
catch (Exception ex)
{
context.IsError = true; // 标记错误
context.IsSuccess = false; // 标记失败
_logger.Error(ex, $"变量处理器 {processor.GetType().Name} 执行错误变量ID: {context.Variable.Id}");
break; // 发生错误,中断后续处理
}
}
// 处理完成后发送消息通知UI更新变量值
// 只有当处理未被短路且没有错误时才发送成功消息
if (!context.IsProcessed && !context.IsError)
{
_messenger.Send(new VariableValueUpdatedMessage(context.Variable.Id, context.CurrentValue));
}
else if (context.IsError)
{
// 可以发送错误消息或进行其他错误处理
_messenger.Send(new VariableProcessingErrorMessage(context.Variable.Id, context.CurrentValue, "处理失败"));
}
}
}
}
```
## 6. 注册处理器和后台服务
在应用程序的启动配置中(例如 `Startup.cs``Program.cs`),需要注册所有的 `IVariableProcessor` 实现和 `VariableProcessingService`
```csharp
// 在 ConfigureServices 方法中
public void ConfigureServices(IServiceCollection services)
{
// ... 其他服务注册
// 注册所有 IVariableProcessor 实现
services.AddTransient<IVariableProcessor, ChangeDetectionProcessor>();
services.AddTransient<IVariableProcessor, HistoryStorageProcessor>();
services.AddTransient<IVariableProcessor, MqttPublishProcessor>();
// ... 注册其他处理器
// 注册后台服务
services.AddHostedService<VariableProcessingService>();
// 注册 IMemoryCache (如果尚未注册)
services.AddMemoryCache();
}
```
## 7. 数据采集服务中的调用
数据采集服务(例如 `S7BackgroundService`)现在只需将采集到的 `VariableContext` 写入 `DataProcessingQueue` 即可,无需关心处理链的构建和执行。
```csharp
// 在 S7BackgroundService.cs 或类似服务中
public class S7BackgroundService : IHostedService
{
private readonly ChannelWriter<VariableContext> _processingQueueWriter;
// ... 其他依赖
public S7BackgroundService(IRepositoryManager repo, IChannelBus bus, IMessenger msg)
{
// ...
_processingQueueWriter = bus.GetWriter<VariableContext>("DataProcessingQueue");
}
// ... StartAsync 方法中
private async Task PollingLoopAsync(List<Variable> varsToRead, int interval, CancellationToken token)
{
while (!token.IsCancellationRequested)
{
// ... 读取变量
foreach (var variable in varsToRead)
{
var context = new VariableContext(variable, variable.DataValue);
await _processingQueueWriter.WriteAsync(context, token);
}
// ...
}
}
}
```