XXL-JOB核心源码解读及时间轮原理剖析

2022-12-15 0 1,057

晚安,那时我想和你撷取呵呵XXL-JOB的核心理念同时实现。假如你是XXL-JOB的采用者,所以你的确思索过它的同时实现基本原理;假如你还未碰触过那个商品,所以能透过责任编辑介绍呵呵。

XXL-JOB的洼瓣(2.0版)如下表所示:

XXL-JOB核心源码解读及时间轮原理剖析

它是怎样组织工作的呢?从采用方的视角上看,具体而言开伞器要向服务项目器端注册登记。所以这儿你可能将就有疑点了,开伞器向服务项目器端注册登记?是不是注册登记的?多长时间注册登记一场?选用甚么串行?

注册登记完了后,服务项目器端就能晓得有甚么样开伞器,并促发各项任务运维。所以服务项目器端是怎样历史记录每一各项任务的促发最佳时机,并顺利完成精确运维的呢?XXL-JOB选用的是Quartz运维架构,责任编辑我急于用天数轮计划来代替。

最终,开伞器转交到运维允诺,是是不是继续执行各项任务的呢?

带着那些难题,他们迈入XXL-JOB的explore。我先而言说XXL-JOB工程项目组件,工程项目组件很单纯,有2个:

• xxl-job-core:那个组件是给开伞器倚赖的;• xxl-job-admin:相关联架构图中的运维中心;

责任编辑文本较干,请配搭源代码饮用。源代码版是:2.0.2

1、Job服务项目手动注册登记

第一个核心理念技术点,服务项目注册登记。

服务项目注册登记要从xxl-job-core组件的XxlJobSpringExecutor类说起,这是一个 Spring 的 Bean,它是这么定义的:

@Bean(initMethod = “start”, destroyMethod = “destroy”) publicXxlJobSpringExecutorxxlJobExecutor() {     XxlJobSpringExecutor xxlJobSpringExecutor = newXxlJobSpringExecutor();     xxlJobSpringExecutor.setAdminAddresses(adminAddresses);// 其他的一些注册登记信息     returnxxlJobSpringExecutor; }

进行代码追踪,最终会是下面的调用链路:

xxl-job-core组件 spring bean: XxlJobSpringExecutor # start()-> XxlJobExecutor# start() -> initRpcProvider() xxl-rpc-core.jar -> XxlRpcProviderFactory # start() -> ServiceRegistry# start() -> ExecutorServiceRegistry # start() -> ExecutorRegistryThread # start()

ExecutorRegistryThread就是服务项目注册登记的核心理念同时实现了,start()方法核心理念代码如下表所示:

public void start(String appName, String address) {     registryThread =new Thread(new Runnable() {         @Override         public void run() {             // registry             while (!toStop) {                 // do registryadminBiz.registry(registryParam);                 TimeUnit.SECONDS.sleep(JobConstants.HEARTBEAT_INTERVAL);// 30s             }             // registry removeadminBiz.registryRemove(registryParam);         }     });     registryThread.setDaemon(true);     registryThread.start(); }

能看到开伞器每 30s 继续执行注册一场,他们继续往下看。

2、手动注册登记通信技术同时实现

透过上面ExecutorRegistryThread # start()方法核心理念代码,能看到,注册登记是透过adminBiz.registry(registryParam)代码同时实现的,调用链路总结如下表所示:

xxl-job-core组件 AdminBiz # registry() -> AdminBizClient # registry()-> XxlJobRemotingUtil# postBody() -> POST api/registry (jdk HttpURLConnection)

最终还是透过 HTTP 协议的 POST 允诺,注册登记数据格式如下:

{   “registryGroup”“EXECUTOR”,   “registryKey”“example-job-executor”,   “registryValue”“10.0.0.10:9999”}

看到这儿,他们回到文章开头难题部分。

开伞器向服务项目器端注册登记?是不是注册登记的?多长时间注册登记一场?选用甚么串行?

答案已经很明显了。

3、各项任务运维同时实现

他们接着上看第二个核心理念技术点,各项任务运维。

XXL-JOB选用的是Quartz运维架构,这儿我急于向你介绍呵呵天数轮的同时实现计划,核心理念源代码如下表所示:

@Component public class JobScheduleHandler {     privateThread scheduler;private Thread ringConsumer;     private final Map<Integer, List<Integer>> ring;          @PostConstruct     public void start() {         scheduler = new Thread(new JobScheduler(), “job-scheduler”);         scheduler.setDaemon(true);         scheduler.start();         ringConsumer = new Thread(new RingConsumer(), “job-ring-handler”);         ringConsumer.setDaemon(true);         ringConsumer.start();     }class JobScheduler implements Runnable {         @Override         public void run() {             sleep(5000– System.currentTimeMillis() %1000);             while (!schedulerStop) {                 try {                     lock.lock();                     // pre read to ring                 } catch (Exception e) {                     log.error(“JobScheduler error”, e);                 }finally{                     lock.unlock();                 }                 sleep(1000);             }         }     }          class RingConsumer implements Runnable {         @Override         public void run() {             sleep(1000 – System.currentTimeMillis() % 1000);             while(!ringConsumerStop) {try {                     intnowSecond = Calendar.getInstance().get(Calendar.SECOND);                     List<Integer> jobIds = ring.remove(nowSecond %60);                     // 促发各项任务运维                 } catch(Exception e) {                     log.error(“ring consumer error”, e);                 }                 sleep(1000– System.currentTimeMillis() %1000);             }         }     } }

上述透过两个线程池来同时实现,job-scheduler为预读线程,job-ring-handler为天数轮线程。所以天数轮是是不是同时实现各项任务的精确运维的呢?

天数轮的同时实现基本原理

他们常见的时钟根据秒针转动的类型,能分为嘀嗒式秒针和流动式秒针。

XXL-JOB核心源码解读及时间轮原理剖析

时钟

我以嘀嗒式秒针时钟为例,能把时钟环看作一个数组,秒针 1~60 秒停留的位置作为数组下标,60s 为数组下标 0。假设现在有 3 个待继续执行的各项任务,分别如下表所示:

jobid: 101  0秒时刻开始继续执行,2s/次 jobid: 102  0秒时刻开始继续执行,3s/次 jobid: 103  3秒时刻开始继续执行,4s/次

相关联 0 秒时刻的数组模型如下表所示图所示:

XXL-JOB核心源码解读及时间轮原理剖析

0秒时刻

这儿我把 0 时刻拆成了三个阶段,分别是:

• 继续执行前:读取该时刻有甚么样各项任务待继续执行,拿到各项任务 id;• 继续执行中:透过各项任务 id 查询各项任务的运行策略,继续执行各项任务;• 继续执行后:更新各项任务的下次继续执行天数;

然后天数指针往前推动一个时刻,到了 1 秒时刻。此时刻天数轮中的各项任务并未发生变化。

XXL-JOB核心源码解读及时间轮原理剖析

到了第 2 秒时刻,预读线程将 jobid 103 加入天数轮,并继续执行该数组下标下的各项任务:

XXL-JOB核心源码解读及时间轮原理剖析

这样到了第 3 秒时刻,各项任务的数组下标又会被更新。

XXL-JOB核心源码解读及时间轮原理剖析

所以这种以秒为刻度的天数轮有没有误差呢?

各项任务运维的精确度是取决于时间轮的刻度的。举个例子,他们把 0 秒时刻的这 1s 拆成 1000ms。

XXL-JOB核心源码解读及时间轮原理剖析

假设各项任务都是在第 500ms 顺利完成该时刻秒内所有各项任务的运维的,501ms 有一个新的各项任务被预读线程加载进来了,所以轮到下次运维,就要等到第 1 秒时刻的第 500ms,误差相差了一个刻度即 1s。假如以 0.5 秒为一个刻度,所以误差就变小了,是 500ms。

所以说,刻度越小,误差越小。不过这也要根据业务的实际情况来决定,毕竟要想减少误差,就要耗费更多的 CPU 资源。

介绍完各项任务运维的同时实现基本原理,那运维器与开伞器间的服务项目通信是怎样同时实现的呢?

4、各项任务运维通信技术同时实现

xxl-job-admin组件,梳理调用链路如下表所示:

xxl-job-admin组件 JobTriggerPoolHelper # trigger() ThreadPoolExecutor # execute() (分快慢线程池) XxlJobTrigger # trigger() processTrigger() runExecutor() XxlJobDynamicScheduler # getExecutorBiz()     ExecutorBiz # run() (动态代理同时实现, 这儿调用的 run 会作为参数)[1] XxlRpcReferenceBeannew InvocationHandler() # invoke() xxl-rpc-core.jar NettyHttpClient # asyncSend() (POST…允诺参数 XxlRpcRequest 设置 methodName 为[1]处的调用方法即 “run”)

最终是透过 HTTP 协议进行通信的,核心理念通信代码如下表所示:

public void send(XxlRpcRequest xxlRpcRequest) throws Exception {     byte[] requestBytes = serializer.serialize(xxlRpcRequest);     DefaultFullHttpRequest request =newDefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST,newURI(address).getRawPath(), Unpooled.wrappedBuffer(requestBytes));     request.headers().set(HttpHeaderNames.HOST, host);     request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);     request.headers().set(HttpHeaderNames.CONTENT_LENGTH, request.content().readableBytes());this.channel.writeAndFlush(request).sync(); }

运维器将继续执行允诺发送到开伞器后,接着就是开伞器的组织工作了。

5、开伞器转交各项任务接口同时实现

开伞器的组织工作,梳理调用链路如下表所示:

xxl-job-core组件 spring bean: XxlJobSpringExecutor# start() -> XxlJobExecutor # start() -> initRpcProvider()xxl-rpc-core.jar -> XxlRpcProviderFactory# start()  -> Server # start() -> NettyHttpServer # start()netty 接口同时实现 NettyHttpServerHandler# channelRead0() -> process() (线程池继续执行) -> XxlRpcProviderFactory # invokeService()(根据允诺参数 XxlRpcRequest 里的 methodName 反射调用) -> ExecutorBizImpl# run()

他们也能透过 HTTP 允诺查看接口同时实现:

GET http://localhost:17711/services

结果如下表所示:

<ui>     <li>com.xxl.job.core.biz.ExecutorBizcom.xxl.job.core.biz.impl.ExecutorBizImpl@d579177</li> </ui>

开伞器转交各项任务,总结而言用的是下面的接口:

POST http://localhost:17711

要注意的是,这儿假如透过 Postman 来调用是调不通的,因为序列化方式和 HTTP 协议是不一样的。

接下来就是开伞器转交到各项任务逻辑,代码链路如下表所示:

xxl-job-core组件 spring bean: XxlJobSpringExecutor# start() -> XxlJobExecutor # start() -> initRpcProvider()-> new ExecutorBizImpl() -> JobThread# pushTriggerQueue() spring bean: XxlJobExecutor # registJobThread() 启动 jobThead -> JobThread # run()

到这里,他们就把核心理念流程梳理了一遍。

小结

透过上文的梳理,假如想要从 0 搭建一个分布式各项任务运维系统,想必你已胸有成竹了。责任编辑所描述的天数轮计划,也是敝司基于XXL-JOB的重构计划,后来也应用在了消息中间件的延迟消息同时实现中。

欢迎交流,公众号【杨同学technotes】

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务