Files
DMS/DMS.WPF/Services/ChannelBusService.cs

61 lines
2.5 KiB
C#
Raw Permalink Normal View History

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace DMS.WPF.Services
{
/// <summary>
/// 标记接口用于标识可以通过ChannelBusService发送的消息。
/// </summary>
public interface IChannelMessage { }
/// <summary>
/// 提供基于System.Threading.Channels的消息发布/订阅机制,实现组件解耦。
/// </summary>
public class ChannelBusService
{
// 使用ConcurrentDictionary存储不同消息类型的Channel
private readonly ConcurrentDictionary<Type, object> _channels = new ConcurrentDictionary<Type, object>();
/// <summary>
/// 异步发布一条消息到对应的Channel。
/// </summary>
/// <typeparam name="TMessage">消息的类型必须实现IChannelMessage接口。</typeparam>
/// <param name="message">要发布的消息实例。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>表示异步操作的Task。</returns>
public async ValueTask PublishAsync<TMessage>(TMessage message, CancellationToken cancellationToken = default)
where TMessage : IChannelMessage
{
var channel = GetOrCreateChannel<TMessage>();
await channel.Writer.WriteAsync(message, cancellationToken);
}
/// <summary>
/// 获取指定消息类型的ChannelReader用于订阅消息。
/// </summary>
/// <typeparam name="TMessage">要订阅的消息类型必须实现IChannelMessage接口。</typeparam>
/// <returns>指定消息类型的ChannelReader。</returns>
public ChannelReader<TMessage> Subscribe<TMessage>()
where TMessage : IChannelMessage
{
var channel = GetOrCreateChannel<TMessage>();
return channel.Reader;
}
/// <summary>
/// 获取或创建指定消息类型的Channel。
/// </summary>
/// <typeparam name="TMessage">消息类型。</typeparam>
/// <returns>指定消息类型的Channel。</returns>
private Channel<TMessage> GetOrCreateChannel<TMessage>()
where TMessage : IChannelMessage
{
// 使用GetOrAdd方法确保线程安全地获取或创建Channel
return (Channel<TMessage>)_channels.GetOrAdd(typeof(TMessage), _ => Channel.CreateUnbounded<TMessage>());
}
}
}