|
@@ -11,7 +11,7 @@ using RabbitMQ.Client;
|
|
|
using RabbitMQ.Client.Events;
|
|
using RabbitMQ.Client.Events;
|
|
|
using System.Threading.Channels;
|
|
using System.Threading.Channels;
|
|
|
|
|
|
|
|
-namespace Admin.Core;
|
|
|
|
|
|
|
+namespace Admin.NET.Core;
|
|
|
|
|
|
|
|
/// <summary>
|
|
/// <summary>
|
|
|
/// RabbitMQ自定义事件源存储器
|
|
/// RabbitMQ自定义事件源存储器
|
|
@@ -74,7 +74,7 @@ public class RabbitMQEventSourceStore : IEventSourceStorer
|
|
|
// 读取原始消息
|
|
// 读取原始消息
|
|
|
var stringEventSource = Encoding.UTF8.GetString(ea.Body.ToArray());
|
|
var stringEventSource = Encoding.UTF8.GetString(ea.Body.ToArray());
|
|
|
|
|
|
|
|
- // 转换为 IEventSource,这里可以选择自己喜欢的序列化工具,如果自定义了 EventSource,注意属性是可读可写
|
|
|
|
|
|
|
+ // 转换为 IEventSource,如果自定义了 EventSource,注意属性是可读可写
|
|
|
var eventSource = JSON.Deserialize<ChannelEventSource>(stringEventSource);
|
|
var eventSource = JSON.Deserialize<ChannelEventSource>(stringEventSource);
|
|
|
|
|
|
|
|
// 写入内存管道存储器
|
|
// 写入内存管道存储器
|
|
@@ -84,7 +84,7 @@ public class RabbitMQEventSourceStore : IEventSourceStorer
|
|
|
_model.BasicAck(ea.DeliveryTag, false);
|
|
_model.BasicAck(ea.DeliveryTag, false);
|
|
|
};
|
|
};
|
|
|
|
|
|
|
|
- // 启动消费者 设置为手动应答消息
|
|
|
|
|
|
|
+ // 启动消费者且设置为手动应答消息
|
|
|
_model.BasicConsume(routeKey, false, consumer);
|
|
_model.BasicConsume(routeKey, false, consumer);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -96,24 +96,19 @@ public class RabbitMQEventSourceStore : IEventSourceStorer
|
|
|
/// <returns><see cref="ValueTask"/></returns>
|
|
/// <returns><see cref="ValueTask"/></returns>
|
|
|
public async ValueTask WriteAsync(IEventSource eventSource, CancellationToken cancellationToken)
|
|
public async ValueTask WriteAsync(IEventSource eventSource, CancellationToken cancellationToken)
|
|
|
{
|
|
{
|
|
|
- // 空检查
|
|
|
|
|
if (eventSource == default)
|
|
if (eventSource == default)
|
|
|
- {
|
|
|
|
|
throw new ArgumentNullException(nameof(eventSource));
|
|
throw new ArgumentNullException(nameof(eventSource));
|
|
|
- }
|
|
|
|
|
|
|
|
|
|
- // 这里判断是否是 ChannelEventSource 或者 自定义的 EventSource
|
|
|
|
|
|
|
+ // 判断是否是 ChannelEventSource 或自定义的 EventSource
|
|
|
if (eventSource is ChannelEventSource source)
|
|
if (eventSource is ChannelEventSource source)
|
|
|
{
|
|
{
|
|
|
- // 序列化,这里可以选择自己喜欢的序列化工具
|
|
|
|
|
|
|
+ // 序列化及发布
|
|
|
var data = Encoding.UTF8.GetBytes(JSON.Serialize(source));
|
|
var data = Encoding.UTF8.GetBytes(JSON.Serialize(source));
|
|
|
-
|
|
|
|
|
- // 发布
|
|
|
|
|
_model.BasicPublish("", _routeKey, null, data);
|
|
_model.BasicPublish("", _routeKey, null, data);
|
|
|
}
|
|
}
|
|
|
else
|
|
else
|
|
|
{
|
|
{
|
|
|
- // 这里处理动态订阅问题
|
|
|
|
|
|
|
+ // 处理动态订阅
|
|
|
await _channel.Writer.WriteAsync(eventSource, cancellationToken);
|
|
await _channel.Writer.WriteAsync(eventSource, cancellationToken);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -125,7 +120,6 @@ public class RabbitMQEventSourceStore : IEventSourceStorer
|
|
|
/// <returns>事件源对象</returns>
|
|
/// <returns>事件源对象</returns>
|
|
|
public async ValueTask<IEventSource> ReadAsync(CancellationToken cancellationToken)
|
|
public async ValueTask<IEventSource> ReadAsync(CancellationToken cancellationToken)
|
|
|
{
|
|
{
|
|
|
- // 读取一条事件源
|
|
|
|
|
var eventSource = await _channel.Reader.ReadAsync(cancellationToken);
|
|
var eventSource = await _channel.Reader.ReadAsync(cancellationToken);
|
|
|
return eventSource;
|
|
return eventSource;
|
|
|
}
|
|
}
|