品RocketMQ源码,学习并发编程三大神器

2022-12-16 0 871

原副标题:品RocketMQ源代码,自学mammalian程式设计三大宝物

本栏是 RocketMQ 的死忠影迷,在写作源代码的操作过程中,自学到了许多程式设计基本功。

这首诗,本栏紧密结合 RocketMQ 源代码,撷取mammalian程式设计三大宝物的相关习题。

品RocketMQ源码,学习并发编程三大神器

1 CountDownLatch 同时实现互联网并行允诺

CountDownLatch 是两个并行辅助工具类,用以协同数个缓存间的并行,它能使两个缓存在等候除此之外许多缓存顺利完成各别组织工作后,再继续继续执行。

右图是 CountDownLatch 的核心理念方式:

品RocketMQ源码,学习并发编程三大神器

他们能指出它内建两个计时器,缺省调用计值。每每缓存继续执行 countDown 方式,计时器的值就会减一,当计时器的值为 0 时,则表示大部份的各项任务都继续执行顺利完成,接着在 CountDownLatch 上等候的缓存就能恢复正常继续执行接下去的各项任务。

总括,资料库有100余条统计数据须要处置,单缓存继续执行很慢,他们能将各项任务分成5个批号,缓存池依照每一批号继续执行,当5个批号总体继续执行顺利完成后,列印出各项任务继续执行的天数 。

longstart = System.currentTimeMillis;

ExecutorService executorService = Executors.newFixedThreadPool(10);

intbatchSize = 5;

CountDownLatch countDownLatch = newCountDownLatch(batchSize);

for( inti = 0; i < batchSize; i++) {

finalintbatchNumber = i;

executorService.execute( newRunnable {

@Override

publicvoidrun{

try{

doSomething(batchNumber);

} catch(Exception e) {

e.printStackTrace;

} finally{

countDownLatch.countDown;

}

}

});

}

countDownLatch.await;

System.out.println( “各项任务继续执行耗时:”+ (System.currentTimeMillis – start) +“毫秒”);

温习完 CountDownLatch 的习题,回到 RocketMQ 源代码。

本栏在没有接触互联网程式设计之前,一直很疑惑, 互联网并行允诺是如何同时实现的?

并行允诺指:客户端缓存发起调用后,须要在指定的超时天数内,等到响应结果,才能顺利完成本次调用如果超时天数内没有得到结果,那么会抛出超时异常。

RocketMQ 的并行发送消息接口见右图:

追踪源代码,真正发送允诺的方式是通讯模块的并行允诺方式 invokeSyncImpl

品RocketMQ源码,学习并发编程三大神器

总体流程:

发送消息缓存 Netty channel 对象调用 writeAndFlush 方式后 ,它的本质是通过 Netty 的读写缓存将统计数据包发送到内核 , 这个操作过程本身就是异步的;

ResponseFuture 类中内建两个 CountDownLatch 对象 ,responseFuture 对象调用 waitRepsone 方式,发送消息线程会阻塞 ;

品RocketMQ源码,学习并发编程三大神器

客户端收到响应命令后, 继续执行 processResponseCommand 方式,核心理念逻辑是继续执行 ResponseFuture 的 putResponse 方式。

品RocketMQ源码,学习并发编程三大神器

该方式的本质就是填充响应对象,并调用 countDownLatch 的 countDown 方式 , 这样发送消息缓存就不再阻塞。

CountDownLatch 同时实现互联网并行允诺是非常实用的基本功,在许多开源中间件里,比如 Metaq ,Xmemcached 都有类似的同时实现。

2 ReadWriteLock 名字服务路由管理

读写锁是一把锁分成两部分:读锁和写锁,其中读锁允许数个缓存同时获得,而写锁则是互斥锁。

它的规则是: 读读不互斥,读写互斥,写写互斥,适用于读多写少的业务场景。

他们一般都使用 ReentrantReadWriteLock ,该类同时实现了 ReadWriteLock 。ReadWriteLock 接口也很简单,其内部主要提供了两个方式,分别返回读锁和写锁 。

publicinterfaceReadWriteLock{

Lock readLock;

Lock writeLock;

}

读写锁的使用方式如下所示:

创建 ReentrantReadWriteLock 对象 , 当使用 ReadWriteLock 的时候,并不是直接使用,而是获得其内部的读锁和写锁,接着分别调用 lock / unlock 方式 ;

privateReadWriteLock readWriteLock = newReentrantReadWriteLock;

读取共享统计数据 ;

Lock readLock = readWriteLock.readLock;

readLock.lock;

try{

// TODO 查询共享统计数据

} finally{

readLock.unlock;

}

写入共享统计数据;

Lock writeLock = readWriteLock.writeLock;

writeLock.lock;

try{

// TODO 修改共享统计数据

} finally{

writeLock.unlock;

}

RocketMQ架构上主要分成四部分,如右图所示 :

品RocketMQ源码,学习并发编程三大神器

Producer :消息发布的角色,Producer 通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的操作过程支持快速失败并且低延迟。

Consumer :消息消费的角色,支持以 push 推,pull 拉两种模式对消息进行消费。

BrokerServer :Broker主要负责消息的存储、投递和查询以及服务高可用保证。

NameServer :名字服务是两个非常简单的 Topic 路由注册中心,其角色类似 Dubbo 中的zookeeper,支持Broker的动态注册与发现。

NameServer 是两个几乎无状态节点,可集群部署,节点间无任何信息并行。Broker 启动后会向大部份 NameServer 定期(每 30s)发送心跳包( 路由信息),NameServer 会定期扫描 Broker 存活列表,如果超过 120s 没有心跳则移除此 Broker 有关信息,代表下线。

那么 NameServer 如何保存路由信息呢?

路由信息通过几个 HashMap 来保存,当 Broker 向 Nameserver 发送心跳包(路由信息),Nameserver 须要对 HashMap 进行统计数据更新,但他们都知道 HashMap 并不是缓存安全的,高mammalian场景下,容易出现 CPU 100% 问题,所以更新 HashMap 时须要加锁,RocketMQ 使用了 JDK 的读写锁 ReentrantReadWriteLock 。

更新路由信息,操作写锁

品RocketMQ源码,学习并发编程三大神器

查询主题信息,操作读锁

品RocketMQ源码,学习并发编程三大神器

读写锁适用于读多写少的场景,比如名字服务,配置服务等。

3 CompletableFuture 异步消息处置

RocketMQ 主从架构中,主节点与从节点间统计数据并行/复制的方式有 并行双写异步复制两种模式。

异步复制是指消息在主节点落盘成功后就告诉客户端消息发送成功,无需等候消息从主节点复制到从节点,消息的复制由其他缓存顺利完成。

并行双写是指主节点将消息成功落盘后,须要等候从节点复制成功,再告诉客户端消息发送成功。

并行双写模式是阻塞的,本栏依照 RocketMQ 4.6.1 源代码,整理出主节点处置两个发送消息的允诺的时序图。

品RocketMQ源码,学习并发编程三大神器

总体流程:

生产者将消息发送到 Broker , Broker 接收到消息后,发送消息处置器 SendMessageProcessor 的继续执行缓存池 SendMessageExecutor缓存池来处置发送消息命令;

继续执行 ComitLog 的 putMessage 方式;

ComitLog 内部先继续执行 appendMessage 方式;

接着提交两个 GroupCommitRequest 到并行复制服务 HAService ,等候 HAService 通知 GroupCommitRequest 顺利完成;

返回写入结果并响应客户端 。

他们能看到: 发送消息的继续执行缓存须要等候消息复制从节点 , 并将消息返回给生产者才能开始处置下两个消息

RocketMQ 4.6.1 源代码中,继续执行缓存池的缓存数量是 1 ,假如缓存处置主从并行速度慢了,系统在这一瞬间无法处置新的发送消息允诺,造成 CPU 资源无法被充分利用 , 同时系统的吞吐量也会降低。

那么优化并行双写呢 ?

从 RocketMQ 4.7 开始,RocketMQ 引入了 CompletableFuture 同时实现了 异步消息处置

发送消息的继续执行缓存 不再等候消息复制到从节点后再处置新的允诺,而是 提前生成CompletableFuture 并返回 ;

HAService 中的缓存在复制成功后,调用 CompletableFuture 的 complete 方式,通知 remoting 模块响应客户端(缓存池:PutMessageExecutor ) 。

他们分析下 RocketMQ 4.9.4 核心理念代码:

Broker 接收到消息后,发送消息处置器 SendMessageProcessor 的继续执行缓存池SendMessageExecutor缓存池来处置发送消息命令;

调用 SendMessageProcessor 的 asyncProcessRequest 方式;

调用 Commitlog 的 aysncPutMessage 方式写入消息 ;

这段代码中,当 commitLog 继续执行完 appendMessage 后, 须要继续执行刷盘各项任务并行复制两个各项任务。

但这两个各项任务并不是并行继续执行,而是异步的方式。

复制缓存复制消息后,唤醒 future ;

组装响应命令 ,并将响应命令返回给客户端。

为了便于理解这一段消息发送处置操作过程的缓存模型,本栏在 RocketMQ 源代码中做了几处埋点,修改 Logback 的日志配置,发送一条普通的消息,观察服务端日志。

从日志中,他们能观察到:

发送消息的继续执行缓存(图中红色)在继续执行完创建刷盘 Future 和并行复制 future 后,并没有等候这两个各项任务继续执行顺利完成,而是在结束 asyncProcessRequest 方式后就能处置发送消息允诺了 ;

刷盘缓存和复制缓存继续执行完各别的各项任务后,唤醒 future,接着通过刷盘缓存组装存储结果,最后通过 PutMessageExecutor 缓存池(图中黄色)将响应命令返回给客户端。

本栏一直指出: 异步是更细粒度的使用系统资源的一种方式,在异步消息处置的操作过程中,通过 CompletableFuture 这个宝物,各个缓存各司其职,优雅且高效的提升了 RocketMQ 的性能。

你参与开源吗?

抽开源中国周边啦~

END

开发者必备的Firfox插件

这里有最新开源资讯、软件更新、技术干货等内容

点这里 ↓↓↓ 记得 关注✔ 标星⭐ 哦~

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务