using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace DMS.WPF.Services
{
///
/// 标记接口,用于标识可以通过ChannelBusService发送的消息。
///
public interface IChannelMessage { }
///
/// 提供基于System.Threading.Channels的消息发布/订阅机制,实现组件解耦。
///
public class ChannelBusService
{
// 使用ConcurrentDictionary存储不同消息类型的Channel
private readonly ConcurrentDictionary _channels = new ConcurrentDictionary();
///
/// 异步发布一条消息到对应的Channel。
///
/// 消息的类型,必须实现IChannelMessage接口。
/// 要发布的消息实例。
/// 取消令牌。
/// 表示异步操作的Task。
public async ValueTask PublishAsync(TMessage message, CancellationToken cancellationToken = default)
where TMessage : IChannelMessage
{
var channel = GetOrCreateChannel();
await channel.Writer.WriteAsync(message, cancellationToken);
}
///
/// 获取指定消息类型的ChannelReader,用于订阅消息。
///
/// 要订阅的消息类型,必须实现IChannelMessage接口。
/// 指定消息类型的ChannelReader。
public ChannelReader Subscribe()
where TMessage : IChannelMessage
{
var channel = GetOrCreateChannel();
return channel.Reader;
}
///
/// 获取或创建指定消息类型的Channel。
///
/// 消息类型。
/// 指定消息类型的Channel。
private Channel GetOrCreateChannel()
where TMessage : IChannelMessage
{
// 使用GetOrAdd方法确保线程安全地获取或创建Channel
return (Channel)_channels.GetOrAdd(typeof(TMessage), _ => Channel.CreateUnbounded());
}
}
}