Merge pull request #728 from lucastetreault/scheduler-performance

[AMQ-7340] Improve scheduled message performance
This commit is contained in:
Jean-Baptiste Onofré 2022-01-19 19:01:26 +01:00 committed by GitHub
commit 70e0b3babf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 83 additions and 80 deletions

View File

@ -155,19 +155,6 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
return result; return result;
} }
private Map.Entry<Long, List<JobLocation>> getNextToSchedule() throws IOException {
this.store.readLockIndex();
try {
if (!this.store.isStopped() && !this.store.isStopping()) {
Map.Entry<Long, List<JobLocation>> first = this.index.getFirst(this.store.getPageFile().tx());
return first;
}
} finally {
this.store.readUnlockIndex();
}
return null;
}
@Override @Override
public List<Job> getAllJobs() throws IOException { public List<Job> getAllJobs() throws IOException {
final List<Job> result = new ArrayList<>(); final List<Job> result = new ArrayList<>();
@ -265,6 +252,12 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
this.store.store(newJob); this.store.store(newJob);
} }
private void doReschedule(List<Closure> toReschedule) throws IOException {
for (Closure closure : toReschedule) {
closure.run();
}
}
private void doReschedule(final String jobId, long executionTime, long nextExecutionTime, int rescheduledCount) throws IOException { private void doReschedule(final String jobId, long executionTime, long nextExecutionTime, int rescheduledCount) throws IOException {
KahaRescheduleJobCommand update = new KahaRescheduleJobCommand(); KahaRescheduleJobCommand update = new KahaRescheduleJobCommand();
update.setScheduler(name); update.setScheduler(name);
@ -275,9 +268,9 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
this.store.store(update); this.store.store(update);
} }
private void doRemove(final long executionTime, final List<JobLocation> jobs) throws IOException { private void doRemove(final List<Closure> toRemove) throws IOException {
for (JobLocation job : jobs) { for (Closure closure : toRemove) {
doRemove(executionTime, job.getJobId()); closure.run();
} }
} }
@ -732,12 +725,19 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
// Read the list of scheduled events and fire the jobs, reschedule repeating jobs as // Read the list of scheduled events and fire the jobs, reschedule repeating jobs as
// needed before firing the job event. // needed before firing the job event.
Map.Entry<Long, List<JobLocation>> first = getNextToSchedule(); List<Closure> toRemove = new ArrayList<>();
if (first != null) { List<Closure> toReschedule = new ArrayList<>();
List<JobLocation> list = new ArrayList<>(first.getValue()); try {
List<JobLocation> toRemove = new ArrayList<>(list.size()); this.store.readLockIndex();
final long executionTime = first.getKey();
Iterator<Map.Entry<Long, List<JobLocation>>> iterator = this.index.iterator(this.store.getPageFile().tx());
while (iterator.hasNext()) {
Map.Entry<Long, List<JobLocation>> next = iterator.next();
if (next != null) {
List<JobLocation> list = new ArrayList<>(next.getValue());
final long executionTime = next.getKey();
long nextExecutionTime = 0; long nextExecutionTime = 0;
if (executionTime <= currentTime) { if (executionTime <= currentTime) {
for (final JobLocation job : list) { for (final JobLocation job : list) {
@ -754,9 +754,10 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
if (repeat != 0) { if (repeat != 0) {
// Reschedule for the next time, the scheduler will take care of // Reschedule for the next time, the scheduler will take care of
// updating the repeat counter on the update. // updating the repeat counter on the update.
doReschedule(job.getJobId(), executionTime, nextExecutionTime, job.getRescheduledCount() + 1); final long finalNextExecutionTime = nextExecutionTime;
toReschedule.add(() -> doReschedule(job.getJobId(), executionTime, finalNextExecutionTime, job.getRescheduledCount() + 1));
} else { } else {
toRemove.add(job); toRemove.add(() -> doRemove(executionTime, job.getJobId()));
} }
} else { } else {
if (repeat == 0) { if (repeat == 0) {
@ -768,7 +769,8 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
// Reschedule the cron job as a new event, if the cron entry signals // Reschedule the cron job as a new event, if the cron entry signals
// a repeat then it will be stored separately and fired as a normal // a repeat then it will be stored separately and fired as a normal
// event with decrementing repeat. // event with decrementing repeat.
doReschedule(job.getJobId(), executionTime, nextExecutionTime, job.getRescheduledCount() + 1); final long finalNextExecutionTime = nextExecutionTime;
toReschedule.add(() -> doReschedule(job.getJobId(), executionTime, finalNextExecutionTime, job.getRescheduledCount() + 1));
if (repeat != 0) { if (repeat != 0) {
// we have a separate schedule to run at this time // we have a separate schedule to run at this time
@ -782,28 +784,25 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
this.scheduleTime.setWaitTime(waitTime); this.scheduleTime.setWaitTime(waitTime);
} }
} else { } else {
toRemove.add(job); toRemove.add(() -> doRemove(executionTime, job.getJobId()));
} }
} }
} }
// now remove all jobs that have not been rescheduled from this execution
// time, if there are no more entries in that time it will be removed.
doRemove(executionTime, toRemove);
// If there is a job that should fire before the currently set wait time
// we need to reset wait time otherwise we'll miss it.
Map.Entry<Long, List<JobLocation>> nextUp = getNextToSchedule();
if (nextUp != null) {
final long timeUntilNextScheduled = nextUp.getKey() - currentTime;
if (timeUntilNextScheduled < this.scheduleTime.getWaitTime()) {
this.scheduleTime.setWaitTime(timeUntilNextScheduled);
}
}
} else { } else {
this.scheduleTime.setWaitTime(executionTime - currentTime); this.scheduleTime.setWaitTime(executionTime - currentTime);
break;
} }
} }
}
} finally {
this.store.readUnlockIndex();
doReschedule(toReschedule);
// now remove all jobs that have not been rescheduled,
// if there are no more entries in that time it will be removed.
doRemove(toRemove);
}
this.scheduleTime.pause(); this.scheduleTime.pause();
} catch (Exception ioe) { } catch (Exception ioe) {
@ -898,6 +897,10 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
out.writeLong(this.index.getPageId()); out.writeLong(this.index.getPageId());
} }
private interface Closure {
void run() throws IOException;
}
static class ScheduleTime { static class ScheduleTime {
private final int DEFAULT_WAIT = 500; private final int DEFAULT_WAIT = 500;
private final int DEFAULT_NEW_JOB_WAIT = 100; private final int DEFAULT_NEW_JOB_WAIT = 100;