diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java index ecbe1dcb8f..1d67ec7c5e 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java @@ -151,6 +151,9 @@ public class PList { synchronized public void unload() { if (loaded.compareAndSet(true, false)) { + this.rootId = EntryLocation.NOT_SET; + this.lastId = EntryLocation.NOT_SET; + this.size=0; } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java index aa7db136bf..6a63c51173 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java @@ -24,7 +24,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -50,8 +49,8 @@ public class PListStore extends ServiceSupport { static final Log LOG = LogFactory.getLog(PListStore.class); private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000; - public static final int CLOSED_STATE = 1; - public static final int OPEN_STATE = 2; + static final int CLOSED_STATE = 1; + static final int OPEN_STATE = 2; private File directory; PageFile pageFile; @@ -61,6 +60,7 @@ public class PListStore extends ServiceSupport { private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; private boolean enableIndexWriteAsync = false; + private boolean initialized = false; // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; MetaData metaData = new MetaData(this); final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this); @@ -69,8 +69,8 @@ public class PListStore extends ServiceSupport { protected class MetaData { protected MetaData(PListStore store) { this.store = store; - LinkedList list = new LinkedList(); } + private final PListStore store; Page page; BTreeIndex journalRC; @@ -174,9 +174,9 @@ public class PListStore extends ServiceSupport { public void setDirectory(File directory) { this.directory = directory; } - + public long size() { - if ( !isStarted() ) { + if (!initialized) { return 0; } try { @@ -187,6 +187,10 @@ public class PListStore extends ServiceSupport { } public PList getPList(final String name) throws Exception { + if (!isStarted()) { + throw new IllegalStateException("Not started"); + } + intialize(); PList result = this.persistentLists.get(name); if (result == null) { final PList pl = new PList(this); @@ -207,7 +211,7 @@ public class PListStore extends ServiceSupport { load.load(tx); } }); - + return result; } @@ -226,45 +230,54 @@ public class PListStore extends ServiceSupport { return result; } + protected synchronized void intialize() throws Exception { + if (isStarted()) { + if (this.initialized == false) { + this.initialized = true; + if (this.directory == null) { + this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB"); + } + IOHelper.mkdirs(this.directory); + lock(); + this.journal = new Journal(); + this.journal.setDirectory(directory); + this.journal.setMaxFileLength(getJournalMaxFileLength()); + this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize()); + this.journal.start(); + this.pageFile = new PageFile(directory, "scheduleDB"); + this.pageFile.load(); + + this.pageFile.tx().execute(new Transaction.Closure() { + public void execute(Transaction tx) throws IOException { + if (pageFile.getPageCount() == 0) { + Page page = tx.allocate(); + assert page.getPageId() == 0; + page.set(metaData); + metaData.page = page; + metaData.createIndexes(tx); + tx.store(metaData.page, metaDataMarshaller, true); + + } else { + Page page = tx.load(0, metaDataMarshaller); + metaData = page.get(); + metaData.page = page; + } + metaData.load(tx); + metaData.loadLists(tx, persistentLists); + } + }); + + this.pageFile.flush(); + LOG.info(this + " initialized"); + } + } + } + @Override protected synchronized void doStart() throws Exception { - if (this.directory == null) { - this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB"); - } - IOHelper.mkdirs(this.directory); - lock(); - this.journal = new Journal(); - this.journal.setDirectory(directory); - this.journal.setMaxFileLength(getJournalMaxFileLength()); - this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize()); - this.journal.start(); - this.pageFile = new PageFile(directory, "scheduleDB"); - this.pageFile.load(); - - this.pageFile.tx().execute(new Transaction.Closure() { - public void execute(Transaction tx) throws IOException { - if (pageFile.getPageCount() == 0) { - Page page = tx.allocate(); - assert page.getPageId() == 0; - page.set(metaData); - metaData.page = page; - metaData.createIndexes(tx); - tx.store(metaData.page, metaDataMarshaller, true); - - } else { - Page page = tx.load(0, metaDataMarshaller); - metaData = page.get(); - metaData.page = page; - } - metaData.load(tx); - metaData.loadLists(tx, persistentLists); - } - }); - - this.pageFile.flush(); LOG.info(this + " started"); } - + @Override protected synchronized void doStop(ServiceStopper stopper) throws Exception { for (PList pl : this.persistentLists.values()) { @@ -280,6 +293,7 @@ public class PListStore extends ServiceSupport { this.lockFile.unlock(); } this.lockFile = null; + this.initialized = false; LOG.info(this + " stopped"); } @@ -381,7 +395,7 @@ public class PListStore extends ServiceSupport { @Override public String toString() { - return "JobSchedulerStore:" + this.directory; + return "PListStore:" + this.directory; } }