Files
DMS/软件设计文档/原始文档/06-核心服务-中央通道总线设计.md

6.7 KiB
Raw Blame History

06. 核心服务 - 中央通道总线设计

本文档详细阐述了 IChannelBus 核心服务的设计与实现。该服务是整个应用程序高性能、异步通信的骨架。

1. 设计理念

1.1. 设计思路与考量

  • 解耦通信:在复杂的后台系统中,不同的组件(如数据采集器、数据处理器、日志记录器)之间需要进行高性能的异步通信。直接传递 Channel<T> 实例或使用全局静态变量会导致紧耦合和难以管理。
  • 统一管理ChannelBus 旨在提供一个统一的、可注入的中央服务,用于创建、注册和分发命名通道(System.Threading.Channels.Channel<T>)。
  • 生产者/消费者模式:通过 ChannelWriter<T>ChannelReader<T>,实现生产者和消费者之间的完全解耦,它们只通过约定的通道名称进行通信,无需知道对方的存在。

1.2. 设计优势

  • 高度解耦:生产者和消费者之间没有直接引用,它们只依赖于 IChannelBus 接口和通道名称。这极大地提高了模块的独立性和可维护性。
  • 高性能异步通信System.Threading.Channels 是.NET中专门为高性能异步生产者/消费者场景设计的,提供了优秀的吞吐量和低延迟。
  • 可扩展性:可以轻松添加新的通信通道,只需定义新的通道名称和数据类型,而无需修改现有代码。
  • 集中管理:所有通道的生命周期和实例都由 ChannelBusService 统一管理,避免了通道实例的混乱和泄露。
  • 易于测试在单元测试中可以轻松地Mock IChannelBus 接口,隔离测试组件。

1.3. 设计劣势/权衡

  • 约定依赖:生产者和消费者必须约定好通道的名称和数据类型,如果约定不一致,可能导致运行时错误。
  • 调试复杂性:由于高度解耦,在调试数据流时,可能需要跟踪多个组件和通道。
  • 错误处理:通道内部的错误处理需要谨慎设计,以防止数据丢失或死锁。

2. 接口与实现 (DMS.WPF)

该服务被定义在WPF项目中因为它是一个应用级的、协调性的核心服务可以被所有层通过DI访问。

IChannelBus.cs

// 文件: DMS.WPF/Services/IChannelBus.cs
using System.Threading.Channels;

namespace DMS.WPF.Services;

/// <summary>
/// 定义了一个中央通道总线,用于在应用程序的不同部分之间创建和分发高性能的、解耦的内存消息通道。
/// </summary>
public interface IChannelBus
{
    /// <summary>
    /// 获取指定名称和类型的通道的写入器。
    /// 如果具有该名称的通道不存在,则会自动创建。
    /// </summary>
    /// <typeparam name="T">通道中流动的数据类型。</typeparam>
    /// <param name="channelName">通道的唯一标识名称,例如 "DataProcessingQueue"。</param>
    /// <returns>一个用于向通道写入数据的 ChannelWriter<T>。</returns>
    ChannelWriter<T> GetWriter<T>(string channelName);

    /// <summary>
    /// 获取指定名称和类型的通道的读取器。
    /// 如果具有该名称的通道不存在,则会自动创建。
    /// </summary>
    /// <typeparam name="T">通道中流动的数据类型。</typeparam>
    /// <param name="channelName">通道的唯一标识名称,例如 "DataProcessingQueue"。</param>
    /// <returns>一个用于从通道读取数据的 ChannelReader<T>。</returns>
    ChannelReader<T> GetReader<T>(string channelName);
}

ChannelBusService.cs

// 文件: DMS.WPF/Services/ChannelBusService.cs
using System.Collections.Concurrent;
using System.Threading.Channels;

namespace DMS.WPF.Services;

/// <summary>
/// IChannelBus的单例实现管理应用程序中所有命名的通道。
/// </summary>
public class ChannelBusService : IChannelBus
{
    private readonly ConcurrentDictionary<string, object> _channels;

    /// <summary>
    /// 构造函数,初始化通道字典。
    /// </summary>
    public ChannelBusService()
    {
        _channels = new ConcurrentDictionary<string, object>();
    }

    /// <summary>
    /// 获取指定名称和类型的通道的写入器。
    /// </summary>
    public ChannelWriter<T> GetWriter<T>(string channelName)
    {
        // GetOrAdd 是一个原子操作,能防止多个线程同时创建同一个通道的竞态条件。
        // 如果通道不存在,则创建新的无界通道。
        var channel = (Channel<T>)_channels.GetOrAdd(
            channelName,
            _ => Channel.CreateUnbounded<T>()
        );
        return channel.Writer;
    }

    /// <summary>
    /// 获取指定名称和类型的通道的读取器。
    /// </summary>
    public ChannelReader<T> GetReader<T>(string channelName)
    {
        // 同样使用 GetOrAdd 来确保获取到的是同一个通道实例。
        var channel = (Channel<T>)_channels.GetOrAdd(
            channelName,
            _ => Channel.CreateUnbounded<T>()
        );
        return channel.Reader;
    }
}

3. 依赖注入 (App.xaml.cs)

3.1. 设计思路与考量

  • 单例注册IChannelBus 必须作为单例注册在DI容器中。这是因为 ChannelBusService 内部维护着所有通道的集合,如果不是单例,每次注入都会得到一个新的 ChannelBusService 实例,导致通道无法共享。

3.2. 设计优势

  • 全局可访问一旦注册为单例应用程序的任何部分都可以通过DI获取 IChannelBus 实例,并访问共享的通信通道。
  • 资源效率:确保只有一个 ChannelBusService 实例和其内部的通道集合,避免了不必要的资源创建和管理开销。

3.3. 示例代码

// 文件: DMS.WPF/App.xaml.cs
private void ConfigureServices(IServiceCollection services)
{
    // ... 其他服务注册

    // 注册中央通道总线为单例
    services.AddSingleton<IChannelBus, ChannelBusService>();

    // ...
}

4. 应用场景

4.1. 数据处理队列

  • 生产者S7DeviceAgent (在 DMS.Infrastructure 中) 通过 channelBus.GetWriter<VariableContext>("DataProcessingQueue") 写入采集到的变量数据。
  • 消费者DataProcessingService (在 DMS.Infrastructure 中) 通过 channelBus.GetReader<VariableContext>("DataProcessingQueue") 读取数据,并启动数据处理链。

4.2. 异步日志队列 (扩展)

  • 生产者:任何需要记录日志的组件都可以通过 channelBus.GetWriter<LogEntry>("LoggingQueue") 写入日志事件。
  • 消费者:一个专门的后台日志服务(例如,一个 LogConsumerService)则负责从该通道读取日志事件,并批量写入数据库,从而进一步提升日志写入性能,并避免阻塞业务线程。