|
|
@@ -21,6 +21,8 @@ namespace Admin.NET.Core;
|
|
|
/// </remarks>
|
|
|
public sealed class RedisEventSourceStorer : IEventSourceStorer, IDisposable
|
|
|
{
|
|
|
+ private ILogger<RedisEventSourceStorer> _logger;
|
|
|
+
|
|
|
/// <summary>
|
|
|
/// 消费者
|
|
|
/// </summary>
|
|
|
@@ -48,6 +50,8 @@ public sealed class RedisEventSourceStorer : IEventSourceStorer, IDisposable
|
|
|
/// <param name="capacity">存储器最多能够处理多少消息,超过该容量进入等待写入</param>
|
|
|
public RedisEventSourceStorer(ICacheProvider cacheProvider, string routeKey, int capacity)
|
|
|
{
|
|
|
+ _logger = App.GetRequiredService<ILogger<RedisEventSourceStorer>>();
|
|
|
+
|
|
|
// 配置通道,设置超出默认容量后进入等待
|
|
|
var boundedChannelOptions = new BoundedChannelOptions(capacity)
|
|
|
{
|
|
|
@@ -73,11 +77,18 @@ public sealed class RedisEventSourceStorer : IEventSourceStorer, IDisposable
|
|
|
_eventConsumer = new EventConsumer<ChannelEventSource>(_queueSingle);
|
|
|
|
|
|
// 订阅消息写入 Channel
|
|
|
- _eventConsumer.Received += (send, cr) =>
|
|
|
+ _eventConsumer.Received += async (send, cr) =>
|
|
|
{
|
|
|
- var oriColor = Console.ForegroundColor;
|
|
|
- ChannelEventSource ces = (ChannelEventSource)cr;
|
|
|
- ConsumeChannelEventSourceAsync(ces,ces.CancellationToken).GetAwaiter().GetResult();
|
|
|
+ // var oriColor = Console.ForegroundColor;
|
|
|
+ try
|
|
|
+ {
|
|
|
+ ChannelEventSource ces = (ChannelEventSource)cr;
|
|
|
+ await ConsumeChannelEventSourceAsync(ces, ces.CancellationToken);
|
|
|
+ }
|
|
|
+ catch (Exception e)
|
|
|
+ {
|
|
|
+ _logger.LogError(e,"处理Received中的消息产生错误!");
|
|
|
+ }
|
|
|
};
|
|
|
_eventConsumer.Start();
|
|
|
}
|
|
|
@@ -116,6 +127,7 @@ public sealed class RedisEventSourceStorer : IEventSourceStorer, IDisposable
|
|
|
// 这里判断是否是 ChannelEventSource 或者 自定义的 EventSource
|
|
|
if (eventSource is ChannelEventSource source)
|
|
|
{
|
|
|
+
|
|
|
// 异步发布
|
|
|
await Task.Factory.StartNew(() =>
|
|
|
{
|