Files
DMS/软件设计文档/原始文档/05-基础设施层-后台服务与通信.md

18 KiB
Raw Blame History

05. 基础设施层 - 后台服务与通信

本文档详细设计了 DMS.Infrastructure 层中负责与外部世界如PLC、MQTT Broker进行通信的后台服务。

1. 目录结构

DMS.Infrastructure/
├── Services/
│   ├── Communication/
│   │   ├── S7DeviceAgent.cs
│   │   └── MqttPublishService.cs
│   ├── Processing/
│   │   ├── ChangeDetectionProcessor.cs
│   │   ├── HistoryStorageProcessor.cs
│   │   └── MqttPublishProcessor.cs
│   ├── S7BackgroundService.cs
│   └── DataProcessingService.cs
└── ...

2. S7通信架构 (“编排者-代理”模式)

2.1. 设计思路与考量

  • 模式:采用“编排者-代理”Orchestrator-Agent模式。S7BackgroundService 作为编排者负责管理所有S7设备的生命周期每个 S7DeviceAgent 作为代理,专门负责与一个S7 PLC进行所有交互。
  • 职责分离:将设备管理(启动、停止、配置更新)与具体设备通信(连接、轮询、读写)的职责分离。
  • 并发性:每个 S7DeviceAgent 独立运行,可以并行处理多个设备的通信,提高系统吞吐量。
  • 热重载:通过消息机制,允许在运行时动态更新设备的变量配置,而无需重启整个服务。

2.2. 设计优势

  • 高可靠性:单个设备的通信故障不会影响其他设备的正常运行。
  • 可扩展性:易于添加新的设备类型或通信协议,只需实现新的 DeviceAgent
  • 动态配置:支持运行时修改设备和变量配置,提高了系统的灵活性和可用性。
  • 资源隔离每个Agent管理自己的连接和资源避免资源争用。

2.3. 设计劣势/权衡

  • 复杂性增加引入了Agent的概念和Agent与Orchestrator之间的通信机制增加了初期设计和实现复杂性。
  • 资源消耗每个Agent可能需要维护独立的连接和线程当设备数量非常庞大时可能会增加资源消耗。

2.4. S7BackgroundService.cs (编排者)

作为 IHostedService 运行负责从数据库加载激活的S7设备并为每个设备创建和管理 S7DeviceAgent 实例。它还监听配置变更消息以触发Agent的热重载。

// 文件: DMS.Infrastructure/Services/S7BackgroundService.cs
using Microsoft.Extensions.Hosting;
using DMS.Core.Interfaces;
using DMS.WPF.Services;
using CommunityToolkit.Mvvm.Messaging;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using DMS.Core.Enums;
using DMS.WPF.Messages;

namespace DMS.Infrastructure.Services;

/// <summary>
/// S7后台服务编排者作为IHostedService运行管理所有S7设备的通信代理。
/// </summary>
public class S7BackgroundService : IHostedService
{
    private readonly IRepositoryManager _repoManager;
    private readonly IChannelBus _channelBus;
    private readonly IMessenger _messenger;
    private readonly ConcurrentDictionary<int, S7DeviceAgent> _activeAgents = new();

    /// <summary>
    /// 构造函数,通过依赖注入获取所需服务。
    /// </summary>
    public S7BackgroundService(IRepositoryManager repo, IChannelBus bus, IMessenger msg)
    {
        _repoManager = repo;
        _channelBus = bus;
        _messenger = msg;
        // 注册配置变更消息以便在设备或变量配置更新时通知Agent
        _messenger.Register<ConfigChangedMessage>(this, async (r, m) => await HandleConfigChange(m));
    }

    /// <summary>
    /// 服务启动时调用加载所有激活的S7设备并启动其代理。
    /// </summary>
    public async Task StartAsync(CancellationToken cancellationToken)
    {
        // 获取所有激活的S7设备及其详细信息
        var s7Devices = await _repoManager.Devices.GetActiveDevicesWithDetailsAsync(ProtocolType.S7);
        foreach (var device in s7Devices)
        {
            // 为每个设备创建一个S7DeviceAgent实例
            var agent = new S7DeviceAgent(device, _channelBus, _messenger);
            if (_activeAgents.TryAdd(device.Id, agent))
            {
                // 启动Agent的通信循环
                await agent.StartAsync(cancellationToken);
            }
        }

        // 启动数据处理消费者服务它将从ChannelBus中读取数据
        var dataProcessor = new DataProcessingService(_channelBus, _messenger, _repoManager);
        _ = dataProcessor.StartProcessingAsync(cancellationToken); // 在后台运行,不阻塞启动
    }

    /// <summary>
    /// 处理配置变更消息通知相关Agent更新其变量列表。
    /// </summary>
    private async Task HandleConfigChange(ConfigChangedMessage message)
    {
        // 从数据库重新加载受影响的设备及其最新配置
        var updatedDevice = await _repoManager.Devices.GetDeviceWithDetailsAsync(message.DeviceId);
        if (updatedDevice != null && _activeAgents.TryGetValue(message.DeviceId, out var agent))
        {
            // 指示Agent使用新的变量列表进行热重载
            agent.UpdateVariableLists(updatedDevice.VariableTables.SelectMany(vt => vt.Variables).ToList());
        }
    }

    /// <summary>
    /// 服务停止时调用停止所有活动的Agent并释放资源。
    /// </summary>
    public async Task StopAsync(CancellationToken cancellationToken)
    {
        foreach (var agent in _activeAgents.Values)
        {
            await agent.DisposeAsync();
        }
        _activeAgents.Clear();
    }
}

2.5. S7DeviceAgent.cs (代理)

负责与单个PLC建立连接、维护连接、按不同频率并行轮询变量并将读取到的数据通过 IChannelBus 写入数据处理队列。

// 文件: DMS.Infrastructure/Services/Communication/S7DeviceAgent.cs
using S7.Net;
using DMS.Core.Models;
using DMS.Core.Enums;
using DMS.WPF.Services;
using CommunityToolkit.Mvvm.Messaging;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using System;

namespace DMS.Infrastructure.Services.Communication;

/// <summary>
/// 单个S7 PLC的通信代理负责连接、轮询和数据发送。
/// </summary>
public class S7DeviceAgent : IAsyncDisposable
{
    private readonly Device _deviceConfig;
    private readonly ChannelWriter<VariableContext> _processingQueueWriter;
    private readonly IMessenger _messenger;
    private readonly Plc _plcClient;
    private CancellationTokenSource _cts; // 用于控制Agent内部的轮询任务

    // 存储按轮询级别分组的变量列表
    private List<Variable> _highFreqVars = new();
    private List<Variable> _mediumFreqVars = new();
    private List<Variable> _lowFreqVars = new();

    /// <summary>
    /// 构造函数。
    /// </summary>
    /// <param name="device">设备的配置信息。</param>
    /// <param name="channelBus">中央通道总线服务。</param>
    /// <param name="messenger">消息总线服务。</param>
    public S7DeviceAgent(Device device, IChannelBus channelBus, IMessenger messenger)
    {
        _deviceConfig = device;
        _messenger = messenger;
        // 从中央总线获取指定名称的通道的写入端
        _processingQueueWriter = channelBus.GetWriter<VariableContext>("DataProcessingQueue");

        // 初始化S7.Net PLC客户端
        _plcClient = new Plc(
            (CpuType)Enum.Parse(typeof(CpuType), _deviceConfig.Protocol.ToString()), // 根据协议类型解析CPU类型
            _deviceConfig.IpAddress,
            (short)_deviceConfig.Rack,
            (short)_deviceConfig.Slot
        );
    }

    /// <summary>
    /// 启动Agent的通信循环。
    /// </summary>
    public async Task StartAsync(CancellationToken cancellationToken = default)
    {
        _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
        await _plcClient.OpenAsync(); // 建立PLC连接

        // 初始加载变量列表并分组
        UpdateVariableLists(_deviceConfig.VariableTables.SelectMany(vt => vt.Variables).ToList());

        // 启动并行的轮询任务,每个轮询级别一个任务
        _ = Task.Run(() => PollingLoopAsync(_highFreqVars, 200, _cts.Token)); // 高频200ms
        _ = Task.Run(() => PollingLoopAsync(_mediumFreqVars, 1000, _cts.Token)); // 中频1000ms
        _ = Task.Run(() => PollingLoopAsync(_lowFreqVars, 5000, _cts.Token)); // 低频5000ms
    }

    /// <summary>
    /// 热重载方法用于响应配置变更更新Agent内部的变量列表。
    /// </summary>
    /// <param name="allActiveVariables">最新的所有激活变量列表。</param>
    public void UpdateVariableLists(List<Variable> allActiveVariables)
    {
        // 重新分组变量
        _highFreqVars = allActiveVariables.Where(v => v.PollLevel == PollLevelType.High).ToList();
        _mediumFreqVars = allActiveVariables.Where(v => v.PollLevel == PollLevelType.Medium).ToList();
        _lowFreqVars = allActiveVariables.Where(v => v.PollLevel == PollLevelType.Low).ToList();
        // 可以考虑在这里重新启动轮询任务,或者让现有任务检测到列表变化
    }

    /// <summary>
    /// 核心轮询循环,负责批量读取变量并将数据写入处理队列。
    /// </summary>
    private async Task PollingLoopAsync(List<Variable> varsToRead, int interval, CancellationToken token)
    {
        while (!token.IsCancellationRequested)
        {
            if (!varsToRead.Any()) // 如果没有变量需要轮询,则等待
            {
                await Task.Delay(interval, token);
                continue;
            }

            try
            {
                // 使用 S7.Net Plus 的 ReadMultipleVarsAsync 批量读取变量
                // 注意S7.Net Plus 会将读取到的值直接更新到 Variable 对象的 DataValue 属性中
                await _plcClient.ReadMultipleVarsAsync(varsToRead);

                foreach (var variable in varsToRead)
                {
                    // 将读取到的值包含在VariableContext中放入数据处理队列
                    var context = new VariableContext(variable, variable.DataValue);
                    await _processingQueueWriter.WriteAsync(context, token);
                }
            }
            catch (Exception ex)
            {
                // 记录通信错误但不中断整个Agent
                _messenger.Send(new LogMessage(LogLevel.Error, ex, $"S7DeviceAgent: 设备 {_deviceConfig.Name} ({_deviceConfig.IpAddress}) 轮询错误。"));
            }
            await Task.Delay(interval, token); // 等待下一个轮询周期
        }
    }

    /// <summary>
    /// 异步释放Agent资源包括停止轮询任务和关闭PLC连接。
    /// </summary>
    public async ValueTask DisposeAsync()
    {
        _cts?.Cancel(); // 取消所有内部轮询任务
        if (_plcClient.IsConnected)
        {
            _plcClient.Close(); // 关闭PLC连接
        }
        _plcClient?.Dispose();
    }
}

3. 数据处理链

3.1. 设计思路与考量

  • 模式采用责任链模式Chain of Responsibility来处理采集到的变量数据。每个处理器Processor负责一个单一的数据处理步骤如变化检测、历史存储、MQTT发布
  • 解耦:每个处理器都是独立的,只关心自己的处理逻辑,不关心前一个处理器如何生成数据,也不关心后一个处理器如何消费数据。
  • 可扩展性:可以轻松地添加、移除或重新排序处理步骤,而无需修改现有代码。

3.2. 设计优势

  • 灵活性:可以根据业务需求动态构建不同的处理链。
  • 可维护性:每个处理步骤独立,易于理解、测试和维护。
  • 可重用性:单个处理器可以在不同的处理链中复用。

3.3. 设计劣势/权衡

  • 性能开销:每个处理器都需要额外的对象创建和方法调用开销,对于极度性能敏感的场景可能需要优化。
  • 调试复杂性:当处理链较长时,跟踪数据流向可能变得复杂。

3.4. DataProcessingService.cs (消费者)

IChannelBus 读取数据,并启动数据处理链。它作为数据处理链的入口点。

// 文件: DMS.Infrastructure/Services/DataProcessingService.cs
using DMS.Core.Models;
using DMS.WPF.Services;
using CommunityToolkit.Mvvm.Messaging;
using System.Threading.Channels;
using System.Threading.Tasks;
using DMS.Infrastructure.Services.Processing;
using DMS.Core.Interfaces;
using DMS.WPF.Messages;
using NLog;

namespace DMS.Infrastructure.Services;

/// <summary>
/// 数据处理消费者服务从ChannelBus中读取变量数据并启动处理链。
/// </summary>
public class DataProcessingService
{
    private readonly ChannelReader<VariableContext> _queueReader;
    private readonly IMessenger _messenger;
    private readonly IRepositoryManager _repoManager;
    private static readonly ILogger _logger = LogManager.GetCurrentClassLogger();

    /// <summary>
    /// 构造函数,通过依赖注入获取所需服务。
    /// </summary>
    public DataProcessingService(IChannelBus channelBus, IMessenger messenger, IRepositoryManager repo)
    {
        // 从中央总线获取数据处理队列的读取端
        _queueReader = channelBus.GetReader<VariableContext>("DataProcessingQueue");
        _messenger = messenger;
        _repoManager = repo;
    }

    /// <summary>
    /// 启动数据处理循环,持续从队列中读取数据并进行处理。
    /// </summary>
    public async Task StartProcessingAsync(CancellationToken token)
    {
        await foreach (var context in _queueReader.ReadAllAsync(token))
        {
            try
            {
                // 构建并执行数据处理链
                var changeDetector = new ChangeDetectionProcessor(); // 假设这些处理器是无状态的,可以直接创建
                var historyStorage = new HistoryStorageProcessor(_repoManager.VariableHistories); // 注入仓储
                var mqttPublisher = new MqttPublishProcessor(_repoManager.VariableMqttAliases, _repoManager.MqttServers, _messenger); // 注入所需服务

                // 链式连接处理器
                changeDetector.SetNext(historyStorage).SetNext(mqttPublisher);

                // 启动处理
                await changeDetector.ProcessAsync(context);

                // 处理完成后发送消息通知UI更新变量值
                _messenger.Send(new VariableValueUpdatedMessage(context.Variable.Id, context.CurrentValue));
            }
            catch (Exception ex)
            {
                _logger.Error(ex, $"数据处理链执行错误变量ID: {context.Variable.Id}");
            }
        }
    }
}

3.5. MqttPublishProcessor.cs

处理器从 VariableMqttAliases 集合中获取别名和目标服务器并使用别名构建MQTT Topic。

// 文件: DMS.Infrastructure/Services/Processing/MqttPublishProcessor.cs
using DMS.Core.Interfaces;
using DMS.Core.Models;
using DMS.Infrastructure.Services.Communication;
using CommunityToolkit.Mvvm.Messaging;
using System.Linq;
using System.Threading.Tasks;
using System.Text.Json;
using NLog;

namespace DMS.Infrastructure.Services.Processing;

/// <summary>
/// MQTT发布处理器负责将变量值发布到关联的MQTT服务器并使用专属别名。
/// </summary>
public class MqttPublishProcessor : VariableProcessorBase
{
    private readonly IMqttPublishService _mqttService;
    private readonly IVariableMqttAliasRepository _aliasRepository;
    private readonly IMqttServerRepository _mqttServerRepository;
    private readonly IMessenger _messenger;
    private static readonly ILogger _logger = LogManager.GetCurrentClassLogger();

    /// <summary>
    /// 构造函数。
    /// </summary>
    public MqttPublishProcessor(IMqttPublishService mqttService, IVariableMqttAliasRepository aliasRepository, IMqttServerRepository mqttServerRepository, IMessenger messenger)
    {
        _mqttService = mqttService;
        _aliasRepository = aliasRepository;
        _mqttServerRepository = mqttServerRepository;
        _messenger = messenger;
    }

    protected override async Task HandleAsync(VariableContext context)
    {
        if (!context.IsValueChanged) return; // 如果值未变化,则不发布

        // 获取与当前变量关联的所有MQTT别名信息
        var aliases = await _aliasRepository.GetAliasesForVariableAsync(context.Variable.Id);

        if (aliases == null || !aliases.Any())
        {
            return; // 没有关联的MQTT服务器无需发布
        }

        foreach (var aliasInfo in aliases)
        {
            try
            {
                // 获取关联的MQTT服务器详细信息
                var targetServer = await _mqttServerRepository.GetByIdAsync(aliasInfo.MqttServerId);
                if (targetServer == null || !targetServer.IsActive)
                {
                    _logger.Warn($"MQTT发布失败变量 {context.Variable.Name} 关联的MQTT服务器 {aliasInfo.MqttServerId} 不存在或未激活。");
                    continue;
                }

                // 使用别名构建Topic
                // 示例Topic格式DMS/DeviceName/VariableAlias
                var topic = $"DMS/{context.Variable.VariableTable.Device.Name}/{aliasInfo.Alias}";
                var payload = JsonSerializer.Serialize(new { value = context.CurrentValue, timestamp = context.Timestamp });

                await _mqttService.PublishAsync(targetServer, topic, payload);
            }
            catch (Exception ex)
            {
                _logger.Error(ex, $"MQTT发布失败变量 {context.Variable.Name} 到服务器 {aliasInfo.MqttServerId},别名 {aliasInfo.Alias}");
            }
        }
    }
}