瀏覽代碼

!707 自定义事件源存储器适配NewLifeRedis
Merge pull request !707 from 一克猫/dev

zuohuaijun 2 年之前
父節點
當前提交
cb4a856003

+ 2 - 1
Admin.NET/Admin.NET.Core/Cache/CacheSetup.cs

@@ -6,7 +6,7 @@ public static class CacheSetup
     /// 缓存注册(新生命Redis组件)
     /// 缓存注册(新生命Redis组件)
     /// </summary>
     /// </summary>
     /// <param name="services"></param>
     /// <param name="services"></param>
-    public static void AddCache(this IServiceCollection services)
+    public static ICache AddCache(this IServiceCollection services)
     {
     {
         ICache cache = Cache.Default;
         ICache cache = Cache.Default;
 
 
@@ -19,5 +19,6 @@ public static class CacheSetup
         }
         }
 
 
         services.AddSingleton(cache);
         services.AddSingleton(cache);
+        return cache;
     }
     }
 }
 }

+ 106 - 0
Admin.NET/Admin.NET.Core/EventBus/EventConsumer.cs

@@ -0,0 +1,106 @@
+namespace Admin.NET.Core;
+
+/// <summary>
+/// Redis 消息扩展
+/// </summary>
+/// <typeparam name="T"></typeparam>
+public class EventConsumer<T> : IDisposable
+{
+    private Task _consumerTask;
+    private CancellationTokenSource _consumerCts;
+
+    /// <summary>
+    /// 消费者
+    /// </summary>
+    public IProducerConsumer<T> Consumer { get; }
+
+    /// <summary>
+    /// ConsumerBuilder
+    /// </summary>
+    public FullRedis Builder { get; set; }
+
+    /// <summary>
+    /// 消息回调
+    /// </summary>
+    public event EventHandler<T> Received;
+
+    /// <summary>
+    /// 构造函数
+    /// </summary>
+    public EventConsumer(FullRedis redis, string routeKey)
+    {
+        Builder = redis;
+        Consumer = Builder.GetQueue<T>(routeKey);
+    }
+
+    /// <summary>
+    /// 启动
+    /// </summary>
+    /// <exception cref="InvalidOperationException"></exception>
+    public void Start()
+    {
+        if (Consumer is null)
+        {
+            throw new InvalidOperationException("Subscribe first using the Consumer.Subscribe() function");
+        }
+        if (_consumerTask != null)
+        {
+            return;
+        }
+        _consumerCts = new CancellationTokenSource();
+        var ct = _consumerCts.Token;
+        _consumerTask = Task.Factory.StartNew(() =>
+        {
+            while (!ct.IsCancellationRequested)
+            {
+                var cr = Consumer.TakeOne(10);
+                if (cr == null) continue;
+                Received?.Invoke(this, cr);
+            }
+        }, ct, TaskCreationOptions.LongRunning, TaskScheduler.Default);
+    }
+
+    /// <summary>
+    /// 停止
+    /// </summary>
+    /// <returns></returns>
+    public async Task Stop()
+    {
+        if (_consumerCts == null || _consumerTask == null) return;
+        _consumerCts.Cancel();
+        try
+        {
+            await _consumerTask;
+        }
+        finally
+        {
+            _consumerTask = null;
+            _consumerCts = null;
+        }
+    }
+
+    /// <summary>
+    /// 释放
+    /// </summary>
+    public void Dispose()
+    {
+        Dispose(true);
+        GC.SuppressFinalize(this);
+    }
+
+    /// <summary>
+    /// 释放
+    /// </summary>
+    /// <param name="disposing"></param>
+    protected virtual void Dispose(bool disposing)
+    {
+        if (disposing)
+        {
+            if (_consumerTask != null)
+            {
+                Stop().Wait();
+            }
+            Builder.Dispose();
+        }
+    }
+}

+ 87 - 23
Admin.NET/Admin.NET.Core/EventBus/RedisEventSourceStorer.cs

@@ -5,58 +5,122 @@ namespace Admin.NET.Core;
 /// <summary>
 /// <summary>
 /// Redis自定义事件源存储器
 /// Redis自定义事件源存储器
 /// </summary>
 /// </summary>
-public class RedisEventSourceStorer : IEventSourceStorer
+public sealed class RedisEventSourceStorer : IEventSourceStorer, IDisposable
 {
 {
-    private readonly Channel<IEventSource> _channel; // 内存通道事件源存储器
+    /// <summary>
+    /// 消费者
+    /// </summary>
+    private readonly EventConsumer<ChannelEventSource> _eventConsumer;
+
+    /// <summary>
+    /// 内存通道事件源存储器
+    /// </summary>
+    private readonly Channel<IEventSource> _channel;
 
 
+    /// <summary>
+    /// Redis 连接对象
+    /// </summary>
     private readonly FullRedis _redis;
     private readonly FullRedis _redis;
 
 
-    // private readonly string _topic = "eventbus";
+    /// <summary>
+    /// 路由键
+    /// </summary>
+    private readonly string _routeKey;
 
 
-    public RedisEventSourceStorer(ICache redis, int capacity = 10000)
+    /// <summary>
+    /// 构造函数
+    /// </summary>
+    /// <param name="redis">Redis 连接对象</param>
+    /// <param name="routeKey">路由键</param>
+    /// <param name="capacity">存储器最多能够处理多少消息,超过该容量进入等待写入</param>
+    public RedisEventSourceStorer(ICache redis, string routeKey, int capacity)
     {
     {
-        _redis = (FullRedis)redis;
-
-        // 配置通道(超出默认容量后进入等待)
+        // 配置通道,设置超出默认容量后进入等待
         var boundedChannelOptions = new BoundedChannelOptions(capacity)
         var boundedChannelOptions = new BoundedChannelOptions(capacity)
         {
         {
             FullMode = BoundedChannelFullMode.Wait
             FullMode = BoundedChannelFullMode.Wait
         };
         };
+
         // 创建有限容量通道
         // 创建有限容量通道
         _channel = Channel.CreateBounded<IEventSource>(boundedChannelOptions);
         _channel = Channel.CreateBounded<IEventSource>(boundedChannelOptions);
-        //RedisHelper.Subscribe((_topic, msg =>
-        //{
-        //    var eventSource = JSON.Deserialize<ChannelEventSource>(msg.Body);
-        //    // 写入内存管道存储器
-        //    _channel!.Writer.TryWrite(eventSource);
-        //}
-        //));
+
+        _redis = redis as FullRedis;
+        _routeKey = routeKey;
+
+        // 创建消息订阅者
+        _eventConsumer = new EventConsumer<ChannelEventSource>(_redis, _routeKey);
+
+
+        // 订阅消息写入 Channel
+        _eventConsumer.Received += (send, cr) =>
+        {
+            // 反序列化消息
+            //var eventSource = JsonConvert.DeserializeObject<ChannelEventSource>(cr);
+
+            // 写入内存管道存储器
+            _channel.Writer.WriteAsync(cr);
+        };
+
+        // 启动消费者
+        _eventConsumer.Start();
     }
     }
 
 
     /// <summary>
     /// <summary>
-    /// 往 Redis 中写入一条
+    /// 将事件源写入存储器
     /// </summary>
     /// </summary>
-    /// <param name="eventSource"></param>
-    /// <param name="cancellationToken"></param>
+    /// <param name="eventSource">事件源对象</param>
+    /// <param name="cancellationToken">取消任务 Token</param>
+    /// <returns><see cref="ValueTask"/></returns>
     public async ValueTask WriteAsync(IEventSource eventSource, CancellationToken cancellationToken)
     public async ValueTask WriteAsync(IEventSource eventSource, CancellationToken cancellationToken)
     {
     {
-        if (eventSource is ChannelEventSource es)
+        // 空检查
+        if (eventSource == default)
         {
         {
-            //await RedisHelper.PublishAsync(_topic, JSON.Serialize(es));
+            throw new ArgumentNullException(nameof(eventSource));
+        }
+
+        // 这里判断是否是 ChannelEventSource 或者 自定义的 EventSource
+        if (eventSource is ChannelEventSource source)
+        {
+            // 序列化消息
+            //var data = JsonSerializer.Serialize(source);
+
+            // 获取一个订阅对象
+            var queue = _redis.GetQueue<ChannelEventSource>(_routeKey);
+
+            // 异步发布
+            await Task.Factory.StartNew(() =>
+            {
+
+                queue.Add(source);
+            }, cancellationToken, TaskCreationOptions.LongRunning, System.Threading.Tasks.TaskScheduler.Default);
         }
         }
         else
         else
         {
         {
+            // 这里处理动态订阅问题
             await _channel.Writer.WriteAsync(eventSource, cancellationToken);
             await _channel.Writer.WriteAsync(eventSource, cancellationToken);
         }
         }
     }
     }
 
 
     /// <summary>
     /// <summary>
-    /// 从 Redis 中读取一条
+    /// 从存储器中读取一条事件源
     /// </summary>
     /// </summary>
-    /// <param name="cancellationToken"></param>
-    /// <returns></returns>
+    /// <param name="cancellationToken">取消任务 Token</param>
+    /// <returns>事件源对象</returns>
     public async ValueTask<IEventSource> ReadAsync(CancellationToken cancellationToken)
     public async ValueTask<IEventSource> ReadAsync(CancellationToken cancellationToken)
     {
     {
-        return await _channel.Reader.ReadAsync(cancellationToken);
+        // 读取一条事件源
+        var eventSource = await _channel.Reader.ReadAsync(cancellationToken);
+        return eventSource;
+    }
+
+
+    /// <summary>
+    /// 释放非托管资源
+    /// </summary>
+    public async void Dispose()
+    {
+        await _eventConsumer.Stop();
+        GC.SuppressFinalize(this);
     }
     }
 }
 }

+ 9 - 1
Admin.NET/Admin.NET.Web.Core/Startup.cs

@@ -11,6 +11,7 @@ using Microsoft.AspNetCore.HttpOverrides;
 using Microsoft.Extensions.DependencyInjection;
 using Microsoft.Extensions.DependencyInjection;
 using Microsoft.Extensions.Hosting;
 using Microsoft.Extensions.Hosting;
 using Microsoft.Extensions.Logging;
 using Microsoft.Extensions.Logging;
+using NewLife.Caching;
 using Newtonsoft.Json;
 using Newtonsoft.Json;
 using Newtonsoft.Json.Serialization;
 using Newtonsoft.Json.Serialization;
 using OnceMi.AspNetCore.OSS;
 using OnceMi.AspNetCore.OSS;
@@ -29,7 +30,7 @@ public class Startup : AppStartup
         // 配置选项
         // 配置选项
         services.AddProjectOptions();
         services.AddProjectOptions();
         // 缓存注册
         // 缓存注册
-        services.AddCache();
+        var cache = services.AddCache();
         // SqlSugar
         // SqlSugar
         services.AddSqlSugar();
         services.AddSqlSugar();
         // JWT
         // JWT
@@ -117,6 +118,13 @@ public class Startup : AppStartup
             //    var redisClient = serviceProvider.GetService<ICache>();
             //    var redisClient = serviceProvider.GetService<ICache>();
             //    return new RedisEventSourceStorer(redisClient);
             //    return new RedisEventSourceStorer(redisClient);
             //});
             //});
+            options.AddSubscriber<AppEventSubscriber>();
+
+            // 创建默认内存通道事件源对象,可自定义队列路由key,比如这里是 eventbus
+            var redisEventSourceStorer = new RedisEventSourceStorer(cache, "eventbus", 3000);
+            
+            // 替换默认事件总线存储器
+            options.ReplaceStorerOrFallback(() => redisEventSourceStorer);
         });
         });
 
 
         // OSS对象存储(必须一个个赋值)
         // OSS对象存储(必须一个个赋值)