# 06. 核心服务 - 中央通道总线设计 本文档详细阐述了 `IChannelBus` 核心服务的设计与实现。该服务是整个应用程序高性能、异步通信的骨架。 ## 1. 设计理念 ### 1.1. 设计思路与考量 * **解耦通信**:在复杂的后台系统中,不同的组件(如数据采集器、数据处理器、日志记录器)之间需要进行高性能的异步通信。直接传递 `Channel` 实例或使用全局静态变量会导致紧耦合和难以管理。 * **统一管理**:`ChannelBus` 旨在提供一个统一的、可注入的中央服务,用于创建、注册和分发命名通道(`System.Threading.Channels.Channel`)。 * **生产者/消费者模式**:通过 `ChannelWriter` 和 `ChannelReader`,实现生产者和消费者之间的完全解耦,它们只通过约定的通道名称进行通信,无需知道对方的存在。 ### 1.2. 设计优势 * **高度解耦**:生产者和消费者之间没有直接引用,它们只依赖于 `IChannelBus` 接口和通道名称。这极大地提高了模块的独立性和可维护性。 * **高性能异步通信**:`System.Threading.Channels` 是.NET中专门为高性能异步生产者/消费者场景设计的,提供了优秀的吞吐量和低延迟。 * **可扩展性**:可以轻松添加新的通信通道,只需定义新的通道名称和数据类型,而无需修改现有代码。 * **集中管理**:所有通道的生命周期和实例都由 `ChannelBusService` 统一管理,避免了通道实例的混乱和泄露。 * **易于测试**:在单元测试中,可以轻松地Mock `IChannelBus` 接口,隔离测试组件。 ### 1.3. 设计劣势/权衡 * **约定依赖**:生产者和消费者必须约定好通道的名称和数据类型,如果约定不一致,可能导致运行时错误。 * **调试复杂性**:由于高度解耦,在调试数据流时,可能需要跟踪多个组件和通道。 * **错误处理**:通道内部的错误处理需要谨慎设计,以防止数据丢失或死锁。 ## 2. 接口与实现 (`DMS.WPF`) 该服务被定义在WPF项目中,因为它是一个应用级的、协调性的核心服务,可以被所有层(通过DI)访问。 ### `IChannelBus.cs` ```csharp // 文件: DMS.WPF/Services/IChannelBus.cs using System.Threading.Channels; namespace DMS.WPF.Services; /// /// 定义了一个中央通道总线,用于在应用程序的不同部分之间创建和分发高性能的、解耦的内存消息通道。 /// public interface IChannelBus { /// /// 获取指定名称和类型的通道的写入器。 /// 如果具有该名称的通道不存在,则会自动创建。 /// /// 通道中流动的数据类型。 /// 通道的唯一标识名称,例如 "DataProcessingQueue"。 /// 一个用于向通道写入数据的 ChannelWriter ChannelWriter GetWriter(string channelName); /// /// 获取指定名称和类型的通道的读取器。 /// 如果具有该名称的通道不存在,则会自动创建。 /// /// 通道中流动的数据类型。 /// 通道的唯一标识名称,例如 "DataProcessingQueue"。 /// 一个用于从通道读取数据的 ChannelReader ChannelReader GetReader(string channelName); } ``` ### `ChannelBusService.cs` ```csharp // 文件: DMS.WPF/Services/ChannelBusService.cs using System.Collections.Concurrent; using System.Threading.Channels; namespace DMS.WPF.Services; /// /// IChannelBus的单例实现,管理应用程序中所有命名的通道。 /// public class ChannelBusService : IChannelBus { private readonly ConcurrentDictionary _channels; /// /// 构造函数,初始化通道字典。 /// public ChannelBusService() { _channels = new ConcurrentDictionary(); } /// /// 获取指定名称和类型的通道的写入器。 /// public ChannelWriter GetWriter(string channelName) { // GetOrAdd 是一个原子操作,能防止多个线程同时创建同一个通道的竞态条件。 // 如果通道不存在,则创建新的无界通道。 var channel = (Channel)_channels.GetOrAdd( channelName, _ => Channel.CreateUnbounded() ); return channel.Writer; } /// /// 获取指定名称和类型的通道的读取器。 /// public ChannelReader GetReader(string channelName) { // 同样使用 GetOrAdd 来确保获取到的是同一个通道实例。 var channel = (Channel)_channels.GetOrAdd( channelName, _ => Channel.CreateUnbounded() ); return channel.Reader; } } ``` ## 3. 依赖注入 (`App.xaml.cs`) ### 3.1. 设计思路与考量 * **单例注册**:`IChannelBus` 必须作为**单例**注册在DI容器中。这是因为 `ChannelBusService` 内部维护着所有通道的集合,如果不是单例,每次注入都会得到一个新的 `ChannelBusService` 实例,导致通道无法共享。 ### 3.2. 设计优势 * **全局可访问**:一旦注册为单例,应用程序的任何部分都可以通过DI获取 `IChannelBus` 实例,并访问共享的通信通道。 * **资源效率**:确保只有一个 `ChannelBusService` 实例和其内部的通道集合,避免了不必要的资源创建和管理开销。 ### 3.3. 示例代码 ```csharp // 文件: DMS.WPF/App.xaml.cs private void ConfigureServices(IServiceCollection services) { // ... 其他服务注册 // 注册中央通道总线为单例 services.AddSingleton(); // ... } ``` ## 4. 应用场景 ### 4.1. 数据处理队列 * **生产者**:`S7DeviceAgent` (在 `DMS.Infrastructure` 中) 通过 `channelBus.GetWriter("DataProcessingQueue")` 写入采集到的变量数据。 * **消费者**:`DataProcessingService` (在 `DMS.Infrastructure` 中) 通过 `channelBus.GetReader("DataProcessingQueue")` 读取数据,并启动数据处理链。 ### 4.2. 异步日志队列 (扩展) * **生产者**:任何需要记录日志的组件都可以通过 `channelBus.GetWriter("LoggingQueue")` 写入日志事件。 * **消费者**:一个专门的后台日志服务(例如,一个 `LogConsumerService`)则负责从该通道读取日志事件,并批量写入数据库,从而进一步提升日志写入性能,并避免阻塞业务线程。