using System;
using System.Collections.Concurrent;
using System.Threading.Channels;
using DMS.Infrastructure.Interfaces;
namespace DMS.Infrastructure.Services
{
///
/// 通道总线实现,用于在不同组件之间传递数据
///
public class ChannelBus : IChannelBus
{
private readonly ConcurrentDictionary _channels = new ConcurrentDictionary();
///
/// 获取指定名称的通道写入器
///
/// 通道中传递的数据类型
/// 通道名称
/// 通道写入器
public ChannelWriter GetWriter(string channelName)
{
var channel = GetOrCreateChannel(channelName);
return channel.Writer;
}
///
/// 获取指定名称的通道读取器
///
/// 通道中传递的数据类型
/// 通道名称
/// 通道读取器
public ChannelReader GetReader(string channelName)
{
var channel = GetOrCreateChannel(channelName);
return channel.Reader;
}
///
/// 创建指定名称的通道
///
/// 通道中传递的数据类型
/// 通道名称
/// 通道容量
public void CreateChannel(string channelName, int capacity = 100)
{
_channels.GetOrAdd(channelName, _ => Channel.CreateBounded(capacity));
}
///
/// 关闭指定名称的通道
///
/// 通道中传递的数据类型
/// 通道名称
public void CloseChannel(string channelName)
{
if (_channels.TryRemove(channelName, out var channel))
{
if (channel is Channel typedChannel)
{
typedChannel.Writer.Complete();
}
}
}
///
/// 获取或创建指定名称的通道
///
/// 通道中传递的数据类型
/// 通道名称
/// 通道
private Channel GetOrCreateChannel(string channelName)
{
return (Channel)_channels.GetOrAdd(channelName, _ => Channel.CreateBounded(100));
}
}
}