439 lines
18 KiB
Markdown
439 lines
18 KiB
Markdown
# 05. 基础设施层 - 后台服务与通信
|
||
|
||
本文档详细设计了 `DMS.Infrastructure` 层中负责与外部世界(如PLC、MQTT Broker)进行通信的后台服务。
|
||
|
||
## 1. 目录结构
|
||
|
||
```
|
||
DMS.Infrastructure/
|
||
├── Services/
|
||
│ ├── Communication/
|
||
│ │ ├── S7DeviceAgent.cs
|
||
│ │ └── MqttPublishService.cs
|
||
│ ├── Processing/
|
||
│ │ ├── ChangeDetectionProcessor.cs
|
||
│ │ ├── HistoryStorageProcessor.cs
|
||
│ │ └── MqttPublishProcessor.cs
|
||
│ ├── S7BackgroundService.cs
|
||
│ └── DataProcessingService.cs
|
||
└── ...
|
||
```
|
||
|
||
## 2. S7通信架构 (“编排者-代理”模式)
|
||
|
||
### 2.1. 设计思路与考量
|
||
|
||
* **模式**:采用“编排者-代理”(Orchestrator-Agent)模式。`S7BackgroundService` 作为编排者,负责管理所有S7设备的生命周期;每个 `S7DeviceAgent` 作为代理,专门负责与**一个**S7 PLC进行所有交互。
|
||
* **职责分离**:将设备管理(启动、停止、配置更新)与具体设备通信(连接、轮询、读写)的职责分离。
|
||
* **并发性**:每个 `S7DeviceAgent` 独立运行,可以并行处理多个设备的通信,提高系统吞吐量。
|
||
* **热重载**:通过消息机制,允许在运行时动态更新设备的变量配置,而无需重启整个服务。
|
||
|
||
### 2.2. 设计优势
|
||
|
||
* **高可靠性**:单个设备的通信故障不会影响其他设备的正常运行。
|
||
* **可扩展性**:易于添加新的设备类型或通信协议,只需实现新的 `DeviceAgent`。
|
||
* **动态配置**:支持运行时修改设备和变量配置,提高了系统的灵活性和可用性。
|
||
* **资源隔离**:每个Agent管理自己的连接和资源,避免资源争用。
|
||
|
||
### 2.3. 设计劣势/权衡
|
||
|
||
* **复杂性增加**:引入了Agent的概念和Agent与Orchestrator之间的通信机制,增加了初期设计和实现复杂性。
|
||
* **资源消耗**:每个Agent可能需要维护独立的连接和线程,当设备数量非常庞大时,可能会增加资源消耗。
|
||
|
||
### 2.4. `S7BackgroundService.cs` (编排者)
|
||
|
||
作为 `IHostedService` 运行,负责从数据库加载激活的S7设备,并为每个设备创建和管理 `S7DeviceAgent` 实例。它还监听配置变更消息,以触发Agent的热重载。
|
||
|
||
```csharp
|
||
// 文件: DMS.Infrastructure/Services/S7BackgroundService.cs
|
||
using Microsoft.Extensions.Hosting;
|
||
using DMS.Core.Interfaces;
|
||
using DMS.WPF.Services;
|
||
using CommunityToolkit.Mvvm.Messaging;
|
||
using System.Collections.Concurrent;
|
||
using System.Threading;
|
||
using System.Threading.Tasks;
|
||
using DMS.Core.Enums;
|
||
using DMS.WPF.Messages;
|
||
|
||
namespace DMS.Infrastructure.Services;
|
||
|
||
/// <summary>
|
||
/// S7后台服务编排者,作为IHostedService运行,管理所有S7设备的通信代理。
|
||
/// </summary>
|
||
public class S7BackgroundService : IHostedService
|
||
{
|
||
private readonly IRepositoryManager _repoManager;
|
||
private readonly IChannelBus _channelBus;
|
||
private readonly IMessenger _messenger;
|
||
private readonly ConcurrentDictionary<int, S7DeviceAgent> _activeAgents = new();
|
||
|
||
/// <summary>
|
||
/// 构造函数,通过依赖注入获取所需服务。
|
||
/// </summary>
|
||
public S7BackgroundService(IRepositoryManager repo, IChannelBus bus, IMessenger msg)
|
||
{
|
||
_repoManager = repo;
|
||
_channelBus = bus;
|
||
_messenger = msg;
|
||
// 注册配置变更消息,以便在设备或变量配置更新时通知Agent
|
||
_messenger.Register<ConfigChangedMessage>(this, async (r, m) => await HandleConfigChange(m));
|
||
}
|
||
|
||
/// <summary>
|
||
/// 服务启动时调用,加载所有激活的S7设备并启动其代理。
|
||
/// </summary>
|
||
public async Task StartAsync(CancellationToken cancellationToken)
|
||
{
|
||
// 获取所有激活的S7设备及其详细信息
|
||
var s7Devices = await _repoManager.Devices.GetActiveDevicesWithDetailsAsync(ProtocolType.S7);
|
||
foreach (var device in s7Devices)
|
||
{
|
||
// 为每个设备创建一个S7DeviceAgent实例
|
||
var agent = new S7DeviceAgent(device, _channelBus, _messenger);
|
||
if (_activeAgents.TryAdd(device.Id, agent))
|
||
{
|
||
// 启动Agent的通信循环
|
||
await agent.StartAsync(cancellationToken);
|
||
}
|
||
}
|
||
|
||
// 启动数据处理消费者服务,它将从ChannelBus中读取数据
|
||
var dataProcessor = new DataProcessingService(_channelBus, _messenger, _repoManager);
|
||
_ = dataProcessor.StartProcessingAsync(cancellationToken); // 在后台运行,不阻塞启动
|
||
}
|
||
|
||
/// <summary>
|
||
/// 处理配置变更消息,通知相关Agent更新其变量列表。
|
||
/// </summary>
|
||
private async Task HandleConfigChange(ConfigChangedMessage message)
|
||
{
|
||
// 从数据库重新加载受影响的设备及其最新配置
|
||
var updatedDevice = await _repoManager.Devices.GetDeviceWithDetailsAsync(message.DeviceId);
|
||
if (updatedDevice != null && _activeAgents.TryGetValue(message.DeviceId, out var agent))
|
||
{
|
||
// 指示Agent使用新的变量列表进行热重载
|
||
agent.UpdateVariableLists(updatedDevice.VariableTables.SelectMany(vt => vt.Variables).ToList());
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 服务停止时调用,停止所有活动的Agent并释放资源。
|
||
/// </summary>
|
||
public async Task StopAsync(CancellationToken cancellationToken)
|
||
{
|
||
foreach (var agent in _activeAgents.Values)
|
||
{
|
||
await agent.DisposeAsync();
|
||
}
|
||
_activeAgents.Clear();
|
||
}
|
||
}
|
||
```
|
||
|
||
### 2.5. `S7DeviceAgent.cs` (代理)
|
||
|
||
负责与单个PLC建立连接、维护连接、按不同频率并行轮询变量,并将读取到的数据通过 `IChannelBus` 写入数据处理队列。
|
||
|
||
```csharp
|
||
// 文件: DMS.Infrastructure/Services/Communication/S7DeviceAgent.cs
|
||
using S7.Net;
|
||
using DMS.Core.Models;
|
||
using DMS.Core.Enums;
|
||
using DMS.WPF.Services;
|
||
using CommunityToolkit.Mvvm.Messaging;
|
||
using System.Collections.Generic;
|
||
using System.Linq;
|
||
using System.Threading;
|
||
using System.Threading.Channels;
|
||
using System.Threading.Tasks;
|
||
using System;
|
||
|
||
namespace DMS.Infrastructure.Services.Communication;
|
||
|
||
/// <summary>
|
||
/// 单个S7 PLC的通信代理,负责连接、轮询和数据发送。
|
||
/// </summary>
|
||
public class S7DeviceAgent : IAsyncDisposable
|
||
{
|
||
private readonly Device _deviceConfig;
|
||
private readonly ChannelWriter<VariableContext> _processingQueueWriter;
|
||
private readonly IMessenger _messenger;
|
||
private readonly Plc _plcClient;
|
||
private CancellationTokenSource _cts; // 用于控制Agent内部的轮询任务
|
||
|
||
// 存储按轮询级别分组的变量列表
|
||
private List<Variable> _highFreqVars = new();
|
||
private List<Variable> _mediumFreqVars = new();
|
||
private List<Variable> _lowFreqVars = new();
|
||
|
||
/// <summary>
|
||
/// 构造函数。
|
||
/// </summary>
|
||
/// <param name="device">设备的配置信息。</param>
|
||
/// <param name="channelBus">中央通道总线服务。</param>
|
||
/// <param name="messenger">消息总线服务。</param>
|
||
public S7DeviceAgent(Device device, IChannelBus channelBus, IMessenger messenger)
|
||
{
|
||
_deviceConfig = device;
|
||
_messenger = messenger;
|
||
// 从中央总线获取指定名称的通道的写入端
|
||
_processingQueueWriter = channelBus.GetWriter<VariableContext>("DataProcessingQueue");
|
||
|
||
// 初始化S7.Net PLC客户端
|
||
_plcClient = new Plc(
|
||
(CpuType)Enum.Parse(typeof(CpuType), _deviceConfig.Protocol.ToString()), // 根据协议类型解析CPU类型
|
||
_deviceConfig.IpAddress,
|
||
(short)_deviceConfig.Rack,
|
||
(short)_deviceConfig.Slot
|
||
);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 启动Agent的通信循环。
|
||
/// </summary>
|
||
public async Task StartAsync(CancellationToken cancellationToken = default)
|
||
{
|
||
_cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
|
||
await _plcClient.OpenAsync(); // 建立PLC连接
|
||
|
||
// 初始加载变量列表并分组
|
||
UpdateVariableLists(_deviceConfig.VariableTables.SelectMany(vt => vt.Variables).ToList());
|
||
|
||
// 启动并行的轮询任务,每个轮询级别一个任务
|
||
_ = Task.Run(() => PollingLoopAsync(_highFreqVars, 200, _cts.Token)); // 高频:200ms
|
||
_ = Task.Run(() => PollingLoopAsync(_mediumFreqVars, 1000, _cts.Token)); // 中频:1000ms
|
||
_ = Task.Run(() => PollingLoopAsync(_lowFreqVars, 5000, _cts.Token)); // 低频:5000ms
|
||
}
|
||
|
||
/// <summary>
|
||
/// 热重载方法,用于响应配置变更,更新Agent内部的变量列表。
|
||
/// </summary>
|
||
/// <param name="allActiveVariables">最新的所有激活变量列表。</param>
|
||
public void UpdateVariableLists(List<Variable> allActiveVariables)
|
||
{
|
||
// 重新分组变量
|
||
_highFreqVars = allActiveVariables.Where(v => v.PollLevel == PollLevelType.High).ToList();
|
||
_mediumFreqVars = allActiveVariables.Where(v => v.PollLevel == PollLevelType.Medium).ToList();
|
||
_lowFreqVars = allActiveVariables.Where(v => v.PollLevel == PollLevelType.Low).ToList();
|
||
// 可以考虑在这里重新启动轮询任务,或者让现有任务检测到列表变化
|
||
}
|
||
|
||
/// <summary>
|
||
/// 核心轮询循环,负责批量读取变量并将数据写入处理队列。
|
||
/// </summary>
|
||
private async Task PollingLoopAsync(List<Variable> varsToRead, int interval, CancellationToken token)
|
||
{
|
||
while (!token.IsCancellationRequested)
|
||
{
|
||
if (!varsToRead.Any()) // 如果没有变量需要轮询,则等待
|
||
{
|
||
await Task.Delay(interval, token);
|
||
continue;
|
||
}
|
||
|
||
try
|
||
{
|
||
// 使用 S7.Net Plus 的 ReadMultipleVarsAsync 批量读取变量
|
||
// 注意:S7.Net Plus 会将读取到的值直接更新到 Variable 对象的 DataValue 属性中
|
||
await _plcClient.ReadMultipleVarsAsync(varsToRead);
|
||
|
||
foreach (var variable in varsToRead)
|
||
{
|
||
// 将读取到的值(包含在VariableContext中)放入数据处理队列
|
||
var context = new VariableContext(variable, variable.DataValue);
|
||
await _processingQueueWriter.WriteAsync(context, token);
|
||
}
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
// 记录通信错误,但不中断整个Agent
|
||
_messenger.Send(new LogMessage(LogLevel.Error, ex, $"S7DeviceAgent: 设备 {_deviceConfig.Name} ({_deviceConfig.IpAddress}) 轮询错误。"));
|
||
}
|
||
await Task.Delay(interval, token); // 等待下一个轮询周期
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 异步释放Agent资源,包括停止轮询任务和关闭PLC连接。
|
||
/// </summary>
|
||
public async ValueTask DisposeAsync()
|
||
{
|
||
_cts?.Cancel(); // 取消所有内部轮询任务
|
||
if (_plcClient.IsConnected)
|
||
{
|
||
_plcClient.Close(); // 关闭PLC连接
|
||
}
|
||
_plcClient?.Dispose();
|
||
}
|
||
}
|
||
```
|
||
|
||
## 3. 数据处理链
|
||
|
||
### 3.1. 设计思路与考量
|
||
|
||
* **模式**:采用责任链模式(Chain of Responsibility)来处理采集到的变量数据。每个处理器(Processor)负责一个单一的数据处理步骤(如变化检测、历史存储、MQTT发布)。
|
||
* **解耦**:每个处理器都是独立的,只关心自己的处理逻辑,不关心前一个处理器如何生成数据,也不关心后一个处理器如何消费数据。
|
||
* **可扩展性**:可以轻松地添加、移除或重新排序处理步骤,而无需修改现有代码。
|
||
|
||
### 3.2. 设计优势
|
||
|
||
* **灵活性**:可以根据业务需求动态构建不同的处理链。
|
||
* **可维护性**:每个处理步骤独立,易于理解、测试和维护。
|
||
* **可重用性**:单个处理器可以在不同的处理链中复用。
|
||
|
||
### 3.3. 设计劣势/权衡
|
||
|
||
* **性能开销**:每个处理器都需要额外的对象创建和方法调用开销,对于极度性能敏感的场景可能需要优化。
|
||
* **调试复杂性**:当处理链较长时,跟踪数据流向可能变得复杂。
|
||
|
||
### 3.4. `DataProcessingService.cs` (消费者)
|
||
|
||
从 `IChannelBus` 读取数据,并启动数据处理链。它作为数据处理链的入口点。
|
||
|
||
```csharp
|
||
// 文件: DMS.Infrastructure/Services/DataProcessingService.cs
|
||
using DMS.Core.Models;
|
||
using DMS.WPF.Services;
|
||
using CommunityToolkit.Mvvm.Messaging;
|
||
using System.Threading.Channels;
|
||
using System.Threading.Tasks;
|
||
using DMS.Infrastructure.Services.Processing;
|
||
using DMS.Core.Interfaces;
|
||
using DMS.WPF.Messages;
|
||
using NLog;
|
||
|
||
namespace DMS.Infrastructure.Services;
|
||
|
||
/// <summary>
|
||
/// 数据处理消费者服务,从ChannelBus中读取变量数据并启动处理链。
|
||
/// </summary>
|
||
public class DataProcessingService
|
||
{
|
||
private readonly ChannelReader<VariableContext> _queueReader;
|
||
private readonly IMessenger _messenger;
|
||
private readonly IRepositoryManager _repoManager;
|
||
private static readonly ILogger _logger = LogManager.GetCurrentClassLogger();
|
||
|
||
/// <summary>
|
||
/// 构造函数,通过依赖注入获取所需服务。
|
||
/// </summary>
|
||
public DataProcessingService(IChannelBus channelBus, IMessenger messenger, IRepositoryManager repo)
|
||
{
|
||
// 从中央总线获取数据处理队列的读取端
|
||
_queueReader = channelBus.GetReader<VariableContext>("DataProcessingQueue");
|
||
_messenger = messenger;
|
||
_repoManager = repo;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 启动数据处理循环,持续从队列中读取数据并进行处理。
|
||
/// </summary>
|
||
public async Task StartProcessingAsync(CancellationToken token)
|
||
{
|
||
await foreach (var context in _queueReader.ReadAllAsync(token))
|
||
{
|
||
try
|
||
{
|
||
// 构建并执行数据处理链
|
||
var changeDetector = new ChangeDetectionProcessor(); // 假设这些处理器是无状态的,可以直接创建
|
||
var historyStorage = new HistoryStorageProcessor(_repoManager.VariableHistories); // 注入仓储
|
||
var mqttPublisher = new MqttPublishProcessor(_repoManager.VariableMqttAliases, _repoManager.MqttServers, _messenger); // 注入所需服务
|
||
|
||
// 链式连接处理器
|
||
changeDetector.SetNext(historyStorage).SetNext(mqttPublisher);
|
||
|
||
// 启动处理
|
||
await changeDetector.ProcessAsync(context);
|
||
|
||
// 处理完成后,发送消息通知UI更新变量值
|
||
_messenger.Send(new VariableValueUpdatedMessage(context.Variable.Id, context.CurrentValue));
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
_logger.Error(ex, $"数据处理链执行错误,变量ID: {context.Variable.Id}");
|
||
}
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
### 3.5. `MqttPublishProcessor.cs`
|
||
|
||
处理器从 `Variable` 的 `MqttAliases` 集合中获取别名和目标服务器,并使用别名构建MQTT Topic。
|
||
|
||
```csharp
|
||
// 文件: DMS.Infrastructure/Services/Processing/MqttPublishProcessor.cs
|
||
using DMS.Core.Interfaces;
|
||
using DMS.Core.Models;
|
||
using DMS.Infrastructure.Services.Communication;
|
||
using CommunityToolkit.Mvvm.Messaging;
|
||
using System.Linq;
|
||
using System.Threading.Tasks;
|
||
using System.Text.Json;
|
||
using NLog;
|
||
|
||
namespace DMS.Infrastructure.Services.Processing;
|
||
|
||
/// <summary>
|
||
/// MQTT发布处理器,负责将变量值发布到关联的MQTT服务器,并使用专属别名。
|
||
/// </summary>
|
||
public class MqttPublishProcessor : VariableProcessorBase
|
||
{
|
||
private readonly IMqttPublishService _mqttService;
|
||
private readonly IVariableMqttAliasRepository _aliasRepository;
|
||
private readonly IMqttServerRepository _mqttServerRepository;
|
||
private readonly IMessenger _messenger;
|
||
private static readonly ILogger _logger = LogManager.GetCurrentClassLogger();
|
||
|
||
/// <summary>
|
||
/// 构造函数。
|
||
/// </summary>
|
||
public MqttPublishProcessor(IMqttPublishService mqttService, IVariableMqttAliasRepository aliasRepository, IMqttServerRepository mqttServerRepository, IMessenger messenger)
|
||
{
|
||
_mqttService = mqttService;
|
||
_aliasRepository = aliasRepository;
|
||
_mqttServerRepository = mqttServerRepository;
|
||
_messenger = messenger;
|
||
}
|
||
|
||
protected override async Task HandleAsync(VariableContext context)
|
||
{
|
||
if (!context.IsValueChanged) return; // 如果值未变化,则不发布
|
||
|
||
// 获取与当前变量关联的所有MQTT别名信息
|
||
var aliases = await _aliasRepository.GetAliasesForVariableAsync(context.Variable.Id);
|
||
|
||
if (aliases == null || !aliases.Any())
|
||
{
|
||
return; // 没有关联的MQTT服务器,无需发布
|
||
}
|
||
|
||
foreach (var aliasInfo in aliases)
|
||
{
|
||
try
|
||
{
|
||
// 获取关联的MQTT服务器详细信息
|
||
var targetServer = await _mqttServerRepository.GetByIdAsync(aliasInfo.MqttServerId);
|
||
if (targetServer == null || !targetServer.IsActive)
|
||
{
|
||
_logger.Warn($"MQTT发布失败:变量 {context.Variable.Name} 关联的MQTT服务器 {aliasInfo.MqttServerId} 不存在或未激活。");
|
||
continue;
|
||
}
|
||
|
||
// 使用别名构建Topic
|
||
// 示例Topic格式:DMS/DeviceName/VariableAlias
|
||
var topic = $"DMS/{context.Variable.VariableTable.Device.Name}/{aliasInfo.Alias}";
|
||
var payload = JsonSerializer.Serialize(new { value = context.CurrentValue, timestamp = context.Timestamp });
|
||
|
||
await _mqttService.PublishAsync(targetServer, topic, payload);
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
_logger.Error(ex, $"MQTT发布失败:变量 {context.Variable.Name} 到服务器 {aliasInfo.MqttServerId},别名 {aliasInfo.Alias}");
|
||
}
|
||
}
|
||
}
|
||
}
|
||
``` |