diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 7f398e636f..bbaf88c0c5 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -415,6 +415,7 @@ public class MessageDatabase { protected void checkpointCleanup(final boolean cleanup) { try { + long start = System.currentTimeMillis(); synchronized (indexMutex) { if( !opened.get() ) { return; @@ -425,6 +426,10 @@ public class MessageDatabase { } }); } + long end = System.currentTimeMillis(); + if( end-start > 100 ) { + LOG.warn("KahaDB Cleanup took "+(end-start)); + } } catch (IOException e) { e.printStackTrace(); } @@ -457,12 +462,22 @@ public class MessageDatabase { * durring a recovery process. */ public Location store(JournalCommand data, boolean sync) throws IOException { + + int size = data.serializedSizeFramed(); DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1); os.writeByte(data.type().getNumber()); data.writeFramed(os); + + long start = System.currentTimeMillis(); Location location = journal.write(os.toByteSequence(), sync); + long start2 = System.currentTimeMillis(); process(data, location); + long end = System.currentTimeMillis(); + if( end-start > 100 ) { + LOG.warn("KahaDB long enqueue time: Journal Add Took: "+(start2-start)+" ms, Index Update took "+(end-start2)+" ms"); + } + metadata.lastUpdate = location; return location; } diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java b/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java index 90f920efc8..b63cdcd63e 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java @@ -145,6 +145,7 @@ public class VerifySteadyEnqueueRate extends TestCase { KahaDBStore kaha = new KahaDBStore(); kaha.setDirectory(new File("target/activemq-data/kahadb")); kaha.deleteAllMessages(); + kaha.getPageFile().setWriteBatchSize(10); broker.setPersistenceAdapter(kaha); } diff --git a/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java b/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java index 1ce0f3d468..6782f7c901 100644 --- a/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java +++ b/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java @@ -104,6 +104,9 @@ public class PageFile { private int recoveryPageCount; private AtomicBoolean loaded = new AtomicBoolean(); + // The number of pages we are aiming to write every time we + // write to disk. + int writeBatchSize = 1000; // We keep a cache of pages recently used? private LRUCache pageCache; @@ -824,7 +827,7 @@ public class PageFile { } private boolean canStartWriteBatch() { - int capacityUsed = ((writes.size() * 100)/1000); + int capacityUsed = ((writes.size() * 100)/writeBatchSize); if( enableAsyncWrites ) { // The constant 10 here controls how soon write batches start going to disk.. // would be nice to figure out how to auto tune that value. Make to small and @@ -1099,4 +1102,12 @@ public class PageFile { return getMainPageFile(); } + public int getWriteBatchSize() { + return writeBatchSize; + } + + public void setWriteBatchSize(int writeBatchSize) { + this.writeBatchSize = writeBatchSize; + } + }