EventConsumer.cs 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. // 此源代码遵循位于源代码树根目录中的 LICENSE 文件的许可证。
  2. //
  3. // 必须在法律法规允许的范围内正确使用,严禁将其用于非法、欺诈、恶意或侵犯他人合法权益的目的。
  4. namespace Admin.NET.Core;
  5. /// <summary>
  6. /// Redis 消息扩展
  7. /// </summary>
  8. /// <typeparam name="T"></typeparam>
  9. public class EventConsumer<T> : IDisposable
  10. {
  11. private Task _consumerTask;
  12. private CancellationTokenSource _consumerCts;
  13. /// <summary>
  14. /// 消费者
  15. /// </summary>
  16. public IProducerConsumer<T> Consumer { get; }
  17. /// <summary>
  18. /// ConsumerBuilder
  19. /// </summary>
  20. public FullRedis Builder { get; set; }
  21. /// <summary>
  22. /// 消息回调
  23. /// </summary>
  24. public event EventHandler<T> Received;
  25. /// <summary>
  26. /// 构造函数
  27. /// </summary>
  28. public EventConsumer(FullRedis redis, string routeKey)
  29. {
  30. Builder = redis;
  31. Consumer = Builder.GetQueue<T>(routeKey);
  32. }
  33. /// <summary>
  34. /// 启动
  35. /// </summary>
  36. /// <exception cref="InvalidOperationException"></exception>
  37. public void Start()
  38. {
  39. if (Consumer is null)
  40. {
  41. throw new InvalidOperationException("Subscribe first using the Consumer.Subscribe() function");
  42. }
  43. if (_consumerTask != null)
  44. {
  45. return;
  46. }
  47. _consumerCts = new CancellationTokenSource();
  48. var ct = _consumerCts.Token;
  49. _consumerTask = Task.Factory.StartNew(() =>
  50. {
  51. while (!ct.IsCancellationRequested)
  52. {
  53. var cr = Consumer.TakeOne(10);
  54. if (cr == null) continue;
  55. Received?.Invoke(this, cr);
  56. }
  57. }, ct, TaskCreationOptions.LongRunning, TaskScheduler.Default);
  58. }
  59. /// <summary>
  60. /// 停止
  61. /// </summary>
  62. /// <returns></returns>
  63. public async Task Stop()
  64. {
  65. if (_consumerCts == null || _consumerTask == null) return;
  66. _consumerCts.Cancel();
  67. try
  68. {
  69. await _consumerTask;
  70. }
  71. finally
  72. {
  73. _consumerTask = null;
  74. _consumerCts = null;
  75. }
  76. }
  77. /// <summary>
  78. /// 释放
  79. /// </summary>
  80. public void Dispose()
  81. {
  82. Dispose(true);
  83. GC.SuppressFinalize(this);
  84. }
  85. /// <summary>
  86. /// 释放
  87. /// </summary>
  88. /// <param name="disposing"></param>
  89. protected virtual void Dispose(bool disposing)
  90. {
  91. if (disposing)
  92. {
  93. if (_consumerTask != null)
  94. {
  95. Stop().Wait();
  96. }
  97. Builder.Dispose();
  98. }
  99. }
  100. }