RedisQueue.cs 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. // 麻省理工学院许可证
  2. //
  3. // 版权所有 (c) 2021-2023 zuohuaijun,大名科技(天津)有限公司 联系电话/微信:18020030720 QQ:515096995
  4. //
  5. // 特此免费授予获得本软件的任何人以处理本软件的权利,但须遵守以下条件:在所有副本或重要部分的软件中必须包括上述版权声明和本许可声明。
  6. //
  7. // 软件按“原样”提供,不提供任何形式的明示或暗示的保证,包括但不限于对适销性、适用性和非侵权的保证。
  8. // 在任何情况下,作者或版权持有人均不对任何索赔、损害或其他责任负责,无论是因合同、侵权或其他方式引起的,与软件或其使用或其他交易有关。
  9. using NewLife.Caching.Queues;
  10. namespace Admin.NET.Core;
  11. /// <summary>
  12. /// Redis 消息队列
  13. /// </summary>
  14. public static class RedisQueue
  15. {
  16. private static ICache _cache = App.GetService<ICache>();
  17. /// <summary>
  18. /// 获取可信队列,需要确认
  19. /// </summary>
  20. /// <typeparam name="T"></typeparam>
  21. /// <param name="topic"></param>
  22. /// <returns></returns>
  23. public static RedisReliableQueue<T> GetRedisReliableQueue<T>(string topic)
  24. {
  25. var queue = (_cache as FullRedis).GetReliableQueue<T>(topic);
  26. return queue;
  27. }
  28. /// <summary>
  29. /// 可信队列回滚
  30. /// </summary>
  31. /// <param name="topic"></param>
  32. /// <param name="retryInterval"></param>
  33. /// <returns></returns>
  34. public static int RollbackAllAck(string topic, int retryInterval = 60)
  35. {
  36. var queue = GetRedisReliableQueue<string>(topic);
  37. queue.RetryInterval = retryInterval;
  38. return queue.RollbackAllAck();
  39. }
  40. /// <summary>
  41. /// 发送一个数据列表到可信队列
  42. /// </summary>
  43. /// <param name="topic"></param>
  44. /// <param name="value"></param>
  45. /// <typeparam name="T"></typeparam>
  46. /// <returns></returns>
  47. public static int AddReliableQueueList<T>(string topic, List<T> value)
  48. {
  49. var queue = (_cache as FullRedis).GetReliableQueue<T>(topic);
  50. var count = queue.Count;
  51. var result = queue.Add(value.ToArray());
  52. return result - count;
  53. }
  54. /// <summary>
  55. /// 发送一条数据到可信队列
  56. /// </summary>
  57. /// <param name="topic"></param>
  58. /// <param name="value"></param>
  59. /// <typeparam name="T"></typeparam>
  60. /// <returns></returns>
  61. public static int AddReliableQueue<T>(string topic, T value)
  62. {
  63. var queue = (_cache as FullRedis).GetReliableQueue<T>(topic);
  64. var count = queue.Count;
  65. var result = queue.Add(value);
  66. return result - count;
  67. }
  68. /// <summary>
  69. /// 获取延迟队列
  70. /// </summary>
  71. /// <param name="topic"></param>
  72. /// <typeparam name="T"></typeparam>
  73. /// <returns></returns>
  74. public static RedisDelayQueue<T> GetDelayQueue<T>(string topic)
  75. {
  76. var queue = (_cache as FullRedis).GetDelayQueue<T>(topic);
  77. return queue;
  78. }
  79. /// <summary>
  80. /// 发送一条数据到延迟队列
  81. /// </summary>
  82. /// <param name="topic"></param>
  83. /// <param name="value"></param>
  84. /// <param name="delay">延迟时间。单位秒</param>
  85. /// <typeparam name="T"></typeparam>
  86. /// <returns></returns>
  87. public static int AddDelayQueue<T>(string topic, T value, int delay)
  88. {
  89. var queue = GetDelayQueue<T>(topic);
  90. return queue.Add(value, delay);
  91. }
  92. /// <summary>
  93. /// 发送数据列表到延迟队列
  94. /// </summary>
  95. /// <param name="topic"></param>
  96. /// <param name="value"></param>
  97. /// <param name="delay"></param>
  98. /// <typeparam name="T">延迟时间。单位秒</typeparam>
  99. /// <returns></returns>
  100. public static int AddDelayQueue<T>(string topic, List<T> value, int delay)
  101. {
  102. var queue = GetDelayQueue<T>(topic);
  103. queue.Delay = delay;
  104. return queue.Add(value.ToArray());
  105. }
  106. /// <summary>
  107. /// 在可信队列获取一条数据
  108. /// </summary>
  109. /// <param name="topic"></param>
  110. /// <typeparam name="T"></typeparam>
  111. /// <returns></returns>
  112. public static T ReliableTakeOne<T>(string topic)
  113. {
  114. var queue = GetRedisReliableQueue<T>(topic);
  115. return queue.TakeOne(1);
  116. }
  117. /// <summary>
  118. /// 异步在可信队列获取一条数据
  119. /// </summary>
  120. /// <param name="topic"></param>
  121. /// <typeparam name="T"></typeparam>
  122. /// <returns></returns>
  123. public static async Task<T> ReliableTakeOneAsync<T>(string topic)
  124. {
  125. var queue = GetRedisReliableQueue<T>(topic);
  126. return await queue.TakeOneAsync(1);
  127. }
  128. /// <summary>
  129. /// 在可信队列获取多条数据
  130. /// </summary>
  131. /// <param name="topic"></param>
  132. /// <param name="count"></param>
  133. /// <typeparam name="T"></typeparam>
  134. /// <returns></returns>
  135. public static List<T> ReliableTake<T>(string topic, int count)
  136. {
  137. var queue = GetRedisReliableQueue<T>(topic);
  138. return queue.Take(count).ToList();
  139. }
  140. }