mirror of https://github.com/apache/activemq.git
Add additional preallocation strategy [CHUNKED contributed by user jtahlborn.
This commit is contained in:
parent
be4ad3d1c8
commit
edfc23ee9d
|
@ -62,6 +62,8 @@ public class Journal {
|
||||||
|
|
||||||
private static final int MAX_BATCH_SIZE = 32*1024*1024;
|
private static final int MAX_BATCH_SIZE = 32*1024*1024;
|
||||||
|
|
||||||
|
private static final int PREALLOC_CHUNK_SIZE = 1 << 20;
|
||||||
|
|
||||||
// ITEM_HEAD_SPACE = length + type+ reserved space + SOR
|
// ITEM_HEAD_SPACE = length + type+ reserved space + SOR
|
||||||
public static final int RECORD_HEAD_SPACE = 4 + 1;
|
public static final int RECORD_HEAD_SPACE = 4 + 1;
|
||||||
|
|
||||||
|
@ -69,10 +71,10 @@ public class Journal {
|
||||||
public static final byte BATCH_CONTROL_RECORD_TYPE = 2;
|
public static final byte BATCH_CONTROL_RECORD_TYPE = 2;
|
||||||
// Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch.
|
// Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch.
|
||||||
public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH");
|
public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH");
|
||||||
public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE+BATCH_CONTROL_RECORD_MAGIC.length+4+8;
|
public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE + BATCH_CONTROL_RECORD_MAGIC.length + 4 + 8;
|
||||||
public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader();
|
public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader();
|
||||||
|
|
||||||
// tackle corruption when checksum is disabled or corrupt with zeros, minimise data loss
|
// tackle corruption when checksum is disabled or corrupt with zeros, minimize data loss
|
||||||
public void corruptRecoveryLocation(Location recoveryPosition) throws IOException {
|
public void corruptRecoveryLocation(Location recoveryPosition) throws IOException {
|
||||||
DataFile dataFile = getDataFile(recoveryPosition);
|
DataFile dataFile = getDataFile(recoveryPosition);
|
||||||
// with corruption on recovery we have no faith in the content - slip to the next batch record or eof
|
// with corruption on recovery we have no faith in the content - slip to the next batch record or eof
|
||||||
|
@ -87,7 +89,6 @@ public class Journal {
|
||||||
recoveryPosition.setSize(-1);
|
recoveryPosition.setSize(-1);
|
||||||
|
|
||||||
dataFile.corruptedBlocks.add(sequence);
|
dataFile.corruptedBlocks.add(sequence);
|
||||||
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
} finally {
|
} finally {
|
||||||
accessorPool.closeDataFileAccessor(reader);
|
accessorPool.closeDataFileAccessor(reader);
|
||||||
|
@ -97,7 +98,8 @@ public class Journal {
|
||||||
public enum PreallocationStrategy {
|
public enum PreallocationStrategy {
|
||||||
SPARSE_FILE,
|
SPARSE_FILE,
|
||||||
OS_KERNEL_COPY,
|
OS_KERNEL_COPY,
|
||||||
ZEROS;
|
ZEROS,
|
||||||
|
CHUNKED_ZEROS;
|
||||||
}
|
}
|
||||||
|
|
||||||
public enum PreallocationScope {
|
public enum PreallocationScope {
|
||||||
|
@ -256,6 +258,8 @@ public class Journal {
|
||||||
doPreallocationKernelCopy(file);
|
doPreallocationKernelCopy(file);
|
||||||
} else if (PreallocationStrategy.ZEROS == preallocationStrategy) {
|
} else if (PreallocationStrategy.ZEROS == preallocationStrategy) {
|
||||||
doPreallocationZeros(file);
|
doPreallocationZeros(file);
|
||||||
|
} else if (PreallocationStrategy.CHUNKED_ZEROS == preallocationStrategy) {
|
||||||
|
doPreallocationChunkedZeros(file);
|
||||||
} else {
|
} else {
|
||||||
doPreallocationSparseFile(file);
|
doPreallocationSparseFile(file);
|
||||||
}
|
}
|
||||||
|
@ -321,6 +325,31 @@ public class Journal {
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void doPreallocationChunkedZeros(RecoverableRandomAccessFile file) {
|
||||||
|
|
||||||
|
ByteBuffer buffer = ByteBuffer.allocate(PREALLOC_CHUNK_SIZE);
|
||||||
|
buffer.position(0);
|
||||||
|
buffer.limit(PREALLOC_CHUNK_SIZE);
|
||||||
|
|
||||||
|
try {
|
||||||
|
FileChannel channel = file.getChannel();
|
||||||
|
|
||||||
|
int remLen = maxFileLength;
|
||||||
|
while (remLen > 0) {
|
||||||
|
if (remLen < buffer.remaining()) {
|
||||||
|
buffer.limit(remLen);
|
||||||
|
}
|
||||||
|
int writeLen = channel.write(buffer);
|
||||||
|
remLen -= writeLen;
|
||||||
|
buffer.rewind();
|
||||||
|
}
|
||||||
|
|
||||||
|
channel.force(false);
|
||||||
|
channel.position(0);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Could not preallocate journal file with zeros! Will continue without preallocation", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
private static byte[] bytes(String string) {
|
private static byte[] bytes(String string) {
|
||||||
try {
|
try {
|
||||||
return string.getBytes("UTF-8");
|
return string.getBytes("UTF-8");
|
||||||
|
|
Loading…
Reference in New Issue