# 05. 基础设施层 - 后台服务与通信 本文档详细设计了 `DMS.Infrastructure` 层中负责与外部世界(如PLC、MQTT Broker)进行通信的后台服务。 ## 1. 目录结构 ``` DMS.Infrastructure/ ├── Services/ │ ├── Communication/ │ │ ├── S7DeviceAgent.cs │ │ └── MqttPublishService.cs │ ├── Processing/ │ │ ├── ChangeDetectionProcessor.cs │ │ ├── HistoryStorageProcessor.cs │ │ └── MqttPublishProcessor.cs │ ├── S7BackgroundService.cs │ └── DataProcessingService.cs └── ... ``` ## 2. S7通信架构 (“编排者-代理”模式) ### 2.1. 设计思路与考量 * **模式**:采用“编排者-代理”(Orchestrator-Agent)模式。`S7BackgroundService` 作为编排者,负责管理所有S7设备的生命周期;每个 `S7DeviceAgent` 作为代理,专门负责与**一个**S7 PLC进行所有交互。 * **职责分离**:将设备管理(启动、停止、配置更新)与具体设备通信(连接、轮询、读写)的职责分离。 * **并发性**:每个 `S7DeviceAgent` 独立运行,可以并行处理多个设备的通信,提高系统吞吐量。 * **热重载**:通过消息机制,允许在运行时动态更新设备的变量配置,而无需重启整个服务。 ### 2.2. 设计优势 * **高可靠性**:单个设备的通信故障不会影响其他设备的正常运行。 * **可扩展性**:易于添加新的设备类型或通信协议,只需实现新的 `DeviceAgent`。 * **动态配置**:支持运行时修改设备和变量配置,提高了系统的灵活性和可用性。 * **资源隔离**:每个Agent管理自己的连接和资源,避免资源争用。 ### 2.3. 设计劣势/权衡 * **复杂性增加**:引入了Agent的概念和Agent与Orchestrator之间的通信机制,增加了初期设计和实现复杂性。 * **资源消耗**:每个Agent可能需要维护独立的连接和线程,当设备数量非常庞大时,可能会增加资源消耗。 ### 2.4. `S7BackgroundService.cs` (编排者) 作为 `IHostedService` 运行,负责从数据库加载激活的S7设备,并为每个设备创建和管理 `S7DeviceAgent` 实例。它还监听配置变更消息,以触发Agent的热重载。 ```csharp // 文件: DMS.Infrastructure/Services/S7BackgroundService.cs using Microsoft.Extensions.Hosting; using DMS.Core.Interfaces; using DMS.WPF.Services; using CommunityToolkit.Mvvm.Messaging; using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; using DMS.Core.Enums; using DMS.WPF.Messages; namespace DMS.Infrastructure.Services; /// /// S7后台服务编排者,作为IHostedService运行,管理所有S7设备的通信代理。 /// public class S7BackgroundService : IHostedService { private readonly IRepositoryManager _repoManager; private readonly IChannelBus _channelBus; private readonly IMessenger _messenger; private readonly ConcurrentDictionary _activeAgents = new(); /// /// 构造函数,通过依赖注入获取所需服务。 /// public S7BackgroundService(IRepositoryManager repo, IChannelBus bus, IMessenger msg) { _repoManager = repo; _channelBus = bus; _messenger = msg; // 注册配置变更消息,以便在设备或变量配置更新时通知Agent _messenger.Register(this, async (r, m) => await HandleConfigChange(m)); } /// /// 服务启动时调用,加载所有激活的S7设备并启动其代理。 /// public async Task StartAsync(CancellationToken cancellationToken) { // 获取所有激活的S7设备及其详细信息 var s7Devices = await _repoManager.Devices.GetActiveDevicesWithDetailsAsync(ProtocolType.S7); foreach (var device in s7Devices) { // 为每个设备创建一个S7DeviceAgent实例 var agent = new S7DeviceAgent(device, _channelBus, _messenger); if (_activeAgents.TryAdd(device.Id, agent)) { // 启动Agent的通信循环 await agent.StartAsync(cancellationToken); } } // 启动数据处理消费者服务,它将从ChannelBus中读取数据 var dataProcessor = new DataProcessingService(_channelBus, _messenger, _repoManager); _ = dataProcessor.StartProcessingAsync(cancellationToken); // 在后台运行,不阻塞启动 } /// /// 处理配置变更消息,通知相关Agent更新其变量列表。 /// private async Task HandleConfigChange(ConfigChangedMessage message) { // 从数据库重新加载受影响的设备及其最新配置 var updatedDevice = await _repoManager.Devices.GetDeviceWithDetailsAsync(message.DeviceId); if (updatedDevice != null && _activeAgents.TryGetValue(message.DeviceId, out var agent)) { // 指示Agent使用新的变量列表进行热重载 agent.UpdateVariableLists(updatedDevice.VariableTables.SelectMany(vt => vt.Variables).ToList()); } } /// /// 服务停止时调用,停止所有活动的Agent并释放资源。 /// public async Task StopAsync(CancellationToken cancellationToken) { foreach (var agent in _activeAgents.Values) { await agent.DisposeAsync(); } _activeAgents.Clear(); } } ``` ### 2.5. `S7DeviceAgent.cs` (代理) 负责与单个PLC建立连接、维护连接、按不同频率并行轮询变量,并将读取到的数据通过 `IChannelBus` 写入数据处理队列。 ```csharp // 文件: DMS.Infrastructure/Services/Communication/S7DeviceAgent.cs using S7.Net; using DMS.Core.Models; using DMS.Core.Enums; using DMS.WPF.Services; using CommunityToolkit.Mvvm.Messaging; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; using System; namespace DMS.Infrastructure.Services.Communication; /// /// 单个S7 PLC的通信代理,负责连接、轮询和数据发送。 /// public class S7DeviceAgent : IAsyncDisposable { private readonly Device _deviceConfig; private readonly ChannelWriter _processingQueueWriter; private readonly IMessenger _messenger; private readonly Plc _plcClient; private CancellationTokenSource _cts; // 用于控制Agent内部的轮询任务 // 存储按轮询级别分组的变量列表 private List _highFreqVars = new(); private List _mediumFreqVars = new(); private List _lowFreqVars = new(); /// /// 构造函数。 /// /// 设备的配置信息。 /// 中央通道总线服务。 /// 消息总线服务。 public S7DeviceAgent(Device device, IChannelBus channelBus, IMessenger messenger) { _deviceConfig = device; _messenger = messenger; // 从中央总线获取指定名称的通道的写入端 _processingQueueWriter = channelBus.GetWriter("DataProcessingQueue"); // 初始化S7.Net PLC客户端 _plcClient = new Plc( (CpuType)Enum.Parse(typeof(CpuType), _deviceConfig.Protocol.ToString()), // 根据协议类型解析CPU类型 _deviceConfig.IpAddress, (short)_deviceConfig.Rack, (short)_deviceConfig.Slot ); } /// /// 启动Agent的通信循环。 /// public async Task StartAsync(CancellationToken cancellationToken = default) { _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); await _plcClient.OpenAsync(); // 建立PLC连接 // 初始加载变量列表并分组 UpdateVariableLists(_deviceConfig.VariableTables.SelectMany(vt => vt.Variables).ToList()); // 启动并行的轮询任务,每个轮询级别一个任务 _ = Task.Run(() => PollingLoopAsync(_highFreqVars, 200, _cts.Token)); // 高频:200ms _ = Task.Run(() => PollingLoopAsync(_mediumFreqVars, 1000, _cts.Token)); // 中频:1000ms _ = Task.Run(() => PollingLoopAsync(_lowFreqVars, 5000, _cts.Token)); // 低频:5000ms } /// /// 热重载方法,用于响应配置变更,更新Agent内部的变量列表。 /// /// 最新的所有激活变量列表。 public void UpdateVariableLists(List allActiveVariables) { // 重新分组变量 _highFreqVars = allActiveVariables.Where(v => v.PollLevel == PollLevelType.High).ToList(); _mediumFreqVars = allActiveVariables.Where(v => v.PollLevel == PollLevelType.Medium).ToList(); _lowFreqVars = allActiveVariables.Where(v => v.PollLevel == PollLevelType.Low).ToList(); // 可以考虑在这里重新启动轮询任务,或者让现有任务检测到列表变化 } /// /// 核心轮询循环,负责批量读取变量并将数据写入处理队列。 /// private async Task PollingLoopAsync(List varsToRead, int interval, CancellationToken token) { while (!token.IsCancellationRequested) { if (!varsToRead.Any()) // 如果没有变量需要轮询,则等待 { await Task.Delay(interval, token); continue; } try { // 使用 S7.Net Plus 的 ReadMultipleVarsAsync 批量读取变量 // 注意:S7.Net Plus 会将读取到的值直接更新到 Variable 对象的 DataValue 属性中 await _plcClient.ReadMultipleVarsAsync(varsToRead); foreach (var variable in varsToRead) { // 将读取到的值(包含在VariableContext中)放入数据处理队列 var context = new VariableContext(variable, variable.DataValue); await _processingQueueWriter.WriteAsync(context, token); } } catch (Exception ex) { // 记录通信错误,但不中断整个Agent _messenger.Send(new LogMessage(LogLevel.Error, ex, $"S7DeviceAgent: 设备 {_deviceConfig.Name} ({_deviceConfig.IpAddress}) 轮询错误。")); } await Task.Delay(interval, token); // 等待下一个轮询周期 } } /// /// 异步释放Agent资源,包括停止轮询任务和关闭PLC连接。 /// public async ValueTask DisposeAsync() { _cts?.Cancel(); // 取消所有内部轮询任务 if (_plcClient.IsConnected) { _plcClient.Close(); // 关闭PLC连接 } _plcClient?.Dispose(); } } ``` ## 3. 数据处理链 ### 3.1. 设计思路与考量 * **模式**:采用责任链模式(Chain of Responsibility)来处理采集到的变量数据。每个处理器(Processor)负责一个单一的数据处理步骤(如变化检测、历史存储、MQTT发布)。 * **解耦**:每个处理器都是独立的,只关心自己的处理逻辑,不关心前一个处理器如何生成数据,也不关心后一个处理器如何消费数据。 * **可扩展性**:可以轻松地添加、移除或重新排序处理步骤,而无需修改现有代码。 ### 3.2. 设计优势 * **灵活性**:可以根据业务需求动态构建不同的处理链。 * **可维护性**:每个处理步骤独立,易于理解、测试和维护。 * **可重用性**:单个处理器可以在不同的处理链中复用。 ### 3.3. 设计劣势/权衡 * **性能开销**:每个处理器都需要额外的对象创建和方法调用开销,对于极度性能敏感的场景可能需要优化。 * **调试复杂性**:当处理链较长时,跟踪数据流向可能变得复杂。 ### 3.4. `DataProcessingService.cs` (消费者) 从 `IChannelBus` 读取数据,并启动数据处理链。它作为数据处理链的入口点。 ```csharp // 文件: DMS.Infrastructure/Services/DataProcessingService.cs using DMS.Core.Models; using DMS.WPF.Services; using CommunityToolkit.Mvvm.Messaging; using System.Threading.Channels; using System.Threading.Tasks; using DMS.Infrastructure.Services.Processing; using DMS.Core.Interfaces; using DMS.WPF.Messages; using NLog; namespace DMS.Infrastructure.Services; /// /// 数据处理消费者服务,从ChannelBus中读取变量数据并启动处理链。 /// public class DataProcessingService { private readonly ChannelReader _queueReader; private readonly IMessenger _messenger; private readonly IRepositoryManager _repoManager; private static readonly ILogger _logger = LogManager.GetCurrentClassLogger(); /// /// 构造函数,通过依赖注入获取所需服务。 /// public DataProcessingService(IChannelBus channelBus, IMessenger messenger, IRepositoryManager repo) { // 从中央总线获取数据处理队列的读取端 _queueReader = channelBus.GetReader("DataProcessingQueue"); _messenger = messenger; _repoManager = repo; } /// /// 启动数据处理循环,持续从队列中读取数据并进行处理。 /// public async Task StartProcessingAsync(CancellationToken token) { await foreach (var context in _queueReader.ReadAllAsync(token)) { try { // 构建并执行数据处理链 var changeDetector = new ChangeDetectionProcessor(); // 假设这些处理器是无状态的,可以直接创建 var historyStorage = new HistoryStorageProcessor(_repoManager.VariableHistories); // 注入仓储 var mqttPublisher = new MqttPublishProcessor(_repoManager.VariableMqttAliases, _repoManager.MqttServers, _messenger); // 注入所需服务 // 链式连接处理器 changeDetector.SetNext(historyStorage).SetNext(mqttPublisher); // 启动处理 await changeDetector.ProcessAsync(context); // 处理完成后,发送消息通知UI更新变量值 _messenger.Send(new VariableValueUpdatedMessage(context.Variable.Id, context.CurrentValue)); } catch (Exception ex) { _logger.Error(ex, $"数据处理链执行错误,变量ID: {context.Variable.Id}"); } } } } ``` ### 3.5. `MqttPublishProcessor.cs` 处理器从 `Variable` 的 `MqttAliases` 集合中获取别名和目标服务器,并使用别名构建MQTT Topic。 ```csharp // 文件: DMS.Infrastructure/Services/Processing/MqttPublishProcessor.cs using DMS.Core.Interfaces; using DMS.Core.Models; using DMS.Infrastructure.Services.Communication; using CommunityToolkit.Mvvm.Messaging; using System.Linq; using System.Threading.Tasks; using System.Text.Json; using NLog; namespace DMS.Infrastructure.Services.Processing; /// /// MQTT发布处理器,负责将变量值发布到关联的MQTT服务器,并使用专属别名。 /// public class MqttPublishProcessor : VariableProcessorBase { private readonly IMqttPublishService _mqttService; private readonly IVariableMqttAliasRepository _aliasRepository; private readonly IMqttServerRepository _mqttServerRepository; private readonly IMessenger _messenger; private static readonly ILogger _logger = LogManager.GetCurrentClassLogger(); /// /// 构造函数。 /// public MqttPublishProcessor(IMqttPublishService mqttService, IVariableMqttAliasRepository aliasRepository, IMqttServerRepository mqttServerRepository, IMessenger messenger) { _mqttService = mqttService; _aliasRepository = aliasRepository; _mqttServerRepository = mqttServerRepository; _messenger = messenger; } protected override async Task HandleAsync(VariableContext context) { if (!context.IsValueChanged) return; // 如果值未变化,则不发布 // 获取与当前变量关联的所有MQTT别名信息 var aliases = await _aliasRepository.GetAliasesForVariableAsync(context.Variable.Id); if (aliases == null || !aliases.Any()) { return; // 没有关联的MQTT服务器,无需发布 } foreach (var aliasInfo in aliases) { try { // 获取关联的MQTT服务器详细信息 var targetServer = await _mqttServerRepository.GetByIdAsync(aliasInfo.MqttServerId); if (targetServer == null || !targetServer.IsActive) { _logger.Warn($"MQTT发布失败:变量 {context.Variable.Name} 关联的MQTT服务器 {aliasInfo.MqttServerId} 不存在或未激活。"); continue; } // 使用别名构建Topic // 示例Topic格式:DMS/DeviceName/VariableAlias var topic = $"DMS/{context.Variable.VariableTable.Device.Name}/{aliasInfo.Alias}"; var payload = JsonSerializer.Serialize(new { value = context.CurrentValue, timestamp = context.Timestamp }); await _mqttService.PublishAsync(targetServer, topic, payload); } catch (Exception ex) { _logger.Error(ex, $"MQTT发布失败:变量 {context.Variable.Name} 到服务器 {aliasInfo.MqttServerId},别名 {aliasInfo.Alias}"); } } } } ```