|
|
@@ -21,8 +21,6 @@ namespace Admin.NET.Core;
|
|
|
/// </remarks>
|
|
|
public sealed class RedisEventSourceStorer : IEventSourceStorer, IDisposable
|
|
|
{
|
|
|
- private ILogger<RedisEventSourceStorer> _logger;
|
|
|
-
|
|
|
/// <summary>
|
|
|
/// 消费者
|
|
|
/// </summary>
|
|
|
@@ -37,10 +35,7 @@ public sealed class RedisEventSourceStorer : IEventSourceStorer, IDisposable
|
|
|
|
|
|
private RedisStream<string> _queueBroadcast;
|
|
|
|
|
|
- /// <summary>
|
|
|
- /// 路由键
|
|
|
- /// </summary>
|
|
|
- private readonly string _routeKey;
|
|
|
+ private ILogger<RedisEventSourceStorer> _logger;
|
|
|
|
|
|
/// <summary>
|
|
|
/// 构造函数
|
|
|
@@ -51,7 +46,7 @@ public sealed class RedisEventSourceStorer : IEventSourceStorer, IDisposable
|
|
|
public RedisEventSourceStorer(ICacheProvider cacheProvider, string routeKey, int capacity)
|
|
|
{
|
|
|
_logger = App.GetRequiredService<ILogger<RedisEventSourceStorer>>();
|
|
|
-
|
|
|
+
|
|
|
// 配置通道,设置超出默认容量后进入等待
|
|
|
var boundedChannelOptions = new BoundedChannelOptions(capacity)
|
|
|
{
|
|
|
@@ -62,7 +57,6 @@ public sealed class RedisEventSourceStorer : IEventSourceStorer, IDisposable
|
|
|
_channel = Channel.CreateBounded<IEventSource>(boundedChannelOptions);
|
|
|
|
|
|
//_redis = redis as FullRedis;
|
|
|
- _routeKey = routeKey;
|
|
|
|
|
|
// 创建广播消息订阅者,即所有服务器节点都能收到消息(用来发布重启、Reload配置等消息)
|
|
|
FullRedis redis = (FullRedis)cacheProvider.Cache;
|
|
|
@@ -87,7 +81,7 @@ public sealed class RedisEventSourceStorer : IEventSourceStorer, IDisposable
|
|
|
}
|
|
|
catch (Exception e)
|
|
|
{
|
|
|
- _logger.LogError(e,"处理Received中的消息产生错误!");
|
|
|
+ _logger.LogError(e, "处理Received中的消息产生错误!");
|
|
|
}
|
|
|
};
|
|
|
_eventConsumer.Start();
|
|
|
@@ -96,10 +90,10 @@ public sealed class RedisEventSourceStorer : IEventSourceStorer, IDisposable
|
|
|
private async Task OnConsumeBroadcast(string source, Message message, CancellationToken token)
|
|
|
{
|
|
|
ChannelEventSource ces = JsonConvert.DeserializeObject<ChannelEventSource>(source);
|
|
|
- await ConsumeChannelEventSourceAsync(ces,token);
|
|
|
+ await ConsumeChannelEventSourceAsync(ces, token);
|
|
|
}
|
|
|
|
|
|
- private async Task ConsumeChannelEventSourceAsync(ChannelEventSource ces,CancellationToken cancel = default)
|
|
|
+ private async Task ConsumeChannelEventSourceAsync(ChannelEventSource ces, CancellationToken cancel = default)
|
|
|
{
|
|
|
// 打印测试事件
|
|
|
if (ces.EventId != null && ces.EventId.IndexOf(":Test") > 0)
|
|
|
@@ -109,7 +103,7 @@ public sealed class RedisEventSourceStorer : IEventSourceStorer, IDisposable
|
|
|
Console.WriteLine($"有消息要处理{ces.EventId},{ces.Payload}");
|
|
|
Console.ForegroundColor = oriColor;
|
|
|
}
|
|
|
- await _channel.Writer.WriteAsync(ces,cancel);
|
|
|
+ await _channel.Writer.WriteAsync(ces, cancel);
|
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
|
@@ -127,7 +121,6 @@ public sealed class RedisEventSourceStorer : IEventSourceStorer, IDisposable
|
|
|
// 这里判断是否是 ChannelEventSource 或者 自定义的 EventSource
|
|
|
if (eventSource is ChannelEventSource source)
|
|
|
{
|
|
|
-
|
|
|
// 异步发布
|
|
|
await Task.Factory.StartNew(() =>
|
|
|
{
|