RedisEventSourceStorer.cs 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. using NewLife.Caching;
  2. using System.Threading.Channels;
  3. namespace Admin.NET.Core;
  4. /// <summary>
  5. /// Redis自定义事件源存储器
  6. /// </summary>
  7. public class RedisEventSourceStorer : IEventSourceStorer
  8. {
  9. private readonly Channel<IEventSource> _channel; // 内存通道事件源存储器
  10. private readonly FullRedis _redis;
  11. // private readonly string _topic = "eventbus";
  12. public RedisEventSourceStorer(ICache redis, int capacity = 10000)
  13. {
  14. _redis = (FullRedis)redis;
  15. // 配置通道(超出默认容量后进入等待)
  16. var boundedChannelOptions = new BoundedChannelOptions(capacity)
  17. {
  18. FullMode = BoundedChannelFullMode.Wait
  19. };
  20. // 创建有限容量通道
  21. _channel = Channel.CreateBounded<IEventSource>(boundedChannelOptions);
  22. //RedisHelper.Subscribe((_topic, msg =>
  23. //{
  24. // var eventSource = JSON.Deserialize<ChannelEventSource>(msg.Body);
  25. // // 写入内存管道存储器
  26. // _channel!.Writer.TryWrite(eventSource);
  27. //}
  28. //));
  29. }
  30. /// <summary>
  31. /// 往 Redis 中写入一条
  32. /// </summary>
  33. /// <param name="eventSource"></param>
  34. /// <param name="cancellationToken"></param>
  35. public async ValueTask WriteAsync(IEventSource eventSource, CancellationToken cancellationToken)
  36. {
  37. if (eventSource is ChannelEventSource es)
  38. {
  39. //await RedisHelper.PublishAsync(_topic, JSON.Serialize(es));
  40. }
  41. else
  42. {
  43. await _channel.Writer.WriteAsync(eventSource, cancellationToken);
  44. }
  45. }
  46. /// <summary>
  47. /// 从 Redis 中读取一条
  48. /// </summary>
  49. /// <param name="cancellationToken"></param>
  50. /// <returns></returns>
  51. public async ValueTask<IEventSource> ReadAsync(CancellationToken cancellationToken)
  52. {
  53. return await _channel.Reader.ReadAsync(cancellationToken);
  54. }
  55. }