From 45e59e6e839ae89ffc099d32a4180ee307543aae Mon Sep 17 00:00:00 2001 From: Christian Posta Date: Tue, 17 Feb 2015 08:02:37 -0700 Subject: [PATCH] adding options for https://issues.apache.org/jira/browse/AMQ-5578 to allow configuring the allocation strategy at finer grained controls including zeroing out, OS copying, or sparse file --- .../kahadb/KahaDBPersistenceAdapter.java | 16 +++ .../store/kahadb/MessageDatabase.java | 31 ++++ .../store/kahadb/disk/journal/DataFile.java | 44 ++++++ .../kahadb/disk/journal/DataFileAppender.java | 8 +- .../store/kahadb/disk/journal/Journal.java | 132 ++++++++++++++++-- .../store/kahadb/disk/util/DiskBenchmark.java | 32 ++++- 6 files changed, 248 insertions(+), 15 deletions(-) diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java index 9b83a0e808..ebe12f38e3 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java @@ -511,6 +511,22 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements letter.setBrokerService(brokerService); } + public String getPreallocationScope() { + return letter.getPreallocationScope(); + } + + public void setPreallocationScope(String preallocationScope) { + this.letter.setPreallocationScope(preallocationScope); + } + + public String getPreallocationStrategy() { + return letter.getPreallocationStrategy(); + } + + public void setPreallocationStrategy(String preallocationStrategy) { + this.letter.setPreallocationStrategy(preallocationStrategy); + } + public boolean isArchiveDataLogs() { return letter.isArchiveDataLogs(); } diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 477f42c711..9fc29f4aab 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -237,8 +237,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe long cleanupInterval = 30*1000; int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; + int preallocationBatchSize = Journal.DEFAULT_PREALLOCATION_BATCH_SIZE; boolean enableIndexWriteAsync = false; int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; + private String preallocationScope = Journal.PreallocationScope.ENTIRE_JOURNAL.name(); + private String preallocationStrategy = Journal.PreallocationStrategy.SPARSE_FILE.name(); protected AtomicBoolean opened = new AtomicBoolean(); private boolean ignoreMissingJournalfiles = false; @@ -2487,6 +2490,10 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe manager.setArchiveDataLogs(isArchiveDataLogs()); manager.setSizeAccumulator(journalSize); manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs()); + manager.setPreallocationScope(Journal.PreallocationScope.valueOf(preallocationScope.trim().toUpperCase())); + manager.setPreallocationStrategy( + Journal.PreallocationStrategy.valueOf(preallocationStrategy.trim().toUpperCase())); + manager.setPreallocationBatchSize(preallocationBatchSize); if (getDirectoryArchive() != null) { IOHelper.mkdirs(getDirectoryArchive()); manager.setDirectoryArchive(getDirectoryArchive()); @@ -3175,4 +3182,28 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe interface IndexAware { public void sequenceAssignedWithIndexLocked(long index); } + + public String getPreallocationScope() { + return preallocationScope; + } + + public void setPreallocationScope(String preallocationScope) { + this.preallocationScope = preallocationScope; + } + + public String getPreallocationStrategy() { + return preallocationStrategy; + } + + public void setPreallocationStrategy(String preallocationStrategy) { + this.preallocationStrategy = preallocationStrategy; + } + + public int getPreallocationBatchSize() { + return preallocationBatchSize; + } + + public void setPreallocationBatchSize(int preallocationBatchSize) { + this.preallocationBatchSize = preallocationBatchSize; + } } diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java index 1c5ee3a89d..ac358665ca 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java @@ -18,11 +18,16 @@ package org.apache.activemq.store.kahadb.disk.journal; import java.io.File; import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import org.apache.activemq.store.kahadb.disk.util.LinkedNode; import org.apache.activemq.store.kahadb.disk.util.SequenceSet; import org.apache.activemq.util.IOHelper; import org.apache.activemq.util.RecoverableRandomAccessFile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * DataFile @@ -31,10 +36,13 @@ import org.apache.activemq.util.RecoverableRandomAccessFile; */ public class DataFile extends LinkedNode implements Comparable { + private static final Logger LOG = LoggerFactory.getLogger(DataFile.class); + protected final File file; protected final Integer dataFileId; protected volatile int length; protected final SequenceSet corruptedBlocks = new SequenceSet(); + protected long preallocationBatchWindow = 0L; DataFile(File file, int number) { this.file = file; @@ -60,6 +68,7 @@ public class DataFile extends LinkedNode implements Comparable implements Comparable