diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 9a0c80270b..5711741b25 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -140,7 +140,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); - store(command, isSyncWrites() && message.isResponseRequired()); + store(command, isEnableJournalDiskSyncs() && message.isResponseRequired()); } @@ -149,7 +149,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { command.setDestination(dest); command.setMessageId(ack.getLastMessageId().toString()); command.setTransactionInfo(createTransactionInfo(ack.getTransactionId()) ); - store(command, isSyncWrites() && ack.isResponseRequired()); + store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired()); } public void removeAllMessages(ConnectionContext context) throws IOException { @@ -282,14 +282,14 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { command.setRetroactive(retroactive); org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo); command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); - store(command, isSyncWrites() && true); + store(command, isEnableJournalDiskSyncs() && true); } public void deleteSubscription(String clientId, String subscriptionName) throws IOException { KahaSubscriptionCommand command = new KahaSubscriptionCommand(); command.setDestination(dest); command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName)); - store(command, isSyncWrites() && true); + store(command, isEnableJournalDiskSyncs() && true); } public SubscriptionInfo[] getAllSubscriptions() throws IOException { diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index bbaf88c0c5..0e4a2c6d9e 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -146,9 +146,9 @@ public class MessageDatabase { protected boolean deleteAllMessages; protected File directory; protected Thread checkpointThread; - protected boolean syncWrites=true; - int checkpointInterval = 5*1000; - int cleanupInterval = 30*1000; + protected boolean enableJournalDiskSyncs=true; + long checkpointInterval = 5*1000; + long cleanupInterval = 30*1000; protected AtomicBoolean started = new AtomicBoolean(); protected AtomicBoolean opened = new AtomicBoolean(); @@ -1182,9 +1182,7 @@ public class MessageDatabase { // ///////////////////////////////////////////////////////////////// private PageFile createPageFile() { - PageFile pf = new PageFile(directory, "db"); - pf.setEnableAsyncWrites(!isSyncWrites()); - return pf; + return new PageFile(directory, "db"); } private Journal createJournal() { @@ -1211,27 +1209,27 @@ public class MessageDatabase { this.deleteAllMessages = deleteAllMessages; } - public boolean isSyncWrites() { - return syncWrites; + public boolean isEnableJournalDiskSyncs() { + return enableJournalDiskSyncs; } - public void setSyncWrites(boolean syncWrites) { - this.syncWrites = syncWrites; + public void setEnableJournalDiskSyncs(boolean syncWrites) { + this.enableJournalDiskSyncs = syncWrites; } - public int getCheckpointInterval() { + public long getCheckpointInterval() { return checkpointInterval; } - public void setCheckpointInterval(int checkpointInterval) { + public void setCheckpointInterval(long checkpointInterval) { this.checkpointInterval = checkpointInterval; } - public int getCleanupInterval() { + public long getCleanupInterval() { return cleanupInterval; } - public void setCleanupInterval(int cleanupInterval) { + public void setCleanupInterval(long cleanupInterval) { this.cleanupInterval = cleanupInterval; } diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java b/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java index b63cdcd63e..95ae2598f7 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java @@ -65,6 +65,8 @@ public class VerifySteadyEnqueueRate extends TestCase { private void doTestEnqueue(final boolean transacted) throws Exception { final long min = 100; + final AtomicLong total = new AtomicLong(0); + final AtomicLong slaViolations = new AtomicLong(0); final AtomicLong max = new AtomicLong(0); long reportTime = 0; @@ -81,16 +83,20 @@ public class VerifySteadyEnqueueRate extends TestCase { long endT = System.currentTimeMillis(); long duration = endT - startT; + total.incrementAndGet(); + if (duration > max.get()) { max.set(duration); } if (duration > min) { - System.err.println(Thread.currentThread().getName() + slaViolations.incrementAndGet(); + System.err.println("SLA violation @ "+Thread.currentThread().getName() + " " + DateFormat.getTimeInstance().format( new Date(startT)) + " at message " - + i + " send time=" + duration); + + i + " send time=" + duration + + " - Total SLA violations: "+slaViolations.get()+"/"+total.get()+" ("+String.format("%.6f", 100.0*slaViolations.get()/total.get())+"%)"); } } @@ -145,7 +151,13 @@ public class VerifySteadyEnqueueRate extends TestCase { KahaDBStore kaha = new KahaDBStore(); kaha.setDirectory(new File("target/activemq-data/kahadb")); kaha.deleteAllMessages(); - kaha.getPageFile().setWriteBatchSize(10); + kaha.setCleanupInterval(1000 * 60 * 60 * 60); + // The setEnableJournalDiskSyncs(false) setting is a little dangerous right now, as I have not verified + // what happens if the index is updated but a journal update is lost. + // Index is going to be in consistent, but can it be repaired? + kaha.setEnableJournalDiskSyncs(false); + kaha.getPageFile().setWriteBatchSize(100); + kaha.getPageFile().setEnableWriteThread(true); broker.setPersistenceAdapter(kaha); } diff --git a/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java b/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java index 6782f7c901..d6b349a49a 100644 --- a/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java +++ b/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java @@ -26,12 +26,9 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InterruptedIOException; import java.io.RandomAccessFile; -import java.nio.channels.FileLock; -import java.nio.channels.OverlappingFileLockException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; @@ -39,7 +36,6 @@ import java.util.Properties; import java.util.TreeMap; import java.util.Map.Entry; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.zip.Adler32; @@ -48,7 +44,6 @@ import java.util.zip.Checksum; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.kahadb.util.DataByteArrayOutputStream; -import org.apache.kahadb.util.IOExceptionSupport; import org.apache.kahadb.util.IOHelper; import org.apache.kahadb.util.IntrospectionSupport; import org.apache.kahadb.util.LRUCache; @@ -119,9 +114,9 @@ public class PageFile { // page write failures.. private boolean enableRecoveryFile=true; // Will we sync writes to disk. Ensures that data will not be lost after a checkpoint() - private boolean enableSyncedWrites=true; + private boolean enableDiskSyncs=true; // Will writes be done in an async thread? - private boolean enableAsyncWrites=false; + private boolean enabledWriteThread=false; // These are used if enableAsyncWrites==true private AtomicBoolean stopWriter = new AtomicBoolean(); @@ -427,7 +422,7 @@ public class PageFile { */ public void flush() throws IOException { - if( enableAsyncWrites && stopWriter.get() ) { + if( enabledWriteThread && stopWriter.get() ) { throw new IOException("Page file already stopped: checkpointing is not allowed"); } @@ -437,7 +432,7 @@ public class PageFile { if( writes.isEmpty()) { return; } - if( enableAsyncWrites ) { + if( enabledWriteThread ) { if( this.checkpointLatch == null ) { this.checkpointLatch = new CountDownLatch(1); } @@ -591,17 +586,17 @@ public class PageFile { /** * @return Are page writes synced to disk? */ - public boolean isEnableSyncedWrites() { - return enableSyncedWrites; + public boolean isEnableDiskSyncs() { + return enableDiskSyncs; } /** * Allows you enable syncing writes to disk. * @param syncWrites */ - public void setEnableSyncedWrites(boolean syncWrites) { + public void setEnableDiskSyncs(boolean syncWrites) { assertNotLoaded(); - this.enableSyncedWrites = syncWrites; + this.enableDiskSyncs = syncWrites; } /** @@ -662,13 +657,13 @@ public class PageFile { this.pageCacheSize = pageCacheSize; } - public boolean isEnableAsyncWrites() { - return enableAsyncWrites; + public boolean isEnabledWriteThread() { + return enabledWriteThread; } - public void setEnableAsyncWrites(boolean enableAsyncWrites) { + public void setEnableWriteThread(boolean enableAsyncWrites) { assertNotLoaded(); - this.enableAsyncWrites = enableAsyncWrites; + this.enabledWriteThread = enableAsyncWrites; } public long getDiskSize() throws IOException { @@ -700,7 +695,16 @@ public class PageFile { this.recoveryFileMaxPageCount = recoveryFileMaxPageCount; } - /////////////////////////////////////////////////////////////////// + public int getWriteBatchSize() { + return writeBatchSize; + } + + public void setWriteBatchSize(int writeBatchSize) { + assertNotLoaded(); + this.writeBatchSize = writeBatchSize; + } + + /////////////////////////////////////////////////////////////////// // Package Protected Methods exposed to Transaction /////////////////////////////////////////////////////////////////// @@ -817,7 +821,7 @@ public class PageFile { // Once we start approaching capacity, notify the writer to start writing if( canStartWriteBatch() ) { - if( enableAsyncWrites ) { + if( enabledWriteThread ) { writes.notify(); } else { writeBatch(); @@ -828,7 +832,7 @@ public class PageFile { private boolean canStartWriteBatch() { int capacityUsed = ((writes.size() * 100)/writeBatchSize); - if( enableAsyncWrites ) { + if( enabledWriteThread ) { // The constant 10 here controls how soon write batches start going to disk.. // would be nice to figure out how to auto tune that value. Make to small and // we reduce through put because we are locking the write mutex too often doing writes @@ -963,7 +967,7 @@ public class PageFile { recoveryFile.write(w.diskBound, 0, pageSize); } - if (enableSyncedWrites) { + if (enableDiskSyncs) { // Sync to make sure recovery buffer writes land on disk.. recoveryFile.getFD().sync(); } @@ -978,7 +982,7 @@ public class PageFile { } // Sync again - if( enableSyncedWrites ) { + if( enableDiskSyncs ) { writeFile.getFD().sync(); } @@ -1077,7 +1081,7 @@ public class PageFile { private void startWriter() { synchronized( writes ) { - if( enableAsyncWrites ) { + if( enabledWriteThread ) { stopWriter.set(false); writerThread = new Thread("KahaDB Page Writer") { @Override @@ -1092,7 +1096,7 @@ public class PageFile { } private void stopWriter() throws InterruptedException { - if( enableAsyncWrites ) { + if( enabledWriteThread ) { stopWriter.set(true); writerThread.join(); } @@ -1102,12 +1106,4 @@ public class PageFile { return getMainPageFile(); } - public int getWriteBatchSize() { - return writeBatchSize; - } - - public void setWriteBatchSize(int writeBatchSize) { - this.writeBatchSize = writeBatchSize; - } - }