AMQ-6451 - catch errors on preallocation. allocate direct buffer on start and reuse to ensure resource availability at runtime

This commit is contained in:
gtully 2016-11-08 14:05:24 +00:00
parent dca066287e
commit 3b7613d930
2 changed files with 80 additions and 26 deletions

View File

@ -215,6 +215,8 @@ public class Journal {
protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL; protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL;
protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE; protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE;
private File osKernelCopyTemplateFile = null; private File osKernelCopyTemplateFile = null;
private ByteBuffer preAllocateDirectBuffer = null;
protected JournalDiskSyncStrategy journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS; protected JournalDiskSyncStrategy journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS;
public interface DataFileRemovedListener { public interface DataFileRemovedListener {
@ -276,13 +278,24 @@ public class Journal {
} }
} }
if (preallocationScope != PreallocationScope.NONE && preallocationStrategy == PreallocationStrategy.OS_KERNEL_COPY) { if (preallocationScope != PreallocationScope.NONE) {
// create a template file that will be used to pre-allocate the journal files switch (preallocationStrategy) {
if (osKernelCopyTemplateFile == null) { case SPARSE_FILE:
osKernelCopyTemplateFile = createJournalTemplateFile(); break;
case OS_KERNEL_COPY: {
osKernelCopyTemplateFile = createJournalTemplateFile();
}
break;
case CHUNKED_ZEROS: {
preAllocateDirectBuffer = allocateDirectBuffer(PREALLOC_CHUNK_SIZE);
}
break;
case ZEROS: {
preAllocateDirectBuffer = allocateDirectBuffer(getMaxFileLength());
}
break;
} }
} }
scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() { scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override @Override
public Thread newThread(Runnable r) { public Thread newThread(Runnable r) {
@ -323,18 +336,29 @@ public class Journal {
LOG.trace("Startup took: "+(end-start)+" ms"); LOG.trace("Startup took: "+(end-start)+" ms");
} }
private ByteBuffer allocateDirectBuffer(int size) {
ByteBuffer buffer = ByteBuffer.allocateDirect(size);
buffer.put(EOF_RECORD);
return buffer;
}
public void preallocateEntireJournalDataFile(RecoverableRandomAccessFile file) { public void preallocateEntireJournalDataFile(RecoverableRandomAccessFile file) {
if (PreallocationScope.NONE != preallocationScope) { if (PreallocationScope.NONE != preallocationScope) {
if (PreallocationStrategy.OS_KERNEL_COPY == preallocationStrategy) { try {
doPreallocationKernelCopy(file); if (PreallocationStrategy.OS_KERNEL_COPY == preallocationStrategy) {
} else if (PreallocationStrategy.ZEROS == preallocationStrategy) { doPreallocationKernelCopy(file);
doPreallocationZeros(file); } else if (PreallocationStrategy.ZEROS == preallocationStrategy) {
} else if (PreallocationStrategy.CHUNKED_ZEROS == preallocationStrategy) { doPreallocationZeros(file);
doPreallocationChunkedZeros(file); } else if (PreallocationStrategy.CHUNKED_ZEROS == preallocationStrategy) {
} else { doPreallocationChunkedZeros(file);
doPreallocationSparseFile(file); } else {
doPreallocationSparseFile(file);
}
} catch (Throwable continueWithNoPrealloc) {
// error on preallocation is non fatal, and we don't want to leak the journal handle
LOG.error("cound not preallocate journal data file", continueWithNoPrealloc);
} }
} }
} }
@ -358,12 +382,10 @@ public class Journal {
} }
private void doPreallocationZeros(RecoverableRandomAccessFile file) { private void doPreallocationZeros(RecoverableRandomAccessFile file) {
ByteBuffer buffer = ByteBuffer.allocate(maxFileLength); preAllocateDirectBuffer.rewind();
buffer.put(EOF_RECORD);
buffer.rewind();
try { try {
FileChannel channel = file.getChannel(); FileChannel channel = file.getChannel();
channel.write(buffer); channel.write(preAllocateDirectBuffer);
channel.force(false); channel.force(false);
channel.position(0); channel.position(0);
} catch (ClosedByInterruptException ignored) { } catch (ClosedByInterruptException ignored) {
@ -401,22 +423,19 @@ public class Journal {
} }
private void doPreallocationChunkedZeros(RecoverableRandomAccessFile file) { private void doPreallocationChunkedZeros(RecoverableRandomAccessFile file) {
preAllocateDirectBuffer.limit(preAllocateDirectBuffer.capacity());
ByteBuffer buffer = ByteBuffer.allocate(PREALLOC_CHUNK_SIZE); preAllocateDirectBuffer.rewind();
buffer.put(EOF_RECORD);
buffer.rewind();
try { try {
FileChannel channel = file.getChannel(); FileChannel channel = file.getChannel();
int remLen = maxFileLength; int remLen = maxFileLength;
while (remLen > 0) { while (remLen > 0) {
if (remLen < buffer.remaining()) { if (remLen < preAllocateDirectBuffer.remaining()) {
buffer.limit(remLen); preAllocateDirectBuffer.limit(remLen);
} }
int writeLen = channel.write(buffer); int writeLen = channel.write(preAllocateDirectBuffer);
remLen -= writeLen; remLen -= writeLen;
buffer.rewind(); preAllocateDirectBuffer.rewind();
} }
channel.force(false); channel.force(false);

View File

@ -17,6 +17,7 @@
package org.apache.activemq.store.kahadb.disk.journal; package org.apache.activemq.store.kahadb.disk.journal;
import org.apache.activemq.store.kahadb.KahaDBStore; import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -55,6 +56,40 @@ public class PreallocationJournalTest {
executeTest("zeros"); executeTest("zeros");
} }
@Test
public void testZerosLoop() throws Exception {
Random rand = new Random();
int randInt = rand.nextInt(100);
File dataDirectory = new File("./target/activemq-data/kahadb" + randInt);
KahaDBStore store = new KahaDBStore();
store.setJournalMaxFileLength(5*1024*1024);
store.deleteAllMessages();
store.setDirectory(dataDirectory);
store.setPreallocationStrategy("zeros");
store.start();
final File journalLog = new File(dataDirectory, "db-1.log");
assertTrue("file exists", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return journalLog.exists();
}
}));
KahaTraceCommand traceCommand = new KahaTraceCommand();
traceCommand.setMessage(new String(new byte[2*1024*1024]));
Location location = null;
for (int i=0; i<20; i++) {
location = store.store(traceCommand);
}
LOG.info("Last location:" + location);
LOG.info("Store journal files:" + store.getJournal().getFiles().size());
}
private void executeTest(String preallocationStrategy)throws Exception { private void executeTest(String preallocationStrategy)throws Exception {
Random rand = new Random(); Random rand = new Random();
int randInt = rand.nextInt(100); int randInt = rand.nextInt(100);