《分布式任务调度平台xxljob》手册.
- 格式:pdf
- 大小:2.52 MB
- 文档页数:12
《分布式任务调度平台XXL-JOB》手册文档历史记录目录1:简介 (3)2:安装 (3)3:配置 (3)3.1数据库准备 (4)3.2源码准备 (5)3.3部署准备 (5)3.3.1 配置部署“调度中心” (5)3.3.2 配置部署“执行器项目” (7)4:使用 (9)步骤一:新建任务 (9)步骤二:“GLUE模式(Java)”任务开发 (10)步骤三:触发执行 (10)步骤四:查看日志 (10)5:总结 (11)1:简介XXL-JOB是一个轻量级分布式任务调度框架,其核心设计目标是开发迅速、学习简单、轻量级、易扩展,XXL-JOB是基于开源Quartz 调度内核的、为方便企业调度场景而开源的一款实用的调度工具。
自带任务配置页面,任务监控,分布式执行器等功能。
2:安装下载地址(最新1.9版本迭代中,可选择稳定版本1.8.2下载)码云:https:///xuxueli0323/xxl-job/tree/v1.8.23:配置解压下载文件,得到如下文件结构,- /doc :文档资料- /db :“调度数据库”建表脚本- /xxl-job-admin :调度中心,项目源码- /xxl-job-core :公共Jar依赖- /xxl-job-executor-samples :执行器,Sample示例项目3.1数据库准备使用mysql 新建数据库xxl-job ,导入下载文件夹中/doc/db/tables_xxl_job.sql 文件,应生成16张表,如图所示。
3.2源码准备按照maven格式将源码导入IDE,使用maven进行编译即可,源码结构如下xxl-job-admin:调度中心xxl-job-core:公共依赖xxl-job-executor:执行器Sample示例(选择合适的版本执行器,可直接使用,也可以参考其并将现有项目改造成执行器):xxl-job-executor-sample-spring:Spring版本,通过Spring容器管理执行器,比较通用;:xxl-job-executor-sample-springboot:Springboot版本,通过Springboot管理执行器;:xxl-job-executor-sample-jfinal:JFinal版本,通过JFinal管理执行器;:xxl-job-executor-sample-nutz:Nutz版本,通过Nutz管理执行器;3.3部署准备3.3.1 配置部署“调度中心”调度中心项目:xxl-job-admin作用:统一管理任务调度平台上调度任务,负责触发调度执行,并且提供任务管理平台。
SpringBoot整合Xxl-job实现定时任务的全过程⽬录前⾔⼀、部署调度中⼼1、项⽬下载2、初始化数据3、修改properties配置⽂件⼆、部署SpringBoot项⽬1、引⼊依赖2、创建配置类3、修改配置⽂件4、创建执⾏器5、启动SpringBoot项⽬三、通过调度中⼼进⾏任务调度1、添加执⾏器2、添加任务3、任务调度中⼼发起任务调度四、⼩结总结前⾔XXL-JOB是⼀个分布式任务调度平台,其核⼼设计⽬标是开发迅速、学习简单、轻量级、易扩展。
现已开放源代码并接⼊多家公司线上产品线,开箱即⽤。
如果是单机并且定时任务不多的情况,可以选择Timer注解@Scheduled或者Cron⼯具类等⽅式来实现,但是这有个缺点,那就是定时任务会写死在代码中,⼀旦启动,就不能暂停或者修改。
如果修改的话,整个还项⽬要重新编译,这属实⾮常的⿇烦。
本篇⽂章将会介绍如何通过xxl-job来实现任务的调度⼀、部署调度中⼼1、项⽬下载下⾯是调度中⼼代码的gitee地址,可以colon到本地/xuxueli0323/xxl-job2、初始化数据在下载好的项⽬中的doc/db⽬录下有⼀个tables_xxl_job.sql⽂件,先放到⾃⼰的数据库中执⾏,其实就是初始化好调度中⼼需要的表结构和数据3、修改properties配置⽂件⼆、部署SpringBoot项⽬1、引⼊依赖<dependency><groupId>com.xuxueli</groupId><artifactId>xxl-job-core</artifactId><version>2.2.0</version></dependency>2、创建配置类@Configurationpublic class XxlJobConfig {private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);@Value("${xxl.job.admin.addresses}")private String adminAddresses;@Value("${xxl.job.executor.appname}")private String appName;@Value("${xxl.job.executor.ip}")private String ip;@Value("${xxl.job.executor.port}")private int port;@Value("${xxl.job.accessToken}")private String accessToken;@Value("${xxl.job.executor.logpath}")private String logPath;@Value("${xxl.job.executor.logretentiondays}")private int logRetentionDays;@Beanpublic XxlJobSpringExecutor xxlJobExecutor() {(">>>>>>>>>>> xxl-job config init.");XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAppname(appName);xxlJobSpringExecutor.setIp(ip);xxlJobSpringExecutor.setPort(port);xxlJobSpringExecutor.setAccessToken(accessToken);xxlJobSpringExecutor.setLogPath(logPath);xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);return xxlJobSpringExecutor;}}3、修改配置⽂件xxl:job:admin:addresses: http://127.0.0.1:8080/xxl-job-admin #部署的调度中⼼的urlexecutor:appname: xxl-job-volunteer-executor #执⾏器的名字ip:port: 9999 #调度中⼼调⽤执⾏器时使⽤的端⼝logpath: /data/apploggs/xxl-job/jobhandler #⽇志路径logretentiondays: 30 #⽇志保留天数accessToken:4、创建执⾏器@Componentpublic class XxlJobSample {// myDemoJob是任务的名字,也是Spring中bean的名字@XxlJob("myDemoJob")public ReturnT<String> myDemoJob(String value) {System.out.println("myDemoJob:定时任务触发:" + value);return ReturnT.SUCCESS;}}5、启动SpringBoot项⽬启动成功后会看到下⾯两⾏⽇志信息,可以看到成功连接到了调度中⼼然后注册了将要被调度任务>>>>>>>>>>> xxl-job register jobhandler success, name:myDemoJob, jobHandler:com.xxl.job.core.handler.impl.MethodJobHandler@1f94dd63[class ponent.XxlJobSample#myDemoJob] >>>>>>>>>>> xxl-job remoting server start success, nettype = class com.xxl.job.core.server.EmbedServer, port = 9999三、通过调度中⼼进⾏任务调度1、添加执⾏器输⼊配置⽂件中配置的appName,名称可以随意2、添加任务新增调度任务查看调度任务3、任务调度中⼼发起任务调度在SpringBoot项⽬中可以看到控制台输出如下信息:2022-01-16 11:54:05.039 INFO 7836 --- [Pool-1148366645] c.xxl.job.core.executor.XxlJobExecutor : >>>>>>>>>>> xxl-job regist JobThread success, jobId:7, handler:com.xxl.job.core.handler.impl.MethodJobHandler@1f94dd63[class pone myDemoJob:定时任务触发:testParam2022-01-16 11:55:38.059 INFO 7836 --- [ Thread-22] com.xxl.job.core.thread.JobThread : >>>>>>>>>>> xxl-job JobThread stoped, hashCode:Thread[Thread-22,10,main]四、⼩结⾄此,SpringBoot整合Xxl-job就完成了,刚才的⽰例代码其实对于原来的项⽬还是有⼀定的侵⼊性的,上⾯仅仅演⽰了BEAN运⾏模式,说⽩了就是调⽤加了@XxlJob的⽅法。
(19)国家知识产权局(12)发明专利申请(10)申请公布号 (43)申请公布日 (21)申请号 202210021289.X(22)申请日 2022.01.10(71)申请人 广东联合电子服务股份有限公司地址 510000 广东省广州市天河区体育西路189号15楼(72)发明人 黄强 李昊淼 张佳松 (74)专利代理机构 广州市越秀区哲力专利商标事务所(普通合伙) 44288专利代理师 徐鹏(51)Int.Cl.G06F 9/48(2006.01)G06F 21/31(2013.01)G06F 21/62(2013.01)(54)发明名称基于XXL-JOB的任务调度系统(57)摘要本申请公开了一种基于XXL ‑JOB的任务调度系统,涉及计算机技术,包括:Web模块,用于响应浏览器的访问请求,并向浏览器返回可交互界面;任务调度模块,用于根据Web模块采集的针对所述可交互界面的操作指令进行响应,所述任务调度模块至少用于调度针对数据库的操作任务;其中,所述可交互界面的输入框配置有校验程序,所述校验程序用于校验所述输入框的输入数据是否为SQL指令,当所述输入数据为SQL指令时,拦截该输入数据。
本申请实施例可以在采用XXL ‑JOB实现数据库任务调度的场景下,通过防注入措施保障系统得到安全。
权利要求书1页 说明书4页 附图1页CN 114528077 A 2022.05.24C N 114528077A1.一种基于XXL ‑JOB的任务调度系统,其特征在于,包括:Web模块,用于响应浏览器的访问请求,并向浏览器返回可交互界面;任务调度模块,用于根据Web模块采集的针对所述可交互界面的操作指令进行响应,所述任务调度模块至少用于调度针对数据库的操作任务;其中,所述可交互界面的输入框配置有校验程序,所述校验程序用于校验所述输入框的输入数据是否为SQL指令,当所述输入数据为SQL指令时,拦截该输入数据。
2.根据权利要求1所述的基于XXL ‑JOB的任务调度系统,其特征在于,所述可交互界面中包括任务调度列表和所述任务调度列表中各任务项的信息和操作按钮,所述任务调度模块根据针对所述操作按钮的操作执行进行响应。
xxl-job调度任务简单使⽤1. 简介XXL-JOB是⼀个分布式任务调度平台,其核⼼设计⽬标是开发迅速、学习简单、轻量级、易扩展。
现已开放源代码并接⼊多家公司线上产品线,开箱即⽤。
2. 使⽤步骤1:源码下载数据库脚本再源码/xxl-job/doc/db/tables_xxl_job.sql3.调度中⼼项⽬:xxl-job-admin修改配置⽂件中的数据库地址或端⼝/xxl-job/xxl-job-admin/src/main/resources/application.properties4.启动xxl-job-admin⼯程访问地址为:账号/密码 admin/1234565.⾄此,xxl-job服务启动完成,接下来将xxl植⼊到所要添加定时任务的⼯程中此次测试我使⽤的是:2.3.0版本<!-- /maven2/com/xuxueli/xxl-job-core/ --><dependency><groupId>com.xuxueli</groupId><artifactId>xxl-job-core</artifactId><version>2.3.0</version></dependency>6.修改⼯程的yml⽂件,引⼊xxl配置#引⼊xxl-jobxxl:job:admin:# 调度中⼼部署跟地址 [选填]:如调度中⼼集群部署存在多个地址则⽤逗号分隔。
# 执⾏器将会使⽤该地址进⾏"执⾏器⼼跳注册"和"任务结果回调";为空则关闭⾃动注册;addresses: http://localhost:8080/xxl-job-adminexecutor:# 执⾏器AppName [选填]:执⾏器⼼跳注册分组依据;为空则关闭⾃动注册appname: springcloudalibaba-user-experience-job# 执⾏器IP [选填]:默认为空表⽰⾃动获取IP,多⽹卡时可⼿动设置指定IP,该IP不会绑定Host仅作为通讯实⽤;# address: http://192.168.2.29:9191# 地址信息⽤于 "执⾏器注册" 和 "调度中⼼请求并触发任务";# ip: 192.168.2.29# 执⾏器运⾏⽇志⽂件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使⽤默认路径;logpath: ./logs/xxl-job# 执⾏器⽇志⽂件保存天数 [选填] :过期⽇志⾃动清理, 限制值⼤于等于3时⽣效; 否则, 如-1, 关闭⾃动清理功能;logretentiondays: 30# 执⾏器端⼝号 [选填]:⼩于等于0则⾃动获取;默认端⼝为9999,# 单机部署多个执⾏器时,注意要配置不同执⾏器端⼝;port: 9192# 执⾏器通讯TOKEN [选填]:⾮空时启⽤;# accessToken:7.添加config配置⽂件import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.Profile;/*** xxl-job config** @author* @date 2020/11/23*/@Slf4j@Configuration@Profile("!dev")public class XxlJobConfig {@Value("${xxl.job.admin.addresses}")private String adminAddresses;@Value("${xxl.job.executor.appname}")private String appName;/*@Value("${xxl.job.executor.address}")private String address;@Value("${xxl.job.executor.ip}")private String ip;*/@Value("${xxl.job.executor.port}")private int port;@Value("${xxl.job.executor.logpath}")private String logPath;@Value("${xxl.job.executor.logretentiondays}")private int logRetentionDays;/*** 注意:如果想本机调试,需要设置address、ip* @return*/@Beanpublic XxlJobSpringExecutor xxlJobExecutor() {XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor(); xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAppname(appName);//xxlJobSpringExecutor.setAddress(address);xxlJobSpringExecutor.setPort(port);//xxlJobSpringExecutor.setIp(ip);xxlJobSpringExecutor.setLogPath(logPath);xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);return xxlJobSpringExecutor;}}8.执⾏器管理--》添加执⾏器添加完成后,可以点击查看详情9.再代码中创建定时任务import com.alibaba.fastjson.JSONObject;import com.xxl.job.core.context.XxlJobHelper;import com.xxl.job.core.handler.annotation.XxlJob;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.InitializingBean;import ponent;import java.util.Date;@Slf4j@Componentpublic class UserJob1 implements InitializingBean {/*** 定时同步群发任务结果** @return*/@XxlJob("xxlJobTest")public String execute() {("start userTodoJob, time at:" + new Date());String params = XxlJobHelper.getJobParam();JSONObject jsonObject = JSONObject.parseObject(params);Integer orgId = jsonObject.getInteger("orgId");String corpId = jsonObject.getString("corpId");System.out.println("执⾏了"+params);return "执⾏了";}@Overridepublic void afterPropertiesSet() throws Exception {("Combination Job init suc at: " + new Date());}}10.任务管理中添加新的任务添加完成后,可再后⾯对该任务进⾏操作。
xxl-job介绍什么是任务调度什么是任务调度?某⼀时间段进⾏任务的操作。
具体任务调度有哪些应⽤的场景?数据同步、交易信息、清除⽤户的信息、定期发送报表数据、活动推送等。
传统实现定时任务的⽅式?Thread、TimeTask、ScheduleExecutorService、Quartz 等;不过,这⼏种⽅式都是在单点系统使⽤,⼀旦Job服务器宕机之后,就必须采取⼀些措施;具体操作如下:(1) 使⽤⼼跳检测监控⾃动重启、任务补偿机制(任务做标记)(2) 定时任务在执⾏代码的时候中间突然报错,使⽤⽇志记录错误,跳过继续执⾏,在使⽤定时Job 扫描⽇志错误记录,进⾏补偿信息。
(3) 定时Job 在执⾏的时候,导致整个 Job 异常结束掉,发送邮件通知给运维⼈员。
分布式定时任务的⽅式?XXL-Job、Elastic-job等。
不过,既然采⽤分布式,那么肯定会遇到项⽬部署集群,导致任务重复执⾏多次;具体操作如下:(1) Zookeeper 实现分布式锁,每次保证拿到锁再执⾏,效率⽐较低。
(2) 配置⽂件中加⼊定时任务的开关,但是只能保证⼀台服务器执⾏,变为单击服务器。
(3) 启动的时候使⽤数据库唯⼀标识;同样是效率低。
(4) 分布式调度任务平台,解决了任务幂等问题,Job 负载均衡轮询机制(推荐)。
那么现在我们来总结下,⾸先传统的定时任务,⼏乎⽆法做到⾼可⽤,再加上项⽬部署集群,会导致任务幂等性问题;此时分布式定时任务调度平台便发挥了作⽤,咱们拿 XXL-Job 来进⾏说明;相关作⽤如下:(1) ⽀持Job集群,Job 负载均衡轮询机制保证幂等性问题。
(2) ⽀持Job补偿,如果Job执⾏失败的话,会⾃动实现重试机制,超过重启次数后,会发送邮件通知运维⼈员。
(3) ⽀持Job⽇志记录。
(4) 动态配置定时规则,传统定时Job触发规则都是写死在代码中。
XXL-JOB简介开源社区:环境:Maven3+Jdk1.8+Mysql5.7+xxl-job执⾏原理调度平台、执⾏器、任务管理,相关解释如下:调度平台:统⼀管理任务调度的平台,负责转发任务到对应的执⾏服务器。
XXL-JOB任务调度⽇常开发中难免会碰到需要开启定时任务处理业务。
这时我们第⼀时间想到的是Spring的Task,但是很不⽅便,这⾥可以列出⼏点: 1.⼀旦需要更改定时任务时间,我们就要打开IDE修改cron表达式; 2.在特殊的情况下代码报错了,我们就要打开Log查看是什么导致的问题; 3.需要很多定时任务去处理业务就要新建多个,突然不想执⾏这个任务了,我们就要再打开IDE注释那些代码。
就特别不好管理。
可能⼜有⼈要说了: 1.我可以⽤Redis进⾏动态修改cron表达式;--------看下⾯代码 2.我写个Aop,只要出现异常了我就记录相关信息;-------- ⽜掰!Aop固然好,但是你觉得真的⽅便了吗? 3.······我就是不闲⿇烦,咋地。
--------打扰了package cn.chenghao.config;import lombok.AllArgsConstructor;import org.springframework.context.annotation.Configuration;import org.springframework.data.redis.core.StringRedisTemplate;import org.springframework.scheduling.annotation.EnableScheduling;import org.springframework.scheduling.annotation.SchedulingConfigurer;import org.springframework.scheduling.config.ScheduledTaskRegistrar;import org.springframework.scheduling.support.CronTrigger;import javax.annotation.PostConstruct;/*** ⾃定义定时任务** @Author chenghao* @Date 2020/4/14 11:56**/@Configuration@EnableScheduling@AllArgsConstructorpublic class CompleteScheduleConfig implements SchedulingConfigurer {/*** StringRedis模板*/private final StringRedisTemplate stringRedisTemplate;/*** 初始化*/@PostConstructprivate void init() {// redis中不存在则创建stringRedisTemplate.opsForValue().setIfAbsent("cron", "0/3 * * * * ?");}/*** 配置任务*/@Overridepublic void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {// 给定cron表达式,触发Runnable任务scheduledTaskRegistrar.addTriggerTask(() -> {System.out.println("执⾏定时任务中.......");},triggerContext -> {// 获取cron表达式String cron = stringRedisTemplate.opsForValue().get("cron");// 返回执⾏周期return new CronTrigger(cron).nextExecutionTime(triggerContext);});}}!(真的是太⽅便了)----------------------------------好,废话不再多说----------------------------------XXL-JOB地址:概述:XXL-JOB是⼀个分布式任务调度平台,其核⼼设计⽬标是开发迅速、学习简单、轻量级、易扩展。
介绍⼀种很好⽤的任务调度平台1 总体设计分布式任务调度平台是什么XXL-JOB是⼀个轻量级分布式任务调度平台,和quartz相似,但是XXL-JOB将调度中⼼与执⾏器解耦,弥补了quartz的不⾜(使⽤API⽅式操作任务,不⼈性化;系统侵⼊性严重等)其核⼼设计⽬标是开发迅速、学习简单、轻量级、易扩展。
现已开放源代码并接⼊多家公司线上产品线,开箱即⽤。
⽂档我这边可以给。
源码⽬录介绍/xxl-job-admin :调度中⼼,项⽬源码/xxl-job-core :公共Jar依赖/xxl-job-executor-samples :执⾏器,Sample⽰例项⽬(⼤家可以在该项⽬上进⾏开发,也可以将现有项⽬改造-⽣成“调度数据库”配置XXL-JOB调度模块基于Quartz集群实现,其“调度数据库”是在Quartz的11张集群mysql表基础上扩展⽽成。
XXL-JOB⾸先定制了Quartz原⽣表结构前缀(XXL_JOB_QRTZ_)。
我们代码配置和源码配置⼀样在此基础上新增了⼏张张扩展表。
正常情况下总共16张表所有表(若想详细了解可以查看⽂档):架构设计设计思想“调度中⼼”负责发起调度请求。
“执⾏器”负责接收调度请求并执⾏对应的JobHandler 中业务逻辑。
系统组成调度模块(调度中⼼)负责管理调度信息,按照调度配置发出调度请求,⾃⾝不承担业务代码。
调度系统与任务解耦,提⾼了系统可⽤性和稳定性,同时调度系统性能不再受限于任务模块;⽀持可视化、简单且动态的管理调度信息,包括任务新建,更新,删除,GLUE 开发和任务报警等,所有上述操作都会实时⽣效,同时⽀持监控调度结果以及执⾏⽇志,⽀持执⾏器Failover 。
- XXL_JOB_QRTZ_TRIGGER_GROUP :执⾏器信息表,维护任务执⾏器信息;- XXL_JOB_QRTZ_TRIGGER_REGISTRY :执⾏器注册表,维护在线的执⾏器和调度中⼼机器地址信息;- XXL_JOB_QRTZ_TRIGGER_INFO :调度扩展信息表: ⽤于保存XXL-JOB 调度任务的扩展信息,如任务分组、任务名、机器地址、执⾏器、执⾏⼊参和报警邮件等等;- XXL_JOB_QRTZ_TRIGGER_LOG :调度⽇志表: ⽤于保存XXL-JOB 任务调度的历史信息,如调度结果、执⾏结果、调度⼊参、调度机器和执⾏器等等;- XXL_JOB_QRTZ_TRIGGER_LOGGLUE :任务GLUE ⽇志:⽤于保存GLUE 更新历史,⽤于⽀持GLUE 的版本回溯功能;执⾏模块(执⾏器)负责接收调度请求并执⾏任务逻辑。
分布式任务调度平台xxl-job定时任务调度:在某个时间点触发执⾏操作(CURD)。
分布式任务调度平台的使⽤场景:数据同步、交易信息(对账)、清除过期⽤户信息、定期发送报表、消息推送。
传统的定时任务与分布式定时任务的区别?传统的定时任务特征:单点系统(job没有集群)思考:如果job在⾼并发的情况下,导致job服务器宕机之后,这时候应该如何处理?1.定时任务和业务服务放在⽤⼀个jvm中(⼩项⽬)2.⼤型互联⽹公司定时任务代码执⾏与业务执⾏代码服务器都是分开的,都是独⽴的jvm。
3.定时任务服务器是否需要考虑⾼并发情况?需要,因为同⼀时间点可能执⾏多个任务。
间隔执⾏场景不需要,同时执⾏需要考虑⾼并发情况。
4.如果在⾼并发情况下,定时job宕机后,该如何处理(只有⼀台服务器的情况下) ?使⽤⼼跳检测监控⾃动重启、补偿机制,每个任务做⼀个标记。
定时任务在执⾏代码的时候突然报错了,该如何处理?⽇志记录错误,跳过当前错的任务,接着执⾏下⼀个任务。
在使⽤定时job扫描错误⽇志记录,进⾏补偿信息。
分布式定时任务特征:job使⽤集群。
产⽣问题:定时任务打成3个war包放在三个不同服务器上(8080、8081、8082)。
定时任务在每个war包中都是相同的,三台服务器启动之后,定时job会被重复执⾏3遍。
分布式领域中服务器集群的情况下,如何保证job的幂等性?1.使⽤zookeeper实现分布式锁⽅式,不推荐效率低。
2.在配置⽂件中加上打开定时job的开关8080 设置flag=true、8081 设置flag=false、8082设置 flag=false,这种⽅法不属于分布式了属于单点的系统,不推荐。
3.启动的时候使⽤数据库的唯⼀标⽰,不推荐效率低。
4.使⽤分布式任务调度平台xxl-job、Elastric-job(依赖于zk)、TBSchedule。
分布式任务调度平台优点xxl-job1.⽀持集群(保证幂等性问题)、job负载均衡轮询机制。
利⽤XXL-JOB实现灵活控制的分⽚处理本⽂讲述了⼀种利⽤ XXL-JOB 来进⾏分⽚任务处理的⽅法,另外加⼊对执⾏节点数的灵活控制。
场景现在⼀张数据表⾥有⼤量数据需要某个服务端应⽤来处理,要求:1. 能够并⾏处理;2. 能够较灵活地控制并⾏任务数量。
3. 压⼒较均衡地分散到不同的服务器节点;思路因为需要并⾏处理同⼀张数据表⾥的数据,所以⽐较⾃然地想到了分⽚查询数据,可以利⽤对 id 取模的⽅法进⾏分⽚,避免同⼀条数据被重复处理。
根据第 1、2 点要求,本来想通过对线程池的动态配置来实现,但结合第 3 点来考虑,服务器节点数量有可能会变化,节点之间相互⽆感知⽆通信,⾃⼰在应⽤内实现⼀套调度机制可能会很复杂。
如果有现成的独⽴于这些服务器节点之外的调度器就好了——顺着这个思路,就想到了已经接⼊的分布式任务调度平台 XXL-JOB,⽽在阅读其后发现「分⽚⼴播 & 动态分⽚」很贴合这种场景。
⽅案1. 利⽤ XXL-JOB 的路由策略「分⽚⼴播」来调度定时任务;2. 通过任务参数传⼊执⾏任务节点数量;3. 定时任务逻辑⾥,根据获取到的分⽚参数、执⾏任务节点数量,决策当前节点是否需要执⾏,分⽚查询数据并处理:如果分⽚序号 > (执⾏任务节点数量 - 1),则当前节点不执⾏任务,直接返回;否则,取分⽚序号和执⾏任务节点数量作为分⽚参数,查询数据并处理。
这样,我们可以实现灵活调度 [1, N] 个节点并⾏执⾏任务处理数据。
主要代码⽰例JobHandler ⽰例:@XxlJob("demoJobHandler")public void execute() {String param = XxlJobHelper.getJobParam();if (StringUtils.isBlank(param)) {XxlJobHelper.log("任务参数为空");XxlJobHelper.handleFail();return;}// 执⾏任务节点数量int executeNodeNum = Integer.valueOf(param);// 分⽚序号int shardIndex = XxlJobHelper.getShardIndex();// 分⽚总数int shardTotal = XxlJobHelper.getShardTotal();if (executeNodeNum <= 0 || executeNodeNum > shardTotal) {XxlJobHelper.log("执⾏任务节点数量取值范围[1,节点总数]");XxlJobHelper.handleFail();return;}if (shardIndex > (executeNodeNum - 1)) {XxlJobHelper.log("当前分⽚ {} ⽆需执⾏", shardIndex);XxlJobHelper.handleSuccess();return;}shardTotal = executeNodeNum;// 分⽚查询数据并处理process(shardIndex, shardTotal);XxlJobHelper.handleSuccess();}分⽚查询数据⽰例:select field1, field2from table_namewhere ...and mod(id, #{shardTotal}) = #{shardIndex}order by id limit #{rows};进⼀步思考1. 如果需要更⼤的并发量,需要有⼤于应⽤节点数量的任务并⾏,如何处理?两种思路:通过任务参数传⼊⼀个并发数,单个节点在处理任务时,将查询到的数据按这个数字进⾏再分⽚,交由线程池并⾏处理;配置 M 个定时任务,指定相同的 JobHandler,给它们编号 0、1、2...M,并将定时任务编号和 M 这两个数,由任务参数传⼊,定时任务逻辑⾥,先根据分⽚参数、定时任务编号、M,重新计算出新的分⽚参数,如分⽚序号 = (分⽚序号 * M) + 定时任务编号,分⽚总数 = 分⽚总数 * M,再查询数据并处理。