一、序言
前文有单纯的提及RocketMQ的下层文档储存数学模型,如前所述该储存数学模型其内再单纯的积极探索呵呵 CommitLog的两个下层结构设计,思索RocketMQ怎样努力做到高效能?
CommitLog外部牵涉的模块较为繁复,能看呵呵后置该文,假如没天数即使了
RocketMQ源代码预测之文档缓存态射第一类层MappedFileRocketMQ源代码预测之态射文档堆栈MappedFileQueueRocketMQ源代码之态射文档预重新分配模块AllocateMappedFileServiceRocketMQ源代码预测之最新消息储存模块DefaultMessageStore(一)RocketMQ源代码预测之最新消息储存组件DefaultMessageStore(二)RocketMQ源代码预测之FlushCommitLogService的刷盘思路二、CommitLog
对RoceketMQ来说,大部份的最新消息最后都须要被长久NozeroyCommitLog文档中。
示意图右图,能很浅显的认知为,CommitLog叙述的是整座CommitLog产品目录,而MappedFileQueue叙述的则是CommitLog File字符串罐子,而MappedFile叙述两个CommitLog File。
三、源代码预测
CommitLog成员变量;构造方法;加载CommitLog产品目录下的文档;;服务正常恢复;服务异常恢复;异步化写入最新消息;最新消息刷盘;最新消息高可用刷盘(提交复制请求);追加最新消息回调函数;最新消息扩展编码器;1、CommitLog成员变量
public class CommitLog { // Messages MAGIC CODE daa320a7 // 用来验证最新消息的合法性,类似于java的魔数的作用 public final static int MESSAGE_MAGIC_CODE = -626843481; protected static final InternalLogger log= InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);// End of file empty MAGIC CODE cbd43194 protected final static int BLANK_MAGIC_CODE = -875286124; // 态射文档堆栈 protected final MappedFileQueue mappedFileQueue; // 所属的最新消息储存模块 protected finalDefaultMessageStore defaultMessageStore;// 刷新commitlog服务模块 private final FlushCommitLogService flushCommitLogService; // If TransientStorePool enabled, we must flush message to FileChannel at fixed periods // 缓冲区刷新到commitlog里面的服务模块 private final FlushCommitLogService commitLogService; // 追加最新消息回调函数 private finalAppendMessageCallback appendMessageCallback;// 写入最新消息线程本地副本 private finalThreadLocal<PutMessageThreadLocal> putMessageThreadLocal;// topic-queueId -> offset protected HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable =new HashMap<String, Long>(1024); // topic-queueId -> offset protected Map<String/* topic-queueid */, Long/* offset */> lmqTopicQueueTable = new ConcurrentHashMap<>(1024); // 确认偏移量 protected volatile long confirmOffset = -1L; // 锁开始天数 private volatile long beginTimeInLock = 0; // 写入最新消息锁 protected finalPutMessageLock putMessageLock;// 完整储存路径列表 private volatile Set<String> fullStorePaths = Collections.emptySet(); // 多路分发模块 private final MultiDispatch multiDispatch; // 刷硬盘watcher private finalFlushDiskWatcher flushDiskWatcher; }2、构造方法
public CommitLog(final DefaultMessageStore defaultMessageStore) { // commitlog储存路径String storePath = defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog();// 储存路径里假如说包含了路径切割符号,逗号,那就走多路径mappedfile堆栈 if (storePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) { this.mappedFileQueue =newMultiPathMappedFileQueue( defaultMessageStore.getMessageStoreConfig(), defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService(),this::getFullStorePaths ); } // 正常情况下,走两个普通的MappedFile堆栈就能了 else { this.mappedFileQueue =newMappedFileQueue( storePath, defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService() ); }this.defaultMessageStore = defaultMessageStore; // 假如说走了同步刷盘思路,GroupCommitService,反之假如是异步刷盘思路,FlushRealTimeService if(FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {this.flushCommitLogService = new GroupCommitService(); // 同步刷盘走这个 } else { this.flushCommitLogService =new FlushRealTimeService(); // 异步刷盘走这个 } this.commitLogService = new CommitRealTimeService(); // 拼接最新消息的回调对象 this.appendMessageCallback = newDefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());// 定于对应的最新消息编码器,会设定最新消息的最大大小,默认是512k putMessageThreadLocal = newThreadLocal<PutMessageThreadLocal>() {@Override protected PutMessageThreadLocal initialValue() { return newPutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); } };// 储存最新消息的时候用自旋锁还是互斥锁(用的是JDK的ReentrantLock),默认的是自旋锁(用的是JDK的原子类的AtomicBoolean) this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ?new PutMessageReentrantLock() : newPutMessageSpinLock();this.multiDispatch = new MultiDispatch(defaultMessageStore, this); flushDiskWatcher =new FlushDiskWatcher(); }构造函数主要是读取对应的配置信息,然后初始化对应的类。其中须要注意的是同步刷盘和异步刷盘使用的第一类类型是不一样的。
3、加载CommitLog产品目录下的文档
// CommitLog里面数据都是在多个硬盘文档里的,每个硬盘文档都是两个MappedFile // 他应该是属于把大部份的硬盘文档mappedfile的数据,从硬盘里load加载到态射缓存区域里来 public boolean load() { boolean result = this.mappedFileQueue.load(); log.info(“load commit log ” + (result ? “OK” : “Failed“)); return result; }RocketMQ源代码预测之态射文档堆栈MappedFileQueue该文预测了mappedFileQueue中的load方法,此处就不展开了;
4、对commitlog模块进行启动
// 对commitlog模块做两个启动 public void start() { // 开启刷盘线程 this.flushCommitLogService.start(); // 设置刷新硬盘监控模块是后台线程 flushDiskWatcher.setDaemon(true); flushDiskWatcher.start();// 假如使用的是临时储存池来保存最新消息,则启动定期提交最新消息的线程,把储存池的信息提交到fileChannel中 // 只有在开启了使用临时储存池 && 刷盘为异步刷盘 && 是master节点 的情况才会为true if(defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {this.commitLogService.start(); } }// 就是能读取指定偏移量位置的数据 publicSelectMappedBufferResultgetData(final long offset) { return this.getData(offset, offset == 0); } // 这是真正从指定偏移量那里去查询数据的函数 public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) { // 先从消 int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig() .getMappedFileSizeCommitLog();// 根据指定的偏移量来查找到那个偏移量位置所在的mappedfile MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset( offset, returnFirstOnNotFound);if (mappedFile != null) { // 用查找 int pos = (int) (offset % mappedFileSize); // 此时我们就能从这个相对位置开始去进行查找,读取数据结果 SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos); return result; } return null; }6、服务正常恢复
recoverNormally方法在RocketMQ正常关闭然后启动的时候会调用,这个方法就是把加载的态射文档列表进行遍历,对文档进行校验,和文档中的最新消息的魔数进行校验,来判断哪些数据是正常的,并计算出正常的数据的最大偏移量。然后,根据偏移量设置对应的提交和刷新的位置以及不正常数据的删除。
// 假如说你的broker正常退出了,此时为了数据恢复性,大部份缓存里的数据都必须进行两个flush到硬盘里去 public void recoverNormally(long maxPhyOffsetOfConsumeQueue) { 置,是否在recover的时候检查crc校验和 boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig() .isCheckCRCOnRecover(); final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();// 假如说mappedfile列表是不为空的话,此时就走下面的逻辑 if (!mappedFiles.isEmpty()) { // Began to recover from the last third file // recover过程是怎么来弄的,从倒数第三个mappedfile开始来进行的,假如说不足3个文档 // 此时就从第两个mappedfile开始就能了 int index = mappedFiles.size() – 3; if (index < 0) index = 0; MappedFile mappedFile = mappedFiles.get(index); ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();long processOffset = mappedFile.getFileFromOffset(); long mappedFileOffset =0; // 进入无限循环,对倒数第三个mappedfile循环读取一条一条的最新消息,每次都进行校验 // 依次把倒数三个mappedfile里面的最新消息都读取以及进行校验,把三个文档都校验完毕了这个循环就出来了 while (true) { // 从倒数第三个mappedfile里读取两个完整的最新消息,以及对最新消息进行校验 // 然后把读取出来的最新消息数据封装在dispatch request里面,然后返回这个dispatch request DispatchRequest dispatchRequest = this.checkMessageAndReturnSize( byteBuffer, checkCRCOnRecover); int size = dispatchRequest.getMsgSize(); // Normal data // 假如说分发请求默认是成功的,而且最新消息大小是大于0的 if (dispatchRequest.isSuccess() && size > 0) { // 给我们的mappedfile偏移量累加这个最新消息的大小,下一次读取一条最新消息应该是要切换到 // 下两个最新消息的偏移量再开始了 mappedFileOffset += size; } // Come the end of the file, switch to the next file Since the // return 0 representatives met last hole, // this can not be included in truncate offset // 假如说读取出来的最新消息大小是0 else if (dispatchRequest.isSuccess() && size == 0) { // 此时就累加mappedfile索引位置,能下一次读取倒数第二个mappedfile index++; // 假如说此时应该达到了最后两个mappedfile了,此时就停止循环 if(index >= mappedFiles.size()) {// Current branch can not happen log.info(“recover last 3 physics file over, last mapped file “+ mappedFile.getFileName());break; } // 否则假如说还没到达最后的两个mappedfile,此时就定位出来倒数第二个mappedfile else { // 此时就能把倒数 mappedFile = mappedFiles.get(index); byteBuffer = mappedFile.sliceByteBuffer(); processOffset = mappedFile.getFileFromOffset();// mappedfile偏移量就重置为了0mappedFileOffset =0; log.info(“recover next physics file, “ + mappedFile.getFileName()); } } // Intermediate file read error else if (!dispatchRequest.isSuccess()) { log.info(“recover physics file end, “ + mappedFile.getFileName()); break; } } // 把已经处理最新消息的偏移量累加上mappedfile偏移量processOffset += mappedFileOffset;// mappedfile堆栈能设置flush位置、commit位置、truncate脏文档位置 this.mappedFileQueue.setFlushedWhere(processOffset);this.mappedFileQueue.setCommittedWhere(processOffset); this.mappedFileQueue.truncateDirtyFiles(processOffset);// Clear ConsumeQueue redundant data // 假如说消费堆栈的最大物理偏移量是大于等于了处理偏移量,此时就须要去执行truncate脏逻辑文档 if (maxPhyOffsetOfConsumeQueue >= processOffset) { log.warn(“maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files”, maxPhyOffsetOfConsumeQueue, processOffset);this.defaultMessageStore.truncateDirtyLogicFiles(processOffset); } } // 假如说没有mappedfiles此时就设置呵呵几个位置就能了 else { // Commitlog case files are deleted log.warn(“The commitlog files are deleted, and delete the consume queue files”); this.mappedFileQueue.setFlushedWhere(0); this.mappedFileQueue.setCommittedWhere(0); this.defaultMessageStore.destroyLogics(); } }// 检查最新消息同时返回大小 publicDispatchRequestcheckMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC) { return this.checkMessageAndReturnSize(byteBuffer, checkCRC,true); }// 检查最新消息同时返回大小函数 public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC, final boolean readBody) { try { // 1 TOTAL SIZE int totalSize = byteBuffer.getInt(); // 2 MAGIC CODE int magicCode = byteBuffer.getInt(); // 对魔数做两个判断 switch (magicCode) { case MESSAGE_MAGIC_CODE: // 最新消息魔数 break; case BLANK_MAGIC_CODE: // 假如说是空白魔数,此时就返回两个默认的dispatch request就能了 return new DispatchRequest(0, true /* success */); default: // 此外默认情况下返回两个异常dispatch requestlog.warn(“found a illegal magic code 0x” + Integer.toHexString(magicCode)); return new DispatchRequest(-1, false /* success */); } // 正常情况下缓存片段里应该是放两个两个的最新消息 // 会搞两个最新消息内容的字节字符串 byte[] bytesContent = new byte[totalSize];// 接着会根据最新消息编码协议,从缓存区域里一点一点的把两个完整的编码后的最新消息读取出来 // 先读取到两个最新消息内容crc校验和 intbodyCRC = byteBuffer.getInt();// 读取最新消息所在的queueId int queueId = byteBuffer.getInt(); // 读取最新消息的flag标识 int flag = byteBuffer.getInt(); // 读取这个最新消息在queue外部的逻辑偏移量 long queueOffset = byteBuffer.getLong(); // 读取这个最新消息在完整的两个commitlog里面的物理偏移量 longphysicOffset = byteBuffer.getLong();// 读取最新消息的sysflag标识 int sysFlag = byteBuffer.getInt(); // 读取最新消息的诞生天数戳 longbornTimeStamp = byteBuffer.getLong();// 根据我们的sysflag构建两个4+4或者16+4的两个缓存空间 ByteBuffer byteBuffer1; if((sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) ==0) { byteBuffer1 = byteBuffer.get(bytesContent, 0, 4 + 4); }else { byteBuffer1 = byteBuffer.get(bytesContent, 0, 16 + 4); } // 进一步读取最新消息储存天数戳 longstoreTimestamp = byteBuffer.getLong();// 根据sysflag构建第二个缓存区域 ByteBuffer byteBuffer2; if((sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) ==0) { byteBuffer2 = byteBuffer.get(bytesContent, 0, 4 + 4); } else{ byteBuffer2 = byteBuffer.get(bytesContent, 0, 16 + 4); } // 读取最新消息重新消费次数 intreconsumeTimes = byteBuffer.getInt();// 读取prepared事务最新消息偏移量 long preparedTransactionOffset = byteBuffer.getLong(); // 读取最新消息内容长度 intbodyLen = byteBuffer.getInt();if (bodyLen > 0) { // 假如说须要读取最新消息内容,此时就把这个最新消息内容读取出来就能了 if(readBody) { byteBuffer.get(bytesContent, 0, bodyLen); // 读取出来了最新消息内容之后,假如说须要校验 crc校验和的话 if (checkCRC) { // 此时对读取出来的最新消息内容字节数据计算crc校验和 int crc = UtilAll.crc32(bytesContent, 0, bodyLen); // 对我们计算出来的最新消息内容crc校验和与我们传递进来的bodycrc校验和做两个比对 // 假如是不一样的话,此时返回两个异常的dispatch request if (crc != bodyCRC) { log.warn(“CRC check failed. bodyCRC={}, currentCRC={}”, crc, bodyCRC); return new DispatchRequest(-1, false/* success */); } } } // 假如说不须要读取最新消息内容,此时就直接再缓存片段里通过位置设置,跳过最新消息内容不要来读取了 else{ byteBuffer.position(byteBuffer.position() + bodyLen); } }// 读取topic长度 byte topicLen = byteBuffer.get(); // 此时就能去读取这个最新消息所属的两个topic名称byteBuffer.get(bytesContent, 0, topicLen); String topic = new String(bytesContent, 0, topicLen, MessageDecoder.CHARSET_UTF8);long tagsCode = 0; String keys = “”; String uniqKey = null; // 读取最新消息属性长度 shortpropertiesLength = byteBuffer.getShort(); Map<String, String> propertiesMap =null; if(propertiesLength >0) { // 把最新消息属性读取出来 byteBuffer.get(bytesContent, 0, propertiesLength); String properties = newString(bytesContent,0, propertiesLength, MessageDecoder.CHARSET_UTF8); // 把字符串格式的最新消息属性转换成两个最新消息mappropertiesMap = MessageDecoder.string2messageProperties(properties); keys = propertiesMap.get(MessageConst.PROPERTY_KEYS); uniqKey = propertiesMap.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); String tags = propertiesMap.get(MessageConst.PROPERTY_TAGS);if (tags != null && tags.length() > 0) { tagsCode = MessageExtBrokerInner.tagsString2tagsCode( MessageExt.parseTopicFilterType(sysFlag), tags); }// Timing message processing { // 从最新消息属性里提取延迟级别,假如你是两个延迟最新消息,其实会附带两个延迟级别 String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL); // 假如说这个最新消息是要投递到系统调度topic里去的 if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(topic) && t != null) { intdelayLevel = Integer.parseInt(t);// 对延迟级别能去做两个处理,假如说超过了最大延迟级别就降成最大延迟级别 if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { delayLevel =this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel(); }// 假如说延迟级别是大于0的话,此时就能计算最新消息最后要投递出去的天数 if (delayLevel > 0) { tagsCode = this.defaultMessageStore.getScheduleMessageService() .computeDeliverTimestamp(delayLevel, storeTimestamp); } } } }// 计算最新消息完整的长度 intreadLength = calMsgLength(sysFlag, bodyLen, topicLen, propertiesLength);// 假如说totalSize不等于最新消息完整长度 if(totalSize != readLength) { doNothingForDeadCode(reconsumeTimes); doNothingForDeadCode(flag); doNothingForDeadCode(bornTimeStamp); doNothingForDeadCode(byteBuffer1); doNothingForDeadCode(byteBuffer2); log.error(“[BUG]read total count not equals msg total size. totalSize={}, readTotalCount={}, bodyLen={}, topicLen={}, propertiesLength={}”, totalSize, readLength, bodyLen, topicLen, propertiesLength); return new DispatchRequest(totalSize, false/* success */); } // 最后会返回两个正常的dispatch request return newDispatchRequest( topic,// topic queueId, // queueId physicOffset, // 物理偏移量 totalSize, // 最新消息总大小 tagsCode, // 最新消息tags code storeTimestamp, // 储存天数戳 queueOffset, // queue偏移量 keys, // keys uniqKey, // uniquekey sysFlag, // sysflag preparedTransactionOffset, // prepare事务最新消息偏移量 propertiesMap // 属性map ); } catch(Exception e) { }return new DispatchRequest(-1, false /* success */); }// 计算最新消息完整长度 protected static int calMsgLength(int sysFlag, int bodyLength, int topicLength, int propertiesLength) { intbornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) ==0 ? 8 : 20; intstorehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) ==0 ? 8 : 20; // 最新消息完整长度计算 // 最新消息总大小、魔数、最新消息内容crc校验和、queueId、flag、queue偏移量、物理偏移量、sysflag、诞生天数 // 诞生机器、储存天数、储存机器、重消费次数、prepared事务最新消息偏移量、最新消息内容、topic、最新消息属性 final int msgLen = 4 //TOTALSIZE + 4 //MAGICCODE + 4 //BODYCRC + 4 //QUEUEID + 4 //FLAG + 8 //QUEUEOFFSET + 8 //PHYSICALOFFSET + 4 //SYSFLAG + 8 //BORNTIMESTAMP + bornhostLength //BORNHOST + 8 //STORETIMESTAMP + storehostAddressLength //STOREHOSTADDRESS + 4 //RECONSUMETIMES + 8 //Prepared Transaction Offset + 4 + (bodyLength > 0 ? bodyLength : 0) //BODY + 1 + topicLength //TOPIC + 2+ (propertiesLength >0 ? propertiesLength : 0) //propertiesLength + 0; return msgLen; }// 最新消息分发请求 public class DispatchRequest { // topic private final String topic; // queueId private final int queueId; // 最新消息在commitlog里面的物理偏移量 private final long commitLogOffset; // 最新消息大小 private int msgSize; // tags code private final long tagsCode; // 储存天数戳 private final long storeTimestamp; // 最新消息在对应的consumeQueue里面的相对偏移量 private final long consumeQueueOffset; // keys private final String keys; // 是否成功标识 private final boolean success; // uniquekey private final String uniqKey; // sysflag private final int sysFlag; // prepared事务最新消息偏移量 private final long preparedTransactionOffset; // 最新消息属性map private final Map<String, String> propertiesMap; // 位图 private byte[] bitMap; // 缓冲大小 private int bufferSize = –1;//the buffer size maybe larger than the msg size if the message is wrapped by something }7、服务异常恢复
异常恢复的逻辑较为复杂,会先检查对应的文档的最后的最新消息落盘天数。
开启最新消息索引功能(默认开启)并且使用安全的最新消息索引功能(默认不开启)的情况下:日志的落盘天数要小于checkpoint的最小落盘天数没有开启的时候:落盘天数须要小于checkpoint文档中物理堆栈最新消息天数戳、逻辑堆栈最新消息天数戳这两个天数戳中最小值假如检查符合要求之后才能进行的校验。这两个参数分别是:
参数
叙述
默认值
messageIndexEnable
是否开启的索引功能,开启后会保存到Index文档中
true
messageIndexSafe
是否开启安全的最新消息索引功能
false
8、异步化写入最新消息
// 异步化写入最新消息 public CompletableFuture<PutMessageResult> asyncPutMessage(finalMessageExtBrokerInner msg) {// Set the storage time msg.setStoreTimestamp(System.currentTimeMillis());// Set the message body BODY CRC (consider the most appropriate setting // on the client) // 设置编码后的最新消息体 msg.setBodyCRC(UtilAll.crc32(msg.getBody())); // Back to Results AppendMessageResult result = null; StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); // 从最新消息中 String topic = msg.getTopic(); // int queueId msg.getQueueId(); ommit,第4字节为1,第3字节为0)最新消息) final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); // 如果不是事务最新消息 或者 是事务最新消息的提交阶段 if(tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// Delay Delivery,delay time level,延迟投递级别假如是大于0 if(msg.getDelayTimeLevel() >0) { // 延迟级别不能超过最大的延迟级别,超过也设置为最大的延迟级别 if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); }// 将延迟最新消息修改目标topic和目标queueIdtopic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// Backup real topic, queueId // 将真实的topic和queueId写入属性MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId); } }// 处理最新消息生产机器地址和储存机器地址InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();if(bornSocketAddress.getAddress() instanceof Inet6Address) { msg.setBornHostV6Flag(); } InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();if(storeSocketAddress.getAddress() instanceof Inet6Address) { msg.setStoreHostAddressV6Flag(); } PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get(); PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);if (encodeResult != null) { returnCompletableFuture.completedFuture(encodeResult); }// 把我们编码后的结果设置到最新消息里面去msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer);// 创建写入最新消息上下文PutMessageContext putMessageContext = new PutMessageContext( generateKey(putMessageThreadLocal.getKeyBuilder(), msg)); long elapsedTimeInLock =0; MappedFile unlockMappedFile =null; // 写入最新消息锁加锁,默认是使用可重入锁 putMessageLock.lock(); // spin or ReentrantLock ,depending on store config try { // commitlog是有多个mappedfile,写满了两个切换下两个,所以说每次写入都应该是最后两个,最新的两个MappedFile mappedFile =this.mappedFileQueue.getLastMappedFile(); long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();// 开始锁定天数 this.beginTimeInLock = beginLockTimestamp; // Here settings are stored timestamp, in order to ensure an orderly // global // 设置最新消息的储存天数msg.setStoreTimestamp(beginLockTimestamp);if (null == mappedFile || mappedFile.isFull()) { // 态射文档不存在或者态射文档满了则创建两个文档mappedFile =this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise } if (null== mappedFile) { log.error(“create mapped file1 error, topic: “ + msg.getTopic() + ” clientAddr: “+ msg.getBornHostString());returnCompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED,null)); } // 如前所述mappedfile实现最新消息追加,假如开启了临时缓存缓冲池,先写入两个缓冲池,再commit到filechannel里去,再flush // 默认情况下是不开启临时缓冲池的,此时会直接进入硬盘文档缓存态射区域里去 result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext); switch (result.getStatus()) {// 写成功 case PUT_OK: break; // 态射文档满了case END_OF_FILE: unlockMappedFile = mappedFile;// Create a new file, re-write the message // 假如说把两个文档写满了以后,此时创建两个新的文档,重写这条最新消息 mappedFile = this.mappedFileQueue.getLastMappedFile(0); if (null == mappedFile) { //XXX: warn and notify me log.error(“create mapped file2 error, topic: “ + msg.getTopic() + ” clientAddr: “ + msg.getBornHostString()); returnCompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result)); }// 重新添加最新消息=》 result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);break; // 最新消息过大 case MESSAGE_SIZE_EXCEEDED: // 最新消息属性过大case PROPERTIES_SIZE_EXCEEDED:returnCompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result)); case UNKNOWN_ERROR:returnCompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));default: returnCompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result)); }// 释放锁 elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() – beginLockTimestamp; }finally { beginTimeInLock = 0; putMessageLock.unlock(); }// 最新消息储存的多定天数过长 if (elapsedTimeInLock > 500) { log.warn(“[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}”, elapsedTimeInLock, msg.getBody().length, result); }// 写满了最后两个文档,此时是会有的,假如同时启用了预热mappedfile机制(默认false) if (null!= unlockMappedFile &&this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) { // 解锁态射文档 this.defaultMessageStore.unlockMappedFile(unlockMappedFile); } PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);// Statistics // 更新统计信息 // Statistics 单次储存最新消息topic次数 storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1); // 单次储存最新消息topic大小storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());// 每次写完一条最新消息之后,就会提交flush请求和replica请求CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg); CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> { if(flushStatus != PutMessageStatus.PUT_OK) { putMessageResult.setPutMessageStatus(flushStatus); }if(replicaStatus != PutMessageStatus.PUT_OK) { putMessageResult.setPutMessageStatus(replicaStatus); }return putMessageResult; }); }9、最新消息刷盘
// 提交flush请求 publicCompletableFuture<PutMessageStatus> submitFlushRequest( AppendMessageResult result, MessageExt messageExt) {// Synchronization flush // 同步刷盘 if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {// GroupCommitService 同步刷盘服务类 // 将请求被提交给GroupCommitService后,GroupCommitService并不是立即处理 // 而是先放到外部的两个请求堆栈中 final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; // 进行同步刷盘 if (messageExt.isWaitStoreMsgOK()) { // 分组提交请求GroupCommitRequest request = new GroupCommitRequest(// 物理上的两个写入偏移量+写入字节数量 result.getWroteOffset() + result.getWroteBytes(), // 同步flush超时天数 this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout() );// 把这个请求交给watcher进行超时监控,假如请求超时没完成就直接主动完成就能了 flushDiskWatcher.add(request); // 通过flush commitlog服务模块提交这个请求,会让我们刷新模块去执行两个刷新mappedfile动作 // 然后会返回request的future,此时你就能通过这个future同步来等待 service.putRequest(request); return request.future(); } // 假如说是false,此时就不会提交请求,直接唤醒我们的提交服务就能了 else { service.wakeup(); returnCompletableFuture.completedFuture(PutMessageStatus.PUT_OK); } }// Asynchronous flush // 异步刷盘 else { // 未开启堆外缓存,直接执行flush操作 if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { flushCommitLogService.wakeup(); }else { // 开启堆外缓存配置,先将数据存到堆外缓存中,执行commit操作 commitLogService.wakeup(); } returnCompletableFuture.completedFuture(PutMessageStatus.PUT_OK); } }10、最新消息高可用刷盘(提交复制请求)
// 提交复制请求 publicCompletableFuture<PutMessageStatus> submitReplicaRequest( AppendMessageResult result, MessageExt messageExt) {// sync master的话,此时跟我们的dleger关系不大,主从同步,假如说主节点挂了以后,还得靠从节点手工运维切换成主节点 if(BrokerRole.SYNC_MASTER ==this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) { HAService service = this.defaultMessageStore.getHaService(); if (messageExt.isWaitStoreMsgOK()) { // 通过HAService判断呵呵从节点是否ok // 检查slave同步的位置是否小于 最大容忍的同步落后偏移量,假如是的则进行刷盘 if(service.isSlaveOK(result.getWroteBytes() + result.getWroteOffset())) { GroupCommitRequest request = new GroupCommitRequest( result.getWroteOffset() + result.getWroteBytes(),this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout() // 主从同步超时时间默认是3s ); service.putRequest(request); service.getWaitNotifyObject().wakeupAll(); returnrequest.future();// 能通过future来等待主从同步完成 } else { // 此时可能是从节点不可用 returnCompletableFuture.completedFuture(PutMessageStatus.SLAVE_NOT_AVAILABLE); } } }returnCompletableFuture.completedFuture(PutMessageStatus.PUT_OK); }11、追加最新消息回调函数
// 默认追加最新消息回调接口 class DefaultAppendMessageCallback implements AppendMessageCallback { // File at the end of the minimum fixed length empty private static final intEND_FILE_MIN_BLANK_LENGTH =4 + 4; private final ByteBuffer msgIdMemory; // msgId缓存区域 private final ByteBuffer msgIdV6Memory; // v6版本的msgId缓存区域 // Store the message content private final ByteBuffer msgStoreItemMemory; // 最新消息储存条目缓存区域 // The maximum length of the message private final int maxMessageSize; // 最大最新消息大小DefaultAppendMessageCallback(final int size) { this.msgIdMemory = ByteBuffer.allocate(4 + 4 + 8); this.msgIdV6Memory = ByteBuffer.allocate(16 + 4 + 8); this.msgStoreItemMemory = ByteBuffer.allocate(END_FILE_MIN_BLANK_LENGTH); this.maxMessageSize = size; }public AppendMessageResult doAppend( // 在文档里的哪个偏移量开始进行写入 final long fileFromOffset, // 要写入最新消息的缓存态射区域 final ByteBuffer byteBuffer, // 最大空白区域 final int maxBlank, // 最新消息内容 finalMessageExtBrokerInner msgInner,// 写入最新消息上下文 PutMessageContext putMessageContext) { // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br> // PHY OFFSET // 从文档的哪个偏移量开始,加上缓存态射区域的起始位置,定位到硬盘文档物理偏移量位置,从这个位置开始写入就能了 // 物理硬盘文档->态射一块缓存区域,这块缓存区域可能是从物理硬盘文档的某个偏移量开始来进行态射的 // 当前mappedfile是从哪个偏移量开始写入的,此时再加上缓存态射区域的能写入位置 // byteBuffer.position(),两个mappedfile缓存态射区域里面写入位置,是属于针对单个mappedfile来的 // 把缓存态射区域写入位置 + 当前这个文档物理全局上的开始偏移量,此时就是两个真正的物理上的偏移量 longwroteOffset = fileFromOffset + byteBuffer.position();// 处理好两个最新消息id // 最新消息Id是由储存机器socket地址+port端口号+物理偏移量Supplier<String> msgIdSupplier = () -> {// 首先是根据sysFlag是否是储存机器地址v6标识,假如不是的话msgId就是4+4+8,假如是的话msgId就是16+4+8 // 根据我们判断出来的msgId大小重新分配一块缓存空间 int sysflag = msgInner.getSysFlag(); intmsgIdLen = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) ==0 ? 4 + 4 + 8 : 16 + 4 + 8; ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);// 他会把储存机器地址写入到msgIdBuffer里去MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer); msgIdBuffer.clear();//because socketAddress2ByteBuffer flip the buffer // 从msgIdLen倒推回去8个位置,此时写入两个long类型的wroteOffset msgIdBuffer.putLong(msgIdLen – 8, wroteOffset); returnUtilAll.bytes2string(msgIdBuffer.array()); }; // Record ConsumeQueue information // 记录呵呵topic堆栈偏移量态射表里面的两个记录信息,这个最新消息是属于topic->queueId String key = putMessageContext.getTopicQueueTableKey(); Long queueOffset = CommitLog.this.topicQueueTable.get(key); // 假如说这个最新消息对应的topic-queueId的偏移量是null,此时就能做两个重置 if (null == queueOffset) { queueOffset = 0L; CommitLog.this.topicQueueTable.put(key, queueOffset); }// 对这个最新消息做两个多路分发wrap boolean multiDispatchWrapResult = CommitLog.this.multiDispatch.wrapMultiDispatch(msgInner);if (!multiDispatchWrapResult) { return newAppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); }// Transaction messages that require special handling final inttranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());switch (tranType) { // Prepared and Rollback message is not consumed, will not enter the // consumer queuec caseMessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: queueOffset = 0L; break; caseMessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE: default: break; } // 最新消息内容,preEncodeBuffer就已经是最新消息内容了 ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff(); final intmsgLen = preEncodeBuffer.getInt(0); // 最新消息开始4个字节,就是最新消息长度 // Determines whether there is sufficient free space if((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {this.msgStoreItemMemory.clear(); // 1 TOTALSIZE this.msgStoreItemMemory.putInt(maxBlank);// 2 MAGICCODE this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE); // 3 The remaining space may be any value // Here the length of the specially set maxBlank final longbeginTimeMills = CommitLog.this.defaultMessageStore.now(); byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);return new AppendMessageResult( AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, /* only wrote 8 bytes, but declare wrote maxBlank for compute write position */msgIdSupplier, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() – beginTimeMills ); }// 在这里一大段的东西都是在对已经编码后的最新消息进行一些信息补充 int pos = 4 + 4 + 4 + 4 + 4; // 6 QUEUEOFFSET,写入最新消息在topic->queueId里面的偏移量 preEncodeBuffer.putLong(pos, queueOffset); pos += 8; // 7 PHYSICALOFFSET,写入最新消息在整座CommitLog里面的物理偏移量 preEncodeBuffer.putLong(pos, fileFromOffset + byteBuffer.position()); intipLen = (msgInner.getSysFlag() & MessageSysFlag.BORNHOST_V6_FLAG) ==0 ? 4 + 4 : 16 + 4; // 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP pos += 8 + 4 + 8 + ipLen; // refresh store time stamp in lock,写入储存天数戳preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp());final long beginTimeMills = CommitLog.this.defaultMessageStore.now();// Write messages to the queue buffer byteBuffer.put(preEncodeBuffer); // 把编码后的完整的最新消息字节数据写入到mappedfile缓存态射区域里去 msgInner.setEncodedBuff(null); AppendMessageResult result = newAppendMessageResult( AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() – beginTimeMills ); switch (tranType) { case MessageSysFlag.TRANSACTION_PREPARED_TYPE: caseMessageSysFlag.TRANSACTION_ROLLBACK_TYPE:break; case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: // The next update ConsumeQueue information CommitLog.this.topicQueueTable.put(key, ++queueOffset); CommitLog.this.multiDispatch.updateMultiQueueOffset(msgInner); break; default: break; } returnresult; }public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final intmaxBlank,final MessageExtBatch messageExtBatch, PutMessageContext putMessageContext) { byteBuffer.mark();//physical offset long wroteOffset = fileFromOffset + byteBuffer.position(); // Record ConsumeQueue informationString key = putMessageContext.getTopicQueueTableKey(); Long queueOffset = CommitLog.this.topicQueueTable.get(key); if (null == queueOffset) { queueOffset = 0L; CommitLog.this.topicQueueTable.put(key, queueOffset); }long beginQueueOffset = queueOffset; int totalMsgLen = 0; intmsgNum =0; final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); ByteBuffer messagesByteBuff = messageExtBatch.getEncodedBuff();int sysFlag = messageExtBatch.getSysFlag(); intbornHostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) ==0 ? 4 + 4 : 16 + 4; intstoreHostLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) ==0 ? 4 + 4 : 16 + 4; Supplier<String> msgIdSupplier = () -> { intmsgIdLen = storeHostLength +8; int batchCount = putMessageContext.getBatchSize(); long[] phyPosArray = putMessageContext.getPhyPos(); ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen); MessageExt.socketAddress2ByteBuffer(messageExtBatch.getStoreHost(), msgIdBuffer); msgIdBuffer.clear();//because socketAddress2ByteBuffer flip the buffer StringBuilder buffer = newStringBuilder(batchCount * msgIdLen *2 + batchCount – 1); for (int i = 0; i < phyPosArray.length; i++) { msgIdBuffer.putLong(msgIdLen –8, phyPosArray[i]); String msgId = UtilAll.bytes2string(msgIdBuffer.array()); if (i != 0) { buffer.append(,); } buffer.append(msgId); } returnbuffer.toString(); }; messagesByteBuff.mark();int index = 0; while (messagesByteBuff.hasRemaining()) { // 1 TOTALSIZE final intmsgPos = messagesByteBuff.position();final int msgLen = messagesByteBuff.getInt(); final int bodyLen = msgLen – 40; //only for log, just estimate it // Exceeds the maximum message if (msgLen > this.maxMessageSize) { CommitLog.log.warn(“message size exceeded, msg total size: “ + msgLen + “, msg body size: “+ bodyLen +“, maxMessageSize: “ + this.maxMessageSize); return newAppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED); } totalMsgLen += msgLen;// Determines whether there is sufficient free space if ((totalMsgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) { this.msgStoreItemMemory.clear(); // 1 TOTALSIZE this.msgStoreItemMemory.putInt(maxBlank); // 2 MAGICCODE this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);// 3 The remaining space may be any value //ignore previous readmessagesByteBuff.reset();// Here the length of the specially set maxBlank byteBuffer.reset(); //ignore the previous appended messages byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8); return newAppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgIdSupplier, messageExtBatch.getStoreTimestamp(), beginQueueOffset, CommitLog.this.defaultMessageStore.now() – beginTimeMills); } //move to add queue offset and commitlog offset int pos = msgPos + 20; messagesByteBuff.putLong(pos, queueOffset); pos +=8; messagesByteBuff.putLong(pos, wroteOffset + totalMsgLen – msgLen); // 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP pos += 8 + 4 + 8 + bornHostLength; // refresh store time stamp in lockmessagesByteBuff.putLong(pos, messageExtBatch.getStoreTimestamp()); putMessageContext.getPhyPos()[index++] = wroteOffset + totalMsgLen – msgLen; queueOffset++; msgNum++; messagesByteBuff.position(msgPos + msgLen); } messagesByteBuff.position(0); messagesByteBuff.limit(totalMsgLen); byteBuffer.put(messagesByteBuff); messageExtBatch.setEncodedBuff(null); AppendMessageResult result =newAppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, totalMsgLen, msgIdSupplier, messageExtBatch.getStoreTimestamp(), beginQueueOffset, CommitLog.this.defaultMessageStore.now() – beginTimeMills); result.setMsgNum(msgNum); CommitLog.this.topicQueueTable.put(key, queueOffset);return result; } private void resetByteBuffer(finalByteBuffer byteBuffer,final int limit) { byteBuffer.flip(); byteBuffer.limit(limit); } }12、最新消息扩展编码器
public static class MessageExtEncoder { // Store the message content // 编码模块对应的两个缓存缓冲区 private finalByteBuffer encoderBuffer;// The maximum length of the message private finalint maxMessageSize; MessageExtEncoder(final int size) { this.encoderBuffer = ByteBuffer.allocateDirect(size); this.maxMessageSize = size; }private void socketAddress2ByteBuffer(final SocketAddress socketAddress, finalByteBuffer byteBuffer) { InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress; InetAddress address = inetSocketAddress.getAddress();if(address instanceof Inet4Address) { byteBuffer.put(inetSocketAddress.getAddress().getAddress(),0, 4); } else{ byteBuffer.put(inetSocketAddress.getAddress().getAddress(),0, 16); } byteBuffer.putInt(inetSocketAddress.getPort()); } // 对最新消息进行编码,拿到两个写入最新消息结果 protected PutMessageResult encode(MessageExtBrokerInner msgInner) { /** * Serialize message */ finalbyte[] propertiesData = msgInner.getPropertiesString() ==null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); final int propertiesLength = propertiesData == null ? 0 : propertiesData.length; if (propertiesLength > Short.MAX_VALUE) { log.warn(“putMessage message properties length too long. length={}”, propertiesData.length);return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null); } // topic字节字符串 final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); // topic直接字符串长度 final int topicLength = topicData.length; // 最新消息内容字节字符串长度 final int bodyLength = msgInner.getBody() == null ? 0: msgInner.getBody().length;// 最新消息完整大小 finalint msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);// Exceeds the maximum message if (msgLen > this.maxMessageSize) { CommitLog.log.warn(“message size exceeded, msg total size: “ + msgLen + “, msg body size: “+ bodyLength +“, maxMessageSize: “ + this.maxMessageSize); returnnew PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL,null); } // Initialization of storage space // 对缓存缓冲区做两个复位,准备对最新消息进行编码,也就是说把第一类格式的最新消息,写入到字节字符串里去 this.resetByteBuffer(encoderBuffer, msgLen); // 1 TOTALSIZE,最新消息完整大小 this.encoderBuffer.putInt(msgLen);// 2 MAGICCODE,最新消息固定的两个魔数 this.encoderBuffer.putInt(CommitLog.MESSAGE_MAGIC_CODE); // 3 BODYCRC,最新消息内容crc校验和 this.encoderBuffer.putInt(msgInner.getBodyCRC()); // 4 QUEUEID,queueId this.encoderBuffer.putInt(msgInner.getQueueId());// 5 FLAG,flag this.encoderBuffer.putInt(msgInner.getFlag()); // 6 QUEUEOFFSET, need update later,最新消息在堆栈里面的偏移量 this.encoderBuffer.putLong(0); // 7 PHYSICALOFFSET, need update later,最新消息在commitlog里面的物理偏移量 this.encoderBuffer.putLong(0); // 8 SYSFLAG,sysflag this.encoderBuffer.putInt(msgInner.getSysFlag());// 9 BORNTIMESTAMP,最新消息诞生天数戳 this.encoderBuffer.putLong(msgInner.getBornTimestamp()); // 10 BORNHOST,最新消息诞生机器地址 socketAddress2ByteBuffer(msgInner.getBornHost() ,this.encoderBuffer); // 11 STORETIMESTAMP,最新消息储存天数戳 this.encoderBuffer.putLong(msgInner.getStoreTimestamp()); // 12 STOREHOSTADDRESS,最新消息储存机器地址 socketAddress2ByteBuffer(msgInner.getStoreHost() ,this.encoderBuffer); // 13 RECONSUMETIMES,最新消息重新消费次数 this.encoderBuffer.putInt(msgInner.getReconsumeTimes()); // 14 Prepared Transaction Offset,prepared事务最新消息偏移量 this.encoderBuffer.putLong(msgInner.getPreparedTransactionOffset()); // 15 BODY,最新消息内容 this.encoderBuffer.putInt(bodyLength);if (bodyLength > 0) this.encoderBuffer.put(msgInner.getBody()); // 16 TOPIC,最新消息topic this.encoderBuffer.put((byte) topicLength); this.encoderBuffer.put(topicData); // 17 PROPERTIES,最新消息属性 this.encoderBuffer.putShort((short) propertiesLength); if (propertiesLength > 0) this.encoderBuffer.put(propertiesData); encoderBuffer.flip();// 编码完毕以后其实是返回了两个null return null; } protectedByteBuffer encode(finalMessageExtBatch messageExtBatch, PutMessageContext putMessageContext) { encoderBuffer.clear();//not thread-safe int totalMsgLen = 0; ByteBuffer messagesByteBuff = messageExtBatch.wrap(); int sysFlag = messageExtBatch.getSysFlag(); int bornHostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) ==0 ? 4 + 4 : 16 + 4; int storeHostLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) ==0 ? 4 + 4 : 16 + 4; ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength); ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);// properties from MessageExtBatchString batchPropStr = MessageDecoder.messageProperties2String(messageExtBatch.getProperties());finalbyte[] batchPropData = batchPropStr.getBytes(MessageDecoder.CHARSET_UTF8); int batchPropDataLen = batchPropData.length;if (batchPropDataLen > Short.MAX_VALUE) { CommitLog.log.warn(“Properties size of messageExtBatch exceeded, properties size: {}, maxSize: {}.”, batchPropDataLen, Short.MAX_VALUE); throw new RuntimeException(“Properties size of messageExtBatch exceeded!”); } finalshort batchPropLen = (short) batchPropDataLen; int batchSize =0; while(messagesByteBuff.hasRemaining()) { batchSize++;// 1 TOTALSIZE messagesByteBuff.getInt(); // 2 MAGICCODEmessagesByteBuff.getInt();// 3 BODYCRC messagesByteBuff.getInt(); // 4 FLAG int flag = messagesByteBuff.getInt(); // 5 BODYint bodyLen = messagesByteBuff.getInt(); int bodyPos = messagesByteBuff.position(); int bodyCrc = UtilAll.crc32(messagesByteBuff.array(), bodyPos, bodyLen); messagesByteBuff.position(bodyPos + bodyLen);// 6 propertiesshort propertiesLen = messagesByteBuff.getShort(); int propertiesPos = messagesByteBuff.position(); messagesByteBuff.position(propertiesPos + propertiesLen); boolean needAppendLastPropertySeparator = propertiesLen >0 && batchPropLen > 0 && messagesByteBuff.get(messagesByteBuff.position() –1) != MessageDecoder.PROPERTY_SEPARATOR; finalbyte[] topicData = messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);finalint topicLength = topicData.length; int totalPropLen = needAppendLastPropertySeparator ? propertiesLen + batchPropLen +1: propertiesLen + batchPropLen;finalint msgLen = calMsgLength(messageExtBatch.getSysFlag(), bodyLen, topicLength, totalPropLen);// Exceeds the maximum message if (msgLen > this.maxMessageSize) { CommitLog.log.warn(“message size exceeded, msg total size: “ + msgLen + “, msg body size: “ + bodyLen + “, maxMessageSize: “ + this.maxMessageSize); throw new RuntimeException(“message size exceeded”); } totalMsgLen += msgLen;// Determines whether there is sufficient free space if (totalMsgLen > maxMessageSize) { throw new RuntimeException(“message size exceeded”); } // 1 TOTALSIZE this.encoderBuffer.putInt(msgLen);// 2 MAGICCODE this.encoderBuffer.putInt(CommitLog.MESSAGE_MAGIC_CODE); // 3 BODYCRC this.encoderBuffer.putInt(bodyCrc);// 4 QUEUEID this.encoderBuffer.putInt(messageExtBatch.getQueueId()); // 5 FLAG this.encoderBuffer.putInt(flag); // 6 QUEUEOFFSET this.encoderBuffer.putLong(0); // 7 PHYSICALOFFSET this.encoderBuffer.putLong(0); // 8 SYSFLAG this.encoderBuffer.putInt(messageExtBatch.getSysFlag()); // 9 BORNTIMESTAMP this.encoderBuffer.putLong(messageExtBatch.getBornTimestamp()); // 10 BORNHOST this.resetByteBuffer(bornHostHolder, bornHostLength);this.encoderBuffer.put(messageExtBatch.getBornHostBytes(bornHostHolder));// 11 STORETIMESTAMP this.encoderBuffer.putLong(messageExtBatch.getStoreTimestamp()); // 12 STOREHOSTADDRESS this.resetByteBuffer(storeHostHolder, storeHostLength); this.encoderBuffer.put(messageExtBatch.getStoreHostBytes(storeHostHolder));// 13 RECONSUMETIMES this.encoderBuffer.putInt(messageExtBatch.getReconsumeTimes());// 14 Prepared Transaction Offset, batch does not support transaction this.encoderBuffer.putLong(0); // 15 BODY this.encoderBuffer.putInt(bodyLen); if (bodyLen > 0) this.encoderBuffer.put(messagesByteBuff.array(), bodyPos, bodyLen);// 16 TOPIC this.encoderBuffer.put((byte) topicLength);this.encoderBuffer.put(topicData); // 17 PROPERTIES this.encoderBuffer.putShort((short) totalPropLen);if (propertiesLen > 0) { this.encoderBuffer.put(messagesByteBuff.array(), propertiesPos, propertiesLen); }if (batchPropLen > 0) { if (needAppendLastPropertySeparator) { this.encoderBuffer.put((byte) MessageDecoder.PROPERTY_SEPARATOR); }this.encoderBuffer.put(batchPropData, 0, batchPropLen); } } putMessageContext.setBatchSize(batchSize); putMessageContext.setPhyPos(new long[batchSize]); encoderBuffer.flip();return encoderBuffer; } private void resetByteBuffer(finalByteBuffer byteBuffer,final int limit) { byteBuffer.flip(); byteBuffer.limit(limit); } public ByteBuffer getEncoderBuffer() { return encoderBuffer; } }四、总结
在IO读写操作上,RocketMQ的一些优化方案的关键词包括:
异步创建文档缓存锁定缓存预热堆外缓存当然,具体怎么使用,怎么配置还是要业务,但是不可否认的是RocketMQ的结构设计确实很精妙