RedisEventSourceStorer.cs 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. using System.Threading.Channels;
  2. namespace Admin.NET.Core;
  3. /// <summary>
  4. /// Redis自定义事件源存储器
  5. /// </summary>
  6. public class RedisEventSourceStorer : IEventSourceStorer
  7. {
  8. private readonly Channel<IEventSource> _channel; // 内存通道事件源存储器
  9. private readonly FullRedis _redis;
  10. // private readonly string _topic = "eventbus";
  11. public RedisEventSourceStorer(ICache redis, int capacity = 10000)
  12. {
  13. _redis = (FullRedis)redis;
  14. // 配置通道(超出默认容量后进入等待)
  15. var boundedChannelOptions = new BoundedChannelOptions(capacity)
  16. {
  17. FullMode = BoundedChannelFullMode.Wait
  18. };
  19. // 创建有限容量通道
  20. _channel = Channel.CreateBounded<IEventSource>(boundedChannelOptions);
  21. //RedisHelper.Subscribe((_topic, msg =>
  22. //{
  23. // var eventSource = JSON.Deserialize<ChannelEventSource>(msg.Body);
  24. // // 写入内存管道存储器
  25. // _channel!.Writer.TryWrite(eventSource);
  26. //}
  27. //));
  28. }
  29. /// <summary>
  30. /// 往 Redis 中写入一条
  31. /// </summary>
  32. /// <param name="eventSource"></param>
  33. /// <param name="cancellationToken"></param>
  34. public async ValueTask WriteAsync(IEventSource eventSource, CancellationToken cancellationToken)
  35. {
  36. if (eventSource is ChannelEventSource es)
  37. {
  38. //await RedisHelper.PublishAsync(_topic, JSON.Serialize(es));
  39. }
  40. else
  41. {
  42. await _channel.Writer.WriteAsync(eventSource, cancellationToken);
  43. }
  44. }
  45. /// <summary>
  46. /// 从 Redis 中读取一条
  47. /// </summary>
  48. /// <param name="cancellationToken"></param>
  49. /// <returns></returns>
  50. public async ValueTask<IEventSource> ReadAsync(CancellationToken cancellationToken)
  51. {
  52. return await _channel.Reader.ReadAsync(cancellationToken);
  53. }
  54. }