RedisQueue.cs 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. // 大名科技(天津)有限公司 版权所有
  2. //
  3. // 此源代码遵循位于源代码树根目录中的 LICENSE 文件的许可证
  4. //
  5. // 不得利用本项目从事危害国家安全、扰乱社会秩序、侵犯他人合法权益等法律法规禁止的活动
  6. //
  7. // 任何基于本项目二次开发而产生的一切法律纠纷和责任,均与作者无关
  8. using NewLife.Caching.Queues;
  9. namespace Admin.NET.Core;
  10. /// <summary>
  11. /// Redis 消息队列
  12. /// </summary>
  13. public static class RedisQueue
  14. {
  15. private static ICache _cache = App.GetService<ICache>();
  16. /// <summary>
  17. /// 获取可信队列,需要确认
  18. /// </summary>
  19. /// <typeparam name="T"></typeparam>
  20. /// <param name="topic"></param>
  21. /// <returns></returns>
  22. public static RedisReliableQueue<T> GetRedisReliableQueue<T>(string topic)
  23. {
  24. var queue = (_cache as FullRedis).GetReliableQueue<T>(topic);
  25. return queue;
  26. }
  27. /// <summary>
  28. /// 可信队列回滚
  29. /// </summary>
  30. /// <param name="topic"></param>
  31. /// <param name="retryInterval"></param>
  32. /// <returns></returns>
  33. public static int RollbackAllAck(string topic, int retryInterval = 60)
  34. {
  35. var queue = GetRedisReliableQueue<string>(topic);
  36. queue.RetryInterval = retryInterval;
  37. return queue.RollbackAllAck();
  38. }
  39. /// <summary>
  40. /// 发送一个数据列表到可信队列
  41. /// </summary>
  42. /// <param name="topic"></param>
  43. /// <param name="value"></param>
  44. /// <typeparam name="T"></typeparam>
  45. /// <returns></returns>
  46. public static int AddReliableQueueList<T>(string topic, List<T> value)
  47. {
  48. var queue = (_cache as FullRedis).GetReliableQueue<T>(topic);
  49. var count = queue.Count;
  50. var result = queue.Add(value.ToArray());
  51. return result - count;
  52. }
  53. /// <summary>
  54. /// 发送一条数据到可信队列
  55. /// </summary>
  56. /// <param name="topic"></param>
  57. /// <param name="value"></param>
  58. /// <typeparam name="T"></typeparam>
  59. /// <returns></returns>
  60. public static int AddReliableQueue<T>(string topic, T value)
  61. {
  62. var queue = (_cache as FullRedis).GetReliableQueue<T>(topic);
  63. var count = queue.Count;
  64. var result = queue.Add(value);
  65. return result - count;
  66. }
  67. /// <summary>
  68. /// 获取延迟队列
  69. /// </summary>
  70. /// <param name="topic"></param>
  71. /// <typeparam name="T"></typeparam>
  72. /// <returns></returns>
  73. public static RedisDelayQueue<T> GetDelayQueue<T>(string topic)
  74. {
  75. var queue = (_cache as FullRedis).GetDelayQueue<T>(topic);
  76. return queue;
  77. }
  78. /// <summary>
  79. /// 发送一条数据到延迟队列
  80. /// </summary>
  81. /// <param name="topic"></param>
  82. /// <param name="value"></param>
  83. /// <param name="delay">延迟时间。单位秒</param>
  84. /// <typeparam name="T"></typeparam>
  85. /// <returns></returns>
  86. public static int AddDelayQueue<T>(string topic, T value, int delay)
  87. {
  88. var queue = GetDelayQueue<T>(topic);
  89. return queue.Add(value, delay);
  90. }
  91. /// <summary>
  92. /// 发送数据列表到延迟队列
  93. /// </summary>
  94. /// <param name="topic"></param>
  95. /// <param name="value"></param>
  96. /// <param name="delay"></param>
  97. /// <typeparam name="T">延迟时间。单位秒</typeparam>
  98. /// <returns></returns>
  99. public static int AddDelayQueue<T>(string topic, List<T> value, int delay)
  100. {
  101. var queue = GetDelayQueue<T>(topic);
  102. queue.Delay = delay;
  103. return queue.Add(value.ToArray());
  104. }
  105. /// <summary>
  106. /// 在可信队列获取一条数据
  107. /// </summary>
  108. /// <param name="topic"></param>
  109. /// <typeparam name="T"></typeparam>
  110. /// <returns></returns>
  111. public static T ReliableTakeOne<T>(string topic)
  112. {
  113. var queue = GetRedisReliableQueue<T>(topic);
  114. return queue.TakeOne(1);
  115. }
  116. /// <summary>
  117. /// 异步在可信队列获取一条数据
  118. /// </summary>
  119. /// <param name="topic"></param>
  120. /// <typeparam name="T"></typeparam>
  121. /// <returns></returns>
  122. public static async Task<T> ReliableTakeOneAsync<T>(string topic)
  123. {
  124. var queue = GetRedisReliableQueue<T>(topic);
  125. return await queue.TakeOneAsync(1);
  126. }
  127. /// <summary>
  128. /// 在可信队列获取多条数据
  129. /// </summary>
  130. /// <param name="topic"></param>
  131. /// <param name="count"></param>
  132. /// <typeparam name="T"></typeparam>
  133. /// <returns></returns>
  134. public static List<T> ReliableTake<T>(string topic, int count)
  135. {
  136. var queue = GetRedisReliableQueue<T>(topic);
  137. return queue.Take(count).ToList();
  138. }
  139. }