diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java index 4a0fbc4d4e..4cbcc30aed 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java @@ -657,22 +657,35 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch * @param tx * the transaction under which this operation was invoked. * - * @return a list of all referenced Location values for this JobSchedulerImpl + * @return a iterator of all referenced Location values for this JobSchedulerImpl * * @throws IOException if an error occurs walking the scheduler tree. */ - protected List getAllScheduledJobs(Transaction tx) throws IOException { - List references = new ArrayList<>(); + protected Iterator getAllScheduledJobs(Transaction tx) throws IOException { + return new Iterator() { - for (Iterator>> i = this.index.iterator(tx); i.hasNext();) { - Map.Entry> entry = i.next(); - List scheduled = entry.getValue(); - for (JobLocation job : scheduled) { - references.add(job); + final Iterator>> mapIterator = index.iterator(tx); + Iterator iterator; + + @Override + public boolean hasNext() { + + while (iterator == null || !iterator.hasNext()) { + if (!mapIterator.hasNext()) { + break; + } + + iterator = new ArrayList<>(mapIterator.next().getValue()).iterator(); + } + + return iterator != null && iterator.hasNext(); } - } - return references; + @Override + public JobLocation next() { + return iterator.next(); + } + }; } @Override diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java index 79059f1ca4..5cd46291a7 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java @@ -826,8 +826,8 @@ public class JobSchedulerStoreImpl extends AbstractKahaDBStore implements JobSch Map.Entry entry = i.next(); JobSchedulerImpl scheduler = entry.getValue(); - List jobs = scheduler.getAllScheduledJobs(tx); - for (JobLocation job : jobs) { + for (Iterator jobLocationIterator = scheduler.getAllScheduledJobs(tx); jobLocationIterator.hasNext();) { + final JobLocation job = jobLocationIterator.next(); if (job.getLocation().compareTo(lastAppendLocation) >= 0) { if (scheduler.removeJobAtTime(tx, job.getJobId(), job.getNextTime())) { LOG.trace("Removed Job past last appened in the journal: {}", job.getJobId()); @@ -854,8 +854,8 @@ public class JobSchedulerStoreImpl extends AbstractKahaDBStore implements JobSch Map.Entry entry = i.next(); JobSchedulerImpl scheduler = entry.getValue(); - List jobs = scheduler.getAllScheduledJobs(tx); - for (JobLocation job : jobs) { + for (Iterator jobLocationIterator = scheduler.getAllScheduledJobs(tx); jobLocationIterator.hasNext();) { + final JobLocation job = jobLocationIterator.next(); missingJournalFiles.add(job.getLocation().getDataFileId()); if (job.getLastUpdate() != null) { missingJournalFiles.add(job.getLastUpdate().getDataFileId()); @@ -937,8 +937,8 @@ public class JobSchedulerStoreImpl extends AbstractKahaDBStore implements JobSch Map.Entry entry = i.next(); JobSchedulerImpl scheduler = entry.getValue(); - List jobs = scheduler.getAllScheduledJobs(tx); - for (JobLocation job : jobs) { + for (Iterator jobLocationIterator = scheduler.getAllScheduledJobs(tx); jobLocationIterator.hasNext();) { + final JobLocation job = jobLocationIterator.next(); // Remove all jobs in missing log files. if (missing.contains(job.getLocation().getDataFileId())) { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java index 073907009c..6a061c7cfe 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java @@ -35,8 +35,14 @@ import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ScheduledMessage; +import org.apache.activemq.store.kahadb.disk.journal.Location; +import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl; +import org.apache.activemq.util.DefaultTestAppender; import org.apache.activemq.util.ProducerThread; import org.apache.activemq.util.Wait; +import org.apache.log4j.Appender; +import org.apache.log4j.Level; +import org.apache.log4j.spi.LoggingEvent; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -201,6 +207,69 @@ public class JmsSchedulerTest extends JobSchedulerTestSupport { @Test public void testScheduleRestart() throws Exception { + testScheduleRestart(RestartType.NORMAL); + } + + @Test + public void testScheduleFullRecoveryRestart() throws Exception { + testScheduleRestart(RestartType.FULL_RECOVERY); + } + + @Test + public void testUpdatesAppliedToIndexBeforeJournalShouldBeDiscarded() throws Exception { + final int NUMBER_OF_MESSAGES = 1000; + final AtomicInteger numberOfDiscardedJobs = new AtomicInteger(); + final JobSchedulerStoreImpl jobSchedulerStore = (JobSchedulerStoreImpl) broker.getJobSchedulerStore(); + Location middleLocation = null; + + Appender appender = new DefaultTestAppender() { + @Override + public void doAppend(LoggingEvent event) { + if (event.getMessage().toString().contains("Removed Job past last appened in the journal")) { + numberOfDiscardedJobs.incrementAndGet(); + } + } + }; + + registerLogAppender(appender); + + // send a messages + Connection connection = createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + connection.start(); + MessageProducer producer = session.createProducer(destination); + + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + TextMessage message = session.createTextMessage("test msg"); + long time = 5000; + message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); + producer.send(message); + + if (NUMBER_OF_MESSAGES / 2 == i) { + middleLocation = jobSchedulerStore.getJournal().getLastAppendLocation(); + } + } + + producer.close(); + + broker.stop(); + broker.waitUntilStopped(); + + // Simulating the case here updates got applied on the index before the journal updates + jobSchedulerStore.getJournal().setLastAppendLocation(middleLocation); + jobSchedulerStore.load(); + + assertEquals(numberOfDiscardedJobs.get(), NUMBER_OF_MESSAGES / 2); + } + + private void registerLogAppender(final Appender appender) { + org.apache.log4j.Logger log4jLogger = + org.apache.log4j.Logger.getLogger(JobSchedulerStoreImpl.class); + log4jLogger.addAppender(appender); + log4jLogger.setLevel(Level.TRACE); + } + + private void testScheduleRestart(final RestartType restartType) throws Exception { // send a message Connection connection = createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -213,12 +282,7 @@ public class JmsSchedulerTest extends JobSchedulerTestSupport { producer.close(); //restart broker - broker.stop(); - broker.waitUntilStopped(); - - broker = createBroker(false); - broker.start(); - broker.waitUntilStarted(); + restartBroker(restartType); // consume the message connection = createConnection(); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java index 5bf8d8c45a..acfd1ba94c 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java @@ -41,6 +41,11 @@ public class JobSchedulerTestSupport { @Rule public TestName name = new TestName(); + enum RestartType { + NORMAL, + FULL_RECOVERY + } + protected String connectionUri; protected BrokerService broker; protected JobScheduler jobScheduler; @@ -113,4 +118,22 @@ public class JobSchedulerTestSupport { answer.setUseJmx(isUseJmx()); return answer; } + + protected void restartBroker(RestartType restartType) throws Exception { + tearDown(); + + if (restartType == RestartType.FULL_RECOVERY) { + File dir = broker.getSchedulerDirectoryFile(); + + if (dir != null) { + IOHelper.deleteFile(new File(dir, "scheduleDB.data")); + IOHelper.deleteFile(new File(dir, "scheduleDB.redo")); + } + } + + broker = createBroker(false); + + broker.start(); + broker.waitUntilStarted(); + } }