SpringBoot+Quartz框架实现分布式集群定时任务实操案例

IT 文章3年前 (2022)发布 小编
0 0 0

定时任务是Java项目开发经常遇到的需求,而Quartz框架是解决定时任务问题非常好的一个框架,当遇到集群部署项目时,为了保证定时任务只能在同一个节点执行,我们需要使用Quartz的集群配置方式来实现。这里潘老师来给大家实操演示下SpringBoot+Quartz框架实现分布式集群定时任务的案例。

一、为什么使用Quartz?

我们知道,如果仅仅只是使用定时任务,可以使用 spring 的spring-task来实现,方便、代码量少、易于实现,一个注解就解决了,但是不支持分布式集群。但是当使用分布式进行部署的时候,就会出现所有的服务器都在跑同一个定时任务,出现一个定时任务被执行多次的情况。那么最好的方式是使用 quartz 进行定时任务的调度。

即:quartz 是一个为了解决分布式定时任务的导致任务重复调度的框架。效果就是,起多个工程,同一个定时任务只会在一个工程上跑。

ad

程序员导航

优网导航旗下整合全网优质开发资源,一站式IT编程学习与工具大全网站

二、Quartz 有哪些优点?

通过上面的介绍为什么使用Quartz我们可以总结以下几个优点:
[list]1、可以实现多个定时任务进行调度。
2、可以实现代码的解耦,通过配置文件的当时进行配置。
3、功能强大,可以通过 cron 表达式设置复杂的时间任务调度。[/list]

三、Quartz 必须关注的几个核心类

[list]1、工作Job: 被调度的任务接口,用于定义具体执行的工作。我们需要实现 Job ,和继承 TimerTask 重写 run 方法一样,重写 Job 中的 excute 方法,excute 方法是任务调度的方法执行位置。
2、工作明细JobDetail: 用于描述定时工作相关的信息。必须通过 JobDetail 来实现 Job 实例(基于 builder 模式实现的)。
3、触发器Trigger: 用于描述触发工作的规则,一般使用cron表达式定义规则。包括 CronTrigger 和 SimpleTrigger,指定任务调度的频率时间。何时进行任务调度(基于 builder 模式实现的)触发器。
4、调度器Scheduler:描述了工作明细和触发器的对应关系。结合 JobDetail 实例和 Trigger 实例,进行任务的触发的调度器(基于 Factory 模式)[/list]

四、Quartz 工作原理

一个 Quartz 集群中的每个节点都是一个独立的 Quartz 应用,它又管理着其他的节点。意思是你必须对每个节点分别启动或停止。不像许多应用服务器的集群,独立的 Quartz 节点并不是与另一节点或管理节点通信。Quartz 应用是通过数据库来感知到另一个应用的。离开数据库将无法感知。我们可以看下下面的结构图:
SpringBoot+Quartz框架实现分布式集群定时任务实操案例

五、Quartz 分布式集群定时任务案例实现

这里我们使用SpringBoot整合Quartz框架来实现,数据库使用MYSQL数据库。
[v_blue]说明事项: 因为 Quartz 集依赖于数据库,所以必须首先创建 Quartz 数据库表。Quartz 包括了所有被支持的数据库平台的 SQL脚本。本文以 MySQL 数据库为例。
注意:创建的表名都是大写的,在代码中也是使用大写的表名;在liunx中的mysql数据库默认是大小写敏感的,因此使用liunx的数据库会出现找不到表的情况,酌情修改表或是改变数据库配置。[/v_blue]
下面我们来看下具体的实操步骤。

ad

AI 工具导航

优网导航旗下AI工具导航,精选全球千款优质 AI 工具集

1、创建数据库表

[v_warn]注意:目前测试额mysql使用的版本是mysql8.0及以上的,之前试了mysql5.6的发现会报错,因此建议使用mysql8版本的数据库[/v_warn]
这里要用到Quartz的11个表,表的创建脚本sql如下:

#
# In your Quartz properties file, you'll need to set
# org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
#
#
# By: Ron Cordell - roncordell
#  I didn't see this anywhere, so I thought I'd post it here. This is the script from Quartz to create the tables in a MySQL database, modified to use INNODB instead of MYISAM.

DROP TABLE IF EXISTS QRTZ_FIRED_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_PAUSED_TRIGGER_GRPS;
DROP TABLE IF EXISTS QRTZ_SCHEDULER_STATE;
DROP TABLE IF EXISTS QRTZ_LOCKS;
DROP TABLE IF EXISTS QRTZ_SIMPLE_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_SIMPROP_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_CRON_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_BLOB_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_JOB_DETAILS;
DROP TABLE IF EXISTS QRTZ_CALENDARS;

CREATE TABLE QRTZ_JOB_DETAILS(
SCHED_NAME VARCHAR(120) NOT NULL,
JOB_NAME VARCHAR(190) NOT NULL,
JOB_GROUP VARCHAR(190) NOT NULL,
DESCRIPTION VARCHAR(250) NULL,
JOB_CLASS_NAME VARCHAR(250) NOT NULL,
IS_DURABLE VARCHAR(1) NOT NULL,
IS_NONCONCURRENT VARCHAR(1) NOT NULL,
IS_UPDATE_DATA VARCHAR(1) NOT NULL,
REQUESTS_RECOVERY VARCHAR(1) NOT NULL,
JOB_DATA BLOB NULL,
PRIMARY KEY (SCHED_NAME,JOB_NAME,JOB_GROUP))
ENGINE=InnoDB;

CREATE TABLE QRTZ_TRIGGERS (
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(190) NOT NULL,
TRIGGER_GROUP VARCHAR(190) NOT NULL,
JOB_NAME VARCHAR(190) NOT NULL,
JOB_GROUP VARCHAR(190) NOT NULL,
DESCRIPTION VARCHAR(250) NULL,
NEXT_FIRE_TIME BIGINT(13) NULL,
PREV_FIRE_TIME BIGINT(13) NULL,
PRIORITY INTEGER NULL,
TRIGGER_STATE VARCHAR(16) NOT NULL,
TRIGGER_TYPE VARCHAR(8) NOT NULL,
START_TIME BIGINT(13) NOT NULL,
END_TIME BIGINT(13) NULL,
CALENDAR_NAME VARCHAR(190) NULL,
MISFIRE_INSTR SMALLINT(2) NULL,
JOB_DATA BLOB NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
REFERENCES QRTZ_JOB_DETAILS(SCHED_NAME,JOB_NAME,JOB_GROUP))
ENGINE=InnoDB;

CREATE TABLE QRTZ_SIMPLE_TRIGGERS (
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(190) NOT NULL,
TRIGGER_GROUP VARCHAR(190) NOT NULL,
REPEAT_COUNT BIGINT(7) NOT NULL,
REPEAT_INTERVAL BIGINT(12) NOT NULL,
TIMES_TRIGGERED BIGINT(10) NOT NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
ENGINE=InnoDB;

CREATE TABLE QRTZ_CRON_TRIGGERS (
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(190) NOT NULL,
TRIGGER_GROUP VARCHAR(190) NOT NULL,
CRON_EXPRESSION VARCHAR(120) NOT NULL,
TIME_ZONE_ID VARCHAR(80),
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
ENGINE=InnoDB;

CREATE TABLE QRTZ_SIMPROP_TRIGGERS
  (
    SCHED_NAME VARCHAR(120) NOT NULL,
    TRIGGER_NAME VARCHAR(190) NOT NULL,
    TRIGGER_GROUP VARCHAR(190) NOT NULL,
    STR_PROP_1 VARCHAR(512) NULL,
    STR_PROP_2 VARCHAR(512) NULL,
    STR_PROP_3 VARCHAR(512) NULL,
    INT_PROP_1 INT NULL,
    INT_PROP_2 INT NULL,
    LONG_PROP_1 BIGINT NULL,
    LONG_PROP_2 BIGINT NULL,
    DEC_PROP_1 NUMERIC(13,4) NULL,
    DEC_PROP_2 NUMERIC(13,4) NULL,
    BOOL_PROP_1 VARCHAR(1) NULL,
    BOOL_PROP_2 VARCHAR(1) NULL,
    PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
    FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
    REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
ENGINE=InnoDB;

CREATE TABLE QRTZ_BLOB_TRIGGERS (
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(190) NOT NULL,
TRIGGER_GROUP VARCHAR(190) NOT NULL,
BLOB_DATA BLOB NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
INDEX (SCHED_NAME,TRIGGER_NAME, TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
ENGINE=InnoDB;

CREATE TABLE QRTZ_CALENDARS (
SCHED_NAME VARCHAR(120) NOT NULL,
CALENDAR_NAME VARCHAR(190) NOT NULL,
CALENDAR BLOB NOT NULL,
PRIMARY KEY (SCHED_NAME,CALENDAR_NAME))
ENGINE=InnoDB;

CREATE TABLE QRTZ_PAUSED_TRIGGER_GRPS (
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_GROUP VARCHAR(190) NOT NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_GROUP))
ENGINE=InnoDB;

CREATE TABLE QRTZ_FIRED_TRIGGERS (
SCHED_NAME VARCHAR(120) NOT NULL,
ENTRY_ID VARCHAR(95) NOT NULL,
TRIGGER_NAME VARCHAR(190) NOT NULL,
TRIGGER_GROUP VARCHAR(190) NOT NULL,
INSTANCE_NAME VARCHAR(190) NOT NULL,
FIRED_TIME BIGINT(13) NOT NULL,
SCHED_TIME BIGINT(13) NOT NULL,
PRIORITY INTEGER NOT NULL,
STATE VARCHAR(16) NOT NULL,
JOB_NAME VARCHAR(190) NULL,
JOB_GROUP VARCHAR(190) NULL,
IS_NONCONCURRENT VARCHAR(1) NULL,
REQUESTS_RECOVERY VARCHAR(1) NULL,
PRIMARY KEY (SCHED_NAME,ENTRY_ID))
ENGINE=InnoDB;

CREATE TABLE QRTZ_SCHEDULER_STATE (
SCHED_NAME VARCHAR(120) NOT NULL,
INSTANCE_NAME VARCHAR(190) NOT NULL,
LAST_CHECKIN_TIME BIGINT(13) NOT NULL,
CHECKIN_INTERVAL BIGINT(13) NOT NULL,
PRIMARY KEY (SCHED_NAME,INSTANCE_NAME))
ENGINE=InnoDB;

CREATE TABLE QRTZ_LOCKS (
SCHED_NAME VARCHAR(120) NOT NULL,
LOCK_NAME VARCHAR(40) NOT NULL,
PRIMARY KEY (SCHED_NAME,LOCK_NAME))
ENGINE=InnoDB;

CREATE INDEX IDX_QRTZ_J_REQ_RECOVERY ON QRTZ_JOB_DETAILS(SCHED_NAME,REQUESTS_RECOVERY);
CREATE INDEX IDX_QRTZ_J_GRP ON QRTZ_JOB_DETAILS(SCHED_NAME,JOB_GROUP);

CREATE INDEX IDX_QRTZ_T_J ON QRTZ_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_T_JG ON QRTZ_TRIGGERS(SCHED_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_T_C ON QRTZ_TRIGGERS(SCHED_NAME,CALENDAR_NAME);
CREATE INDEX IDX_QRTZ_T_G ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP);
CREATE INDEX IDX_QRTZ_T_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_T_N_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_T_N_G_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_T_NEXT_FIRE_TIME ON QRTZ_TRIGGERS(SCHED_NAME,NEXT_FIRE_TIME);
CREATE INDEX IDX_QRTZ_T_NFT_ST ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE,NEXT_FIRE_TIME);
CREATE INDEX IDX_QRTZ_T_NFT_MISFIRE ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME);
CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE_GRP ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_GROUP,TRIGGER_STATE);

CREATE INDEX IDX_QRTZ_FT_TRIG_INST_NAME ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME);
CREATE INDEX IDX_QRTZ_FT_INST_JOB_REQ_RCVRY ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME,REQUESTS_RECOVERY);
CREATE INDEX IDX_QRTZ_FT_J_G ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_FT_JG ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_FT_T_G ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP);
CREATE INDEX IDX_QRTZ_FT_TG ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_GROUP);

commit;

数据库表结构的说明,请参考如下文章:
[neilian ids=5712]
其他数据库脚本也可以到quartz官网下载最新的包:http://www.quartz-scheduler.org/downloads/

解压后,可以看到结构目录。在org\quartz\impl\jdbcjobstore下选择合适你数据库的SQL执行文件。

2、引入相关依赖


	org.mybatis.spring.boot
	mybatis-spring-boot-starter
	2.2.2


	mysql
	mysql-connector-java
	8.0.30



	org.springframework.boot
	spring-boot-starter-quartz
	2.7.4

提示:spring-boot-starter-quartz-2.7.4.jar整合的是quartz 2.3.2版本,去查看quartz源码-github官网

3、核心配置

application.yml核文件配置如下:

server:
  port: 8080
spring:
  datasource:
    username: test_db
    password: mysql
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/test_db?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true&useSSL=false
  quartz:
    #相关属性配置
    properties:
      #quartz集群配置
      org:
        quartz:
          # 1、Configure 调度器属性
          scheduler:
            instanceName: DefaultQuartzScheduler
            #ID设置为自动获取 每一个必须不同
            instanceId: AUTO
          # 2、Configure JobStore
          jobStore:
            # 信息保存时间 默认值60秒
            misfireThreshold: 60000
            #数据保存方式为数据库持久化
            class: org.springframework.scheduling.quartz.LocalDataSourceJobStore
            #数据库代理类,一般org.quartz.impl.jdbcjobstore.StdJDBCDelegate可以满足大部分数据库
            driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
            #表的前缀,默认QRTZ_
            tablePrefix: QRTZ_
            #是否加入集群
            isClustered: true
            #调度实例失效的检查时间间隔
            clusterCheckinInterval: 10000
            useProperties: false
          # 3、Configure ThreadPool
          threadPool:
            #线程池的实现类(一般使用SimpleThreadPool即可满足几乎所有用户的需求)
            class: org.quartz.simpl.SimpleThreadPool
            #指定线程数,至少为1(无默认值)(一般设置为1-100直接的整数合适)
            threadCount: 10
            #设置线程的优先级(最大为java.lang.Thread.MAX_PRIORITY 10,最小为Thread.MIN_PRIORITY 1,默认为5)
            threadPriority: 5
            threadsInheritContextClassLoaderOfInitializingThread: true
    #数据库方式
    job-store-type: jdbc

4、新建quartz相关的核心类

我们先在springboot项目中新建一个quartz包,把相关的定时调度任务相关类都放在一个包中。然后再去创建相关的类,如下:

ad

免费在线工具导航

优网导航旗下整合全网优质免费、免注册的在线工具导航大全

1)新建调度任务管理类-QuartzManager

该任务调度管理类对调度任务的增删改查进行的封装,可以轻松地对外提供新增、移除或更新一个定时任务。

package com.panziye.quartztest.quartz;

import org.quartz.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.QuartzJobBean;
import org.springframework.stereotype.Component;

/**
 * @author panziye
 * @description 

* @date 2022-10-25 15:37 **/ @Component public class QuartzManager { @Autowired private Scheduler scheduler; /** * 创建or更新任务,存在则更新不存在创建 * * @param jobClass 任务类 * @param jobName 任务名称 * @param jobGroupName 任务组名称 * @param jobCron cron表达式 */ public void addOrUpdateJob(Class jobClass, String jobName, String jobGroupName, String jobCron) { try { TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName); CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey); if (trigger==null) { addJob(jobClass, jobName, jobGroupName, jobCron); } else { if (trigger.getCronExpression().equals(jobCron)) { return; } updateJob(jobName, jobGroupName, jobCron); } } catch (SchedulerException e) { e.printStackTrace(); } } /** * 增加一个job * * @param jobClass 任务实现类 * @param jobName 任务名称 * @param jobGroupName 任务组名 * @param jobCron cron表达式(如:0/5 * * * * ? ) */ public void addJob(Class jobClass, String jobName, String jobGroupName, String jobCron) { try { JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName).build(); Trigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroupName) .startAt(DateBuilder.futureDate(1, DateBuilder.IntervalUnit.SECOND)) .withSchedule(CronScheduleBuilder.cronSchedule(jobCron)).startNow().build(); scheduler.scheduleJob(jobDetail, trigger); if (!scheduler.isShutdown()) { scheduler.start(); } } catch (SchedulerException e) { e.printStackTrace(); } } /** * @param jobClass * @param jobName * @param jobGroupName * @param jobTime */ public void addJob(Class jobClass, String jobName, String jobGroupName, int jobTime) { addJob(jobClass, jobName, jobGroupName, jobTime, -1); } public void addJob(Class jobClass, String jobName, String jobGroupName, int jobTime, int jobTimes) { try { JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName)// 任务名称和组构成任务key .build(); // 使用simpleTrigger规则 Trigger trigger; if (jobTimes < 0) { trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroupName) .withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(1).withIntervalInSeconds(jobTime)) .startNow().build(); } else { trigger = TriggerBuilder .newTrigger().withIdentity(jobName, jobGroupName).withSchedule(SimpleScheduleBuilder .repeatSecondlyForever(1).withIntervalInSeconds(jobTime).withRepeatCount(jobTimes)) .startNow().build(); } scheduler.scheduleJob(jobDetail, trigger); if (!scheduler.isShutdown()) { scheduler.start(); } } catch (SchedulerException e) { e.printStackTrace(); } } public void updateJob(String jobName, String jobGroupName, String jobTime) { try { TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroupName); CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey); trigger = trigger.getTriggerBuilder().withIdentity(triggerKey) .withSchedule(CronScheduleBuilder.cronSchedule(jobTime)).build(); // 重启触发器 scheduler.rescheduleJob(triggerKey, trigger); } catch (SchedulerException e) { e.printStackTrace(); } } /** * 删除任务一个job * * @param jobName 任务名称 * @param jobGroupName 任务组名 */ public void deleteJob(String jobName, String jobGroupName) { try { scheduler.pauseTrigger(TriggerKey.triggerKey(jobName, jobGroupName)); scheduler.unscheduleJob(TriggerKey.triggerKey(jobName, jobGroupName)); scheduler.deleteJob(new JobKey(jobName, jobGroupName)); } catch (Exception e) { e.printStackTrace(); } } /** * 暂停一个job * * @param jobName * @param jobGroupName */ public void pauseJob(String jobName, String jobGroupName) { try { JobKey jobKey = JobKey.jobKey(jobName, jobGroupName); scheduler.pauseJob(jobKey); } catch (SchedulerException e) { e.printStackTrace(); } } /** * 恢复一个job * * @param jobName * @param jobGroupName */ public void resumeJob(String jobName, String jobGroupName) { try { JobKey jobKey = JobKey.jobKey(jobName, jobGroupName); scheduler.resumeJob(jobKey); } catch (SchedulerException e) { e.printStackTrace(); } } /** * 立即执行一个job * * @param jobName * @param jobGroupName */ public void runJobNow(String jobName, String jobGroupName) { try { JobKey jobKey = JobKey.jobKey(jobName, jobGroupName); scheduler.triggerJob(jobKey); } catch (SchedulerException e) { e.printStackTrace(); } } }

2)新建任务类-MyJob1

这是我们自己需要实现的定时任务类,需要继承QuartzJobBean,相关的核心业务代码都写在executeInternal方法中。

package com.panziye.quartztest.quartz;

import org.quartz.*;
import org.springframework.scheduling.quartz.QuartzJobBean;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * @author panziye
 * @description 

* @date 2022-10-25 15:02 **/ @PersistJobDataAfterExecution @DisallowConcurrentExecution public class MyJob1 extends QuartzJobBean { @Override protected void executeInternal(JobExecutionContext context) throws JobExecutionException { try { System.out.println("调度id="+context.getScheduler().getSchedulerInstanceId()); System.out.println("任务名="+context.getJobDetail().getKey()+",执行时间="+new Date()); } catch (Exception e) { e.printStackTrace(); } } }

3)新建项目启动监听类-StartApplicationListerner

该类在项目启动时会自动执行,我们将对应的Job任务交给QuartzManager类去进行管理,比如这里在项目启动时就去新增或更新该定时调度任务。

package com.panziye.quartztest.quartz;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;

/**
 * @author panziye
 * @description 

* @date 2022-10-25 15:16 **/ @Component public class StartApplicationListerner implements ApplicationListener { @Autowired private QuartzManager quartzManager; @Override public void onApplicationEvent(ContextRefreshedEvent event) { quartzManager.addOrUpdateJob(MyJob1.class,"job1","group1","0/10 * * * * ?"); } }

测试

接下来我们开始测试多节点定时任务执行情况,先直接启动Application类,然后在EditConfiguration中新增第二个Application:
SpringBoot+Quartz框架实现分布式集群定时任务实操案例
然后再去把application.yml中的端口号改为8081,再去启动Application2,我们发现定时任务只会在一个节点执行:
SpringBoot+Quartz框架实现分布式集群定时任务实操案例

SpringBoot+Quartz框架实现分布式集群定时任务实操案例
至此,全部完成。

扩展:

如果我们需要将配置任务界面化操作,可以将新增如下代码:

1、新增查询结果的实体类

@Data
public class JobDetails{
	private String cronExpression;
	private String jobClassName;
	private String triggerGroupName;
	private String triggerName;
	private String jobGroupName;
	private String jobName;
	private Date nextFireTime;
	private Date previousFireTime;
	private Date startTime;
	private String timeZone;
	private String status;
}

2、在QuartzManager中新增对外查询的方法

public PageInfo queryAllJobBean(int pageNum, int pageSize) {
        PageHelper.startPage(pageNum, pageSize);
        List jobList = null;
        try {
            GroupMatcher matcher = GroupMatcher.anyJobGroup();
            Set jobKeys = sched.getJobKeys(matcher);
            jobList = new ArrayList<>();
            for (JobKey jobKey : jobKeys) {
                List triggers = sched.getTriggersOfJob(jobKey);
                for (Trigger trigger : triggers) {
                    JobDetails jobDetails = new JobDetails();
                    if (trigger instanceof CronTrigger) {
                        CronTrigger cronTrigger = (CronTrigger) trigger;
                        jobDetails.setCronExpression(cronTrigger.getCronExpression());
                        jobDetails.setTimeZone(cronTrigger.getTimeZone().getDisplayName());
                    }
                    jobDetails.setTriggerGroupName(trigger.getKey().getName());
                    jobDetails.setTriggerName(trigger.getKey().getGroup());
                    jobDetails.setJobGroupName(jobKey.getGroup());
                    jobDetails.setJobName(jobKey.getName());
                    jobDetails.setStartTime(trigger.getStartTime());
                    jobDetails.setJobClassName(sched.getJobDetail(jobKey).getJobClass().getName());
                    jobDetails.setNextFireTime(trigger.getNextFireTime());
                    jobDetails.setPreviousFireTime(trigger.getPreviousFireTime());
                    jobDetails.setStatus(sched.getTriggerState(trigger.getKey()).name());
                    jobList.add(jobDetails);
                }
            }
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
        return new PageInfo<>(jobList);
    }

    /**
     * 获取所有计划中的任务列表
     *
     * @return
     */
    public List> queryAllJob() {
        List> jobList = null;
        try {
            GroupMatcher matcher = GroupMatcher.anyJobGroup();
            Set jobKeys = sched.getJobKeys(matcher);
            jobList = new ArrayList<>();
            for (JobKey jobKey : jobKeys) {
                List triggers = sched.getTriggersOfJob(jobKey);
                for (Trigger trigger : triggers) {
                    Map map = new HashMap<>();
                    map.put("jobName", jobKey.getName());
                    map.put("jobGroupName", jobKey.getGroup());
                    map.put("description", "trigger:" + trigger.getKey());
                    Trigger.TriggerState triggerState = sched.getTriggerState(trigger.getKey());
                    map.put("jobStatus", triggerState.name());
                    if (trigger instanceof CronTrigger) {
                        CronTrigger cronTrigger = (CronTrigger) trigger;
                        String cronExpression = cronTrigger.getCronExpression();
                        map.put("jobTime", cronExpression);
                    }
                    jobList.add(map);
                }
            }
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
        return jobList;
    }

    /**
     * 获取所有正在运行的job
     *
     * @return
     */
    public List> queryRunJon() {
        List> jobList = null;
        try {
            List executingJobs = sched.getCurrentlyExecutingJobs();
            jobList = new ArrayList<>(executingJobs.size());
            for (JobExecutionContext executingJob : executingJobs) {
                Map map = new HashMap<>();
                JobDetail jobDetail = executingJob.getJobDetail();
                JobKey jobKey = jobDetail.getKey();
                Trigger trigger = executingJob.getTrigger();
                map.put("jobName", jobKey.getName());
                map.put("jobGroupName", jobKey.getGroup());
                map.put("description", "trigger:" + trigger.getKey());
                Trigger.TriggerState triggerState = sched.getTriggerState(trigger.getKey());
                map.put("jobStatus", triggerState.name());
                if (trigger instanceof CronTrigger) {
                    CronTrigger cronTrigger = (CronTrigger) trigger;
                    String cronExpression = cronTrigger.getCronExpression();
                    map.put("jobTime", cronExpression);
                }
                jobList.add(map);
            }
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
        return jobList;
    }

3、controller控制层代码

import java.util.HashMap;
import java.util.Map;

import com.github.pagehelper.PageInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.QuartzJobBean;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import tech.pdai.springboot.quartz.cluster.entity.JobDetails;
import tech.pdai.springboot.quartz.cluster.manager.QuartzManager;

@RestController
@RequestMapping(value = "/job")
public class JobController {

    @Autowired
    private QuartzManager qtzManager;

    @SuppressWarnings("unchecked")
    private static Class getClass(String classname) throws Exception {
        Class class1 = Class.forName(classname);
        return (Class) class1;
    }

    /**
     * @param jobClassName
     * @param jobGroupName
     * @param cronExpression
     * @throws Exception
     */
    @PostMapping(value = "/addjob")
    public void addjob(@RequestParam(value = "jobClassName") String jobClassName,
                       @RequestParam(value = "jobGroupName") String jobGroupName,
                       @RequestParam(value = "cronExpression") String cronExpression) throws Exception {
        qtzManager.addOrUpdateJob(getClass(jobClassName), jobClassName, jobGroupName, cronExpression);
    }

    /**
     * @param jobClassName
     * @param jobGroupName
     * @throws Exception
     */
    @PostMapping(value = "/pausejob")
    public void pausejob(@RequestParam(value = "jobClassName") String jobClassName,
                         @RequestParam(value = "jobGroupName") String jobGroupName) throws Exception {
        qtzManager.pauseJob(jobClassName, jobGroupName);
    }

    /**
     * @param jobClassName
     * @param jobGroupName
     * @throws Exception
     */
    @PostMapping(value = "/resumejob")
    public void resumejob(@RequestParam(value = "jobClassName") String jobClassName,
                          @RequestParam(value = "jobGroupName") String jobGroupName) throws Exception {
        qtzManager.resumeJob(jobClassName, jobGroupName);
    }

    /**
     * @param jobClassName
     * @param jobGroupName
     * @param cronExpression
     * @throws Exception
     */
    @PostMapping(value = "/reschedulejob")
    public void rescheduleJob(@RequestParam(value = "jobClassName") String jobClassName,
                              @RequestParam(value = "jobGroupName") String jobGroupName,
                              @RequestParam(value = "cronExpression") String cronExpression) throws Exception {
        qtzManager.addOrUpdateJob(getClass(jobClassName), jobClassName, jobGroupName, cronExpression);
    }

    /**
     * @param jobClassName
     * @param jobGroupName
     * @throws Exception
     */
    @PostMapping(value = "/deletejob")
    public void deletejob(@RequestParam(value = "jobClassName") String jobClassName,
                          @RequestParam(value = "jobGroupName") String jobGroupName) throws Exception {
        qtzManager.deleteJob(jobClassName, jobGroupName);
    }

    /**
     * @param pageNum
     * @param pageSize
     * @return
     */
    @GetMapping(value = "/queryjob")
    public Map queryjob(@RequestParam(value = "pageNum") Integer pageNum,
                                        @RequestParam(value = "pageSize") Integer pageSize) {
        PageInfo jobAndTrigger = qtzManager.queryAllJobBean(pageNum, pageSize);
        Map map = new HashMap();
        map.put("JobAndTrigger", jobAndTrigger);
        map.put("number", jobAndTrigger.getTotal());
        return map;
    }
}

5、前端实现

简单用VueJS 写个页面测试





QuartzDemo







查询 添加

© Quartz 任务管理

6、测试效果

(PS: 这里的任务名称需要改成你自己的完整类名称)

展示正在运行的Jobs:
SpringBoot+Quartz框架实现分布式集群定时任务实操案例

© 版权声明

相关文章

暂无评论

暂无评论...