浏览代码

😎优化缓存服务和队列操作 感谢大石头大佬的支持👍 @nnhy

zuohuaijun 1 年之前
父节点
当前提交
1f13f77403

+ 10 - 5
Admin.NET/Admin.NET.Core/Cache/CacheSetup.cs

@@ -4,6 +4,9 @@
 //
 // 不得利用本项目从事危害国家安全、扰乱社会秩序、侵犯他人合法权益等法律法规禁止的活动!任何基于本项目二次开发而产生的一切法律纠纷和责任,我们不承担任何责任!
 
+using Microsoft.Extensions.DependencyInjection.Extensions;
+using NewLife.Caching.Services;
+
 namespace Admin.NET.Core;
 
 public static class CacheSetup
@@ -14,20 +17,22 @@ public static class CacheSetup
     /// <param name="services"></param>
     public static void AddCache(this IServiceCollection services)
     {
-        ICache cache = Cache.Default;
-
         var cacheOptions = App.GetConfig<CacheOptions>("Cache", true);
         if (cacheOptions.CacheType == CacheTypeEnum.Redis.ToString())
         {
-            cache = new FullRedis(new RedisOptions
+            var redis = new FullRedis(new RedisOptions
             {
                 Configuration = cacheOptions.Redis.Configuration,
                 Prefix = cacheOptions.Redis.Prefix
             });
             if (cacheOptions.Redis.MaxMessageSize > 0)
-                ((FullRedis)cache).MaxMessageSize = cacheOptions.Redis.MaxMessageSize;
+                redis.MaxMessageSize = cacheOptions.Redis.MaxMessageSize;
+
+            // 注入 Redis 缓存提供者
+            services.AddSingleton<ICacheProvider>(p => new RedisCacheProvider(p) { Cache = redis });
         }
 
-        services.AddSingleton(cache);
+        // 内存缓存兜底。在没有配置Redis时,使用内存缓存,逻辑代码无需修改
+        services.TryAddSingleton<ICacheProvider, CacheProvider>();
     }
 }

+ 9 - 11
Admin.NET/Admin.NET.Core/EventBus/EventConsumer.cs

@@ -12,18 +12,20 @@ namespace Admin.NET.Core;
 /// <typeparam name="T"></typeparam>
 public class EventConsumer<T> : IDisposable
 {
+    /// <summary>
+    ///
+    /// </summary>
     private Task _consumerTask;
-    private CancellationTokenSource _consumerCts;
 
     /// <summary>
-    /// 消费者
+    ///
     /// </summary>
-    public IProducerConsumer<T> Consumer { get; }
+    private CancellationTokenSource _consumerCts;
 
     /// <summary>
-    /// ConsumerBuilder
+    /// 消费者
     /// </summary>
-    public FullRedis Builder { get; set; }
+    public IProducerConsumer<T> Consumer { get; }
 
     /// <summary>
     /// 消息回调
@@ -33,11 +35,8 @@ public class EventConsumer<T> : IDisposable
     /// <summary>
     /// 构造函数
     /// </summary>
-    public EventConsumer(FullRedis redis, string routeKey)
-    {
-        Builder = redis;
-        Consumer = Builder.GetQueue<T>(routeKey);
-    }
+    /// <param name="consumer"></param>
+    public EventConsumer(IProducerConsumer<T> consumer) => Consumer = consumer;
 
     /// <summary>
     /// 启动
@@ -106,7 +105,6 @@ public class EventConsumer<T> : IDisposable
             {
                 Stop().Wait();
             }
-            Builder.Dispose();
         }
     }
 }

+ 15 - 15
Admin.NET/Admin.NET.Core/EventBus/RedisEventSourceStorer.cs

@@ -23,10 +23,11 @@ public sealed class RedisEventSourceStorer : IEventSourceStorer, IDisposable
     /// </summary>
     private readonly Channel<IEventSource> _channel;
 
-    /// <summary>
-    /// Redis 连接对象
-    /// </summary>
-    private readonly FullRedis _redis;
+    ///// <summary>
+    ///// Redis 连接对象
+    ///// </summary>
+    //private readonly FullRedis _redis;
+    private IProducerConsumer<ChannelEventSource> _queue;
 
     /// <summary>
     /// 路由键
@@ -36,10 +37,10 @@ public sealed class RedisEventSourceStorer : IEventSourceStorer, IDisposable
     /// <summary>
     /// 构造函数
     /// </summary>
-    /// <param name="redis">Redis 连接对象</param>
+    /// <param name="cacheProvider">Redis 连接对象</param>
     /// <param name="routeKey">路由键</param>
     /// <param name="capacity">存储器最多能够处理多少消息,超过该容量进入等待写入</param>
-    public RedisEventSourceStorer(ICache redis, string routeKey, int capacity)
+    public RedisEventSourceStorer(ICacheProvider cacheProvider, string routeKey, int capacity)
     {
         // 配置通道,设置超出默认容量后进入等待
         var boundedChannelOptions = new BoundedChannelOptions(capacity)
@@ -50,11 +51,12 @@ public sealed class RedisEventSourceStorer : IEventSourceStorer, IDisposable
         // 创建有限容量通道
         _channel = Channel.CreateBounded<IEventSource>(boundedChannelOptions);
 
-        _redis = redis as FullRedis;
+        //_redis = redis as FullRedis;
         _routeKey = routeKey;
 
         // 创建消息订阅者
-        _eventConsumer = new EventConsumer<ChannelEventSource>(_redis, _routeKey);
+        _queue = cacheProvider.GetQueue<ChannelEventSource>(routeKey);
+        _eventConsumer = new EventConsumer<ChannelEventSource>(_queue);
 
         // 订阅消息写入 Channel
         _eventConsumer.Received += (send, cr) =>
@@ -63,10 +65,7 @@ public sealed class RedisEventSourceStorer : IEventSourceStorer, IDisposable
             //var eventSource = JsonConvert.DeserializeObject<ChannelEventSource>(cr);
 
             // 写入内存管道存储器
-            Task.Run(async () =>
-            {
-                await _channel.Writer.WriteAsync(cr);
-            });
+            _channel.Writer.WriteAsync(cr);
         };
 
         // 启动消费者
@@ -93,13 +92,14 @@ public sealed class RedisEventSourceStorer : IEventSourceStorer, IDisposable
             // 序列化消息
             //var data = JsonSerializer.Serialize(source);
 
-            // 获取一个订阅对象
-            var queue = _redis.GetQueue<ChannelEventSource>(_routeKey);
+            //// 获取一个订阅对象
+            //var queue = _redis.GetQueue<ChannelEventSource>(_routeKey);
 
             // 异步发布
             await Task.Factory.StartNew(() =>
             {
-                queue.Add(source);
+                //queue.Add(source);
+                _queue.Add(source);
             }, cancellationToken, TaskCreationOptions.LongRunning, System.Threading.Tasks.TaskScheduler.Default);
         }
         else

+ 48 - 107
Admin.NET/Admin.NET.Core/EventBus/RedisQueue.cs

@@ -13,72 +13,26 @@ namespace Admin.NET.Core;
 /// </summary>
 public static class RedisQueue
 {
-    private static readonly ICache _cache = App.GetRequiredService<ICache>();
+    private static ICacheProvider _cacheProvider = App.GetService<ICacheProvider>();
 
-    /// <summary>
-    /// 获取普通队列
-    /// </summary>
+    /// <summary>创建Redis消息队列。默认消费一次,指定消费者group时使用STREAM结构,支持多消费组共享消息</summary>
+    /// <remarks>
+    /// 使用队列时,可根据是否设置消费组来决定使用简单队列还是完整队列。 简单队列(如RedisQueue)可用作命令队列,Topic很多,但几乎没有消息。 完整队列(如RedisStream)可用作消息队列,Topic很少,但消息很多,并且支持多消费组。
+    /// </remarks>
     /// <typeparam name="T"></typeparam>
-    /// <param name="topic"></param>
+    /// <param name="topic">主题</param>
+    /// <param name="group">消费组。未指定消费组时使用简单队列(如RedisQueue),指定消费组时使用完整队列(如RedisStream)</param>
     /// <returns></returns>
-    public static IProducerConsumer<T> GetQueue<T>(string topic)
+    public static IProducerConsumer<T> GetQueue<T>(String topic, String group = null)
     {
-        var queue = (_cache as FullRedis).GetQueue<T>(topic);
-        return queue;
-    }
+        // 队列需要单列
+        var key = $"myStream:{topic}";
+        if (_cacheProvider.InnerCache.TryGetValue<IProducerConsumer<T>>(key, out var queue)) return queue;
 
-    /// <summary>
-    /// 发送一个数据到队列
-    /// </summary>
-    /// <typeparam name="T"></typeparam>
-    /// <param name="topic"></param>
-    /// <param name="value"></param>
-    /// <returns></returns>
-    public static int AddQueue<T>(string topic, T value)
-    {
-        var queue = GetQueue<T>(topic);
-        return queue.Add(value);
-    }
+        queue = _cacheProvider.GetQueue<T>(topic, group);
+        _cacheProvider.Cache.Set(key, queue);
 
-    /// <summary>
-    /// 发送一个数据列表到队列
-    /// </summary>
-    /// <param name="topic"></param>
-    /// <param name="value"></param>
-    /// <typeparam name="T"></typeparam>
-    /// <returns></returns>
-    public static int AddQueueList<T>(string topic, List<T> value)
-    {
-        var queue = GetQueue<T>(topic);
-        var count = queue.Count;
-        var result = queue.Add(value.ToArray());
-        return result - count;
-    }
-
-    /// <summary>
-    /// 获取一批队列消息
-    /// </summary>
-    /// <typeparam name="T"></typeparam>
-    /// <param name="topic"></param>
-    /// <param name="count"></param>
-    /// <returns></returns>
-    public static List<T> Take<T>(string topic, int count = 1)
-    {
-        var queue = GetQueue<T>(topic);
-        var result = queue.Take(count).ToList();
-        return result;
-    }
-
-    /// <summary>
-    /// 获取一个队列消息
-    /// </summary>
-    /// <typeparam name="T"></typeparam>
-    /// <param name="topic"></param>
-    /// <returns></returns>
-    public static async Task<T> TakeOneAsync<T>(string topic)
-    {
-        var queue = GetQueue<T>(topic);
-        return await queue.TakeOneAsync(1);
+        return queue;
     }
 
     /// <summary>
@@ -89,7 +43,13 @@ public static class RedisQueue
     /// <returns></returns>
     public static RedisReliableQueue<T> GetRedisReliableQueue<T>(string topic)
     {
-        var queue = (_cache as FullRedis).GetReliableQueue<T>(topic);
+        // 队列需要单列
+        var key = $"myQueue:{topic}";
+        if (_cacheProvider.InnerCache.TryGetValue<RedisReliableQueue<T>>(key, out var queue)) return queue;
+
+        queue = (_cacheProvider.Cache as FullRedis).GetReliableQueue<T>(topic);
+        _cacheProvider.Cache.Set(key, queue);
+
         return queue;
     }
 
@@ -115,7 +75,7 @@ public static class RedisQueue
     /// <returns></returns>
     public static int AddReliableQueueList<T>(string topic, List<T> value)
     {
-        var queue = (_cache as FullRedis).GetReliableQueue<T>(topic);
+        var queue = GetRedisReliableQueue<T>(topic);
         var count = queue.Count;
         var result = queue.Add(value.ToArray());
         return result - count;
@@ -130,49 +90,12 @@ public static class RedisQueue
     /// <returns></returns>
     public static int AddReliableQueue<T>(string topic, T value)
     {
-        var queue = (_cache as FullRedis).GetReliableQueue<T>(topic);
+        var queue = GetRedisReliableQueue<T>(topic);
         var count = queue.Count;
         var result = queue.Add(value);
         return result - count;
     }
 
-    /// <summary>
-    /// 在可信队列获取一条数据
-    /// </summary>
-    /// <param name="topic"></param>
-    /// <typeparam name="T"></typeparam>
-    /// <returns></returns>
-    public static T ReliableTakeOne<T>(string topic)
-    {
-        var queue = GetRedisReliableQueue<T>(topic);
-        return queue.TakeOne(1);
-    }
-
-    /// <summary>
-    /// 异步在可信队列获取一条数据
-    /// </summary>
-    /// <param name="topic"></param>
-    /// <typeparam name="T"></typeparam>
-    /// <returns></returns>
-    public static async Task<T> ReliableTakeOneAsync<T>(string topic)
-    {
-        var queue = GetRedisReliableQueue<T>(topic);
-        return await queue.TakeOneAsync(1);
-    }
-
-    /// <summary>
-    /// 在可信队列获取多条数据
-    /// </summary>
-    /// <param name="topic"></param>
-    /// <param name="count"></param>
-    /// <typeparam name="T"></typeparam>
-    /// <returns></returns>
-    public static List<T> ReliableTake<T>(string topic, int count)
-    {
-        var queue = GetRedisReliableQueue<T>(topic);
-        return queue.Take(count).ToList();
-    }
-
     /// <summary>
     /// 获取延迟队列
     /// </summary>
@@ -181,7 +104,13 @@ public static class RedisQueue
     /// <returns></returns>
     public static RedisDelayQueue<T> GetDelayQueue<T>(string topic)
     {
-        var queue = (_cache as FullRedis).GetDelayQueue<T>(topic);
+        // 队列需要单列
+        var key = $"myDelay:{topic}";
+        if (_cacheProvider.InnerCache.TryGetValue<RedisDelayQueue<T>>(key, out var queue)) return queue;
+
+        queue = (_cacheProvider.Cache as FullRedis).GetDelayQueue<T>(topic);
+        _cacheProvider.Cache.Set(key, queue);
+
         return queue;
     }
 
@@ -215,27 +144,39 @@ public static class RedisQueue
     }
 
     /// <summary>
-    /// 异步在延迟队列获取一条数据
+    /// 在可信队列获取一条数据
     /// </summary>
     /// <param name="topic"></param>
     /// <typeparam name="T"></typeparam>
     /// <returns></returns>
-    public static async Task<T> DelayTakeOne<T>(string topic)
+    public static T ReliableTakeOne<T>(string topic)
     {
-        var queue = GetDelayQueue<T>(topic);
+        var queue = GetRedisReliableQueue<T>(topic);
+        return queue.TakeOne(1);
+    }
+
+    /// <summary>
+    /// 异步在可信队列获取一条数据
+    /// </summary>
+    /// <param name="topic"></param>
+    /// <typeparam name="T"></typeparam>
+    /// <returns></returns>
+    public static async Task<T> ReliableTakeOneAsync<T>(string topic)
+    {
+        var queue = GetRedisReliableQueue<T>(topic);
         return await queue.TakeOneAsync(1);
     }
 
     /// <summary>
-    /// 在延迟队列获取多条数据
+    /// 在可信队列获取多条数据
     /// </summary>
     /// <param name="topic"></param>
     /// <param name="count"></param>
     /// <typeparam name="T"></typeparam>
     /// <returns></returns>
-    public static List<T> DelayTake<T>(string topic, int count = 1)
+    public static List<T> ReliableTake<T>(string topic, int count)
     {
-        var queue = GetDelayQueue<T>(topic);
+        var queue = GetRedisReliableQueue<T>(topic);
         return queue.Take(count).ToList();
     }
 }

+ 1 - 1
Admin.NET/Plugins/Admin.NET.Plugin.ReZero/Admin.NET.Plugin.ReZero.csproj

@@ -24,7 +24,7 @@
   </ItemGroup>
 
   <ItemGroup>
-    <PackageReference Include="Rezero.Api" Version="1.7.6" />
+    <PackageReference Include="Rezero.Api" Version="1.7.7" />
   </ItemGroup>
 
   <ItemGroup>