事件总线
事件总线常常作为软件开发中解耦各个组件通讯的工具。在领域驱动设计的开发过程中,由于系统存在多个模块,为了避免各个模块的函数直接进行调用,使用事件总线作为媒介进行模块间的通讯,解耦模块之间的依赖。
在本章中我们将会学习如何编写一个事件总线框架,本文参考了很多 MASA Framework 框架中对于事件总线的设计。
使用事件总线
在编写事件总线框架之前,首先了解 Maomi.EventBus 的使用,其示例代码参考 Demo8.Console 项目。
创建一个项目,然后通过 nuget 引入 Maomi.EventBus 包。
这里我们来模拟用户注册的流程,模拟用户注册流程。假设用户提交信息后,系统的处理过程是检查验证码、将用户信息写到 Users 表中、初始化用户数据、发送电子邮箱。
首先是定义一个事件模型类,模型类必须继承 Event 抽象类或 IEvent 接口。
public record class MyEvent : Event
{
public string Name { get; set; }
public string EMail { get; set; }
public override string ToString()
{
return $"用户名: {Name} ,邮箱: {EMail}";
}
}
接着要编写事件执行器,在 Maomi.EventBus 中,事件执行器是一个方法,而不是一个类型,事件执行器方法命名没有约束,只要求方法参数包含 MyEvent
事件即可,执行器方法所在的类型不需要继承任何接口,只需要使用 EventAttribute
特性标记即可。
首先是检查用户的验证码是否正确:
[Event] // 标记此类型里有事件处理器
public class CheckImageCodeEventHandler
{
// 标记该方法是事件处理器,并设置执行顺序
[EventHandler(Order = 0)]
public void Check(MyEvent @event)
{
Console.WriteLine(@event.ToString());
}
}
当框架扫描到 CheckImageCodeEventHandler 带有 Event
标识时,会被自动注册到容器中,其生命周期为 scope。由于 CheckImageCodeEventHandler 被注册到容器中,所以可以在其构造函数中注入其它服务。
[EventHandler(Order = 0)]
定义触发事件后该执行器的执行顺序。每个事件都有多个执行器,它们之间是有顺序的,通过 EventHandlerAttribute.Order
属性进行排序。一个类型中可以有多个执行器方法,可以订阅同一个事件,也可以订阅不同的事件。每个执行器方法都有一个对应顺序的的撤销器器,由于回滚撤销之前的操作。
在编写验证码执行器方法后,我们继续完善用户注册过程,其完整代码示例如下:
[Event]
public class UserRegisterEventHandler
{
[EventHandler(Order = 1)]
public void InsertDb(MyEvent @event)
{
var state = new Random().Next(0, 2);
if (state == 0)
Console.WriteLine("√ 用户信息已添加到数据库");
else throw new Exception("× 写入用户信息到数据库失败");
}
[EventHandler(Order = 1, IsCancel = true)]
public void CancelInsertDb(MyEvent @event)
{
Console.WriteLine("注册失败,刷新验证码");
}
[EventHandler(Order = 2)]
public void InitUser(MyEvent @event)
{
var state = new Random().Next(0, 2);
if (state == 0)
Console.WriteLine("√ 初始化用户数据,系统生成默认用户权限、数据");
else throw new Exception("× 初始化用户数据失败");
}
[EventHandler(Order = 2, IsCancel = true)]
public void CancelInitUser(MyEvent @event)
{
Console.WriteLine("撤销用户注册信息");
}
[EventHandler(Order = 3)]
public void SendEmail(MyEvent @event)
{
var state = new Random().Next(0, 2);
if (state == 0)
Console.WriteLine("√ 发送验证邮件成功");
else throw new Exception("× 发送验证邮件失败");
}
[EventHandler(Order = 3, IsCancel = true)]
public void CancelSendEmail(MyEvent @event)
{
Console.WriteLine("× 撤销初始化用户数据");
}
}
可以看到,每个执行器方法都有其执行顺序,而且都配有一个撤销器,只需要标记 IsCancel = true
即可,执行器和撤销器也可以可以分开放到不同的类型中。如果执行到 SendEmail
时出现错误,框架会逐个执行撤销器,执行顺序为 CancelSendEmail
、CancelInitUser
、CancelInsertDb
。
我们还可以设置中间件拦截事件,记录事件日志以及判断是否可以执行该事件。
public class LoggingMiddleware<TEvent> : IEventMiddleware<TEvent> where TEvent : IEvent
{
public async Task HandleAsync(TEvent @event, EventHandlerDelegate next)
{
Console.WriteLine("----- Handling command {0} ({1})", @event.GetType().Name, @event.ToString());
await next();
}
}
可以通过中间件拦截器判断是否需要执行事件,如果继续执行事件,则使用 awat next();
。你也可以使用数据库事务包裹 await next();
,使得该事件的所有数据库操作都能在数据库回滚,减少手动编写撤销的代码量,以及增加可靠性,防止代码撤销过程中出现异常、程序崩溃导致的撤销失败。
使用事件总线是非常简单的,只需要注册 .AddEventBus()
服务即可,然后使用 IEventBus 服务发布事件。
static async Task Main()
{
var ioc = new ServiceCollection();
ioc.AddEventBus(middleware: typeof(LoggingMiddleware<>));
ioc.AddLogging(build => build.AddConsole());
var services = ioc.BuildServiceProvider();
var eventBus = services.GetRequiredService<IEventBus>();
await eventBus.PublishAsync(new MyEvent()
{
Name = "工良",
EMail = "工良@maomi.com"
});
}
事件总线的设计
Maomi.EventBus 项目文件组成如下:
// 事件模型接口
IEvent.cs
// 事件模型抽象实现
Event.cs
// 事件总线接口
IEventBus.cs
// 事件总线实现
EventBus.cs
// 扩展函数
EventBusExtensions.cs
// 特性标记
EventHandlerAttribute.cs
// 记录事件执行器信息
EventInfo.cs
// 事件中间件
IEventMiddleware.cs
// 表达式树构造器
InvokeBuilder.cs
接口抽象
首先是最简单的事件模型,每个事件都必须继承该接口,以便在日志中记录,可以在调试和排查问题时,快速定位。
// 事件接口,通过事件传递参数
public interface IEvent
{
// 事件唯一标识
Guid GetEventId();
void SetEventId(Guid eventId);
DateTime GetCreationTime();
void SetCreationTime(DateTime creationTime);
}
// 简化事件的实现,通过事件传递参数
public abstract record Event : IEvent
{
private Guid _eventId;
private DateTime _creationTime;
protected Event() : this(Guid.NewGuid(), DateTime.UtcNow) { }
protected Event(Guid eventId, DateTime creationTime)
{
_eventId = eventId;
_creationTime = creationTime;
}
public Guid GetEventId() => _eventId;
public void SetEventId(Guid eventId) => _eventId = eventId;
public DateTime GetCreationTime() => _creationTime;
public void SetCreationTime(DateTime creationTime) => _creationTime = creationTime;
}
通过事件模型定义了事件的基本格式,使用者可以通过继承 IEvent 或 Event 来扩展事件模型。
接着是两个执行器模型,分别标记类型和方法,EventAttribute 的作用仅仅是告诉程序,这个类里面有事件执行器。而 EventHandlerAttribute 标记了这个方法是事件执行器,需要绑定到哪个事件,以及其执行顺序。
// 标识类中有事件执行器
[AttributeUsage(AttributeTargets.Class, AllowMultiple = false, Inherited = false)]
public class EventAttribute : Attribute { }
// 标识方法是一个事件执行器
[AttributeUsage(AttributeTargets.Method, AllowMultiple = false, Inherited = false)]
public class EventHandlerAttribute : Attribute
{
// 事件排序
public int Order { get; set; } = 0;
// 是否为撤销事件
public bool IsCancel { get; set; } = false;
}
执行器封装
由于我们采用非接口继承的方法实现事件处理,执行器方法没有固定的名称和参数格式,比较灵活,所以在框架需要解决如何识别以及调用该方法。
执行器方法会有以下四种情况:
// 同步方法
void My(MyEvent data)
// 异步方法
Task My(MyEvent data)
// 同步可取消
void My(MyEvent data, CancellationToken cancellationToken)
// 异步可取消
Task My(MyEvent data, CancellationToken cancellationToken)
由于执行器方法有四种情况,当程序启动时,框架如何扫描识别并注册执行器?当触发事件之后,框架应该如何调用执行器?
首先,我们可以将这四种情况分为同步方法和异步方法,然后使用两种委托包装,使用 object target
表示事件类型,使用 params object?[] parameters
来表示参数。
// 定义函数格式,支持异步和非异步的执行器
internal delegate Task TaskInvokeDelegate(object target, params object?[] parameters);
internal delegate void VoidInvokeDelegate(object target, object?[] parameters);
两个 void My()
函数会被表达式树封装成 void VoidInvokeDelegate()
委托,而两个 Task My()
会被封装为 Task TaskInvokeDelegate
委托,虽然定义了两个委托,但是为了统一格式,最后还需要将 VoidInvokeDelegate 委托转换为 TaskInvokeDelegate,所以,最后框架只需要统一封装按照 Task TaskInvokeDelegate()
委托执行异步方法即可。
创建一个 InvokeBuilder.cs
类型,封装方法通过表达式树将四种执行器方法以及 VoidInvokeDelegate 委托转换为 TaskInvokeDelegate 委托。
比如以下方法:
public class CheckImageCodeEventHandler
{
public void Check(MyEvent @event)
{
}
}
使用表达式树构造为如下伪代码所示:
VoidInvokeDelegate invoke = (object target,object[]? parameters) =>
{
(CheckImageCodeEventHandler)target.Check(parameters);
}
TaskInvokeDelegate taskInvoke = Task (object target,object[]? parameters) =>
{
invoke.Invoke(target,paramters);
return Task.CompletedTask;
}
InvokeBuilder 完整代码如下所示:
internal static class InvokeBuilder
{
// 构造委托
public static TaskInvokeDelegate Build(MethodInfo methodInfo, Type targetType)
{
// 步骤一:构造 (object target, params object?[] parameters) 参数
// 构造执行器方法的参数
var targetParameter = Expression.Parameter(typeof(object), "target");
var parametersParameter = Expression.Parameter(typeof(object?[]), "parameters");
// 构建函数参数列表
var parameters = new List<Expression>();
var paramInfos = methodInfo.GetParameters();
for (var i = 0; i < paramInfos.Length; i++)
{
var paramInfo = paramInfos[i];
var valueObj = Expression.ArrayIndex(parametersParameter, Expression.Constant(i));
var valueCast = Expression.Convert(valueObj, paramInfo.ParameterType);
parameters.Add(valueCast);
}
// 构造函数调用
var instanceCast = Expression.Convert(targetParameter, targetType);
var methodCall = Expression.Call(instanceCast, methodInfo, parameters);
// 步骤二:转换为 TaskInvokeDelegate 形式
if (methodCall.Type == typeof(void))
{
var lambda = Expression.Lambda<VoidInvokeDelegate>(methodCall, targetParameter, parametersParameter);
var voidExecutor = lambda.Compile();
return delegate (object target, object?[] parameters)
{
voidExecutor(target, parameters);
return Task.CompletedTask;
};
}
else if (methodCall.Type == typeof(Task))
{
var castMethodCall = Expression.Convert(methodCall, typeof(Task));
var lambda = Expression.Lambda<TaskInvokeDelegate>(castMethodCall, targetParameter, parametersParameter);
return lambda.Compile();
}
else
{
throw new NotSupportedException($"The return type of the [{methodInfo.Name}] method must be Task or void");
}
}
}
封装调用链
将执行器方法封装成统一的委托结构后,接下来是将这些执行器和取消执行器的委托使用表达式树构造成调用链,按照 Order 的顺序执行。
示例:
MyEvent => InsertDb => InitUser => SendEmail
CancelInsertDb <= CancelInitUser <= CancelSendEmail
定义两个用于执行链委托:
// 定义事件委托,用于构建执行链
public delegate Task EventHandlerDelegate();
// 带依赖注入的事件委托,用于构建执行链
internal delegate Task ServiceEventHandlerDelegate(IServiceProvider provider, params object?[] parameters);
ServiceEventHandlerDelegate 用于构造调用链和撤销链,把执行事件的执行器执行顺序和撤销顺序包装起来。伪代码示例如下:
ServiceEventHandlerDelegate next = async (provider, @params) =>
{
try
{
A => B => C
}
catch
{
C => B => A
}
});
为了能够拦截事件,还需要定义一个事件中间件拦截器接口:
// 事件执行中间件,即执行事件时的拦截器
public interface IEventMiddleware<in TEvent>
where TEvent : IEvent
{
// @event: 事件
// next: 下一个要执行的函数
Task HandleAsync(TEvent @event, EventHandlerDelegate next);
}
使用 ServiceEventHandlerDelegate 包装好之后,我们只需要执行 ServiceEventHandlerDelegate 委托即可调用整个链路过程。
而 EventHandlerDelegate 的作用是包装 ServiceEventHandlerDelegate ,如果开发者要求使用中间件拦截事件,那么我们需要再包装一层传递给中间件拦截器。
if (Middleware != null)
{
var mid = ioc.GetRequiredService<IEventMiddleware<TEvent>>();
EventHandlerDelegate next = async () =>
{
await next(ioc, @event, cancellationToken);
};
await mid.HandleAsync(@event, next);
}
else
{
await next(ioc, @event, cancellationToken);
}
事件扫描和注册
接着,我们开始编写代码,来处理扫描和注册保存事件。扫描每一个执行器方法存储到内存中,并按照 Order 属性进行排序,同时,绑定 Cancel 撤销方法。
为了绑定一个执行器信息和撤销器,需要定义模型:
// 用来记录一个 Handler
internal class EventInfo
{
// 执行器所在的类
public Type DeclaringType { get; set; }
// 执行序号
public int Order { get; set; }
// 事件
public Type EventType { get; set; }
// 执行器方法
public MethodInfo MethodInfo { get; set; }
// 委托封装的执行器方法
public TaskInvokeDelegate TaskInvoke { get; set; }
// 撤销时执行
public bool IsCancel { get; set; }
// 撤销执行器对应的信息
public EventInfo? CancelInfo { get; set; }
public override int GetHashCode()
{
return MethodInfo.GetHashCode();
}
public override bool Equals(object? obj)
{
if (obj is not EventInfo info) return false;
return this.GetHashCode() == info.GetHashCode();
}
}
接下来是实现事件总线,定义事件总线接口:
// 事件总线服务
public interface IEventBus
{
Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default)
where TEvent : IEvent;
}
为了方便,笔者将 EventBus 类型拆开两部分来写,使用分部类拆分代码。
第一部分的 EventBus 中包含了静态方法和字段,缓存执行器信息以及存储委托调用链。
public partial class EventBus
{
#region static
// 拦截器
private static Type? Middleware;
// 缓存所有事件执行器
private static readonly Dictionary<Type, HashSet<EventInfo>> EventCache = new();
// 调用链缓存
private static readonly Dictionary<Type, ServiceEventHandlerDelegate> HandlerDelegateCache = new();
// 设置拦截器
public static void SetMiddleware(Type type)
{
Middleware = type;
}
// 给一个事件添加执行器
public static void AddEventHandler(
Type declaringType, // 执行器方法所在的类
int order,
Type eventType, // 绑定了哪个事件
MethodInfo method) // 执行器方法
{
if (!EventCache.TryGetValue(eventType, out var events))
{
events = new HashSet<EventInfo>();
EventCache[eventType] = events;
}
var info = new EventInfo
{
DeclaringType = declaringType,
EventType = eventType,
MethodInfo = method,
IsCancel = false,
Order = order,
// 封装方法为统一的格式
TaskInvoke = InvokeBuilder.Build(method, declaringType)
};
events.Add(info);
// 绑定对应的撤销器
var cancelInfo = events.FirstOrDefault(x => x.EventType == eventType && x.Order == order && x.IsCancel == true);
if (cancelInfo != null) info.CancelInfo = cancelInfo;
}
// 添加撤销事件执行器
public static void AddCancelEventHandler(Type declaringType, int order, Type eventType, MethodInfo method)
{
if (!EventCache.TryGetValue(eventType, out var events))
{
events = new HashSet<EventInfo>();
EventCache[eventType] = events;
}
var cancelInfo = new EventInfo
{
DeclaringType = declaringType,
EventType = eventType,
MethodInfo = method,
IsCancel = true,
Order = order,
TaskInvoke = InvokeBuilder.Build(method, declaringType)
};
events.Add(cancelInfo);
// 该撤销器绑定对应的执行器
var info = events.FirstOrDefault(x => x.EventType == eventType && x.Order == order && x.IsCancel == false);
if (info != null) info.CancelInfo = cancelInfo;
}
#endregion
}
这里面的两个方法很简单,将执行器方法存储到缓存中。
接着,为了构建函数调用链,已经执行失败回撤过程,需要实现一个核心的构建方法,将所有步骤封装到 ServiceEventHandlerDelegate 中。
// 构建事件执行链
private static ServiceEventHandlerDelegate BuildHandler<TEvent>() where TEvent : IEvent
{
if (HandlerDelegateCache.TryGetValue(typeof(TEvent), out var handler)) return handler;
ServiceEventHandlerDelegate next = async (provider, @params) =>
{
var eventData = @params.OfType<Event>().FirstOrDefault();
var cancel = @params.OfType<CancellationToken>().FirstOrDefault();
var logger = provider.GetRequiredService<ILogger<EventBus>>();
logger.LogDebug("开始执行事件: {0},{1}", typeof(TEvent).Name, @params[0]);
if (!EventCache.TryGetValue(typeof(TEvent), out var eventInfos)) return;
var infos = eventInfos.Where(x => x.IsCancel == false).OrderBy(x => x.Order).ToArray();
// 包装调用链和撤销链
for (int i = 0; i < infos.Length; i++)
{
var info = infos[i];
if (cancel.IsCancellationRequested)
{
logger.LogDebug("事件已被取消执行: {0},位置:{1}", typeof(TEvent).Name, info.MethodInfo.Name);
return;
}
logger.LogDebug("事件: {0},=> {1}", typeof(TEvent).Name, info.MethodInfo.Name);
// 构建执行链
var currentService = provider.GetRequiredService(info.DeclaringType);
try
{
await info.TaskInvoke(currentService, @params);
}
// 执行失败,开始回退
catch (Exception ex)
{
logger.LogError(ex, "执行事件失败: {0},执行器:{1},{2}", typeof(TEvent).Name, info.MethodInfo.Name, @params[0]);
for (int j = i; j >= 0; j--)
{
var backInfo = infos[j];
if (backInfo.CancelInfo is not null)
{
await backInfo.CancelInfo.TaskInvoke(currentService, @params);
}
}
return;
}
}
};
// 存到缓存
HandlerDelegateCache[typeof(TEvent)] = next;
return next;
}
接着,编写 EventBus 的实例方法,实现 IEventBus 发布事件的方法。
// 事件总线
public partial class EventBus : IEventBus
{
private readonly IServiceProvider _provider;
public EventBus(IServiceProvider serviceProvider)
{
_provider = serviceProvider;
}
// 发布事件
public async Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default)
where TEvent : IEvent
{
var handler = BuildHandler<TEvent>();
if (Middleware != null)
{
var mid = _provider.GetRequiredService<IEventMiddleware<TEvent>>();
EventHandlerDelegate next = async () =>
{
await handler(_provider, @event, cancellationToken);
};
await mid.HandleAsync(@event, next);
}
else
{
await handler(_provider, @event, cancellationToken);
}
}
}
最后为了在程序启动时扫描出事件执行器,注册到事件总线中,需要实现服务注册的扩展方法:
public static class EventBusExtensions
{
// 添加事件总线扩展
public static void AddEventBus(this IServiceCollection services, Type? middleware = null)
{
services.AddScoped<IEventBus, EventBus>();
if (middleware is not null)
{
EventBus.SetMiddleware(middleware);
services.TryAddEnumerable(new ServiceDescriptor(typeof(IEventMiddleware<>), middleware, lifetime: ServiceLifetime.Transient));
}
var assemblies = AppDomain.CurrentDomain.GetAssemblies();
foreach (var assembly in assemblies)
{
foreach (var type in assembly.GetTypes())
{
if (type.CustomAttributes.Any(x => x.AttributeType == typeof(EventAttribute)))
{
GetEventHandler(services, type);
}
}
}
}
// 扫描类中的执行器
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static void GetEventHandler(IServiceCollection services, Type type)
{
services.AddScoped(type);
var methods = type.GetMethods(BindingFlags.Public | BindingFlags.Instance);
foreach (var method in methods)
{
var attr = method.GetCustomAttribute<EventHandlerAttribute>();
if (attr == null) return;
var parameters = method.GetParameters();
if (parameters.Length == 0) throw new Exception($"{method.Name} 的定义不正确,至少包含一个参数");
var eventType = parameters[0].ParameterType;
if (!(eventType.IsSubclassOf(typeof(Event)) || eventType.GetInterface(typeof(IEvent).Name) != null))
throw new Exception($"{method.Name} 的定义不正确,第一个参数必须为事件");
if (!attr.IsCancel) EventBus.AddEventHandler(type, attr.Order, eventType, method);
else EventBus.AddCancelEventHandler(type, attr.Order, eventType, method);
}
}
}
使用事务处理事件
本节示例代码参考 Demo7.Tran 项目,该项目使用 EFCore 将数据存储到 Sqlite 数据库中。
public class MyContext : DbContext
{
public DbSet<AccountEntity> Account { get; set; }
public MyContext(DbContextOptions<MyContext> dbContext) : base(dbContext)
{
Database.EnsureCreated();
}
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder.UseSqlite(
@"filename=my.db");
}
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.Entity<AccountEntity>()
.Property(b => b.Id)
.ValueGeneratedOnAdd();
}
}
[PrimaryKey(nameof(Id))]
[Index(nameof(EMail), IsUnique = true)]
public class AccountEntity
{
public int Id { get; set; }
public string Name { get; set; }
public string EMail { get; set; }
public bool VerifyEMail { get; set; }
}
由于一个事件会有多个执行器方法,虽然事件执行失败后可以执行对应的方法回滚,可是当准备回滚时,程序崩溃了,那么就会导致数据不一致,为了解决这一问题,可以在事件中间件中加入数据库事务处理。
定义一个事件中间件,在中间件中打开和回滚事务。
public class TranMiddleware<TEvent> : IEventMiddleware<TEvent> where TEvent : IEvent
{
private readonly MyContext _context;
public TranMiddleware(MyContext context)
{
_context = context;
}
public async Task HandleAsync(TEvent @event, EventHandlerDelegate next)
{
Console.WriteLine("----- Handling command {0} ({1})", @event.GetType().Name, @event.ToString());
using var tran = await _context.Database.BeginTransactionAsync();
try
{
await next();
await tran.CommitAsync();
}
catch (Exception)
{
await tran.RollbackAsync();
throw;
}
}
}
编写相关事件,将数据插入到数据库中:
[Event]
public class UserRegisterEventHandler
{
private readonly MyContext _context;
public UserRegisterEventHandler(MyContext context)
{
_context = context;
}
[EventHandler(Order = 0)]
public async Task InsertDb(MyEvent @event)
{
var state = new Random().Next(0, 2);
if (state == 1)
{
await _context.Account.AddAsync(new AccountEntity
{
Name = @event.Name,
EMail = @event.EMail,
});
await _context.SaveChangesAsync();
Console.WriteLine("√ 用户信息已添加到数据库");
}
else throw new Exception("× 写入用户信息到数据库失败");
}
}
在进行依赖注入时,注入该中间件:
static async Task Main(string[] args)
{
var ioc = new ServiceCollection();
ioc.AddDbContext<MyContext>();
ioc.AddLogging(build => build.AddConsole());
ioc.AddEventBus(typeof(TranMiddleware<>));
var services = ioc.BuildServiceProvider();
var eventBus = services.GetRequiredService<IEventBus>();
await eventBus.PublishAsync(new MyEvent()
{
Name = "工良",
EMail = "工良@maomi.com"
});
}