EasyBus是一个简单消息框架,对第三方的消息组件进行一定抽象,目前支持Msmq和RabbitMq。EasyBus本身自带软负载功能,并且能对消息的发布情况和消息的处理情况进行跟踪,在打开消息跟踪功能后,还可以实现消息发布端和消息处理的幂等性。目前第三方的消息组件功能不一样,EasyBus实现了消息中常用的发布订阅,负载,幂等性,分布式部署(动态增加负载节点),配置变更通知等功能,因为只使用第三方消息组件的发送和接收功能,所以可以实现快速切换第三方消息组件,效率上在不跟踪消息的情况下,效率等同于第三方组件的效率,在使用消息跟踪后效率会降低,目前跟踪消息实现是依赖的Mongodb。
※ 目前项目没在生产环境使用,还在完善开发阶段。
1.消息的发布订阅
EasyBus对消息的发布订阅可以使用标记(Attribute)和配置文件两种方式来配置消息的发布和订阅。
(1) 标记方式(Attribute)
发布端配置如图:
发布的消息类要标记Publish标记。
RouteKey:是路由关键字,发布订阅都依靠路由关键字实现。
Description:是发布消息的描述。
总线在启动时,会把发布信息上传到RegisterCenter(目前由ZooKeeper实现)。存储结构如图:
EasyBus为配置树的根节点,发布信息在PublishInfo节点下,发布节点的存储路径结构为:
/EasyBus/PublishInfo/{PublishBusName}/{RouteKey}/{NodeName}
PublishBusName:发布端总线名称
RouteKey:发布消息的路由关键字
NodeName:发布端总线的负载节点名称
订阅端配置如图:
接收端的消息标记上Subscribe标记。
BusName:发布端的总线名称
RouteKey:要订阅的消息的路由关键字
Name:该订阅的唯一标识
Mq:接收消息的队列地址。
如果要对消息做处理,还可以直接使用Listener标记。
SubscribeName:要处理哪个订阅的消息。
ProcessThreadCount:处理消息的线程数(不标记默认为1)
HandleName:处理器名称,多个使用分号分隔(处理器配置在IOC容器中,需要用Key标识,目前框架使用CommonServiceLocator来做IOC的适配)
Subscribe和Listener的信息和Publish一样,在总线启动时上传到RegisterCenter。如图:
Listener的信息在ListenerInfo节点下,Subscribe的信息在SubscribeInfo节点下。
Subscribe的存储路径结构:
/EasyBus/SubscribeInfo/{PublishBusName}/{RouteKey}/{SubscribeBusName}-{NodeName}-{SubscribeName}
PublishBusName:发布端总线名称
RouteKey:消息路由关键字
SubscribeBusName:订阅端总线名称
NodeName:订阅端总线的负载节点名称
SubscribeName:订阅的唯一标识
Listener的存储路径结构:
/EasyBus/ListenerInfo/{SubscribeBusName}/{NodeName}/{SubscribeName}-Listener
具体节点解释和Subscribe里出现的节点解释一样。
(2) 配置文件配置方式,如图:
<esaybus-config>为配置的根节点,<bus>标识一个总线的配置。
<endpoint-setting>节点用来配置终结点信息,订阅配置节(<subscribe>)用到的终结点用名字来引用。
name:终结点名称
value:终结点字串(注:图中的Msmq和RabbitMq是终结点名称,只是标识名字)
<publish-setting>节点用来配置发布信息,对应的是PublishAttribute
routeKey:路由关键字
description:发布的信息描述
<subscribe-setting>节点用来配置订阅信息,对应的是SubscribeAttribute
name:订阅的唯一标识名称
busName:发布端总线名称
routeKey:要订阅的路由关键字
endpointName:接收消息的终结点名称
<listener>配置节用来配置监听处理信息,对应ListenerAttribute
processThreadCount:监听的线程数
<handle>用来配置处理器的名称,处理请会从CommonServiceLocator中获取。
2.分布式部署和负载均衡
IBus接口有两个属性用来支持EasyBus的分布式部署和负载均衡。
Name:用来标识总线的名称。
NodeName:用来标识总线的负载节点名称。
例如:
A:{ Name = "BusA", NodeName = "NodeA" }
B:{ Name = "BusA", NodeName = "NodeB" }
如果A和B分别在两个不同的进程中部署,这样就认为总线BusA,被负载到两个实例中(NodeA和NodeB),如果此时BusA订阅了BusB的一个RouteKey为"OrderMessage"的消息,在BusB发布这个消息时,会随机选择NodeA和NodeB中的一个发布消息。此时如果NodeB因为服务器当机,ZooKeeper会通知BusB,BusB接收到通知会把NodeB从发布列表去除,待NodeB恢复后,ZooKeeper同样会通知BusB,BusB接收通知后会把NodeB从新加入到发布列表中
3.消息跟踪和发布处理幂等性
当在调用EasyBus的Publish方法发布消息时,如果使用Message对象,并把IsRecordingHistory属性设置成true,那这个消息在发布过程中EasyBus会全程跟踪消息的发布和处理过程。
(1) 消息日志
在消息发布前首先上传消息信息到HistoryCenter(目前使用Mongodb实现)MessageLog的表(Collection)中,如图:
上图是一条消息的记录,记录了消息的RouteKey,MessageId(消息的唯一标识),Json为序列化后的消息,PublishServerIp(发布端服务器地址),PublishBusName(发布端总线名称),PublishBusNodeName(负载节点名称)。
(2) 消息发布日志
在消息完成发布后,发布到各个订阅端的记录也会上传到HistoryCenter的MessagePublishLog表中,如图:
上图是发布记录中的一条,MessageId消息唯一标识,Mq订阅端的队列地址,SubscribeName订阅名称,SubscribeBusName订阅端总线名称,SubscribeBusNodeName订阅端总线负载节点名称,IsSuccess是否成功,Error异常信息,PublishTime发布时间。
在上图中,总线:EasyBus.Subscribe假设有两个节点ServerA和ServerB,那对于一个订阅名称则随机发送一个节点,实现软负载
(3) 消息处理日志
在消息到达订阅端,订阅端并且设置了监听的话,在处理完消息后,会把处理结果的日志上传到HistoryCenter的MessageProcessLog表中,如图