Ensure that when add / remove commands are colocated they don't prevent
the log from being GC'd once it is unreferenced.
This commit is contained in:
Timothy Bish 2016-02-02 20:28:24 -05:00
parent 5f7a81f928
commit 8c4b5f485d
3 changed files with 57 additions and 9 deletions

View File

@ -505,8 +505,12 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
// now that the job is removed from the index we can store the remove info and
// then dereference the log files that hold the initial add command and the most
// recent update command.
this.store.referenceRemovedLocation(tx, location, removed);
// recent update command. If the remove and the add that created the job are in
// the same file we don't need to track it and just let a normal GC of the logs
// remove it when the log is unreferenced.
if (removed.getLocation().getDataFileId() != location.getDataFileId()) {
this.store.referenceRemovedLocation(tx, location, removed);
}
}
}
@ -589,8 +593,12 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
// now that the job is removed from the index we can store the remove info and
// then dereference the log files that hold the initial add command and the most
// recent update command.
this.store.referenceRemovedLocation(tx, location, job);
// recent update command. If the remove and the add that created the job are in
// the same file we don't need to track it and just let a normal GC of the logs
// remove it when the log is unreferenced.
if (job.getLocation().getDataFileId() != location.getDataFileId()) {
this.store.referenceRemovedLocation(tx, location, job);
}
}
}
}

View File

@ -397,22 +397,22 @@ public class JobSchedulerStoreImpl extends AbstractKahaDBStore implements JobSch
Iterator<Entry<Integer, List<Integer>>> removals = metaData.getRemoveLocationTracker().iterator(tx);
List<Integer> orphans = new ArrayList<Integer>();
while (removals.hasNext()) {
boolean orphanedRemve = true;
boolean orphanedRemove = true;
Entry<Integer, List<Integer>> entry = removals.next();
// If this log is not a GC candidate then there's no need to do a check to rule it out
if (gcCandidateSet.contains(entry.getKey())) {
for (Integer addLocation : entry.getValue()) {
if (completeFileSet.contains(addLocation)) {
orphanedRemve = false;
LOG.trace("A remove in log {} has an add still in existance in {}.", entry.getKey(), addLocation);
orphanedRemove = false;
break;
}
}
// If it's not orphaned than we can't remove it, otherwise we
// stop tracking it it's log will get deleted on the next check.
if (!orphanedRemve) {
LOG.trace("A remove in log {} has an add still in existance.", entry.getKey());
if (!orphanedRemove) {
gcCandidateSet.remove(entry.getKey());
} else {
LOG.trace("All removes in log {} are orphaned, file can be GC'd", entry.getKey());

View File

@ -29,6 +29,7 @@ import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.Wait;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -45,6 +46,10 @@ public class JobSchedulerStoreCheckpointTest {
@Before
public void setUp() throws Exception {
// investigate gc issue - store usage not getting released
org.apache.log4j.Logger.getLogger(JobSchedulerStoreImpl.class).setLevel(Level.TRACE);
File directory = new File("target/test/ScheduledJobsDB");
IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory);
@ -80,7 +85,7 @@ public class JobSchedulerStoreCheckpointTest {
}
@Test
public void test() throws Exception {
public void testStoreCleanupLinear() throws Exception {
final int COUNT = 10;
final CountDownLatch latch = new CountDownLatch(COUNT);
scheduler.addListener(new JobListener() {
@ -122,4 +127,39 @@ public class JobSchedulerStoreCheckpointTest {
LOG.info("Number of journal log files: {}", getNumJournalFiles());
}
@Test
public void testColocatedAddRemoveCleanup() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
scheduler.addListener(new JobListener() {
@Override
public void scheduledJob(String id, ByteSequence job) {
latch.countDown();
}
});
byte[] data = new byte[1024];
for (int i = 0; i < data.length; ++i) {
data[i] = (byte) (i % 256);
}
long time = TimeUnit.SECONDS.toMillis(2);
scheduler.schedule("Message-1", new ByteSequence(data), "", time, 0, 0);
assertTrue(latch.await(70, TimeUnit.SECONDS));
assertEquals(0, latch.getCount());
scheduler.schedule("Message-2", payload, "", time, 0, 0);
scheduler.schedule("Message-3", payload, "", time, 0, 0);
assertTrue("Should be only one log left: " + getNumJournalFiles(), Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getNumJournalFiles() == 1;
}
}, TimeUnit.MINUTES.toMillis(2)));
LOG.info("Number of journal log files: {}", getNumJournalFiles());
}
}