RedisQueue.cs 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. // 大名科技(天津)有限公司版权所有 电话:18020030720 QQ:515096995
  2. //
  3. // 此源代码遵循位于源代码树根目录中的 LICENSE 文件的许可证
  4. using NewLife.Caching.Queues;
  5. namespace Admin.NET.Core;
  6. /// <summary>
  7. /// Redis 消息队列
  8. /// </summary>
  9. public static class RedisQueue
  10. {
  11. private static ICache _cache = App.GetService<ICache>();
  12. /// <summary>
  13. /// 获取可信队列,需要确认
  14. /// </summary>
  15. /// <typeparam name="T"></typeparam>
  16. /// <param name="topic"></param>
  17. /// <returns></returns>
  18. public static RedisReliableQueue<T> GetRedisReliableQueue<T>(string topic)
  19. {
  20. var queue = (_cache as FullRedis).GetReliableQueue<T>(topic);
  21. return queue;
  22. }
  23. /// <summary>
  24. /// 可信队列回滚
  25. /// </summary>
  26. /// <param name="topic"></param>
  27. /// <param name="retryInterval"></param>
  28. /// <returns></returns>
  29. public static int RollbackAllAck(string topic, int retryInterval = 60)
  30. {
  31. var queue = GetRedisReliableQueue<string>(topic);
  32. queue.RetryInterval = retryInterval;
  33. return queue.RollbackAllAck();
  34. }
  35. /// <summary>
  36. /// 发送一个数据列表到可信队列
  37. /// </summary>
  38. /// <param name="topic"></param>
  39. /// <param name="value"></param>
  40. /// <typeparam name="T"></typeparam>
  41. /// <returns></returns>
  42. public static int AddReliableQueueList<T>(string topic, List<T> value)
  43. {
  44. var queue = (_cache as FullRedis).GetReliableQueue<T>(topic);
  45. var count = queue.Count;
  46. var result = queue.Add(value.ToArray());
  47. return result - count;
  48. }
  49. /// <summary>
  50. /// 发送一条数据到可信队列
  51. /// </summary>
  52. /// <param name="topic"></param>
  53. /// <param name="value"></param>
  54. /// <typeparam name="T"></typeparam>
  55. /// <returns></returns>
  56. public static int AddReliableQueue<T>(string topic, T value)
  57. {
  58. var queue = (_cache as FullRedis).GetReliableQueue<T>(topic);
  59. var count = queue.Count;
  60. var result = queue.Add(value);
  61. return result - count;
  62. }
  63. /// <summary>
  64. /// 获取延迟队列
  65. /// </summary>
  66. /// <param name="topic"></param>
  67. /// <typeparam name="T"></typeparam>
  68. /// <returns></returns>
  69. public static RedisDelayQueue<T> GetDelayQueue<T>(string topic)
  70. {
  71. var queue = (_cache as FullRedis).GetDelayQueue<T>(topic);
  72. return queue;
  73. }
  74. /// <summary>
  75. /// 发送一条数据到延迟队列
  76. /// </summary>
  77. /// <param name="topic"></param>
  78. /// <param name="value"></param>
  79. /// <param name="delay">延迟时间。单位秒</param>
  80. /// <typeparam name="T"></typeparam>
  81. /// <returns></returns>
  82. public static int AddDelayQueue<T>(string topic, T value, int delay)
  83. {
  84. var queue = GetDelayQueue<T>(topic);
  85. return queue.Add(value, delay);
  86. }
  87. /// <summary>
  88. /// 发送数据列表到延迟队列
  89. /// </summary>
  90. /// <param name="topic"></param>
  91. /// <param name="value"></param>
  92. /// <param name="delay"></param>
  93. /// <typeparam name="T">延迟时间。单位秒</typeparam>
  94. /// <returns></returns>
  95. public static int AddDelayQueue<T>(string topic, List<T> value, int delay)
  96. {
  97. var queue = GetDelayQueue<T>(topic);
  98. queue.Delay = delay;
  99. return queue.Add(value.ToArray());
  100. }
  101. /// <summary>
  102. /// 在可信队列获取一条数据
  103. /// </summary>
  104. /// <param name="topic"></param>
  105. /// <typeparam name="T"></typeparam>
  106. /// <returns></returns>
  107. public static T ReliableTakeOne<T>(string topic)
  108. {
  109. var queue = GetRedisReliableQueue<T>(topic);
  110. return queue.TakeOne(1);
  111. }
  112. /// <summary>
  113. /// 异步在可信队列获取一条数据
  114. /// </summary>
  115. /// <param name="topic"></param>
  116. /// <typeparam name="T"></typeparam>
  117. /// <returns></returns>
  118. public static async Task<T> ReliableTakeOneAsync<T>(string topic)
  119. {
  120. var queue = GetRedisReliableQueue<T>(topic);
  121. return await queue.TakeOneAsync(1);
  122. }
  123. /// <summary>
  124. /// 在可信队列获取多条数据
  125. /// </summary>
  126. /// <param name="topic"></param>
  127. /// <param name="count"></param>
  128. /// <typeparam name="T"></typeparam>
  129. /// <returns></returns>
  130. public static List<T> ReliableTake<T>(string topic, int count)
  131. {
  132. var queue = GetRedisReliableQueue<T>(topic);
  133. return queue.Take(count).ToList();
  134. }
  135. }