#头条新闻音乐创作邀请赛#
上一则:RocketMQ源代码预测之核心理念硬盘计算机程序CommitLog
一、序言
后面如是说了RocketMQ的CommitLog文档有关的类预测CommitLog力学笔记有关的CommitLog类。当中有如是说到最新消息刷盘时高需用相关联的submitReplicarequest方式,submitReplicaRequest方式中假如实用性的伺服器的配角为SYNC_MASTER(从master同步),就会等候characterization间最新消息并行的工程进度达至预设的值后才恒定回到,假如延时则回到并行延时;
// 递交拷贝允诺 public CompletableFuture submitReplicaRequest( AppendMessageResult result, MessageExt messageExt) { // sync master不然,这时跟他们的dleger毫无关系,characterization并行,假如说主结点挂了之后,还得靠从结点纯手工网络管理转换宗校立结点 if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) { HAService service = this.defaultMessageStore.getHaService(); if (messageExt.isWaitStoreMsgOK()) { // 透过HAService推论呵呵从结点与否ok // 检查和slave并行的边线与否大于 最小放任的并行滞后差值,假如是的则展开刷盘 if (service.isSlaveOK(result.getWroteBytes() + result.getWroteOffset())) { GroupCommitRequest request = new GroupCommitRequest( result.getWroteOffset() + result.getWroteBytes(), this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout() // characterization并行延时天数预设是3s ); service.putRequest(request); service.getWaitNotifyObject().wakeupAll(); return request.future(); // 能透过future来等候characterization并行顺利完成 } else { // 这时可能将从结点不需用 return CompletableFuture.completedFuture(PutMessageStatus.SLAVE_NOT_AVAILABLE); } } } return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); }public Boolean isWaitStoreMsgOK() { String result = this.getProperty(MessageConst.PROPERTY_WAIT_STORE_MSG_OK); if (null == result) { return true; } return Boolean.parseBoolean(result); }
这段代码的主要逻辑如下:
象,检查和最新消息与否本地存储完毕,假如没有则结束,否则进入下一步;检查和slave并行的边线与否大于 最小放任的并行滞后差值参数haSlaveFallbehindMax,假如是的则展开characterization并行刷盘。假如没有则回到slave不需用的状态;将最新消息落盘的最小力学差值也就是CommitLog上的差值作为参数构建一个GroupCommitRequest对象,然后递交到HAService;最多等候syncFl上面那段代码比较简单,因为characterization的逻辑全部交给了HAService和HAConnection两个类处理了。这里先简单如是说呵呵整个并行的流程(并行模式)
三、高需用服务项目HAService
HAService是在RocketMQ的Broker启动的时候就会创建的,而创建的点在DefaultMessageStore这个最新消息存储有关的综合类中,在这个类的构造器中会创建HAService无论当前的Broker是什么配角。
这里需要说明的是Broker中的Master和Slaver两个配角,代码都是一样的,只不过是在实际执行的时候,走的分支不一样。
四、源代码预测
内部属性;构造函数;启动内部类;接受Slave连接处理;检查和并行工程进度和唤醒CommitLog刷盘线程;characterization并行客户端模块;Master并行笔记(监听slave笔记并行工程进度和并行笔记、根据并行工程进度来唤醒刷盘CommitLog线程);1、内部属性在HAService中有几个比较重要的属性,这里需要简单的如是说呵呵:
参数
说明
connectIOnList
连接到master的slave连接列表,用于管理连接
acceptsocketService
用于接收连接用的服务项目,只监听OP_ACCEPT事件,监听到连接事件时候,创建HAConnection来处理读写允诺事件
waitNotifyObject
一个消费等候模型类,用于处理高需用线程和CommitLog的刷盘线程交互
push2SlaveMaxOffset
master并行到slave的差值
groupTransferService
characterization并行的检测服务项目,用于检查和与否并行顺利完成
haClient
高需用的服务项目,slave用来跟master建立连接,像master汇报差值和拉取最新消息
// characterization并行服务项目 public class HAService { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); // 连接数量 private final AtomicInteger connectionCount = new AtomicInteger(0); // characterization建立的网络连接,因为一个master可能将有多个slave private final List connectionList = new LinkedList<>(); // 接收slave的socket服务项目 private final AcceptSocketService acceptSocketService; // 所属的最新消息存储模块 private final DefaultMessageStore defaultMessageStore; // 线程阻塞与唤醒并行对象 private final WaitNotifyObject waitNotifyObject = new WaitNotifyObject(); // 推送到slave最小差值 private final AtomicLong push2SlaveMaxOffset = new AtomicLong(0); // 组传输服务项目 private final GroupTransferService groupTransferService; // characterization并行客户端模块 private final HAClient haClient; }2、构造函数
HAService只有一个构造器。逻辑也比较简单,创建一个AcceptSocketService开放一个端口为 10912的端口用于slave来简历连接,同时启动characterization信息并行的任务groupTransferService用于接收CommitLog在高需用刷盘时递交任务。
public HAService(final DefaultMessageStore defaultMessageStore) throws IOexception { this.defaultMessageStore = defaultMessageStore; // 创建,接受连接的服务项目, 开放的端口号为10912 this.acceptSocketService = new acceptSocketService( defaultMessageStore.getMessageStoreConfig().getHaListenPort() ); // 创建characterization信息并行的线程 this.groupTransferService = new GroupTransferService(); this.haClient = new HAClient(); }3、启动内部类
HAService在创建后,会在DefaultMessageStore中调用其start方式,这个方式会启动其内部的几个内部类,用来characterization并行;
public void start() throws Exception { // 接受连接的服务项目,开启端口,设置监听的事件 this.acceptSocketService.beginAccept(); // 开启服务项目不断检查和与否有连接 this.acceptSocketService.start(); // 开启groupTransferService,接受CommitLog的characterization并行允诺 this.groupTransferService.start(); // 开启haClient,用于slave来建立与Master连接和并行 this.haClient.start(); }4、接受Slave连接
AcceptSocketService这个类在Broker的Master和Slaver两个配角启动时都会创建,只不过区别是Slaver开启端口后,并不会有别的Broker与其建立连接。因为只有在Broker的配角是Slave的时候才会指定要连接的Master地址。这个逻辑,在Broker启动的时候BrokerController类中运行的。
// 主要是基于nio来实现的 class AcceptSocketService extends ServiceThread { // 监听端口地址 private final SocketAddress socketAddressListen; // nio里面的网络监听服务项目端 private ServerSocketChannel serverSocketChannel; // 多路复用监听模块 private Selector selector; // 给他传入一个监听端口号,构建好监听地址 public AcceptSocketService(final int port) { this.socketAddressListen = new InetSocketAddress(port); } /** * Starts listening to slave connections. * * @throws Exception If fails. */ public void beginAccept() throws Exception { // 打开nio网络监听服务项目端 this.serverSocketChannel = ServerSocketChannel.open(); // 打开selector多路复用模块 this.selector = RemotingUtil.openSelector(); // 设置他们的socket重用地址是true this.serverSocketChannel.socket().setReuseAddress(true); // 设置监听他们指定的端口号 this.serverSocketChannel.socket().bind(this.socketAddressListen); // 实用性与否nio阻塞模式是false this.serverSocketChannel.configureBlocking(false); // 把nio网络监听伺服器注册到selector多路复用模块里去 this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT); } /** * {@inheritDoc} */ @Override public void shutdown(final boolean interrupt) { super.shutdown(interrupt); try { this.serverSocketChannel.close(); this.selector.close(); } catch (IOException e) { log.error(“AcceptSocketService shutdown exception”, e); } } /** * {@inheritDoc} */ @Override public void run() { log.info(this.getServiceName() + ” service started”); while (!this.isStopped()) { try { // 透过selector多路复用模块监听他们的nio网络伺服器与否有连接事件到达 this.selector.select(1000); // 假如说确实是有从结点来连接他们,这时就会拿到一批selectedKeys Set selected = this.selector.selectedKeys(); if (selected != null) { // 每一个新建立的连接都相关联了一个selectionKey,就是一个连接的key句柄 for (SelectionKey k : selected) { // 假如说过来的网络事件就是op_accept连接事件 if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) { // 透过调用pt(); if (sc != null) { HAService.log.info(“HAService receive new connection, ” + sc.socket().getRemrvice.this, sc ); // 同时启动这个从结点连接模块 conn.start(); // 把从结点连接加入到自己的连接列表里去 HAService.this.addConnection(conn); } catch (Exception e) { log.error(“new HAConnection exception”, e); sc.close(); } } } else { log.warn(“Unexpected ops in select ” + k.readyOps()); } } // 一次selectedKeys处理完毕了,就必须做一个clear selected.clear(); } } catch (Exception e) { log.error(this.getServiceName() + ” service has exception.”, e); } } log.info(this.getServiceName() + ” service end”); } /** * {@inheritDoc} */ @Override public String getServiceName() { return AcceptSocketService.class.getSimpleName(); } }
beginAccept方式就是开启Socket,绑定10912端口,然后注册selector和指定监听的事件为OP_ACCEPT也就是建立连接事件。相关联的IO模式为NIO模式。主要看其run方式,这个方式是Master用来接受Slave连接的核心理念。
每过一秒检查和一次与否有连接事件,假如有则建立连接,并把建立起来的连接加入到连接列表中展开保存。一直循环这个逻辑。
5、检查和并行工程进度和唤醒CommitLog刷盘线程
GroupTransferService是CommitLog最新消息刷盘的类CommitLog与HAService打交道的一个中间类。在CommitLog中展开characterization刷盘的时候,会创建一个CommitLog.GroupCommitRequest的内部类,这个类包含了当前Broker最新的最新消息的力学差值信息。然后把这个类丢给GroupTransferService处理,然后唤醒GroupTransferService。起始这个逻辑跟CommitLog内部的GroupCommitService逻辑一样。只不过对于并行部分的逻辑不一样,这里能参考后面的文章存储部分(3)CommitLog力学笔记有关的CommitLog类。
先看run方式
/** * 在run方式中会将传入的CommitLog.GroupCommitRequest从requestsWrite * 转换到requestsRead中然后展开处理检查和相关联的并行允诺的工程进度。检查和的逻辑在doWaitTransfer中 */ public void run() { log.info(this.getServiceName() + ” service started”); while (!this.isStopped()) { try { // 这里进入等候,等候被唤醒,进入等候之前会调用onWaitEnd方式,然后调用swapRequests方式 // 把requestsWrite转换为requestsRead this.waitForRunning(10); // 展开允诺处理 this.doWaitTransfer(); } catch (Exception e) { log.warn(this.getServiceName() + ” service has exception. “, e); } } log.info(this.getServiceName() + ” service end”); }
再看doWaitTransfer方式
/** * 1、比较Master推送到Slave的 差值push2SlaveMaxOffset是不是大于传进来的CommitLog.GroupCommitRequest中的差值 * 2、计算本次并行延时的天数结点,天数为当前天数加上参数系统实用性参数syncFlushTimeout预设为5秒 * 3、假如第一步结果为true,则回到结果为PUT_OK。假如第一步为false,则每过一秒检查和一次结果,假如超过5次了还没并行顺利完成,则表示延时了那么回到结果为FLUSH_SLAVE_TIMEOUT。同时会唤醒CommitLog的刷盘线程。 */ private void doWaitTransfer() { // 假如读允诺不为空 if (!this.requestsRead.isEmpty()) { for (CommitLog.GroupCommitRequest req : this.requestsRead) { // 假如push到slave的差值 大于等于 允诺中的最新消息的最小差值 表示slave并行顺利完成 boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); // 计算这次并行延时的天数点 并行的延时天数段为5s long deadLine = req.getDeadLine(); // 假如没有并行完毕,并且还没达至延时天数,则等候1秒后检查和并行的工程进度 while (!transferOK && deadLine – System.nanoTime() > 0) { this.notifyTransferObject.waitForRunning(1000); transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); } // 延时或者并行成功的时候 唤醒主线程 req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT); } this.requestsRead = new LinkedList<>(); } }
主要逻辑如下:
比较Master推送到Slave的 差值push2SlaveMaxOffset是不是大于传进来的CommitLog.GroupCommitRequest中的差值。计算本次并行延时的天数结点,天数为当前天数加上参数系统实用性参数syncFlushTimeout预设为5秒。假如第一步结果为true,则回到结果为PUT_OK。假如第一步为false,则每过一秒检查和一次结果,假如超过5次了还没并行顺利完成,则表示延时了那么回到结果为FLUSH_SLAVE_TIMEOUT。同时会唤醒CommitLog的刷盘线程。6、characterization并行客户端模块后面他们说到了只有是Salve配角的Broker才会真正的实用性Master的地址,而HAClient是需要Master地址的,因此这个类真正在运行的时候只有Slave才会真正的使用到。
先看看核心理念的参数信息// 从结点那边会用这个线程跟他们主结点建立连接,执行数据读写 class HAClient extends ServiceThread { // 读数据缓冲区大小,4mb private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4; // master地址 private final AtomicReference masterAddress = new AtomicReference<>(); // 从结点收到数据之后会回到一个8个字节的ack差值,固定8个字节 private final ByteBuffer reportOffset = ByteBuffer.allocate(8); // nio网络连接 private SocketChannel socketChannel; // nio多路复用模块 private Selector selector; // 最近一次写数据天数戳 private long lastWriteTimestamp = System.currentTimeMillis(); // 当前上报过的差值 private long currentReportedOffset = 0; // 分发边线 private int dispatchPosition = 0; // 读数据缓冲区 private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE); // 备份数据缓冲区 private ByteBuffer byteBufferBackup = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE); }
基本上都是缓冲有关的实用性。这里主要预测的是run方式中的逻辑
public void run() { log.info(this.getServiceName() + ” service started”); while (!this.isStopped()) { try { // 尝试去连接他们的master结点 if (this.connectMaster()) { // 与否要上报呵呵ack差值,间隔需要大于心跳的天数(5s) if (this.isTimeToReportOffset()) { // 向master 汇报当前 salve 的CommitLog的最小差值,并记录这次的并行天数 boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset); // 假如汇报完了就关闭连接 if (!result) { this.closeMaster(); } } // 假如说人家给你传输过来了数据 this.selector.select(1000); // 向master拉取的信息 boolean ok = this.processReadEvent(); if (!ok) { this.closeMaster(); } // 再次并行slave的差值假如,最新的差值大于已经汇报的情况下 if (!reportSlaveMaxOffsetPlus()) { continue; } // 检查和天数距离上次并行工程进度的天数间隔 long interval = HAService.this.getDefaultMessageStore().getSystemClock().now() – this.lastWriteTimestamp; // 假如间隔大于心跳的天数,那么就关闭 if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig() .getHaHousekeepingInterval()) { log.warn(“HAClient, housekeeping, found this connection[” + this.masterAddress + “] expired, ” + interval); this.closeMaster(); log.warn(“HAClient, master not response some time, so close connection”); } } else { // 等候 this.waitForRunning(1000 * 5); } } catch (Exception e) { log.warn(this.getServiceName() + ” service has exception. “, e); this.waitForRunning(1000 * 5); } } log.info(this.getServiceName() + ” service end”); }
主要的逻辑如下:
连接master,假如当前的broker配角是master,那么相关联的masterAddress是空的,不会有后续逻辑。假如是slave,并且配置了master地址,则会展开连接展开后续逻辑处理检查和与否需要向master汇报当前的并行工程进度,假如两次并行的天数大于5s,则不展开并行。每次并行间间隔在5s以上,这个5s是心跳连接的间隔参数为haSendHeartbeatInterval向master 汇报当前 salve 的CommitLog的最小差值,并记录这次的并行天数从master拉取笔记信息,主要就是展开最新消息的并行,并行出问题则关闭连接再次并行slave的差值,假如最新的差值大于已经汇报的情况下则从步骤1重头开始这里预测完了run方式,然后就要预测主要的笔记并行的逻辑了,这个逻辑在processReadEvent方式中
private boolean processReadEvent() { int readSizeZeroTimes = 0; // 假如读取缓存还有没读取完,则一直读取 while (this.byteBufferRead.hasRemaining()) { try { // 能把characterization并行过来的数据读取到缓冲区里去 int readSize = this.socketChannel.read(this.byteBufferRead); if (readSize > 0) { readSizeZeroTimes = 0; // 执行一次分发读允诺 boolean result = this.dispatchReadRequest(); if (!result) { log.error(“HAClient, dispatchReadRequest error”); return false; } } else if (readSize == 0) { if (++readSizeZeroTimes >= 3) { break; } } else { log.info(“HAClient, processReadEvent read socket < 0”); return false; } } catch (IOException e) { log.info(“HAClient, processReadEvent read socket exception”, e); return false; } } return true; } private boolean dispatch发的偏移差 int diff = this.byteBufferRead.position() – this.dispatchPosition; // 假如偏移差大于头大小,说明存在允诺体 if (dhyOffset != 0) { if (slavePhyOffset != masterPhyOffset) { log.error(“master pushed offset not equal the max phy offset in slave, SLAVE: ” + slavePhyOffset + ” MASTER: ” + masterPhyOffset); return false; } } // 假如偏移差大于 最新消息头和 最新消息体大小。则读取最新消息体 if (diff >= (msgHeaderSize + bodySize)) { byte[] bodyData = byteBufferRead.array(); int dataStart = this.dispatchPosition + msgHeaderSize; // 把你读取到的数据追加到commitlog里面去 HAService.this.defaultMessageStore.appendToCommitLog( masterPhyOffset, bodyData, dataStart, bodySize); // 记录分发的边线 this.dispatchPosition += msgHeaderSize + bodySize; if (!reportSlaveMaxOffsetPlus()) { return false; } continue; } } if (!this.byteBufferRead.hasRemaining()) { this.reallocateByteBuffer(); } break; } return true; }7、Master并行笔记
后面说过,在HAService的AcceptSocketService内部类中,Master会在建立连接的时候创建HAConnection用来处理读写事件。这里主要如是说构造函数和内部类就能了解原理了。
成员变量
public class HAConnection { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); // 所属的HA高需用服务项目模块 private final HAService haService; // nio网络连接 private final SocketChannel socketChannel; // 跟他们建立连接的HA客户端组件的地址,从结点地址 private final String clientAddr; // 网络连接写数据服务项目线程 private WriteSocketService writeSocketSerng slaveRequestOffset = -1; // 从结点并行数据后ack的差值 private volatile long slaveAckOffset = -1; }
构造函数
public HAConnection(final HAService haService, final SocketChannel socketChannel) throws IOException { // 指定所属的 HAService this.haService = haService; // 指定的NIO的socketChannel this.socketChannel = socketChannel; // 客户端的地址 this.clientAddr = this.socketChannel.socket().getRemoteSocketAddress().toString(); // 这是为非阻塞 this.socketChannel.configureBlocking(false); // 与否启动SO_LINGER // SO_LINGER作用:设置函数close()关闭TCP连接时的行为。缺省close()的行为是 // 假如有数据残留在socket发送缓冲区中则系统将继续发送这些数据给对方,等候被确认,然后回到。 this.socketChannel.socket().setSoLinger(false, -1); // 与否开启TCP_NODELAY this.socketChannel.socket().setTcpNoDelay(true); if (NettySystemConfig.socketSndbufSize > 0) { // 接收缓冲的大小 this.socketChannel.socket().setReceiveBufferSize(NettySystemConfig.socketSndbufSize); } if (NettySystemConfig.socketRcvbufSize > 0) { // 发送缓冲的大小 this.socketChannel.socket().setSendBufferSize(NettySystemConfig.socketRcvbufSize); } // 把网络连接写数据服务项目线程和读数据服务线程都构建好 // 端口写服务项目 this.writeSocketService = new WriteSocketService(this.socketChannel); // 端口读服务项目 this.readSocketService = new ReadSocketService(this.socketChannel); // 增加haService中的连接数字段 this.haService.getConnectionCount().incrementAndGet(); }
监听slave笔记并行工程进度和并行笔记
WriteSocketService监听的是OP_WRITE事件,注册的端口就是在HAService中开启的端口。
class WriteSocketService extends ServiceThread { private final Selector selector; private final SocketChannel socketChannel; // 写数据头大小,12个字节 private final int headerSize = 8 + 4; // 写数据头缓冲区 private final ByteBuffer byteBufferHeader = ByteBuffer.allocate(headerSize); // 从哪个边线开始传输 private long nextTransferFromWhere = -1; // 对内存映射区域查询数据结果 private SelectMappedBufferResult selectMappedBufferResult; // 最后一次写数据与否顺利完成了,预设true private boolean lastWriteOver = true; // 最后一次写数据天数戳 private long lastWriteTimestamp = System.currentTimeMillis(); public WriteSocketService(final SocketChannel socketChannel) throws IOException { this.selector = RemotingUtil.openSelector(); // 搞一个selector多路复用模块 this.socketChannel = socketChannel; this.socketChannel.register(this.selector, SelectionKey.OP_WRITE); // 把这个网络连接注册到selector多路复用模块里去就能了 this.setDaemon(true); } @Override public void run() { HAConnection.log.info(this.getServiceName() + ” service started”); while (!this.isStopped()) { try { // 假如说针对你的从结点这时能执行写数据动作 this.selector.select(1000); // 假如slave的读允诺为 -1 表示没有slave 发出写允诺,不需要处理 if (-1 == HAConnection.this.slaveRequestOffset) { Thread.sleep(10); continue; } // nextTransferFromWhere 为-1 表示初始第一次并行,需要展开计算 if (-1 == this.nextTransferFromWhere) { // 假如slave 并行顺利完成 则下次并行从CommitLog的最小差值开始并行 if (0 =his.haService.getDefaultMessageStore() .getCommitLog().getMaxOffset(); masterOffset = masterOffset – (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getMappedFileSizeCommitLog()); if (masterOffset < 0) { masterOffset = 0; } // 能去设置呵呵当前需要从哪个边线开始来传输数据 this.nextTransferFromWhere = masterOffset; } else { // 设置下次并行的边线,为 salve 读允诺的边线 this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset; } log.info(“master transfer data from ” + this.nextTransferFromWhere + ” to slave[” + HAConnection.this.clientAddr + “], and slave request ” + HAConnection.this.slaveRequestOffset); } // 上次并行与否顺利完成 if (this.lastWriteOver) { // 上一次写数据天数戳到现在截止的差值 long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() – this.lastWriteTimestamp; // 把这个天数差值跟ha发送心跳间隔做一个比对,假如超过了那个间隔,心跳间隔为 5000毫秒 if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaSendHeartbeatInterval()) { // Build Header // 开始去构建允诺头,先设置他们要从哪个边线开始传输数据,心跳允诺大小为12 字节 this.byteBufferHeader.position(0); this.byteBufferHeader.limit(headerSize); this.byteBufferHeader.putLong(this.nextTransferFromWhere); this.byteBufferHeader.putInt(0); this.byteBufferHeader.flip(); // 展开最新消息并行 this.lastWriteOver = this.transferData(); if (!this.lastWriteOver) continue; } } // 假如说是上一次传输还没完毕这时就传输数据就能了 else { this.lastWriteOver = this.transferData(); // 假如还没并行顺利完成则继续 if (!this.lastWriteOver) continue; } // 构建完了header之后,这时就需要查询commitlog里面指定边线开始的一个数据片段 SelectMappedBufferResult selectResult = HAConnection.this.haService .getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere); if (selectResult != null) { int size = selectResult.getSize(); // 检查和要并行最新消息的长度,是不是大于单次并行的最小限制 预设为 32kb if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) { size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize(); } long thisOffset = this.nextTransferFromWhere; this.nextTransferFromWhere += size; selectResult.getByteBuffer().limit(size); this.selectMappedBufferResult = selectResult; // Build Header this.byteBufferHeader.position(0); this.byteBufferHeader.limit(headerSize); this.byteBufferHeader.putLong(thisOffset); this.byteBufferHeader.putInt(size); this.byteBufferHeader.flip(); this.lastWriteOver = this.transferData(); } else { // 没有数据要传输呢?这时就是能让他们的模块等候一会儿,等候100毫秒 HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100); } } catch (Exception e) { HAConnection.log.error(this.getServiceName() + ” service has exception.”, e); break; } } // 对等候通知模块做一个处理 HAConnection.this.haService.getWaitNotifyObject().removeFromWaitingThreadTable(); if (this.selectMappedBufferResult != null) { this.selectMappedBufferResult.release(); } this.makeStop(); readSocketService.makeStop(); haService.removeConnection(HAConnection.this); SelectionKey sk = this.socketChannel.keyFor(this.selector); if (sk != null) { sk.cancel(); } try { this.selector.close(); this.socketChannel.close(); } catch (IOException e) { HAConnection.log.error(“”, e); } HAConnection.log.info(this.getServiceName() + ” service end”); } private boolean transferData() throws Exception { int writeSizeZeroTimes = 0; // Write Header // 心跳的头没写满,先写头 while (this.byteBufferHeader.hasRemaining()) { // 透过nio网络连接能把允诺头先发送出去 int writeSize = this.socketChannel.write(this.byteBufferHeader); if (writeSize > 0) { writeSizeZeroTimes = 0; // 记录上次写的天数 this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now(); } else if (writeSize == 0) { // 重试3次 则不再重试 if (++writeSizeZeroTimes >= 3) { break; } } else { throw new Exception(“ha master write header error < 0”); } } // 假如要并行的笔记为null,则直接回到这次并行的结果与否并行顺利完成 if (null == this.selectMappedBufferResult) { return !this.byteBufferHeader.hasRemaining(); } writeSizeZeroTimes = 0; // Write Body // 填充允诺体 if (!this.byteBufferHeader.hasRemaining()) { // 他会有一个要传输的一个commitlog里面的内存区域查询出来的数据片段 while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) { // 把commitlog里面要传输的数据片段就写入到nio网络连接里去 int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer()); if (writeSize > 0) { writeSizeZeroTimes = 0; this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now(); } else if (writeSize == 0) { // 重试3次 if (++writeSizeZeroTimes >= 3) { break; } } else { throw new Exception(“ha master write body error < 0”); } } } boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining(); // 释放缓存 if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) { this.selectMappedBufferResult.release(); this.selectMappedBufferResult = null; } return result; } @Override public String getServiceName() { return WriteSocketService.class.getSimpleName(); } @Override public void shutdown() { super.shutdown(); } }
主要的逻辑如下:
假如slave展开了笔记差值的汇报,推论是不是第一次的展开并行以及相关联的并行工程进度。设置下一次的并行边线检查和上次同步是不是已经顺利完成了,检查和两次并行的周期是不是超过心跳间隔,假如是的则需要把心跳信息放到回到的头里面,然后展开最新消息并行。假如上次并行还没顺利完成,则等候上次并行顺利完成后再继续从Master本地读取Commi大小的笔记。假如缓存为null,则等候100毫秒根据并行工程进度来唤醒刷盘CommitLog线程
ReadSocketService的作用主要是:根据Slave推送的笔记并行工程进度,来唤醒HAService的GroupTransferService然后进一步唤醒CommitLog的笔记刷盘线程。这里主要看run方式和processReadEvent方式。
class ReadSocketService extends ServiceThread { // 读数据最小缓冲区大小,预设是1mb private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024; // 多路复用监听模块 private final Selector selector; // nio网络连接 private final SocketChannel socketChannel; // 读数据缓冲区 private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE); // 处理最新消息边线 private int processPosition = 0; // 最近一次读取到数据的天数戳 private volatile long lastReadTimestamp = System.currentTimeMillis(); public ReadSocketService(final SocketChannel socketChannel) throws IOException { this.selector = RemotingUtil.openSelector(); this.socketChannel = socketChannel; this.socketChannel.register(this.selector, SelectionKey.OP_READ); this.setDaemon(true); } @Override public void run() { HAConnection.log.info(this.getServiceName() + ” service started”); // 任务与否结束 while (!this.isStopped()) { try { // 设置selector的阻塞天数,假如说从结点确实发送了允诺过来 this.selector.select(1000); // 处理salver读取最新消息的事件 boolean ok = this.processReadEvent(); if (!ok) { HAConnection.log.error(“processReadEvent error”); break; } // 检查和此次处理天数与否超过心跳连接天数 long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() – this.lastReadTimestamp; if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) { log.warn(“ha housekeeping, found this connection[” + HAConnection.this.clientAddr + “] expired, ” + interval); break; } } catch (Exception e) { HAConnection.log.error(this.getServiceName() + ” service has exception.”, e); break; } } this.makeStop(); writeSocketService.makeStop(); haService.removeConnection(HAConnection.this); HAConnection.this.haService.getConnectionCount().decrementAndGet(); SelectionKey sk = this.socketChannel.keyFor(this.selector); if (sk != null) { sk.cancel(); } try { this.selector.close(); this.socketChannel.close(); } catch (IOException e) { HAConnection.log.error(“”, e); } HAConnection.log.info(this.getServiceName() + ” service end”); } @Override public String getServiceName() { return ReadSocketService.class.getSimpleName(); } private boolean processReadEvent() { int readSizeZeroTimes = 0; // 假如说读取数据缓冲区已经没有空间了,这时就做一个flip,处理边线复位为0 if (!this.byteBufferRead.hasRemaining()) { // 读允诺缓冲转变为读取模式。 this.byteBufferRead.flip(); this.processPosition = 0; } // 但凡是读取数据缓冲区还有空间,就进入循环 while (this.byteBufferRead.hasRemaining()) { try { // 一次性从网络连接里读取最小可能将是1mb的读取缓冲空间的内容 int readSize = this.socketChannel.read(this.byteBufferRead); if (readSize > 0) { readSizeZeroTimes = 0; // 更新呵呵本次读取到数据的天数戳 this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore() .getSystemClock().now(); // 读取缓冲区边线 – 处理边线,是大于等于8,至少说是读到了8个字节 // 为什么是8个字节,因为salver向master发去拉取允诺时,差值固定为8 if ((this.byteBufferRead.poseBufferRead.position() – (this.byteBufferRead.position() % 8); // 读取8个直接就从结点回到的东西,就从结点顺利完成最新消息并行的最新的偏移量 long readOffset = this.byteBufferRead.getLong(pos – 8); // 设置处理的边线 this.processPosition = pos; // 把他们的slave的ack差值去做一个设置 HAConnection.this.slaveAckOffset = readOffset; // 假如slave的 读允诺 差值大于0 表示并行顺利完成了 if (HAConnection.this.slaveRequestOffset < 0) { // 重新设置slave的 读允诺的 差值 HAConnection.this.slaveRequestOffset = readOffset; log.info(“slave[” + HAConnection.this.clientAddr + “] request offset ” + readOffset); } else if (HAConnection.this.slaveAckOffset > HAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset()) { log.warn(“slave[{}] request offset={} greater than local commitLog offset={}. “, HAConnection.this.clientAddr, HAConnection.this.slaveAckOffset, HAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset()); return false; } // 假如说从结点已经接收到了一些数据后,唤醒阻塞的线程, 他们就能通知HAService去传输一些数据给从结点 // 在最新消息的characterization并行选择的模式是并行的时候,会唤醒被阻塞的最新消息写入的线程 HAConnection.this.haService.notifyTransferSome( HAConnection.this.slaveAckOffset ); } } else if (readSize == 0) { // 假如数据为0超过3次,表示并行顺利完成,直接结束 if (++readSizeZeroTimes >= 3) { break; } } else { log.error(“read socket[” + HAConnection.this.clientAddr + “] < 0”); return false; } } catch (IOException e) { log.error(“processReadEvent exception”, e); return false; } } return true; } }
整体的逻辑如下:
每1s执行一次事件就绪选择,然后调用processReadEvent方式处理读允诺,读取从伺服器的拉取允诺获取slave已拉取差值,因为有新的从伺服器反馈拉取工程进度,需要通知某些生产者以便回到,因为假如最新消息发送使用并行方式,需要等候将最新消息拷贝到从伺服器,然后才回到,故这里需要唤醒有关线程去推论自己关注的最新消息与否已经传输顺利完成。也就是HAService的GroupTransferService假如读取到的字节数等于0,则重复三次,否则结束本次读允诺处理;假如读取到的字节数大于0,表示连接被断开,回到false,后续会断开该连接。五、总结RocketMQ的characterization并行间的核心理念类就是HAService和HAConnection和当中的几个子类。结合后面的那个图能简单的理解呵呵。