Просмотр исходного кода

!1349 修改RedisEventSourceStorer
Merge pull request !1349 from capad1946/fix/ConsumeChannelEventSource

zuohuaijun 1 год назад
Родитель
Сommit
dec7de61e3
1 измененных файлов с 6 добавлено и 7 удалено
  1. 6 7
      Admin.NET/Admin.NET.Core/EventBus/RedisEventSourceStorer.cs

+ 6 - 7
Admin.NET/Admin.NET.Core/EventBus/RedisEventSourceStorer.cs

@@ -1,4 +1,4 @@
-// Admin.NET 项目的版权、商标、专利和其他相关权利均受相应法律法规的保护。使用本项目应遵守相关法律法规和许可证的要求。
+// Admin.NET 项目的版权、商标、专利和其他相关权利均受相应法律法规的保护。使用本项目应遵守相关法律法规和许可证的要求。
 //
 // 本项目主要遵循 MIT 许可证和 Apache 许可证(版本 2.0)进行分发和使用。许可证位于源代码树根目录中的 LICENSE-MIT 和 LICENSE-APACHE 文件。
 //
@@ -77,19 +77,18 @@ public sealed class RedisEventSourceStorer : IEventSourceStorer, IDisposable
         {
             var oriColor = Console.ForegroundColor;
             ChannelEventSource ces = (ChannelEventSource)cr;
-            ConsumeChannelEventSource(ces);
+            ConsumeChannelEventSourceAsync(ces,ces.CancellationToken).GetAwaiter().GetResult();
         };
         _eventConsumer.Start();
     }
 
-    private Task OnConsumeBroadcast(string source, Message message, CancellationToken token)
+    private async Task OnConsumeBroadcast(string source, Message message, CancellationToken token)
     {
         ChannelEventSource ces = JsonConvert.DeserializeObject<ChannelEventSource>(source);
-        ConsumeChannelEventSource(ces);
-        return Task.CompletedTask;
+        await ConsumeChannelEventSourceAsync(ces,token);
     }
 
-    private void ConsumeChannelEventSource(ChannelEventSource ces)
+    private async Task ConsumeChannelEventSourceAsync(ChannelEventSource ces,CancellationToken cancel = default)
     {
         // 打印测试事件
         if (ces.EventId != null && ces.EventId.IndexOf(":Test") > 0)
@@ -99,7 +98,7 @@ public sealed class RedisEventSourceStorer : IEventSourceStorer, IDisposable
             Console.WriteLine($"有消息要处理{ces.EventId},{ces.Payload}");
             Console.ForegroundColor = oriColor;
         }
-        _channel.Writer.WriteAsync(ces);
+        await _channel.Writer.WriteAsync(ces,cancel);
     }
 
     /// <summary>