From 60859b0b7fa93b001ca2b0ac2f2385386cb2f47d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20T=C3=A9treault?= Date: Wed, 24 Nov 2021 01:06:47 -0800 Subject: [PATCH] Use B+ Tree iterator instead of DFS to find scheduled jobs to be executed --- .../kahadb/scheduler/JobSchedulerImpl.java | 163 +++++++++--------- 1 file changed, 83 insertions(+), 80 deletions(-) diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java index 5889c6ab95..3e535d52b8 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java @@ -155,19 +155,6 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch return result; } - private Map.Entry> getNextToSchedule() throws IOException { - this.store.readLockIndex(); - try { - if (!this.store.isStopped() && !this.store.isStopping()) { - Map.Entry> first = this.index.getFirst(this.store.getPageFile().tx()); - return first; - } - } finally { - this.store.readUnlockIndex(); - } - return null; - } - @Override public List getAllJobs() throws IOException { final List result = new ArrayList<>(); @@ -265,6 +252,12 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch this.store.store(newJob); } + private void doReschedule(List toReschedule) throws IOException { + for (Closure closure : toReschedule) { + closure.run(); + } + } + private void doReschedule(final String jobId, long executionTime, long nextExecutionTime, int rescheduledCount) throws IOException { KahaRescheduleJobCommand update = new KahaRescheduleJobCommand(); update.setScheduler(name); @@ -275,9 +268,9 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch this.store.store(update); } - private void doRemove(final long executionTime, final List jobs) throws IOException { - for (JobLocation job : jobs) { - doRemove(executionTime, job.getJobId()); + private void doRemove(final List toRemove) throws IOException { + for (Closure closure : toRemove) { + closure.run(); } } @@ -732,77 +725,83 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch // Read the list of scheduled events and fire the jobs, reschedule repeating jobs as // needed before firing the job event. - Map.Entry> first = getNextToSchedule(); - if (first != null) { - List list = new ArrayList<>(first.getValue()); - List toRemove = new ArrayList<>(list.size()); - final long executionTime = first.getKey(); - long nextExecutionTime = 0; - if (executionTime <= currentTime) { - for (final JobLocation job : list) { + List toRemove = new ArrayList<>(); + List toReschedule = new ArrayList<>(); + try { + this.store.readLockIndex(); - if (!running.get()) { - break; - } + Iterator>> iterator = this.index.iterator(this.store.getPageFile().tx()); + while (iterator.hasNext()) { + Map.Entry> next = iterator.next(); + if (next != null) { + List list = new ArrayList<>(next.getValue()); + final long executionTime = next.getKey(); + long nextExecutionTime = 0; - int repeat = job.getRepeat(); - nextExecutionTime = calculateNextExecutionTime(job, currentTime, repeat); - long waitTime = nextExecutionTime - currentTime; - this.scheduleTime.setWaitTime(waitTime); - if (!job.isCron()) { - fireJob(job); - if (repeat != 0) { - // Reschedule for the next time, the scheduler will take care of - // updating the repeat counter on the update. - doReschedule(job.getJobId(), executionTime, nextExecutionTime, job.getRescheduledCount() + 1); - } else { - toRemove.add(job); + if (executionTime <= currentTime) { + for (final JobLocation job : list) { + + if (!running.get()) { + break; + } + + int repeat = job.getRepeat(); + nextExecutionTime = calculateNextExecutionTime(job, currentTime, repeat); + long waitTime = nextExecutionTime - currentTime; + this.scheduleTime.setWaitTime(waitTime); + if (!job.isCron()) { + fireJob(job); + if (repeat != 0) { + // Reschedule for the next time, the scheduler will take care of + // updating the repeat counter on the update. + final long finalNextExecutionTime = nextExecutionTime; + toReschedule.add(() -> doReschedule(job.getJobId(), executionTime, finalNextExecutionTime, job.getRescheduledCount() + 1)); + } else { + toRemove.add(() -> doRemove(executionTime, job.getJobId())); + } + } else { + if (repeat == 0) { + // This is a non-repeating Cron entry so we can fire and forget it. + fireJob(job); + } + + if (nextExecutionTime > currentTime) { + // Reschedule the cron job as a new event, if the cron entry signals + // a repeat then it will be stored separately and fired as a normal + // event with decrementing repeat. + final long finalNextExecutionTime = nextExecutionTime; + toReschedule.add(() -> doReschedule(job.getJobId(), executionTime, finalNextExecutionTime, job.getRescheduledCount() + 1)); + + if (repeat != 0) { + // we have a separate schedule to run at this time + // so the cron job is used to set of a separate schedule + // hence we won't fire the original cron job to the + // listeners but we do need to start a separate schedule + String jobId = ID_GENERATOR.generateId(); + ByteSequence payload = getPayload(job.getLocation()); + schedule(jobId, payload, "", job.getDelay(), job.getPeriod(), job.getRepeat()); + waitTime = job.getDelay() != 0 ? job.getDelay() : job.getPeriod(); + this.scheduleTime.setWaitTime(waitTime); + } + } else { + toRemove.add(() -> doRemove(executionTime, job.getJobId())); + } + } } } else { - if (repeat == 0) { - // This is a non-repeating Cron entry so we can fire and forget it. - fireJob(job); - } - - if (nextExecutionTime > currentTime) { - // Reschedule the cron job as a new event, if the cron entry signals - // a repeat then it will be stored separately and fired as a normal - // event with decrementing repeat. - doReschedule(job.getJobId(), executionTime, nextExecutionTime, job.getRescheduledCount() + 1); - - if (repeat != 0) { - // we have a separate schedule to run at this time - // so the cron job is used to set of a separate schedule - // hence we won't fire the original cron job to the - // listeners but we do need to start a separate schedule - String jobId = ID_GENERATOR.generateId(); - ByteSequence payload = getPayload(job.getLocation()); - schedule(jobId, payload, "", job.getDelay(), job.getPeriod(), job.getRepeat()); - waitTime = job.getDelay() != 0 ? job.getDelay() : job.getPeriod(); - this.scheduleTime.setWaitTime(waitTime); - } - } else { - toRemove.add(job); - } + this.scheduleTime.setWaitTime(executionTime - currentTime); + break; } } - - // now remove all jobs that have not been rescheduled from this execution - // time, if there are no more entries in that time it will be removed. - doRemove(executionTime, toRemove); - - // If there is a job that should fire before the currently set wait time - // we need to reset wait time otherwise we'll miss it. - Map.Entry> nextUp = getNextToSchedule(); - if (nextUp != null) { - final long timeUntilNextScheduled = nextUp.getKey() - currentTime; - if (timeUntilNextScheduled < this.scheduleTime.getWaitTime()) { - this.scheduleTime.setWaitTime(timeUntilNextScheduled); - } - } - } else { - this.scheduleTime.setWaitTime(executionTime - currentTime); } + } finally { + this.store.readUnlockIndex(); + + doReschedule(toReschedule); + + // now remove all jobs that have not been rescheduled, + // if there are no more entries in that time it will be removed. + doRemove(toRemove); } this.scheduleTime.pause(); @@ -898,6 +897,10 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch out.writeLong(this.index.getPageId()); } + private interface Closure { + void run() throws IOException; + } + static class ScheduleTime { private final int DEFAULT_WAIT = 500; private final int DEFAULT_NEW_JOB_WAIT = 100;