allow configuring the allocation strategy at finer grained controls
including zeroing out, OS copying, or sparse file
This commit is contained in:
Christian Posta 2015-02-17 08:02:37 -07:00
parent 8858dc294c
commit 45e59e6e83
6 changed files with 248 additions and 15 deletions

View File

@ -511,6 +511,22 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
letter.setBrokerService(brokerService);
}
public String getPreallocationScope() {
return letter.getPreallocationScope();
}
public void setPreallocationScope(String preallocationScope) {
this.letter.setPreallocationScope(preallocationScope);
}
public String getPreallocationStrategy() {
return letter.getPreallocationStrategy();
}
public void setPreallocationStrategy(String preallocationStrategy) {
this.letter.setPreallocationStrategy(preallocationStrategy);
}
public boolean isArchiveDataLogs() {
return letter.isArchiveDataLogs();
}

View File

@ -237,8 +237,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
long cleanupInterval = 30*1000;
int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
int preallocationBatchSize = Journal.DEFAULT_PREALLOCATION_BATCH_SIZE;
boolean enableIndexWriteAsync = false;
int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
private String preallocationScope = Journal.PreallocationScope.ENTIRE_JOURNAL.name();
private String preallocationStrategy = Journal.PreallocationStrategy.SPARSE_FILE.name();
protected AtomicBoolean opened = new AtomicBoolean();
private boolean ignoreMissingJournalfiles = false;
@ -2487,6 +2490,10 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
manager.setArchiveDataLogs(isArchiveDataLogs());
manager.setSizeAccumulator(journalSize);
manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs());
manager.setPreallocationScope(Journal.PreallocationScope.valueOf(preallocationScope.trim().toUpperCase()));
manager.setPreallocationStrategy(
Journal.PreallocationStrategy.valueOf(preallocationStrategy.trim().toUpperCase()));
manager.setPreallocationBatchSize(preallocationBatchSize);
if (getDirectoryArchive() != null) {
IOHelper.mkdirs(getDirectoryArchive());
manager.setDirectoryArchive(getDirectoryArchive());
@ -3175,4 +3182,28 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
interface IndexAware {
public void sequenceAssignedWithIndexLocked(long index);
}
public String getPreallocationScope() {
return preallocationScope;
}
public void setPreallocationScope(String preallocationScope) {
this.preallocationScope = preallocationScope;
}
public String getPreallocationStrategy() {
return preallocationStrategy;
}
public void setPreallocationStrategy(String preallocationStrategy) {
this.preallocationStrategy = preallocationStrategy;
}
public int getPreallocationBatchSize() {
return preallocationBatchSize;
}
public void setPreallocationBatchSize(int preallocationBatchSize) {
this.preallocationBatchSize = preallocationBatchSize;
}
}

View File

@ -18,11 +18,16 @@ package org.apache.activemq.store.kahadb.disk.journal;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import org.apache.activemq.store.kahadb.disk.util.LinkedNode;
import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.RecoverableRandomAccessFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* DataFile
@ -31,10 +36,13 @@ import org.apache.activemq.util.RecoverableRandomAccessFile;
*/
public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFile> {
private static final Logger LOG = LoggerFactory.getLogger(DataFile.class);
protected final File file;
protected final Integer dataFileId;
protected volatile int length;
protected final SequenceSet corruptedBlocks = new SequenceSet();
protected long preallocationBatchWindow = 0L;
DataFile(File file, int number) {
this.file = file;
@ -60,6 +68,7 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil
public synchronized void incrementLength(int size) {
length += size;
preallocationBatchWindow -= size;
}
@Override
@ -105,4 +114,39 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil
public int hashCode() {
return dataFileId;
}
public void preallocateJournalBatch(Journal journal, long newMessageSize) {
if (preallocationBatchWindow - newMessageSize <= 0) {
int preallocationBatchSize = Math.min(journal.getPreallocationBatchSize(),
journal.maxFileLength - length);
doPreallocation(preallocationBatchSize);
preallocationBatchWindow = preallocationBatchSize;
}
}
private void doPreallocation(int size) {
try {
RecoverableRandomAccessFile file = openRandomAccessFile();
FileChannel channel = file.getChannel();
channel.position(length+1);
ByteBuffer buffer = generateAllocation(size);
channel.write(buffer);
channel.force(false);
file.close();
} catch (IOException e) {
LOG.debug("Cannot allocate batch for journal, continue without preallocation of batch...");
}
}
private ByteBuffer generateAllocation(int size) {
ByteBuffer rc = ByteBuffer.allocate(size);
for (int i = 0; i < size; i++) {
rc.put((byte) 0x00);
}
rc.flip();
return rc;
}
}

View File

@ -211,6 +211,11 @@ class DataFileAppender implements FileAppender {
file = journal.rotateWriteFile();
}
// will do batch preallocation on the journal if configured
if (journal.preallocationScope == Journal.PreallocationScope.BATCH) {
file.preallocateJournalBatch(journal, write.location.getSize());
}
nextWriteBatch = newWriteBatch(write, file);
enqueueMutex.notifyAll();
break;
@ -314,8 +319,7 @@ class DataFileAppender implements FileAppender {
dataFile = wb.dataFile;
file = dataFile.openRandomAccessFile();
// pre allocate on first open
file.seek(journal.maxFileLength-1);
file.write(end);
journal.preallocateEntireJournalDataFile(file);
}
Journal.WriteCommand write = wb.writes.getHead();

View File

@ -16,10 +16,9 @@
*/
package org.apache.activemq.store.kahadb.disk.journal;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
@ -27,12 +26,9 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.Adler32;
import java.util.zip.Checksum;
import org.apache.activemq.store.kahadb.disk.util.LinkedNode;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.DataByteArrayInputStream;
import org.apache.activemq.util.DataByteArrayOutputStream;
import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList;
import org.apache.activemq.store.kahadb.disk.util.SchedulerTimerTask;
import org.apache.activemq.store.kahadb.disk.util.Sequence;
@ -58,6 +54,17 @@ public class Journal {
public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE+BATCH_CONTROL_RECORD_MAGIC.length+4+8;
public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader();
public enum PreallocationStrategy {
SPARSE_FILE,
OS_KERNEL_COPY,
ZEROS;
}
public enum PreallocationScope {
BATCH,
ENTIRE_JOURNAL;
}
private static byte[] createBatchControlRecordHeader() {
try {
DataByteArrayOutputStream os = new DataByteArrayOutputStream();
@ -80,6 +87,7 @@ public class Journal {
public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30;
public static final int PREFERED_DIFF = 1024 * 512;
public static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4;
public static final int DEFAULT_PREALLOCATION_BATCH_SIZE = 1024 * 1024 * 1;
private static final Logger LOG = LoggerFactory.getLogger(Journal.class);
@ -113,6 +121,10 @@ public class Journal {
protected boolean enableAsyncDiskSync = true;
private Timer timer;
protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL;
protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE;
protected int preallocationBatchSize = DEFAULT_PREALLOCATION_BATCH_SIZE;
public interface DataFileRemovedListener {
void fileRemoved(DataFile datafile);
}
@ -181,6 +193,7 @@ public class Journal {
totalLength.addAndGet(lastAppendLocation.get().getOffset() - maxFileLength);
}
cleanupTask = new Runnable() {
public void run() {
cleanup();
@ -193,6 +206,85 @@ public class Journal {
LOG.trace("Startup took: "+(end-start)+" ms");
}
public void preallocateEntireJournalDataFile(RecoverableRandomAccessFile file) {
if (PreallocationScope.ENTIRE_JOURNAL == preallocationScope) {
if (PreallocationStrategy.OS_KERNEL_COPY == preallocationStrategy) {
doPreallocationKernelCopy(file);
}else if (PreallocationStrategy.ZEROS == preallocationStrategy) {
doPreallocationZeros(file);
}
else {
doPreallocationSparseFile(file);
}
}else {
LOG.info("Using journal preallocation scope of batch allocation");
}
}
private void doPreallocationSparseFile(RecoverableRandomAccessFile file) {
LOG.info("Preallocate journal file with sparse file");
try {
file.seek(maxFileLength - 1);
file.write((byte)0x00);
} catch (IOException e) {
LOG.error("Could not preallocate journal file with sparse file! Will continue without preallocation", e);
}
}
private void doPreallocationZeros(RecoverableRandomAccessFile file) {
LOG.info("Preallocate journal file with zeros");
ByteBuffer buffer = ByteBuffer.allocate(maxFileLength);
for (int i = 0; i < maxFileLength; i++) {
buffer.put((byte) 0x00);
}
buffer.flip();
try {
FileChannel channel = file.getChannel();
channel.write(buffer);
channel.force(false);
channel.position(0);
} catch (IOException e) {
LOG.error("Could not preallocate journal file with zeros! Will continue without preallocation", e);
}
}
private void doPreallocationKernelCopy(RecoverableRandomAccessFile file) {
LOG.info("Preallocate journal file with kernel file copying");
// create a template file that will be used to pre-allocate the journal files
File templateFile = createJournalTemplateFile();
RandomAccessFile templateRaf = null;
try {
templateRaf = new RandomAccessFile(templateFile, "rw");
templateRaf.setLength(maxFileLength);
templateRaf.getChannel().force(true);
templateRaf.getChannel().transferTo(0, getMaxFileLength(), file.getChannel());
templateRaf.close();
templateFile.delete();
} catch (FileNotFoundException e) {
LOG.error("Could not find the template file on disk at " + templateFile.getAbsolutePath(), e);
} catch (IOException e) {
LOG.error("Could not transfer the template file to journal, transferFile=" + templateFile.getAbsolutePath(), e);
}
}
private File createJournalTemplateFile() {
String fileName = "db-log.template";
File rc = new File(directory, fileName);
if (rc.exists()) {
System.out.println("deleting file because it already exists...");
rc.delete();
}
return rc;
}
private static byte[] bytes(String string) {
try {
return string.getBytes("UTF-8");
@ -570,6 +662,30 @@ public class Journal {
}
}
public PreallocationStrategy getPreallocationStrategy() {
return preallocationStrategy;
}
public void setPreallocationStrategy(PreallocationStrategy preallocationStrategy) {
this.preallocationStrategy = preallocationStrategy;
}
public PreallocationScope getPreallocationScope() {
return preallocationScope;
}
public void setPreallocationScope(PreallocationScope preallocationScope) {
this.preallocationScope = preallocationScope;
}
public int getPreallocationBatchSize() {
return preallocationBatchSize;
}
public void setPreallocationBatchSize(int preallocationBatchSize) {
this.preallocationBatchSize = preallocationBatchSize;
}
public File getDirectory() {
return directory;
}

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.store.kahadb.disk.util;
import org.apache.activemq.util.RecoverableRandomAccessFile;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
@ -193,7 +195,7 @@ public class DiskBenchmark {
}
}
public Report benchmark(File file) throws IOException {
public Report benchmark(File file) throws Exception {
Report rc = new Report();
// Initialize the block we will be writing to disk.
@ -203,8 +205,9 @@ public class DiskBenchmark {
}
rc.size = data.length;
RandomAccessFile raf = new RandomAccessFile(file, "rw");
raf.setLength(size);
RecoverableRandomAccessFile raf = new RecoverableRandomAccessFile(file, "rw");
// RandomAccessFile raf = new RandomAccessFile(file, "rw");
preallocateDataFile(raf, file.getParentFile());
// Figure out how many writes we can do in the sample interval.
long start = System.currentTimeMillis();
@ -235,7 +238,7 @@ public class DiskBenchmark {
rc.writes = ioCount;
rc.writeDuration = (now - start);
raf = new RandomAccessFile(file, "rw");
raf = new RecoverableRandomAccessFile(file, "rw");
start = System.currentTimeMillis();
now = System.currentTimeMillis();
ioCount = 0;
@ -259,7 +262,7 @@ public class DiskBenchmark {
rc.syncWrites = ioCount;
rc.syncWriteDuration = (now - start);
raf = new RandomAccessFile(file, "rw");
raf = new RecoverableRandomAccessFile(file, "rw");
start = System.currentTimeMillis();
now = System.currentTimeMillis();
ioCount = 0;
@ -285,6 +288,25 @@ public class DiskBenchmark {
return rc;
}
private void preallocateDataFile(RecoverableRandomAccessFile raf, File location) throws Exception {
File tmpFile;
if (location != null && location.isDirectory()) {
tmpFile = new File(location, "template.dat");
}else {
tmpFile = new File("template.dat");
}
if (tmpFile.exists()) {
tmpFile.delete();
}
System.out.println("Using a template file: " + tmpFile.getAbsolutePath());
RandomAccessFile templateFile = new RandomAccessFile(tmpFile, "rw");
templateFile.setLength(size);
templateFile.getChannel().force(true);
templateFile.getChannel().transferTo(0, size, raf.getChannel());
templateFile.close();
tmpFile.delete();
}
public boolean isVerbose() {
return verbose;
}