diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java index d780b46fbf..65a952bb7c 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java @@ -215,6 +215,8 @@ public class Journal { protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL; protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE; private File osKernelCopyTemplateFile = null; + private ByteBuffer preAllocateDirectBuffer = null; + protected JournalDiskSyncStrategy journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS; public interface DataFileRemovedListener { @@ -276,13 +278,24 @@ public class Journal { } } - if (preallocationScope != PreallocationScope.NONE && preallocationStrategy == PreallocationStrategy.OS_KERNEL_COPY) { - // create a template file that will be used to pre-allocate the journal files - if (osKernelCopyTemplateFile == null) { - osKernelCopyTemplateFile = createJournalTemplateFile(); + if (preallocationScope != PreallocationScope.NONE) { + switch (preallocationStrategy) { + case SPARSE_FILE: + break; + case OS_KERNEL_COPY: { + osKernelCopyTemplateFile = createJournalTemplateFile(); + } + break; + case CHUNKED_ZEROS: { + preAllocateDirectBuffer = allocateDirectBuffer(PREALLOC_CHUNK_SIZE); + } + break; + case ZEROS: { + preAllocateDirectBuffer = allocateDirectBuffer(getMaxFileLength()); + } + break; } } - scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { @@ -323,18 +336,29 @@ public class Journal { LOG.trace("Startup took: "+(end-start)+" ms"); } + private ByteBuffer allocateDirectBuffer(int size) { + ByteBuffer buffer = ByteBuffer.allocateDirect(size); + buffer.put(EOF_RECORD); + return buffer; + } + public void preallocateEntireJournalDataFile(RecoverableRandomAccessFile file) { if (PreallocationScope.NONE != preallocationScope) { - if (PreallocationStrategy.OS_KERNEL_COPY == preallocationStrategy) { - doPreallocationKernelCopy(file); - } else if (PreallocationStrategy.ZEROS == preallocationStrategy) { - doPreallocationZeros(file); - } else if (PreallocationStrategy.CHUNKED_ZEROS == preallocationStrategy) { - doPreallocationChunkedZeros(file); - } else { - doPreallocationSparseFile(file); + try { + if (PreallocationStrategy.OS_KERNEL_COPY == preallocationStrategy) { + doPreallocationKernelCopy(file); + } else if (PreallocationStrategy.ZEROS == preallocationStrategy) { + doPreallocationZeros(file); + } else if (PreallocationStrategy.CHUNKED_ZEROS == preallocationStrategy) { + doPreallocationChunkedZeros(file); + } else { + doPreallocationSparseFile(file); + } + } catch (Throwable continueWithNoPrealloc) { + // error on preallocation is non fatal, and we don't want to leak the journal handle + LOG.error("cound not preallocate journal data file", continueWithNoPrealloc); } } } @@ -358,12 +382,10 @@ public class Journal { } private void doPreallocationZeros(RecoverableRandomAccessFile file) { - ByteBuffer buffer = ByteBuffer.allocate(maxFileLength); - buffer.put(EOF_RECORD); - buffer.rewind(); + preAllocateDirectBuffer.rewind(); try { FileChannel channel = file.getChannel(); - channel.write(buffer); + channel.write(preAllocateDirectBuffer); channel.force(false); channel.position(0); } catch (ClosedByInterruptException ignored) { @@ -401,22 +423,19 @@ public class Journal { } private void doPreallocationChunkedZeros(RecoverableRandomAccessFile file) { - - ByteBuffer buffer = ByteBuffer.allocate(PREALLOC_CHUNK_SIZE); - buffer.put(EOF_RECORD); - buffer.rewind(); - + preAllocateDirectBuffer.limit(preAllocateDirectBuffer.capacity()); + preAllocateDirectBuffer.rewind(); try { FileChannel channel = file.getChannel(); int remLen = maxFileLength; while (remLen > 0) { - if (remLen < buffer.remaining()) { - buffer.limit(remLen); + if (remLen < preAllocateDirectBuffer.remaining()) { + preAllocateDirectBuffer.limit(remLen); } - int writeLen = channel.write(buffer); + int writeLen = channel.write(preAllocateDirectBuffer); remLen -= writeLen; - buffer.rewind(); + preAllocateDirectBuffer.rewind(); } channel.force(false); diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java index d4095f3817..1c98743271 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java @@ -17,6 +17,7 @@ package org.apache.activemq.store.kahadb.disk.journal; import org.apache.activemq.store.kahadb.KahaDBStore; +import org.apache.activemq.store.kahadb.data.KahaTraceCommand; import org.apache.activemq.util.Wait; import org.junit.Test; import org.slf4j.Logger; @@ -55,6 +56,40 @@ public class PreallocationJournalTest { executeTest("zeros"); } + @Test + public void testZerosLoop() throws Exception { + Random rand = new Random(); + int randInt = rand.nextInt(100); + File dataDirectory = new File("./target/activemq-data/kahadb" + randInt); + + KahaDBStore store = new KahaDBStore(); + store.setJournalMaxFileLength(5*1024*1024); + store.deleteAllMessages(); + store.setDirectory(dataDirectory); + store.setPreallocationStrategy("zeros"); + store.start(); + + final File journalLog = new File(dataDirectory, "db-1.log"); + assertTrue("file exists", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return journalLog.exists(); + } + })); + + + KahaTraceCommand traceCommand = new KahaTraceCommand(); + traceCommand.setMessage(new String(new byte[2*1024*1024])); + Location location = null; + for (int i=0; i<20; i++) { + location = store.store(traceCommand); + } + LOG.info("Last location:" + location); + + LOG.info("Store journal files:" + store.getJournal().getFiles().size()); + + } + private void executeTest(String preallocationStrategy)throws Exception { Random rand = new Random(); int randInt = rand.nextInt(100);