https://issues.apache.org/jira/browse/AMQ-3310 - IOException PListStore "Could not locate data file" from FilePendingMessageCursor. Issue with reference counting and async location initialsation resulting in inuse data file removal and data file leaking. Pulled cleanup into task that periodically queries lists for references

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1101085 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-05-09 16:11:50 +00:00
parent d5813be360
commit 0619a8757e
6 changed files with 146 additions and 41 deletions

View File

@ -1453,6 +1453,7 @@ public class BrokerService implements Service {
}
this.tempDataStore = new PListStore();
this.tempDataStore.setDirectory(getTmpDataDirectory());
configureService(tempDataStore);
this.tempDataStore.start();
} catch (Exception e) {
throw new RuntimeException(e);
@ -1467,6 +1468,7 @@ public class BrokerService implements Service {
*/
public void setTempDataStore(PListStore tempDataStore) {
this.tempDataStore = tempDataStore;
configureService(tempDataStore);
try {
tempDataStore.start();
} catch (Exception e) {

View File

@ -175,8 +175,6 @@ public class PList {
EntryLocation entry = createEntry(tx, id, this.lastId, EntryLocation.NOT_SET);
entry.setLocation(location);
storeEntry(tx, entry);
this.store.incrementJournalCount(tx, location);
EntryLocation last = loadEntry(tx, this.lastId);
last.setNext(entry.getPage().getPageId());
storeEntry(tx, last);
@ -210,7 +208,6 @@ public class PList {
storeEntry(tx, root);
storeEntry(tx, entry);
this.store.incrementJournalCount(tx, location);
this.size++;
}
@ -433,7 +430,6 @@ public class PList {
storeEntry(tx, prev);
}
this.store.decrementJournalCount(tx, entry.getLocation());
entry.reset();
storeEntry(tx, entry);
tx.free(entry.getPage().getPageId());

View File

@ -22,17 +22,18 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.kahadb.index.BTreeIndex;
import org.apache.kahadb.journal.Journal;
import org.apache.kahadb.journal.Location;
@ -40,15 +41,16 @@ import org.apache.kahadb.page.Page;
import org.apache.kahadb.page.PageFile;
import org.apache.kahadb.page.Transaction;
import org.apache.kahadb.util.ByteSequence;
import org.apache.kahadb.util.IntegerMarshaller;
import org.apache.kahadb.util.LockFile;
import org.apache.kahadb.util.StringMarshaller;
import org.apache.kahadb.util.VariableMarshaller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @org.apache.xbean.XBean
*/
public class PListStore extends ServiceSupport {
public class PListStore extends ServiceSupport implements BrokerServiceAware, Runnable {
static final Logger LOG = LoggerFactory.getLogger(PListStore.class);
private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
@ -69,11 +71,18 @@ public class PListStore extends ServiceSupport {
final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
Map<String, PList> persistentLists = new HashMap<String, PList>();
final Object indexLock = new Object();
private Scheduler scheduler;
private long cleanupInterval = 30000;
public Object getIndexLock() {
return indexLock;
}
@Override
public void setBrokerService(BrokerService brokerService) {
this.scheduler = brokerService.getScheduler();
}
protected class MetaData {
protected MetaData(PListStore store) {
this.store = store;
@ -81,21 +90,16 @@ public class PListStore extends ServiceSupport {
private final PListStore store;
Page<MetaData> page;
BTreeIndex<Integer, Integer> journalRC;
BTreeIndex<String, PList> storedSchedulers;
void createIndexes(Transaction tx) throws IOException {
this.storedSchedulers = new BTreeIndex<String, PList>(pageFile, tx.allocate().getPageId());
this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId());
}
void load(Transaction tx) throws IOException {
this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
this.storedSchedulers.load(tx);
this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
this.journalRC.load(tx);
}
void loadLists(Transaction tx, Map<String, PList> schedulers) throws IOException {
@ -110,15 +114,10 @@ public class PListStore extends ServiceSupport {
this.storedSchedulers = new BTreeIndex<String, PList>(pageFile, is.readLong());
this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong());
this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
}
public void write(DataOutput os) throws IOException {
os.writeLong(this.storedSchedulers.getPageId());
os.writeLong(this.journalRC.getPageId());
}
}
@ -276,8 +275,15 @@ public class PListStore extends ServiceSupport {
metaData.loadLists(tx, persistentLists);
}
});
this.pageFile.flush();
if (cleanupInterval > 0) {
if (scheduler == null) {
scheduler = new Scheduler(PListStore.class.getSimpleName());
scheduler.start();
}
scheduler.executePeriodically(this, cleanupInterval);
}
LOG.info(this + " initialized");
}
}
@ -290,6 +296,12 @@ public class PListStore extends ServiceSupport {
@Override
protected synchronized void doStop(ServiceStopper stopper) throws Exception {
if (scheduler != null) {
if (PListStore.class.getSimpleName().equals(scheduler.getName())) {
scheduler.stop();
scheduler = null;
}
}
for (PList pl : this.persistentLists.values()) {
pl.unload();
}
@ -308,27 +320,29 @@ public class PListStore extends ServiceSupport {
}
synchronized void incrementJournalCount(Transaction tx, Location location) throws IOException {
int logId = location.getDataFileId();
Integer val = this.metaData.journalRC.get(tx, logId);
int refCount = val != null ? val.intValue() + 1 : 1;
this.metaData.journalRC.put(tx, logId, refCount);
public void run() {
try {
final Set<Integer> candidates = journal.getFileMap().keySet();
LOG.trace("Full gc candidate set:" + candidates);
for (PList list : persistentLists.values()) {
PListEntry entry = list.getFirst();
while (entry != null) {
claimCandidates(entry, candidates);
entry = list.getNext(entry);
}
LOG.trace("Remaining gc candidate set after refs from: " + list.getName() + ":" + candidates);
}
LOG.debug("GC Candidate set:" + candidates);
this.journal.removeDataFiles(candidates);
} catch (IOException e) {
LOG.error("Exception on periodic cleanup: " + e, e);
}
}
synchronized void decrementJournalCount(Transaction tx, Location location) throws IOException {
int logId = location.getDataFileId();
if (logId != Location.NOT_SET) {
int refCount = this.metaData.journalRC.get(tx, logId);
refCount--;
if (refCount <= 0) {
this.metaData.journalRC.remove(tx, logId);
Set<Integer> set = new HashSet<Integer>();
set.add(logId);
this.journal.removeDataFiles(set);
} else {
this.metaData.journalRC.put(tx, logId, refCount);
}
private void claimCandidates(PListEntry entry, Set<Integer> candidates) {
EntryLocation location = entry.getEntry();
if (location != null) {
candidates.remove(location.getLocation().getDataFileId());
}
}
@ -404,6 +418,14 @@ public class PListStore extends ServiceSupport {
this.enableIndexWriteAsync = enableIndexWriteAsync;
}
public long getCleanupInterval() {
return cleanupInterval;
}
public void setCleanupInterval(long cleanupInterval) {
this.cleanupInterval = cleanupInterval;
}
@Override
public String toString() {
return "PListStore:" + this.directory;

View File

@ -80,4 +80,8 @@ public final class Scheduler extends ServiceSupport {
}
}
public String getName() {
return name;
}
}

View File

@ -163,6 +163,10 @@ public class TempStorageBlockedBrokerTest {
LOG.info("Subscrition Usage: " + tempUsageBySubscription + ", endUsage: "
+ broker.getSystemUsage().getTempUsage().getUsage());
// do a cleanup
broker.getTempDataStore().run();
LOG.info("Subscrition Usage: " + tempUsageBySubscription + ", endUsage: "
+ broker.getSystemUsage().getTempUsage().getUsage());
assertEquals("Incorrect number of Messages Sent: " + messagesSent.get(), messagesSent.get(), MESSAGES_COUNT);
assertEquals("Incorrect number of Messages Consumed: " + messagesConsumed.get(), messagesConsumed.get(),
@ -187,6 +191,7 @@ public class TempStorageBlockedBrokerTest {
IOHelper.deleteChildren(tmpDir);
PListStore tempStore = new PListStore();
tempStore.setDirectory(tmpDir);
tempStore.setJournalMaxFileLength(50*1024);
tempStore.start();
SystemUsage sysUsage = new SystemUsage("mySysUsage", persistence, tempStore);

View File

@ -25,6 +25,11 @@ import java.io.File;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Vector;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.util.IOHelper;
import org.apache.kahadb.util.ByteSequence;
@ -147,7 +152,78 @@ public class PListTest {
assertTrue(plist.remove(0));
assertFalse(plist.remove(3));
}
@Test
public void testConcurrentAddRemove() throws Exception {
File directory = store.getDirectory();
store.stop();
IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory);
store = new PListStore();
store.setDirectory(directory);
store.setJournalMaxFileLength(1024*5);
store.start();
final ByteSequence payload = new ByteSequence(new byte[1024*4]);
final Vector<Throwable> exceptions = new Vector<Throwable>();
final int iterations = 1000;
final int numLists = 10;
final PList[] lists = new PList[numLists];
for (int i=0; i<numLists; i++) {
lists[i] = store.getPList("List" + i);
}
ExecutorService executor = Executors.newFixedThreadPool(100);
class A implements Runnable {
@Override
public void run() {
try {
for (int i=0; i<iterations; i++) {
PList candidate = lists[i%numLists];
candidate.addLast(String.valueOf(i), payload);
PListEntry entry = candidate.getFirst();
assertTrue(candidate.remove(String.valueOf(i)));
}
} catch (Exception error) {
error.printStackTrace();
exceptions.add(error);
}
}
};
class B implements Runnable {
@Override
public void run() {
try {
for (int i=0; i<iterations; i++) {
PList candidate = lists[i%numLists];
candidate.addLast(String.valueOf(i), payload);
PListEntry entry = candidate.getFirst();
assertTrue(candidate.remove(String.valueOf(i)));
}
} catch (Exception error) {
error.printStackTrace();
exceptions.add(error);
}
}
};
executor.execute(new A());
executor.execute(new A());
executor.execute(new A());
executor.execute(new B());
executor.execute(new B());
executor.execute(new B());
executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);
assertTrue("no exceptions", exceptions.isEmpty());
}
@Before
public void setUp() throws Exception {