原副标题:品RocketMQ源代码,自学mammalian程式设计三大宝物
本栏是 RocketMQ 的死忠影迷,在写作源代码的操作过程中,自学到了许多程式设计基本功。
这首诗,本栏紧密结合 RocketMQ 源代码,撷取mammalian程式设计三大宝物的相关习题。
1 CountDownLatch 同时实现互联网并行允诺
CountDownLatch 是两个并行辅助工具类,用以协同数个缓存间的并行,它能使两个缓存在等候除此之外许多缓存顺利完成各别组织工作后,再继续继续执行。
右图是 CountDownLatch 的核心理念方式:
他们能指出它内建两个计时器,缺省调用计值。每每缓存继续执行 countDown 方式,计时器的值就会减一,当计时器的值为 0 时,则表示大部份的各项任务都继续执行顺利完成,接着在 CountDownLatch 上等候的缓存就能恢复正常继续执行接下去的各项任务。
总括,资料库有100余条统计数据须要处置,单缓存继续执行很慢,他们能将各项任务分成5个批号,缓存池依照每一批号继续执行,当5个批号总体继续执行顺利完成后,列印出各项任务继续执行的天数 。
longstart = System.currentTimeMillis;
ExecutorService executorService = Executors.newFixedThreadPool(10);
intbatchSize = 5;
CountDownLatch countDownLatch = newCountDownLatch(batchSize);
for( inti = 0; i < batchSize; i++) {
finalintbatchNumber = i;
executorService.execute( newRunnable {
@Override
publicvoidrun{
try{
doSomething(batchNumber);
} catch(Exception e) {
e.printStackTrace;
} finally{
countDownLatch.countDown;
}
}
});
}
countDownLatch.await;
System.out.println( “各项任务继续执行耗时:”+ (System.currentTimeMillis – start) +“毫秒”);
温习完 CountDownLatch 的习题,回到 RocketMQ 源代码。
本栏在没有接触互联网程式设计之前,一直很疑惑, 互联网并行允诺是如何同时实现的?
并行允诺指:客户端缓存发起调用后,须要在指定的超时天数内,等到响应结果,才能顺利完成本次调用。 如果超时天数内没有得到结果,那么会抛出超时异常。
RocketMQ 的并行发送消息接口见右图:
追踪源代码,真正发送允诺的方式是通讯模块的并行允诺方式 invokeSyncImpl。
总体流程:
发送消息缓存 Netty channel 对象调用 writeAndFlush 方式后 ,它的本质是通过 Netty 的读写缓存将统计数据包发送到内核 , 这个操作过程本身就是异步的;
ResponseFuture 类中内建两个 CountDownLatch 对象 ,responseFuture 对象调用 waitRepsone 方式,发送消息线程会阻塞 ;
客户端收到响应命令后, 继续执行 processResponseCommand 方式,核心理念逻辑是继续执行 ResponseFuture 的 putResponse 方式。
该方式的本质就是填充响应对象,并调用 countDownLatch 的 countDown 方式 , 这样发送消息缓存就不再阻塞。
CountDownLatch 同时实现互联网并行允诺是非常实用的基本功,在许多开源中间件里,比如 Metaq ,Xmemcached 都有类似的同时实现。
2 ReadWriteLock 名字服务路由管理读写锁是一把锁分成两部分:读锁和写锁,其中读锁允许数个缓存同时获得,而写锁则是互斥锁。
它的规则是: 读读不互斥,读写互斥,写写互斥,适用于读多写少的业务场景。
他们一般都使用 ReentrantReadWriteLock ,该类同时实现了 ReadWriteLock 。ReadWriteLock 接口也很简单,其内部主要提供了两个方式,分别返回读锁和写锁 。
publicinterfaceReadWriteLock{
Lock readLock;
Lock writeLock;
}
读写锁的使用方式如下所示:
创建 ReentrantReadWriteLock 对象 , 当使用 ReadWriteLock 的时候,并不是直接使用,而是获得其内部的读锁和写锁,接着分别调用 lock / unlock 方式 ;
privateReadWriteLock readWriteLock = newReentrantReadWriteLock;读取共享统计数据 ;
Lock readLock = readWriteLock.readLock;readLock.lock;
try{
// TODO 查询共享统计数据
} finally{
readLock.unlock;
}
写入共享统计数据;
Lock writeLock = readWriteLock.writeLock;writeLock.lock;
try{
// TODO 修改共享统计数据
} finally{
writeLock.unlock;
}
RocketMQ架构上主要分成四部分,如右图所示 :
Producer :消息发布的角色,Producer 通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的操作过程支持快速失败并且低延迟。
Consumer :消息消费的角色,支持以 push 推,pull 拉两种模式对消息进行消费。
BrokerServer :Broker主要负责消息的存储、投递和查询以及服务高可用保证。
NameServer :名字服务是两个非常简单的 Topic 路由注册中心,其角色类似 Dubbo 中的zookeeper,支持Broker的动态注册与发现。
NameServer 是两个几乎无状态节点,可集群部署,节点间无任何信息并行。Broker 启动后会向大部份 NameServer 定期(每 30s)发送心跳包( 路由信息),NameServer 会定期扫描 Broker 存活列表,如果超过 120s 没有心跳则移除此 Broker 有关信息,代表下线。
那么 NameServer 如何保存路由信息呢?
路由信息通过几个 HashMap 来保存,当 Broker 向 Nameserver 发送心跳包(路由信息),Nameserver 须要对 HashMap 进行统计数据更新,但他们都知道 HashMap 并不是缓存安全的,高mammalian场景下,容易出现 CPU 100% 问题,所以更新 HashMap 时须要加锁,RocketMQ 使用了 JDK 的读写锁 ReentrantReadWriteLock 。
更新路由信息,操作写锁
查询主题信息,操作读锁
读写锁适用于读多写少的场景,比如名字服务,配置服务等。
3 CompletableFuture 异步消息处置RocketMQ 主从架构中,主节点与从节点间统计数据并行/复制的方式有 并行双写和 异步复制两种模式。
异步复制是指消息在主节点落盘成功后就告诉客户端消息发送成功,无需等候消息从主节点复制到从节点,消息的复制由其他缓存顺利完成。
并行双写是指主节点将消息成功落盘后,须要等候从节点复制成功,再告诉客户端消息发送成功。
并行双写模式是阻塞的,本栏依照 RocketMQ 4.6.1 源代码,整理出主节点处置两个发送消息的允诺的时序图。
总体流程:
生产者将消息发送到 Broker , Broker 接收到消息后,发送消息处置器 SendMessageProcessor 的继续执行缓存池 SendMessageExecutor缓存池来处置发送消息命令;
继续执行 ComitLog 的 putMessage 方式;
ComitLog 内部先继续执行 appendMessage 方式;
接着提交两个 GroupCommitRequest 到并行复制服务 HAService ,等候 HAService 通知 GroupCommitRequest 顺利完成;
返回写入结果并响应客户端 。
他们能看到: 发送消息的继续执行缓存须要等候消息复制从节点 , 并将消息返回给生产者才能开始处置下两个消息。
RocketMQ 4.6.1 源代码中,继续执行缓存池的缓存数量是 1 ,假如缓存处置主从并行速度慢了,系统在这一瞬间无法处置新的发送消息允诺,造成 CPU 资源无法被充分利用 , 同时系统的吞吐量也会降低。
那么优化并行双写呢 ?
从 RocketMQ 4.7 开始,RocketMQ 引入了 CompletableFuture 同时实现了 异步消息处置。
发送消息的继续执行缓存 不再等候消息复制到从节点后再处置新的允诺,而是 提前生成CompletableFuture 并返回 ;
HAService 中的缓存在复制成功后,调用 CompletableFuture 的 complete 方式,通知 remoting 模块响应客户端(缓存池:PutMessageExecutor ) 。
他们分析下 RocketMQ 4.9.4 核心理念代码:
Broker 接收到消息后,发送消息处置器 SendMessageProcessor 的继续执行缓存池SendMessageExecutor缓存池来处置发送消息命令;
调用 SendMessageProcessor 的 asyncProcessRequest 方式;
调用 Commitlog 的 aysncPutMessage 方式写入消息 ;
这段代码中,当 commitLog 继续执行完 appendMessage 后, 须要继续执行刷盘各项任务和 并行复制两个各项任务。
但这两个各项任务并不是并行继续执行,而是异步的方式。
复制缓存复制消息后,唤醒 future ;
组装响应命令 ,并将响应命令返回给客户端。
为了便于理解这一段消息发送处置操作过程的缓存模型,本栏在 RocketMQ 源代码中做了几处埋点,修改 Logback 的日志配置,发送一条普通的消息,观察服务端日志。
从日志中,他们能观察到:
发送消息的继续执行缓存(图中红色)在继续执行完创建刷盘 Future 和并行复制 future 后,并没有等候这两个各项任务继续执行顺利完成,而是在结束 asyncProcessRequest 方式后就能处置发送消息允诺了 ;
刷盘缓存和复制缓存继续执行完各别的各项任务后,唤醒 future,接着通过刷盘缓存组装存储结果,最后通过 PutMessageExecutor 缓存池(图中黄色)将响应命令返回给客户端。
本栏一直指出: 异步是更细粒度的使用系统资源的一种方式,在异步消息处置的操作过程中,通过 CompletableFuture 这个宝物,各个缓存各司其职,优雅且高效的提升了 RocketMQ 的性能。
你参与开源吗?
抽开源中国周边啦~
END
开发者必备的Firfox插件
这里有最新开源资讯、软件更新、技术干货等内容
点这里 ↓↓↓ 记得 关注✔ 标星⭐ 哦~









