diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java index bcb819cc93..82b9ff581c 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java @@ -505,8 +505,12 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch // now that the job is removed from the index we can store the remove info and // then dereference the log files that hold the initial add command and the most - // recent update command. - this.store.referenceRemovedLocation(tx, location, removed); + // recent update command. If the remove and the add that created the job are in + // the same file we don't need to track it and just let a normal GC of the logs + // remove it when the log is unreferenced. + if (removed.getLocation().getDataFileId() != location.getDataFileId()) { + this.store.referenceRemovedLocation(tx, location, removed); + } } } @@ -589,8 +593,12 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch // now that the job is removed from the index we can store the remove info and // then dereference the log files that hold the initial add command and the most - // recent update command. - this.store.referenceRemovedLocation(tx, location, job); + // recent update command. If the remove and the add that created the job are in + // the same file we don't need to track it and just let a normal GC of the logs + // remove it when the log is unreferenced. + if (job.getLocation().getDataFileId() != location.getDataFileId()) { + this.store.referenceRemovedLocation(tx, location, job); + } } } } diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java index 1a089318b9..f73b6a3c48 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java @@ -397,22 +397,22 @@ public class JobSchedulerStoreImpl extends AbstractKahaDBStore implements JobSch Iterator>> removals = metaData.getRemoveLocationTracker().iterator(tx); List orphans = new ArrayList(); while (removals.hasNext()) { - boolean orphanedRemve = true; + boolean orphanedRemove = true; Entry> entry = removals.next(); // If this log is not a GC candidate then there's no need to do a check to rule it out if (gcCandidateSet.contains(entry.getKey())) { for (Integer addLocation : entry.getValue()) { if (completeFileSet.contains(addLocation)) { - orphanedRemve = false; + LOG.trace("A remove in log {} has an add still in existance in {}.", entry.getKey(), addLocation); + orphanedRemove = false; break; } } // If it's not orphaned than we can't remove it, otherwise we // stop tracking it it's log will get deleted on the next check. - if (!orphanedRemve) { - LOG.trace("A remove in log {} has an add still in existance.", entry.getKey()); + if (!orphanedRemove) { gcCandidateSet.remove(entry.getKey()); } else { LOG.trace("All removes in log {} are orphaned, file can be GC'd", entry.getKey()); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreCheckpointTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreCheckpointTest.java index c013a4c5f0..3685f34e28 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreCheckpointTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreCheckpointTest.java @@ -29,6 +29,7 @@ import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.IOHelper; import org.apache.activemq.util.Wait; +import org.apache.log4j.Level; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -45,6 +46,10 @@ public class JobSchedulerStoreCheckpointTest { @Before public void setUp() throws Exception { + + // investigate gc issue - store usage not getting released + org.apache.log4j.Logger.getLogger(JobSchedulerStoreImpl.class).setLevel(Level.TRACE); + File directory = new File("target/test/ScheduledJobsDB"); IOHelper.mkdirs(directory); IOHelper.deleteChildren(directory); @@ -80,7 +85,7 @@ public class JobSchedulerStoreCheckpointTest { } @Test - public void test() throws Exception { + public void testStoreCleanupLinear() throws Exception { final int COUNT = 10; final CountDownLatch latch = new CountDownLatch(COUNT); scheduler.addListener(new JobListener() { @@ -122,4 +127,39 @@ public class JobSchedulerStoreCheckpointTest { LOG.info("Number of journal log files: {}", getNumJournalFiles()); } + + @Test + public void testColocatedAddRemoveCleanup() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + scheduler.addListener(new JobListener() { + @Override + public void scheduledJob(String id, ByteSequence job) { + latch.countDown(); + } + }); + + byte[] data = new byte[1024]; + for (int i = 0; i < data.length; ++i) { + data[i] = (byte) (i % 256); + } + + long time = TimeUnit.SECONDS.toMillis(2); + scheduler.schedule("Message-1", new ByteSequence(data), "", time, 0, 0); + + assertTrue(latch.await(70, TimeUnit.SECONDS)); + assertEquals(0, latch.getCount()); + + scheduler.schedule("Message-2", payload, "", time, 0, 0); + scheduler.schedule("Message-3", payload, "", time, 0, 0); + + assertTrue("Should be only one log left: " + getNumJournalFiles(), Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getNumJournalFiles() == 1; + } + }, TimeUnit.MINUTES.toMillis(2))); + + LOG.info("Number of journal log files: {}", getNumJournalFiles()); + } }