https://issues.apache.org/jira/browse/AMQ-5603 - add preallocationScope=full_journal_async that will preallocate a journal in advance or use to avoid latency jitter on journal rotation. Added none option to disable preallocation

This commit is contained in:
gtully 2016-04-29 16:57:03 +01:00
parent 3c342ffce4
commit 62bdbb0db5
14 changed files with 344 additions and 195 deletions

View File

@ -257,7 +257,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
boolean enableIndexWriteAsync = false;
int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
private String preallocationScope = Journal.PreallocationScope.ENTIRE_JOURNAL.name();
private String preallocationScope = Journal.PreallocationScope.ENTIRE_JOURNAL_ASYNC.name();
private String preallocationStrategy = Journal.PreallocationStrategy.SPARSE_FILE.name();
protected AtomicBoolean opened = new AtomicBoolean();
@ -1860,33 +1860,38 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
@Override
public void run() {
int journalToAdvance = -1;
Set<Integer> journalLogsReferenced = new HashSet<Integer>();
// Lock index to capture the ackMessageFileMap data
indexLock.writeLock().lock();
// Map keys might not be sorted, find the earliest log file to forward acks
// from and move only those, future cycles can chip away at more as needed.
// We won't move files that are themselves rewritten on a previous compaction.
List<Integer> journalFileIds = new ArrayList<Integer>(metadata.ackMessageFileMap.keySet());
Collections.sort(journalFileIds);
int journalToAdvance = -1;
for (Integer journalFileId : journalFileIds) {
DataFile current = journal.getDataFileById(journalFileId);
if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) {
journalToAdvance = journalFileId;
break;
try {
// Map keys might not be sorted, find the earliest log file to forward acks
// from and move only those, future cycles can chip away at more as needed.
// We won't move files that are themselves rewritten on a previous compaction.
List<Integer> journalFileIds = new ArrayList<Integer>(metadata.ackMessageFileMap.keySet());
Collections.sort(journalFileIds);
for (Integer journalFileId : journalFileIds) {
DataFile current = journal.getDataFileById(journalFileId);
if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) {
journalToAdvance = journalFileId;
break;
}
}
// Check if we found one, or if we only found the current file being written to.
if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) {
return;
}
journalLogsReferenced.addAll(metadata.ackMessageFileMap.get(journalToAdvance));
} finally {
indexLock.writeLock().unlock();
}
// Check if we found one, or if we only found the current file being written to.
if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) {
return;
}
Set<Integer> journalLogsReferenced =
new HashSet<Integer>(metadata.ackMessageFileMap.get(journalToAdvance));
indexLock.writeLock().unlock();
try {
// Background rewrite of the old acks
forwardAllAcks(journalToAdvance, journalLogsReferenced);

View File

@ -36,6 +36,7 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil
protected volatile int length;
protected int typeCode = STANDARD_LOG_FILE;
protected final SequenceSet corruptedBlocks = new SequenceSet();
protected RecoverableRandomAccessFile appendRandomAccessFile;
DataFile(File file, int number) {
this.file = file;
@ -76,12 +77,22 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil
return file.getName() + " number = " + dataFileId + " , length = " + length;
}
public synchronized RecoverableRandomAccessFile appendRandomAccessFile() throws IOException {
if (appendRandomAccessFile == null) {
appendRandomAccessFile = new RecoverableRandomAccessFile(file.getCanonicalPath(), "rw");
}
return appendRandomAccessFile;
}
public synchronized RecoverableRandomAccessFile openRandomAccessFile() throws IOException {
return new RecoverableRandomAccessFile(file.getCanonicalPath(), "rw");
}
public synchronized void closeRandomAccessFile(RecoverableRandomAccessFile file) throws IOException {
file.close();
if (file == appendRandomAccessFile) {
appendRandomAccessFile = null;
}
}
public synchronized boolean delete() throws IOException {

View File

@ -31,6 +31,9 @@ import org.apache.activemq.util.RecoverableRandomAccessFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.activemq.store.kahadb.disk.journal.Journal.EMPTY_BATCH_CONTROL_RECORD;
import static org.apache.activemq.store.kahadb.disk.journal.Journal.RECORD_HEAD_SPACE;
/**
* An optimized writer to do batch appends to a data file. This object is thread
* safe and gains throughput as you increase the number of concurrent writes it
@ -110,7 +113,7 @@ class DataFileAppender implements FileAppender {
public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException {
// Write the packet our internal buffer.
int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
int size = data.getLength() + RECORD_HEAD_SPACE;
final Location location = new Location();
location.setSize(size);
@ -138,7 +141,7 @@ class DataFileAppender implements FileAppender {
@Override
public Location storeItem(ByteSequence data, byte type, Runnable onComplete) throws IOException {
// Write the packet our internal buffer.
int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
int size = data.getLength() + RECORD_HEAD_SPACE;
final Location location = new Location();
location.setSize(size);
@ -179,12 +182,7 @@ class DataFileAppender implements FileAppender {
while ( true ) {
if (nextWriteBatch == null) {
DataFile file = journal.getOrCreateCurrentWriteFile();
if( file.getLength() + write.location.getSize() >= journal.getMaxFileLength() ) {
file = journal.rotateWriteFile();
}
DataFile file = journal.getCurrentDataFile(write.location.getSize());
nextWriteBatch = newWriteBatch(write, file);
enqueueMutex.notifyAll();
break;
@ -285,23 +283,14 @@ class DataFileAppender implements FileAppender {
dataFile.closeRandomAccessFile(file);
}
dataFile = wb.dataFile;
file = dataFile.openRandomAccessFile();
// pre allocate on first open of new file (length==0)
// note dataFile.length cannot be used because it is updated in enqueue
if (file.length() == 0l) {
journal.preallocateEntireJournalDataFile(file);
}
file = dataFile.appendRandomAccessFile();
}
Journal.WriteCommand write = wb.writes.getHead();
// Write an empty batch control record.
buff.reset();
buff.writeInt(Journal.BATCH_CONTROL_RECORD_SIZE);
buff.writeByte(Journal.BATCH_CONTROL_RECORD_TYPE);
buff.write(Journal.BATCH_CONTROL_RECORD_MAGIC);
buff.writeInt(0);
buff.writeLong(0);
buff.write(EMPTY_BATCH_CONTROL_RECORD);
boolean forceToDisk = false;
while (write != null) {
@ -312,19 +301,18 @@ class DataFileAppender implements FileAppender {
write = write.getNext();
}
// append 'unset' next batch (5 bytes) so read can always find eof
buff.writeInt(0);
buff.writeByte(0);
// append 'unset', zero length next batch so read can always find eof
buff.write(Journal.EOF_RECORD);
ByteSequence sequence = buff.toByteSequence();
// Now we can fill in the batch control record properly.
buff.reset();
buff.skip(5+Journal.BATCH_CONTROL_RECORD_MAGIC.length);
buff.writeInt(sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE - 5);
buff.skip(RECORD_HEAD_SPACE + Journal.BATCH_CONTROL_RECORD_MAGIC.length);
buff.writeInt(sequence.getLength() - Journal.BATCH_CONTROL_RECORD_SIZE - Journal.EOF_RECORD.length);
if( journal.isChecksum() ) {
Checksum checksum = new Adler32();
checksum.update(sequence.getData(), sequence.getOffset()+Journal.BATCH_CONTROL_RECORD_SIZE, sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE - 5);
checksum.update(sequence.getData(), sequence.getOffset()+Journal.BATCH_CONTROL_RECORD_SIZE, sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE-Journal.EOF_RECORD.length);
buff.writeLong(checksum.getValue());
}

View File

@ -23,19 +23,16 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.TreeMap;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.Adler32;
@ -43,13 +40,13 @@ import java.util.zip.Checksum;
import org.apache.activemq.store.kahadb.disk.util.LinkedNode;
import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList;
import org.apache.activemq.store.kahadb.disk.util.SchedulerTimerTask;
import org.apache.activemq.store.kahadb.disk.util.Sequence;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.DataByteArrayInputStream;
import org.apache.activemq.util.DataByteArrayOutputStream;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.RecoverableRandomAccessFile;
import org.apache.activemq.util.ThreadPoolUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -73,6 +70,12 @@ public class Journal {
public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH");
public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE + BATCH_CONTROL_RECORD_MAGIC.length + 4 + 8;
public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader();
public static final byte[] EMPTY_BATCH_CONTROL_RECORD = createEmptyBatchControlRecordHeader();
public static final int EOF_INT = ByteBuffer.wrap(new byte[]{'-', 'q', 'M', 'a'}).getInt();
public static final byte EOF_EOT = '4';
public static final byte[] EOF_RECORD = createEofBatchAndLocationRecord();
private ScheduledExecutorService scheduler;
// tackle corruption when checksum is disabled or corrupt with zeros, minimize data loss
public void corruptRecoveryLocation(Location recoveryPosition) throws IOException {
@ -103,7 +106,9 @@ public class Journal {
}
public enum PreallocationScope {
ENTIRE_JOURNAL;
ENTIRE_JOURNAL,
ENTIRE_JOURNAL_ASYNC,
NONE;
}
private static byte[] createBatchControlRecordHeader() {
@ -119,13 +124,39 @@ public class Journal {
}
}
private static byte[] createEmptyBatchControlRecordHeader() {
try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) {
os.writeInt(BATCH_CONTROL_RECORD_SIZE);
os.writeByte(BATCH_CONTROL_RECORD_TYPE);
os.write(BATCH_CONTROL_RECORD_MAGIC);
os.writeInt(0);
os.writeLong(0l);
ByteSequence sequence = os.toByteSequence();
sequence.compact();
return sequence.getData();
} catch (IOException e) {
throw new RuntimeException("Could not create empty batch control record header.", e);
}
}
private static byte[] createEofBatchAndLocationRecord() {
try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) {
os.writeInt(EOF_INT);
os.writeByte(EOF_EOT);
ByteSequence sequence = os.toByteSequence();
sequence.compact();
return sequence.getData();
} catch (IOException e) {
throw new RuntimeException("Could not create eof header.", e);
}
}
public static final String DEFAULT_DIRECTORY = ".";
public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive";
public static final String DEFAULT_FILE_PREFIX = "db-";
public static final String DEFAULT_FILE_SUFFIX = ".log";
public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30;
public static final int PREFERED_DIFF = 1024 * 512;
public static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4;
private static final Logger LOG = LoggerFactory.getLogger(Journal.class);
@ -151,18 +182,21 @@ public class Journal {
protected LinkedNodeList<DataFile> dataFiles = new LinkedNodeList<DataFile>();
protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
protected Runnable cleanupTask;
protected ScheduledFuture cleanupTask;
protected AtomicLong totalLength = new AtomicLong();
protected boolean archiveDataLogs;
private ReplicationTarget replicationTarget;
protected boolean checksum;
protected boolean checkForCorruptionOnStartup;
protected boolean enableAsyncDiskSync = true;
private Timer timer;
private int nextDataFileId = 1;
private Object dataFileIdLock = new Object();
private final AtomicReference<DataFile> currentDataFile = new AtomicReference<>(null);
private volatile DataFile nextDataFile;
protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL;
protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL_ASYNC;
protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE;
private File osKernelCopyTemplateFile = null;
public interface DataFileRemovedListener {
void fileRemoved(DataFile datafile);
@ -204,13 +238,15 @@ public class Journal {
// Sort the list so that we can link the DataFiles together in the
// right order.
List<DataFile> l = new ArrayList<DataFile>(fileMap.values());
LinkedList<DataFile> l = new LinkedList<>(fileMap.values());
Collections.sort(l);
for (DataFile df : l) {
if (df.getLength() == 0) {
// possibly the result of a previous failed write
LOG.info("ignoring zero length, partially initialised journal data file: " + df);
continue;
} else if (l.getLast().equals(df) && isUnusedPreallocated(df)) {
continue;
}
dataFiles.addLast(df);
fileByFileMap.put(df.getFile(), df);
@ -221,9 +257,31 @@ public class Journal {
}
}
nextDataFileId = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1 : 1;
if (preallocationScope != PreallocationScope.NONE && preallocationStrategy == PreallocationStrategy.OS_KERNEL_COPY) {
// create a template file that will be used to pre-allocate the journal files
if (osKernelCopyTemplateFile == null) {
osKernelCopyTemplateFile = createJournalTemplateFile();
}
}
getOrCreateCurrentWriteFile();
scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread schedulerThread = new Thread(r);
schedulerThread.setName("ActiveMQ Journal Scheduled executor");
schedulerThread.setDaemon(true);
return schedulerThread;
}
});
// init current write file
if (dataFiles.isEmpty()) {
nextDataFileId = 1;
rotateWriteFile();
} else {
currentDataFile.set(dataFiles.getTail());
nextDataFileId = currentDataFile.get().dataFileId + 1;
}
if (preallocationStrategy != PreallocationStrategy.SPARSE_FILE && maxFileLength != DEFAULT_MAX_FILE_LENGTH) {
LOG.warn("You are using a preallocation strategy and journal maxFileLength which should be benchmarked accordingly to not introduce unexpected latencies.");
@ -239,23 +297,20 @@ public class Journal {
totalLength.addAndGet(lastAppendLocation.get().getOffset() - maxFileLength);
}
cleanupTask = new Runnable() {
cleanupTask = scheduler.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
cleanup();
}
};
}, DEFAULT_CLEANUP_INTERVAL, DEFAULT_CLEANUP_INTERVAL, TimeUnit.MILLISECONDS);
this.timer = new Timer("KahaDB Scheduler", true);
TimerTask task = new SchedulerTimerTask(cleanupTask);
this.timer.scheduleAtFixedRate(task, DEFAULT_CLEANUP_INTERVAL,DEFAULT_CLEANUP_INTERVAL);
long end = System.currentTimeMillis();
LOG.trace("Startup took: "+(end-start)+" ms");
}
public void preallocateEntireJournalDataFile(RecoverableRandomAccessFile file) {
if (PreallocationScope.ENTIRE_JOURNAL == preallocationScope) {
if (PreallocationScope.NONE != preallocationScope) {
if (PreallocationStrategy.OS_KERNEL_COPY == preallocationStrategy) {
doPreallocationKernelCopy(file);
@ -266,58 +321,68 @@ public class Journal {
} else {
doPreallocationSparseFile(file);
}
} else {
LOG.info("Using journal preallocation scope of batch allocation");
}
}
private void doPreallocationSparseFile(RecoverableRandomAccessFile file) {
final ByteBuffer journalEof = ByteBuffer.wrap(EOF_RECORD);
try {
file.seek(maxFileLength - 1);
file.write((byte)0x00);
FileChannel channel = file.getChannel();
channel.position(0);
channel.write(journalEof);
channel.position(maxFileLength - 5);
journalEof.rewind();
channel.write(journalEof);
channel.force(false);
channel.position(0);
} catch (ClosedByInterruptException ignored) {
LOG.trace("Could not preallocate journal file with sparse file", ignored);
} catch (IOException e) {
LOG.error("Could not preallocate journal file with sparse file! Will continue without preallocation", e);
LOG.error("Could not preallocate journal file with sparse file", e);
}
}
private void doPreallocationZeros(RecoverableRandomAccessFile file) {
ByteBuffer buffer = ByteBuffer.allocate(maxFileLength);
buffer.put(EOF_RECORD);
buffer.rewind();
try {
FileChannel channel = file.getChannel();
channel.write(buffer);
channel.force(false);
channel.position(0);
} catch (ClosedByInterruptException ignored) {
LOG.trace("Could not preallocate journal file with zeros", ignored);
} catch (IOException e) {
LOG.error("Could not preallocate journal file with zeros! Will continue without preallocation", e);
LOG.error("Could not preallocate journal file with zeros", e);
}
}
private void doPreallocationKernelCopy(RecoverableRandomAccessFile file) {
// create a template file that will be used to pre-allocate the journal files
File templateFile = createJournalTemplateFile();
RandomAccessFile templateRaf = null;
try {
templateRaf = new RandomAccessFile(templateFile, "rw");
templateRaf.setLength(maxFileLength);
templateRaf.getChannel().force(true);
RandomAccessFile templateRaf = new RandomAccessFile(osKernelCopyTemplateFile, "rw");
templateRaf.getChannel().transferTo(0, getMaxFileLength(), file.getChannel());
templateRaf.close();
templateFile.delete();
} catch (ClosedByInterruptException ignored) {
LOG.trace("Could not preallocate journal file with kernel copy", ignored);
} catch (FileNotFoundException e) {
LOG.error("Could not find the template file on disk at " + templateFile.getAbsolutePath(), e);
LOG.error("Could not find the template file on disk at " + osKernelCopyTemplateFile.getAbsolutePath(), e);
} catch (IOException e) {
LOG.error("Could not transfer the template file to journal, transferFile=" + templateFile.getAbsolutePath(), e);
LOG.error("Could not transfer the template file to journal, transferFile=" + osKernelCopyTemplateFile.getAbsolutePath(), e);
}
}
private File createJournalTemplateFile() {
String fileName = "db-log.template";
File rc = new File(directory, fileName);
if (rc.exists()) {
LOG.trace("deleting journal template file because it already exists...");
rc.delete();
try (RandomAccessFile templateRaf = new RandomAccessFile(rc, "rw");) {
templateRaf.getChannel().write(ByteBuffer.wrap(EOF_RECORD));
templateRaf.setLength(maxFileLength);
templateRaf.getChannel().force(true);
} catch (FileNotFoundException e) {
LOG.error("Could not find the template file on disk at " + osKernelCopyTemplateFile.getAbsolutePath(), e);
} catch (IOException e) {
LOG.error("Could not transfer the template file to journal, transferFile=" + osKernelCopyTemplateFile.getAbsolutePath(), e);
}
return rc;
}
@ -325,6 +390,8 @@ public class Journal {
private void doPreallocationChunkedZeros(RecoverableRandomAccessFile file) {
ByteBuffer buffer = ByteBuffer.allocate(PREALLOC_CHUNK_SIZE);
buffer.put(EOF_RECORD);
buffer.rewind();
try {
FileChannel channel = file.getChannel();
@ -354,6 +421,24 @@ public class Journal {
}
}
public boolean isUnusedPreallocated(DataFile dataFile) throws IOException {
int firstBatchRecordSize = -1;
if (preallocationScope == PreallocationScope.ENTIRE_JOURNAL_ASYNC) {
Location location = new Location();
location.setDataFileId(dataFile.getDataFileId());
location.setOffset(0);
DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
try {
firstBatchRecordSize = checkBatchRecord(reader, location.getOffset());
} catch (Exception ignored) {
} finally {
accessorPool.closeDataFileAccessor(reader);
}
}
return firstBatchRecordSize == 0;
}
protected Location recoveryCheck(DataFile dataFile) throws IOException {
Location location = new Location();
location.setDataFileId(dataFile.getDataFileId());
@ -364,6 +449,10 @@ public class Journal {
while (true) {
int size = checkBatchRecord(reader, location.getOffset());
if (size >= 0 && location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size <= dataFile.getLength()) {
if (size == 0) {
// eof batch record
break;
}
location.setOffset(location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size);
} else {
@ -433,6 +522,12 @@ public class Journal {
reader.readFully(offset, controlRecord);
// check for journal eof
if (Arrays.equals(EOF_RECORD, Arrays.copyOfRange(controlRecord, 0, EOF_RECORD.length))) {
// eof batch
return 0;
}
// Assert that it's a batch record.
for (int i = 0; i < BATCH_CONTROL_RECORD_HEADER.length; i++) {
if (controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i]) {
@ -476,42 +571,67 @@ public class Journal {
return totalLength.get();
}
synchronized DataFile getOrCreateCurrentWriteFile() throws IOException {
if (dataFiles.isEmpty()) {
rotateWriteFile();
private void rotateWriteFile() throws IOException {
synchronized (dataFileIdLock) {
DataFile dataFile = nextDataFile;
if (dataFile == null) {
dataFile = newDataFile();
}
synchronized (currentDataFile) {
fileMap.put(dataFile.getDataFileId(), dataFile);
fileByFileMap.put(dataFile.getFile(), dataFile);
dataFiles.addLast(dataFile);
currentDataFile.set(dataFile);
}
nextDataFile = null;
}
DataFile current = dataFiles.getTail();
if (current != null) {
return current;
} else {
return rotateWriteFile();
if (PreallocationScope.ENTIRE_JOURNAL_ASYNC == preallocationScope) {
preAllocateNextDataFileFuture = scheduler.submit(preAllocateNextDataFileTask);
}
}
synchronized DataFile rotateWriteFile() {
private Runnable preAllocateNextDataFileTask = new Runnable() {
@Override
public void run() {
if (nextDataFile == null) {
synchronized (dataFileIdLock){
try {
nextDataFile = newDataFile();
} catch (IOException e) {
LOG.warn("Failed to proactively allocate data file", e);
}
}
}
}
};
private volatile Future preAllocateNextDataFileFuture;
private DataFile newDataFile() throws IOException {
int nextNum = nextDataFileId++;
File file = getFile(nextNum);
DataFile nextWriteFile = new DataFile(file, nextNum);
fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
fileByFileMap.put(file, nextWriteFile);
dataFiles.addLast(nextWriteFile);
preallocateEntireJournalDataFile(nextWriteFile.appendRandomAccessFile());
return nextWriteFile;
}
public synchronized DataFile reserveDataFile() {
int nextNum = nextDataFileId++;
File file = getFile(nextNum);
DataFile reservedDataFile = new DataFile(file, nextNum);
fileMap.put(reservedDataFile.getDataFileId(), reservedDataFile);
fileByFileMap.put(file, reservedDataFile);
if (dataFiles.isEmpty()) {
dataFiles.addLast(reservedDataFile);
} else {
dataFiles.getTail().linkBefore(reservedDataFile);
public DataFile reserveDataFile() {
synchronized (dataFileIdLock) {
int nextNum = nextDataFileId++;
File file = getFile(nextNum);
DataFile reservedDataFile = new DataFile(file, nextNum);
synchronized (currentDataFile) {
fileMap.put(reservedDataFile.getDataFileId(), reservedDataFile);
fileByFileMap.put(file, reservedDataFile);
if (dataFiles.isEmpty()) {
dataFiles.addLast(reservedDataFile);
} else {
dataFiles.getTail().linkBefore(reservedDataFile);
}
}
return reservedDataFile;
}
return reservedDataFile;
}
public File getFile(int nextNum) {
@ -520,9 +640,12 @@ public class Journal {
return file;
}
synchronized DataFile getDataFile(Location item) throws IOException {
DataFile getDataFile(Location item) throws IOException {
Integer key = Integer.valueOf(item.getDataFileId());
DataFile dataFile = fileMap.get(key);
DataFile dataFile = null;
synchronized (currentDataFile) {
dataFile = fileMap.get(key);
}
if (dataFile == null) {
LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
throw new IOException("Could not locate data file " + getFile(item.getDataFileId()));
@ -530,29 +653,21 @@ public class Journal {
return dataFile;
}
synchronized File getFile(Location item) throws IOException {
Integer key = Integer.valueOf(item.getDataFileId());
DataFile dataFile = fileMap.get(key);
if (dataFile == null) {
LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
throw new IOException("Could not locate data file " + getFile(item.getDataFileId()));
}
return dataFile.getFile();
}
public void close() throws IOException {
synchronized (this) {
if (!started) {
return;
}
if (this.timer != null) {
this.timer.cancel();
cleanupTask.cancel(true);
if (preAllocateNextDataFileFuture != null) {
preAllocateNextDataFileFuture.cancel(true);
}
ThreadPoolUtils.shutdownGraceful(scheduler, 4000);
accessorPool.close();
}
// the appender can be calling back to to the journal blocking a close AMQ-5620
appender.close();
synchronized (this) {
synchronized (currentDataFile) {
fileMap.clear();
fileByFileMap.clear();
dataFiles.clear();
@ -579,37 +694,52 @@ public class Journal {
result &= dataFile.delete();
}
totalLength.set(0);
fileMap.clear();
fileByFileMap.clear();
lastAppendLocation.set(null);
dataFiles = new LinkedNodeList<DataFile>();
if (preAllocateNextDataFileFuture != null) {
preAllocateNextDataFileFuture.cancel(true);
}
synchronized (dataFileIdLock) {
if (nextDataFile != null) {
nextDataFile.delete();
nextDataFile = null;
}
}
totalLength.set(0);
synchronized (currentDataFile) {
fileMap.clear();
fileByFileMap.clear();
lastAppendLocation.set(null);
dataFiles = new LinkedNodeList<DataFile>();
}
// reopen open file handles...
accessorPool = new DataFileAccessorPool(this);
appender = new DataFileAppender(this);
return result;
}
public synchronized void removeDataFiles(Set<Integer> files) throws IOException {
public void removeDataFiles(Set<Integer> files) throws IOException {
for (Integer key : files) {
// Can't remove the data file (or subsequent files) that is currently being written to.
if (key >= lastAppendLocation.get().getDataFileId()) {
continue;
}
DataFile dataFile = fileMap.get(key);
DataFile dataFile = null;
synchronized (currentDataFile) {
dataFile = fileMap.remove(key);
if (dataFile != null) {
fileByFileMap.remove(dataFile.getFile());
dataFile.unlink();
}
}
if (dataFile != null) {
forceRemoveDataFile(dataFile);
}
}
}
private synchronized void forceRemoveDataFile(DataFile dataFile) throws IOException {
private void forceRemoveDataFile(DataFile dataFile) throws IOException {
accessorPool.disposeDataFileAccessors(dataFile);
fileByFileMap.remove(dataFile.getFile());
fileMap.remove(dataFile.getDataFileId());
totalLength.addAndGet(-dataFile.getLength());
dataFile.unlink();
if (archiveDataLogs) {
File directoryArchive = getDirectoryArchive();
if (directoryArchive.exists()) {
@ -657,13 +787,15 @@ public class Journal {
return directory.toString();
}
public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException {
public Location getNextLocation(Location location) throws IOException, IllegalStateException {
Location cur = null;
while (true) {
if (cur == null) {
if (location == null) {
DataFile head = dataFiles.getHead();
DataFile head = null;
synchronized (currentDataFile) {
head = dataFiles.getHead();
}
if (head == null) {
return null;
}
@ -687,7 +819,9 @@ public class Journal {
// Did it go into the next file??
if (dataFile.getLength() <= cur.getOffset()) {
dataFile = dataFile.getNext();
synchronized (currentDataFile) {
dataFile = dataFile.getNext();
}
if (dataFile == null) {
return null;
} else {
@ -708,9 +842,14 @@ public class Journal {
if (corruptedRange != null) {
// skip corruption
cur.setSize((int) corruptedRange.range());
} else if (cur.getType() == 0) {
} else if (cur.getSize() == EOF_INT && cur.getType() == EOF_EOT ||
(cur.getType() == 0 && cur.getSize() == 0)) {
// eof - jump to next datafile
cur.setOffset(maxFileLength);
// EOF_INT and EOF_EOT replace 0,0 - we need to react to both for
// replay of existing journals
// possibly journal is larger than maxFileLength after config change
cur.setSize(EOF_RECORD.length);
cur.setOffset(Math.max(maxFileLength, dataFile.getLength()));
} else if (cur.getType() == USER_RECORD_TYPE) {
// Only return user records.
return cur;
@ -718,7 +857,7 @@ public class Journal {
}
}
public synchronized ByteSequence read(Location location) throws IOException, IllegalStateException {
public ByteSequence read(Location location) throws IOException, IllegalStateException {
DataFile dataFile = getDataFile(location);
DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
ByteSequence rc = null;
@ -816,34 +955,24 @@ public class Journal {
this.archiveDataLogs = archiveDataLogs;
}
public synchronized DataFile getDataFileById(int dataFileId) {
if (dataFiles.isEmpty()) {
return null;
}
return fileMap.get(Integer.valueOf(dataFileId));
}
public synchronized DataFile getCurrentDataFile() {
if (dataFiles.isEmpty()) {
return null;
}
DataFile current = dataFiles.getTail();
if (current != null) {
return current;
} else {
return null;
public DataFile getDataFileById(int dataFileId) {
synchronized (currentDataFile) {
return fileMap.get(Integer.valueOf(dataFileId));
}
}
public synchronized Integer getCurrentDataFileId() {
DataFile current = getCurrentDataFile();
if (current != null) {
return current.getDataFileId();
} else {
return null;
public DataFile getCurrentDataFile(int capacity) throws IOException {
synchronized (currentDataFile) {
if (currentDataFile.get().getLength() + capacity >= maxFileLength) {
rotateWriteFile();
}
return currentDataFile.get();
}
}
public Integer getCurrentDataFileId() {
synchronized (currentDataFile) {
return currentDataFile.get().getDataFileId();
}
}
@ -853,11 +982,15 @@ public class Journal {
* @return files currently being used
*/
public Set<File> getFiles() {
return fileByFileMap.keySet();
synchronized (currentDataFile) {
return fileByFileMap.keySet();
}
}
public synchronized Map<Integer, DataFile> getFileMap() {
return new TreeMap<Integer, DataFile>(fileMap);
public Map<Integer, DataFile> getFileMap() {
synchronized (currentDataFile) {
return new TreeMap<Integer, DataFile>(fileMap);
}
}
public long getDiskSize() {

View File

@ -227,20 +227,18 @@ public class TargetedDataFileAppender implements FileAppender {
}
// append 'unset' next batch (5 bytes) so read can always find eof
buff.writeInt(0);
buff.writeByte(0);
buff.write(Journal.EOF_RECORD);
ByteSequence sequence = buff.toByteSequence();
// Now we can fill in the batch control record properly.
buff.reset();
buff.skip(5 + Journal.BATCH_CONTROL_RECORD_MAGIC.length);
buff.writeInt(sequence.getLength() - Journal.BATCH_CONTROL_RECORD_SIZE - 5);
buff.writeInt(sequence.getLength() - Journal.BATCH_CONTROL_RECORD_SIZE - Journal.EOF_RECORD.length);
if (journal.isChecksum()) {
Checksum checksum = new Adler32();
checksum.update(sequence.getData(),
sequence.getOffset() + Journal.BATCH_CONTROL_RECORD_SIZE,
sequence.getLength() - Journal.BATCH_CONTROL_RECORD_SIZE - 5);
sequence.getLength() - Journal.BATCH_CONTROL_RECORD_SIZE - Journal.EOF_RECORD.length);
buff.writeLong(checksum.getValue());
}

View File

@ -128,8 +128,8 @@ public class JournalCorruptionEofIndexRecoveryTest {
adapter.setCheckForCorruptJournalFiles(true);
adapter.setIgnoreMissingJournalfiles(ignoreMissingJournalFiles);
adapter.setPreallocationStrategy("zeros");
adapter.setPreallocationScope("entire_journal");
adapter.setPreallocationStrategy(Journal.PreallocationStrategy.ZEROS.name());
adapter.setPreallocationScope(Journal.PreallocationScope.ENTIRE_JOURNAL_ASYNC.name());
}
@After
@ -259,6 +259,7 @@ public class JournalCorruptionEofIndexRecoveryTest {
corruptOrderIndex(id, size);
randomAccessFile.getChannel().force(true);
dataFile.closeRandomAccessFile(randomAccessFile);
}
private void corruptBatchEndEof(int id) throws Exception{

View File

@ -114,6 +114,8 @@ public class JournalCorruptionIndexRecoveryTest {
adapter.setCheckForCorruptJournalFiles(true);
adapter.setIgnoreMissingJournalfiles(true);
adapter.setPreallocationScope(Journal.PreallocationScope.ENTIRE_JOURNAL_ASYNC.name());
}
@After

View File

@ -18,6 +18,7 @@ package org.apache.activemq.store.kahadb.disk.journal;
import java.io.File;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;

View File

@ -39,10 +39,13 @@ public class PreallocationJournalLatencyTest {
TimeStatisticImpl sparse = executeTest(Journal.PreallocationStrategy.SPARSE_FILE.name());
TimeStatisticImpl chunked_zeros = executeTest(Journal.PreallocationStrategy.CHUNKED_ZEROS.name());
TimeStatisticImpl zeros = executeTest(Journal.PreallocationStrategy.ZEROS.name());
//TimeStatisticImpl zeros = executeTest(Journal.PreallocationStrategy.ZEROS.name());
TimeStatisticImpl kernel = executeTest(Journal.PreallocationStrategy.OS_KERNEL_COPY.name());
LOG.info(" sparse: " + sparse);
LOG.info(" chunked: " + chunked_zeros);
LOG.info(" zeros: " + zeros);
//LOG.info(" zeros: " + zeros);
LOG.info(" kernel: " + kernel);
}
@ -50,11 +53,13 @@ public class PreallocationJournalLatencyTest {
int randInt = rand.nextInt(100);
File dataDirectory = new File("./target/activemq-data/kahadb" + randInt);
KahaDBStore store = new KahaDBStore();
store.setJournalMaxFileLength(16*1204*1024);
final KahaDBStore store = new KahaDBStore();
store.setCheckpointInterval(5000);
store.setJournalMaxFileLength(32*1204*1024);
store.deleteAllMessages();
store.setDirectory(dataDirectory);
store.setPreallocationStrategy(preallocationStrategy);
store.setPreallocationScope(Journal.PreallocationScope.ENTIRE_JOURNAL_ASYNC.name());
store.start();
final File journalLog = new File(dataDirectory, "db-1.log");
@ -66,7 +71,7 @@ public class PreallocationJournalLatencyTest {
}));
final Journal journal = store.getJournal();
ByteSequence byteSequence = new ByteSequence(new byte[8*1024]);
ByteSequence byteSequence = new ByteSequence(new byte[16*1024]);
TimeStatisticImpl timeStatistic = new TimeStatisticImpl("append", "duration");
for (int i=0;i<5000; i++) {

View File

@ -20,6 +20,7 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.util.ByteSequence;

View File

@ -41,6 +41,7 @@ import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.disk.journal.Journal;
import org.apache.activemq.util.IntrospectionSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -224,6 +225,7 @@ public class AMQ2584ConcurrentDlqTest extends org.apache.activemq.TestSupport {
properties.put("maxFileLength", maxFileLengthVal);
properties.put("cleanupInterval", "2000");
properties.put("checkpointInterval", "2000");
properties.put("preallocationScope", Journal.PreallocationScope.ENTIRE_JOURNAL.name());
// there are problems with duplicate dispatch in the cursor, which maintain
// a map of messages. A dup dispatch can be dropped.
// see: org.apache.activemq.broker.region.cursors.OrderedPendingList

View File

@ -31,6 +31,7 @@ import org.junit.Test;
import javax.jms.*;
import java.io.File;
import java.util.Arrays;
import static org.junit.Assert.assertEquals;
@ -101,6 +102,7 @@ public class AMQ3120Test {
private int getFileCount(File dir){
if (dir.isDirectory()) {
String[] children = dir.list();
LOG.info("Children: " + Arrays.asList(children));
return children.length;
}
@ -112,7 +114,7 @@ public class AMQ3120Test {
final int messageCount = 500;
startBroker(true);
int fileCount = getFileCount(kahaDbDir);
assertEquals(4, fileCount);
assertEquals(5, fileCount);
Connection connection = new ActiveMQConnectionFactory(
broker.getTransportConnectors().get(0).getConnectUri()).createConnection();

View File

@ -116,7 +116,7 @@ public class AMQ4323Test {
final int messageCount = 500;
startBroker(true);
int fileCount = getFileCount(kahaDbDir);
assertEquals(4, fileCount);
assertEquals(5, fileCount);
Connection connection = new ActiveMQConnectionFactory(
broker.getTransportConnectors().get(0).getConnectUri()).createConnection();
@ -149,7 +149,7 @@ public class AMQ4323Test {
public boolean isSatisified() throws Exception {
int fileCount = getFileCount(kahaDbDir);
LOG.info("current filecount:" + fileCount);
return 4 == fileCount;
return 5 == fileCount;
}
}));

View File

@ -135,7 +135,7 @@ public class KahaDBIndexLocationTest {
// Should contain the initial log for the journal and the lock.
assertNotNull(journal);
assertEquals(2, journal.length);
assertEquals(3, journal.length);
}
@Test