Load quartz job in batch style when starting the scheduler.

Original commit: elastic/x-pack-elasticsearch@9bcf84092a
This commit is contained in:
Martijn van Groningen 2014-11-25 21:18:02 +01:00
parent c9e181e597
commit 470fb053fd
1 changed files with 17 additions and 13 deletions

View File

@ -19,10 +19,7 @@ import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory; import org.quartz.impl.StdSchedulerFactory;
import org.quartz.simpl.SimpleJobFactory; import org.quartz.simpl.SimpleJobFactory;
import java.util.HashSet; import java.util.*;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
public class AlertScheduler extends AbstractComponent { public class AlertScheduler extends AbstractComponent {
@ -60,9 +57,11 @@ public class AlertScheduler extends AbstractComponent {
SchedulerFactory schFactory = new StdSchedulerFactory(properties); SchedulerFactory schFactory = new StdSchedulerFactory(properties);
scheduler = schFactory.getScheduler(); scheduler = schFactory.getScheduler();
scheduler.setJobFactory(new SimpleJobFactory()); scheduler.setJobFactory(new SimpleJobFactory());
Map<JobDetail, Set<? extends Trigger>> jobs = new HashMap<>();
for (Map.Entry<String, Alert> entry : alerts.entrySet()) { for (Map.Entry<String, Alert> entry : alerts.entrySet()) {
schedule(entry.getKey(), entry.getValue().schedule()); jobs.put(createJobDetail(entry.getKey()), createTrigger(entry.getValue().schedule()));
} }
scheduler.scheduleJobs(jobs, false);
scheduler.start(); scheduler.start();
} catch (SchedulerException se){ } catch (SchedulerException se){
logger.error("Failed to start quartz scheduler", se); logger.error("Failed to start quartz scheduler", se);
@ -104,22 +103,27 @@ public class AlertScheduler extends AbstractComponent {
* Schedules the alert with the specified name to be fired according to the specified cron expression. * Schedules the alert with the specified name to be fired according to the specified cron expression.
*/ */
public void schedule(String alertName, String cronExpression) { public void schedule(String alertName, String cronExpression) {
JobDetail job = JobBuilder.newJob(AlertExecutorJob.class).withIdentity(alertName).build();
job.getJobDataMap().put("manager", this);
CronTrigger cronTrigger = TriggerBuilder.newTrigger()
.withSchedule(CronScheduleBuilder.cronSchedule(cronExpression))
.build();
try { try {
logger.trace("Scheduling [{}] with schedule [{}]", alertName, cronExpression); logger.trace("Scheduling [{}] with schedule [{}]", alertName, cronExpression);
Set<CronTrigger> triggers = new HashSet<>(); scheduler.scheduleJob(createJobDetail(alertName), createTrigger(cronExpression), true);
triggers.add(cronTrigger);
scheduler.scheduleJob(job, triggers, true);
} catch (SchedulerException se) { } catch (SchedulerException se) {
logger.error("Failed to schedule job",se); logger.error("Failed to schedule job",se);
throw new ElasticsearchException("Failed to schedule job", se); throw new ElasticsearchException("Failed to schedule job", se);
} }
} }
private Set<CronTrigger> createTrigger(String cronExpression) {
return new HashSet<>(Arrays.asList(
TriggerBuilder.newTrigger().withSchedule(CronScheduleBuilder.cronSchedule(cronExpression)).build()
));
}
private JobDetail createJobDetail(String alertName) {
JobDetail job = JobBuilder.newJob(AlertExecutorJob.class).withIdentity(alertName).build();
job.getJobDataMap().put("manager", this);
return job;
}
// This Quartz thread pool will always accept. On this thread we will only index an alert action and add it to the work queue // This Quartz thread pool will always accept. On this thread we will only index an alert action and add it to the work queue
public static final class AlertQuartzThreadPool implements org.quartz.spi.ThreadPool { public static final class AlertQuartzThreadPool implements org.quartz.spi.ThreadPool {