晚安,那时我想和你撷取呵呵XXL-JOB的核心理念同时实现。假如你是XXL-JOB的采用者,所以你的确思索过它的同时实现基本原理;假如你还未碰触过那个商品,所以能透过责任编辑介绍呵呵。
XXL-JOB的洼瓣(2.0版)如下表所示:
它是怎样组织工作的呢?从采用方的视角上看,具体而言开伞器要向服务项目器端注册登记。所以这儿你可能将就有疑点了,开伞器向服务项目器端注册登记?是不是注册登记的?多长时间注册登记一场?选用甚么串行?
注册登记完了后,服务项目器端就能晓得有甚么样开伞器,并促发各项任务运维。所以服务项目器端是怎样历史记录每一各项任务的促发最佳时机,并顺利完成精确运维的呢?XXL-JOB选用的是Quartz运维架构,责任编辑我急于用天数轮计划来代替。
最终,开伞器转交到运维允诺,是是不是继续执行各项任务的呢?
带着那些难题,他们迈入XXL-JOB的explore。我先而言说XXL-JOB工程项目组件,工程项目组件很单纯,有2个:
• xxl-job-core:那个组件是给开伞器倚赖的;• xxl-job-admin:相关联架构图中的运维中心;责任编辑文本较干,请配搭源代码饮用。源代码版是:2.0.2
1、Job服务项目手动注册登记
第一个核心理念技术点,服务项目注册登记。
服务项目注册登记要从xxl-job-core组件的XxlJobSpringExecutor类说起,这是一个 Spring 的 Bean,它是这么定义的:
@Bean(initMethod = “start”, destroyMethod = “destroy”) publicXxlJobSpringExecutorxxlJobExecutor() { XxlJobSpringExecutor xxlJobSpringExecutor = newXxlJobSpringExecutor(); xxlJobSpringExecutor.setAdminAddresses(adminAddresses);// 其他的一些注册登记信息 returnxxlJobSpringExecutor; }进行代码追踪,最终会是下面的调用链路:
xxl-job-core组件 spring bean: XxlJobSpringExecutor # start()-> XxlJobExecutor# start() -> initRpcProvider() xxl-rpc-core.jar -> XxlRpcProviderFactory # start() -> ServiceRegistry# start() -> ExecutorServiceRegistry # start() -> ExecutorRegistryThread # start()ExecutorRegistryThread就是服务项目注册登记的核心理念同时实现了,start()方法核心理念代码如下表所示:
public void start(String appName, String address) { registryThread =new Thread(new Runnable() { @Override public void run() { // registry while (!toStop) { // do registryadminBiz.registry(registryParam); TimeUnit.SECONDS.sleep(JobConstants.HEARTBEAT_INTERVAL);// 30s } // registry removeadminBiz.registryRemove(registryParam); } }); registryThread.setDaemon(true); registryThread.start(); }能看到开伞器每 30s 继续执行注册一场,他们继续往下看。
2、手动注册登记通信技术同时实现
透过上面ExecutorRegistryThread # start()方法核心理念代码,能看到,注册登记是透过adminBiz.registry(registryParam)代码同时实现的,调用链路总结如下表所示:
xxl-job-core组件 AdminBiz # registry() -> AdminBizClient # registry()-> XxlJobRemotingUtil# postBody() -> POST api/registry (jdk HttpURLConnection)最终还是透过 HTTP 协议的 POST 允诺,注册登记数据格式如下:
{ “registryGroup”: “EXECUTOR”, “registryKey”: “example-job-executor”, “registryValue”: “10.0.0.10:9999”}看到这儿,他们回到文章开头难题部分。
开伞器向服务项目器端注册登记?是不是注册登记的?多长时间注册登记一场?选用甚么串行?
答案已经很明显了。
3、各项任务运维同时实现
他们接着上看第二个核心理念技术点,各项任务运维。
XXL-JOB选用的是Quartz运维架构,这儿我急于向你介绍呵呵天数轮的同时实现计划,核心理念源代码如下表所示:
@Component public class JobScheduleHandler { privateThread scheduler;private Thread ringConsumer; private final Map<Integer, List<Integer>> ring; @PostConstruct public void start() { scheduler = new Thread(new JobScheduler(), “job-scheduler”); scheduler.setDaemon(true); scheduler.start(); ringConsumer = new Thread(new RingConsumer(), “job-ring-handler”); ringConsumer.setDaemon(true); ringConsumer.start(); }class JobScheduler implements Runnable { @Override public void run() { sleep(5000– System.currentTimeMillis() %1000); while (!schedulerStop) { try { lock.lock(); // pre read to ring } catch (Exception e) { log.error(“JobScheduler error”, e); }finally{ lock.unlock(); } sleep(1000); } } } class RingConsumer implements Runnable { @Override public void run() { sleep(1000 – System.currentTimeMillis() % 1000); while(!ringConsumerStop) {try { intnowSecond = Calendar.getInstance().get(Calendar.SECOND); List<Integer> jobIds = ring.remove(nowSecond %60); // 促发各项任务运维 } catch(Exception e) { log.error(“ring consumer error”, e); } sleep(1000– System.currentTimeMillis() %1000); } } } }上述透过两个线程池来同时实现,job-scheduler为预读线程,job-ring-handler为天数轮线程。所以天数轮是是不是同时实现各项任务的精确运维的呢?
天数轮的同时实现基本原理
他们常见的时钟根据秒针转动的类型,能分为嘀嗒式秒针和流动式秒针。
时钟
我以嘀嗒式秒针时钟为例,能把时钟环看作一个数组,秒针 1~60 秒停留的位置作为数组下标,60s 为数组下标 0。假设现在有 3 个待继续执行的各项任务,分别如下表所示:
jobid: 101 0秒时刻开始继续执行,2s/次 jobid: 102 0秒时刻开始继续执行,3s/次 jobid: 103 3秒时刻开始继续执行,4s/次相关联 0 秒时刻的数组模型如下表所示图所示:
0秒时刻
这儿我把 0 时刻拆成了三个阶段,分别是:
• 继续执行前:读取该时刻有甚么样各项任务待继续执行,拿到各项任务 id;• 继续执行中:透过各项任务 id 查询各项任务的运行策略,继续执行各项任务;• 继续执行后:更新各项任务的下次继续执行天数;然后天数指针往前推动一个时刻,到了 1 秒时刻。此时刻天数轮中的各项任务并未发生变化。
到了第 2 秒时刻,预读线程将 jobid 103 加入天数轮,并继续执行该数组下标下的各项任务:
这样到了第 3 秒时刻,各项任务的数组下标又会被更新。
所以这种以秒为刻度的天数轮有没有误差呢?
各项任务运维的精确度是取决于时间轮的刻度的。举个例子,他们把 0 秒时刻的这 1s 拆成 1000ms。
假设各项任务都是在第 500ms 顺利完成该时刻秒内所有各项任务的运维的,501ms 有一个新的各项任务被预读线程加载进来了,所以轮到下次运维,就要等到第 1 秒时刻的第 500ms,误差相差了一个刻度即 1s。假如以 0.5 秒为一个刻度,所以误差就变小了,是 500ms。
所以说,刻度越小,误差越小。不过这也要根据业务的实际情况来决定,毕竟要想减少误差,就要耗费更多的 CPU 资源。
介绍完各项任务运维的同时实现基本原理,那运维器与开伞器间的服务项目通信是怎样同时实现的呢?
4、各项任务运维通信技术同时实现
在xxl-job-admin组件,梳理调用链路如下表所示:
xxl-job-admin组件 JobTriggerPoolHelper # trigger() –> ThreadPoolExecutor # execute() (分快慢线程池) –> XxlJobTrigger # trigger() –> processTrigger() –> runExecutor() –> XxlJobDynamicScheduler # getExecutorBiz() –> ExecutorBiz # run() (动态代理同时实现, 这儿调用的 run 会作为参数)[1] –> XxlRpcReferenceBean. new InvocationHandler() # invoke() xxl-rpc-core.jar –> NettyHttpClient # asyncSend() (POST…允诺参数 XxlRpcRequest 设置 methodName 为[1]处的调用方法即 “run”)最终是透过 HTTP 协议进行通信的,核心理念通信代码如下表所示:
public void send(XxlRpcRequest xxlRpcRequest) throws Exception { byte[] requestBytes = serializer.serialize(xxlRpcRequest); DefaultFullHttpRequest request =newDefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST,newURI(address).getRawPath(), Unpooled.wrappedBuffer(requestBytes)); request.headers().set(HttpHeaderNames.HOST, host); request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); request.headers().set(HttpHeaderNames.CONTENT_LENGTH, request.content().readableBytes());this.channel.writeAndFlush(request).sync(); }运维器将继续执行允诺发送到开伞器后,接着就是开伞器的组织工作了。
5、开伞器转交各项任务接口同时实现
开伞器的组织工作,梳理调用链路如下表所示:
xxl-job-core组件 spring bean: XxlJobSpringExecutor# start() -> XxlJobExecutor # start() -> initRpcProvider()xxl-rpc-core.jar -> XxlRpcProviderFactory# start() -> Server # start() -> NettyHttpServer # start()netty 接口同时实现 NettyHttpServerHandler# channelRead0() -> process() (线程池继续执行) -> XxlRpcProviderFactory # invokeService()(根据允诺参数 XxlRpcRequest 里的 methodName 反射调用) -> ExecutorBizImpl# run()他们也能透过 HTTP 允诺查看接口同时实现:
GET http://localhost:17711/services结果如下表所示:
<ui> <li>com.xxl.job.core.biz.ExecutorBiz: com.xxl.job.core.biz.impl.ExecutorBizImpl@d579177</li> </ui>开伞器转交各项任务,总结而言用的是下面的接口:
POST http://localhost:17711要注意的是,这儿假如透过 Postman 来调用是调不通的,因为序列化方式和 HTTP 协议是不一样的。
接下来就是开伞器转交到各项任务逻辑,代码链路如下表所示:
xxl-job-core组件 spring bean: XxlJobSpringExecutor# start() -> XxlJobExecutor # start() -> initRpcProvider()-> new ExecutorBizImpl() -> JobThread# pushTriggerQueue() spring bean: XxlJobExecutor # registJobThread() 启动 jobThead -> JobThread # run()到这里,他们就把核心理念流程梳理了一遍。
小结
透过上文的梳理,假如想要从 0 搭建一个分布式各项任务运维系统,想必你已胸有成竹了。责任编辑所描述的天数轮计划,也是敝司基于XXL-JOB的重构计划,后来也应用在了消息中间件的延迟消息同时实现中。
欢迎交流,公众号【杨同学technotes】