using NewLife.Caching;
using System.Threading.Channels;
namespace Admin.NET.Core;
///
/// Redis自定义事件源存储器
///
public class RedisEventSourceStorer : IEventSourceStorer
{
private readonly Channel _channel; // 内存通道事件源存储器
private readonly FullRedis _redis;
private readonly string _topic = "eventbus";
public RedisEventSourceStorer(ICache redis, int capacity = 10000)
{
_redis = (FullRedis)redis;
// 配置通道(超出默认容量后进入等待)
var boundedChannelOptions = new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.Wait
};
// 创建有限容量通道
_channel = Channel.CreateBounded(boundedChannelOptions);
//RedisHelper.Subscribe((_topic, msg =>
//{
// var eventSource = JSON.Deserialize(msg.Body);
// // 写入内存管道存储器
// _channel!.Writer.TryWrite(eventSource);
//}
//));
}
///
/// 往 Redis 中写入一条
///
///
///
public async ValueTask WriteAsync(IEventSource eventSource, CancellationToken cancellationToken)
{
if (eventSource is ChannelEventSource es)
{
//await RedisHelper.PublishAsync(_topic, JSON.Serialize(es));
}
else
{
await _channel.Writer.WriteAsync(eventSource, cancellationToken);
}
}
///
/// 从 Redis 中读取一条
///
///
///
public async ValueTask ReadAsync(CancellationToken cancellationToken)
{
return await _channel.Reader.ReadAsync(cancellationToken);
}
}