diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 9a0b6ebb75..806bbeffc9 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -151,6 +151,9 @@ public class MessageDatabase { protected boolean enableJournalDiskSyncs=true; long checkpointInterval = 5*1000; long cleanupInterval = 30*1000; + int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; + boolean enableIndexWriteAsync = false; + int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; protected AtomicBoolean started = new AtomicBoolean(); protected AtomicBoolean opened = new AtomicBoolean(); @@ -1138,7 +1141,7 @@ public class MessageDatabase { // ///////////////////////////////////////////////////////////////// protected final LinkedHashMap> inflightTransactions = new LinkedHashMap>(); protected final LinkedHashMap> preparedTransactions = new LinkedHashMap>(); - + private ArrayList getInflightTx(KahaTransactionInfo info, Location location) { TransactionId key = key(info); ArrayList tx = inflightTransactions.get(key); @@ -1219,13 +1222,16 @@ public class MessageDatabase { // ///////////////////////////////////////////////////////////////// private PageFile createPageFile() { - return new PageFile(directory, "db"); + PageFile index = new PageFile(directory, "db"); + index.setEnableWriteThread(isEnableIndexWriteAsync()); + index.setWriteBatchSize(getIndexWriteBatchSize()); + return index; } private Journal createJournal() { Journal manager = new Journal(); manager.setDirectory(directory); - manager.setMaxFileLength(1024 * 1024 * 20); + manager.setMaxFileLength(getJournalMaxFileLength()); manager.setUseNio(false); return manager; } @@ -1245,7 +1251,23 @@ public class MessageDatabase { public void setDeleteAllMessages(boolean deleteAllMessages) { this.deleteAllMessages = deleteAllMessages; } + + public void setIndexWriteBatchSize(int setIndexWriteBatchSize) { + this.setIndexWriteBatchSize = setIndexWriteBatchSize; + } + public int getIndexWriteBatchSize() { + return setIndexWriteBatchSize; + } + + public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { + this.enableIndexWriteAsync = enableIndexWriteAsync; + } + + boolean isEnableIndexWriteAsync() { + return enableIndexWriteAsync; + } + public boolean isEnableJournalDiskSyncs() { return enableJournalDiskSyncs; } @@ -1270,6 +1292,14 @@ public class MessageDatabase { this.cleanupInterval = cleanupInterval; } + public void setJournalMaxFileLength(int journalMaxFileLength) { + this.journalMaxFileLength = journalMaxFileLength; + } + + public int getJournalMaxFileLength() { + return journalMaxFileLength; + } + public PageFile getPageFile() { if (pageFile == null) { pageFile = createPageFile(); diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java b/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java index 339ca98d5f..8241e1e3d9 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java @@ -56,7 +56,7 @@ public class VerifySteadyEnqueueRate extends TestCase { broker.stop(); } - public void testForDataFileNotDeleted() throws Exception { + public void testEnqueueRateCanMeetSLA() throws Exception { if (true) { return; } @@ -68,8 +68,8 @@ public class VerifySteadyEnqueueRate extends TestCase { final AtomicLong total = new AtomicLong(0); final AtomicLong slaViolations = new AtomicLong(0); final AtomicLong max = new AtomicLong(0); - long reportTime = 0; - + final int numThreads = 6; + Runnable runner = new Runnable() { public void run() { @@ -108,7 +108,7 @@ public class VerifySteadyEnqueueRate extends TestCase { } }; ExecutorService executor = Executors.newCachedThreadPool(); - int numThreads = 6; + for (int i = 0; i < numThreads; i++) { executor.execute(runner); } @@ -127,7 +127,7 @@ public class VerifySteadyEnqueueRate extends TestCase { private void startBroker() throws Exception { broker = new BrokerService(); - broker.setDeleteAllMessagesOnStartup(true); + //broker.setDeleteAllMessagesOnStartup(true); broker.setPersistent(true); broker.setUseJmx(true); @@ -155,9 +155,13 @@ public class VerifySteadyEnqueueRate extends TestCase { // Index is going to be in consistent, but can it be repaired? kaha.setEnableJournalDiskSyncs(false); // Using a bigger journal file size makes he take fewer spikes as it is not switching files as often. - kaha.getJournal().setMaxFileLength(1024*1024*100); - kaha.getPageFile().setWriteBatchSize(100); - kaha.getPageFile().setEnableWriteThread(true); + kaha.setJournalMaxFileLength(1024*1024*100); + + // small batch means more frequent and smaller writes + kaha.setIndexWriteBatchSize(100); + // do the index write in a separate thread + kaha.setEnableIndexWriteAsync(true); + broker.setPersistenceAdapter(kaha); } diff --git a/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java b/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java index afc3844229..0551a2385f 100644 --- a/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java +++ b/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java @@ -69,6 +69,7 @@ public class PageFile { // 4k Default page size. public static final int DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", ""+1024*4)); + public static final int DEFAULT_WRITE_BATCH_SIZE = Integer.parseInt(System.getProperty("defaultWriteBatchSize", ""+1000)); private static final int RECOVERY_FILE_HEADER_SIZE=1024*4; private static final int PAGE_FILE_HEADER_SIZE=1024*4; @@ -101,7 +102,7 @@ public class PageFile { private AtomicBoolean loaded = new AtomicBoolean(); // The number of pages we are aiming to write every time we // write to disk. - int writeBatchSize = 1000; + int writeBatchSize = DEFAULT_WRITE_BATCH_SIZE; // We keep a cache of pages recently used? private LRUCache pageCache;