AMQ-9448 Fix persistent scheduler deadlock

Do not fire or schedule jobs while holding read lock on store.
This commit is contained in:
Albertas Vyšniauskas 2024-03-19 21:17:31 +02:00
parent e896f2688e
commit 0448cf25db
2 changed files with 59 additions and 11 deletions

View File

@ -290,6 +290,18 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
this.store.store(destroy);
}
private void doSchedule(List<Closure> toSchedule) throws IOException {
for (Closure closure : toSchedule) {
closure.run();
}
}
private void doFire(List<JobWithPayload> toFire) throws IOException {
for (final JobWithPayload jobWithPayload : toFire) {
jobWithPayload.fire(jobListeners);
}
}
/**
* Adds a new Scheduled job to the index. Must be called under index lock.
*
@ -727,6 +739,8 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
// needed before firing the job event.
List<Closure> toRemove = new ArrayList<>();
List<Closure> toReschedule = new ArrayList<>();
List<Closure> toSchedule = new ArrayList<>();
List<JobWithPayload> toFire = new ArrayList<>();
try {
this.store.readLockIndex();
@ -750,7 +764,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
long waitTime = nextExecutionTime - currentTime;
this.scheduleTime.setWaitTime(waitTime);
if (!job.isCron()) {
fireJob(job);
toFire.add(new JobWithPayload(job, this.store.getPayload(job.getLocation())));
if (repeat != 0) {
// Reschedule for the next time, the scheduler will take care of
// updating the repeat counter on the update.
@ -762,7 +776,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
} else {
if (repeat == 0) {
// This is a non-repeating Cron entry so we can fire and forget it.
fireJob(job);
toFire.add(new JobWithPayload(job, this.store.getPayload(job.getLocation())));
}
if (nextExecutionTime > currentTime) {
@ -779,7 +793,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
// listeners but we do need to start a separate schedule
String jobId = ID_GENERATOR.generateId();
ByteSequence payload = getPayload(job.getLocation());
schedule(jobId, payload, "", job.getDelay(), job.getPeriod(), job.getRepeat());
toSchedule.add(() -> schedule(jobId, payload, "", job.getDelay(), job.getPeriod(), job.getRepeat()));
waitTime = job.getDelay() != 0 ? job.getDelay() : job.getPeriod();
this.scheduleTime.setWaitTime(waitTime);
}
@ -797,6 +811,8 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
} finally {
this.store.readUnlockIndex();
doFire(toFire);
doSchedule(toSchedule);
doReschedule(toReschedule);
// now remove all jobs that have not been rescheduled,
@ -816,14 +832,6 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
}
}
void fireJob(JobLocation job) throws IllegalStateException, IOException {
LOG.debug("Firing: {}", job);
ByteSequence bs = this.store.getPayload(job.getLocation());
for (JobListener l : jobListeners) {
l.scheduledJob(job.getJobId(), bs);
}
}
@Override
public void startDispatching() throws Exception {
if (!this.running.get()) {
@ -950,4 +958,21 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
}
}
}
static class JobWithPayload {
private final JobLocation job;
private final ByteSequence payload;
public JobWithPayload(JobLocation job, ByteSequence payload) {
this.job = job;
this.payload = payload;
}
void fire(List<JobListener> jobListeners) throws IllegalStateException, IOException {
LOG.debug("Firing: {}", job);
for (JobListener l : jobListeners) {
l.scheduledJob(job.getJobId(), payload);
}
}
}
}

View File

@ -119,4 +119,27 @@ public class JmsCronSchedulerTest extends JobSchedulerTestSupport {
connection.close();
}
@Test(timeout = 90000)
public void testRepeatedCronDoesNotDeadlock() throws Exception {
Connection connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(destination);
connection.start();
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "* * * * *");
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 1000);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 1000);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 1);
producer.send(message);
producer.close();
assertNotNull(consumer.receive());
assertNotNull(consumer.receive());
connection.close();
}
}