Bladeren bron

组EventBus加上广播消息的功能

yzp 1 jaar geleden
bovenliggende
commit
db8056e1fb

+ 1 - 1
Admin.NET/Admin.NET.Application/Configuration/Database.json

@@ -1,4 +1,4 @@
-{
+{
   "$schema": "https://gitee.com/dotnetchina/Furion/raw/v4/schemas/v4/furion-schema.json",
   "$schema": "https://gitee.com/dotnetchina/Furion/raw/v4/schemas/v4/furion-schema.json",
 
 
   // 详细数据库配置见SqlSugar官网(第一个为默认库),极力推荐 PostgreSQL 数据库
   // 详细数据库配置见SqlSugar官网(第一个为默认库),极力推荐 PostgreSQL 数据库

+ 58 - 25
Admin.NET/Admin.NET.Core/EventBus/RedisEventSourceStorer.cs

@@ -4,6 +4,8 @@
 //
 //
 // 不得利用本项目从事危害国家安全、扰乱社会秩序、侵犯他人合法权益等法律法规禁止的活动!任何基于本项目二次开发而产生的一切法律纠纷和责任,我们不承担任何责任!
 // 不得利用本项目从事危害国家安全、扰乱社会秩序、侵犯他人合法权益等法律法规禁止的活动!任何基于本项目二次开发而产生的一切法律纠纷和责任,我们不承担任何责任!
 
 
+using NewLife.Caching.Queues;
+using Newtonsoft.Json;
 using System.Threading.Channels;
 using System.Threading.Channels;
 
 
 namespace Admin.NET.Core;
 namespace Admin.NET.Core;
@@ -11,6 +13,12 @@ namespace Admin.NET.Core;
 /// <summary>
 /// <summary>
 /// Redis自定义事件源存储器
 /// Redis自定义事件源存储器
 /// </summary>
 /// </summary>
+/// <remarks>
+/// 在集群部署时,一般每一个消息只由一个服务节点消费一次。
+/// 有些特殊情情要通知到服务器群中的每一个节点(比如需要强制加载某些配置、重点服务等),
+/// 在这种情况下就要以“broadcast:”开头来定义EventId,
+/// 本系统会把“broadcast:”开头的事件视为“广播消息”保证集群中的每一个服务节点都能消费得到这个消息
+/// </remarks>
 public sealed class RedisEventSourceStorer : IEventSourceStorer, IDisposable
 public sealed class RedisEventSourceStorer : IEventSourceStorer, IDisposable
 {
 {
     /// <summary>
     /// <summary>
@@ -23,12 +31,11 @@ public sealed class RedisEventSourceStorer : IEventSourceStorer, IDisposable
     /// </summary>
     /// </summary>
     private readonly Channel<IEventSource> _channel;
     private readonly Channel<IEventSource> _channel;
 
 
-    ///// <summary>
-    ///// Redis 连接对象
-    ///// </summary>
-    //private readonly FullRedis _redis;
-    private IProducerConsumer<ChannelEventSource> _queue;
 
 
+    private IProducerConsumer<ChannelEventSource> _queueSingle;
+
+    private RedisStream<string> _queueBroadcast;
+
     /// <summary>
     /// <summary>
     /// 路由键
     /// 路由键
     /// </summary>
     /// </summary>
@@ -54,24 +61,48 @@ public sealed class RedisEventSourceStorer : IEventSourceStorer, IDisposable
         //_redis = redis as FullRedis;
         //_redis = redis as FullRedis;
         _routeKey = routeKey;
         _routeKey = routeKey;
 
 
-        // 创建消息订阅者
-        _queue = cacheProvider.GetQueue<ChannelEventSource>(routeKey);
-        _eventConsumer = new EventConsumer<ChannelEventSource>(_queue);
-
-        // 订阅消息写入 Channel
+        // 创建广播消息订阅者,即所有服务器节点都能收到消息(用来发布重启、Reload配置等消息)
+        FullRedis redis = (FullRedis)cacheProvider.Cache;
+        var clusterOpt = App.GetConfig<ClusterOptions>("Cluster", true);
+        _queueBroadcast = redis.GetStream<string>(routeKey + ":broadcast");
+        _queueBroadcast.Group = clusterOpt.ServerId;//根据服务器标识分配到不同的分组里
+        _queueBroadcast.Expire = TimeSpan.FromSeconds(10);//消息10秒过期()
+        _queueBroadcast.ConsumeAsync(OnConsumeBroadcast);
+
+        // 创建队列消息订阅者,只要有一个服务节点消费了消息即可
+        _queueSingle = redis.GetQueue<ChannelEventSource>(routeKey + ":single");
+        _eventConsumer = new EventConsumer<ChannelEventSource>(_queueSingle);
+
+        // 订阅消息写入 Channel
         _eventConsumer.Received += (send, cr) =>
         _eventConsumer.Received += (send, cr) =>
         {
         {
-            // 反序列化消息
-            //var eventSource = JsonConvert.DeserializeObject<ChannelEventSource>(cr);
-
-            // 写入内存管道存储器
-            _channel.Writer.WriteAsync(cr);
+            var oriColor = Console.ForegroundColor;
+            ChannelEventSource ces = (ChannelEventSource)cr;
+            ConsumeChannelEventSource(ces);
         };
         };
-
-        // 启动消费者
         _eventConsumer.Start();
         _eventConsumer.Start();
     }
     }
 
 
+    private Task OnConsumeBroadcast(string source, Message message, CancellationToken token)
+    {
+        ChannelEventSource ces = JsonConvert.DeserializeObject<ChannelEventSource>(source);
+        ConsumeChannelEventSource(ces);
+        return Task.CompletedTask;
+    }
+
+    private void ConsumeChannelEventSource(ChannelEventSource ces)
+    {
+        //一些测试的事件就输出一下
+        if (ces.EventId != null && ces.EventId.IndexOf(":Test") > 0)
+        {
+            var oriColor = Console.ForegroundColor;
+            Console.ForegroundColor = ConsoleColor.Green;
+            Console.WriteLine($"有消息要处理{ces.EventId},{ces.Payload}");
+            Console.ForegroundColor = oriColor;
+        }
+        _channel.Writer.WriteAsync(ces);
+    }
+
     /// <summary>
     /// <summary>
     /// 将事件源写入存储器
     /// 将事件源写入存储器
     /// </summary>
     /// </summary>
@@ -89,18 +120,20 @@ public sealed class RedisEventSourceStorer : IEventSourceStorer, IDisposable
         // 这里判断是否是 ChannelEventSource 或者 自定义的 EventSource
         // 这里判断是否是 ChannelEventSource 或者 自定义的 EventSource
         if (eventSource is ChannelEventSource source)
         if (eventSource is ChannelEventSource source)
         {
         {
-            // 序列化消息
-            //var data = JsonSerializer.Serialize(source);
-
-            //// 获取一个订阅对象
-            //var queue = _redis.GetQueue<ChannelEventSource>(_routeKey);
-
             // 异步发布
             // 异步发布
             await Task.Factory.StartNew(() =>
             await Task.Factory.StartNew(() =>
             {
             {
-                //queue.Add(source);
-                _queue.Add(source);
+                if (source.EventId != null && source.EventId.StartsWith("broadcast:"))
+                {
+                    string str = JsonConvert.SerializeObject(source);
+                    _queueBroadcast.Add(str);
+                }
+                else
+                {
+                    _queueSingle.Add(source);
+                }
             }, cancellationToken, TaskCreationOptions.LongRunning, System.Threading.Tasks.TaskScheduler.Default);
             }, cancellationToken, TaskCreationOptions.LongRunning, System.Threading.Tasks.TaskScheduler.Default);
+
         }
         }
         else
         else
         {
         {