mirror of https://github.com/apache/activemq.git
add experimental appender that takes the buffering burden from the writer thread, it and some trace enabled via -Dorg.apache.kahadb.journal.appender.WRITE_STAT_WINDOW=10000 -Dorg.apache.kahadb.journal.CALLER_BUFFER_APPENDER=true. Additional accessors on KahaDb to further configure index for the fast but may need recovery case
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1222471 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
cb1b92bf58
commit
bb4a2f73f4
|
@ -537,6 +537,30 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
letter.setUseIndexLFRUEviction(useIndexLFRUEviction);
|
||||
}
|
||||
|
||||
public void setEnableIndexDiskSyncs(boolean diskSyncs) {
|
||||
letter.setEnableIndexDiskSyncs(diskSyncs);
|
||||
}
|
||||
|
||||
public boolean isEnableIndexDiskSyncs() {
|
||||
return letter.isEnableIndexDiskSyncs();
|
||||
}
|
||||
|
||||
public void setEnableIndexRecoveryFile(boolean enable) {
|
||||
letter.setEnableIndexRecoveryFile(enable);
|
||||
}
|
||||
|
||||
public boolean isEnableIndexRecoveryFile() {
|
||||
return letter.isEnableIndexRecoveryFile();
|
||||
}
|
||||
|
||||
public void setEnableIndexPageCaching(boolean enable) {
|
||||
letter.setEnableIndexPageCaching(enable);
|
||||
}
|
||||
|
||||
public boolean isEnableIndexPageCaching() {
|
||||
return isEnableIndexPageCaching();
|
||||
}
|
||||
|
||||
public KahaDBStore getStore() {
|
||||
return letter;
|
||||
}
|
||||
|
|
|
@ -184,6 +184,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
private boolean archiveCorruptedIndex = false;
|
||||
private boolean useIndexLFRUEviction = false;
|
||||
private float indexLFUEvictionFactor = 0.2f;
|
||||
private boolean enableIndexDiskSyncs = true;
|
||||
private boolean enableIndexRecoveryFile = true;
|
||||
private boolean enableIndexPageCaching = true;
|
||||
|
||||
public MessageDatabase() {
|
||||
}
|
||||
|
@ -2058,6 +2061,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
index.setPageCacheSize(indexCacheSize);
|
||||
index.setUseLFRUEviction(isUseIndexLFRUEviction());
|
||||
index.setLFUEvictionFactor(getIndexLFUEvictionFactor());
|
||||
index.setEnableDiskSyncs(isEnableIndexDiskSyncs());
|
||||
index.setEnableRecoveryFile(isEnableIndexRecoveryFile());
|
||||
index.setEnablePageCaching(isEnableIndexPageCaching());
|
||||
return index;
|
||||
}
|
||||
|
||||
|
@ -2297,6 +2303,30 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
this.useIndexLFRUEviction = useIndexLFRUEviction;
|
||||
}
|
||||
|
||||
public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) {
|
||||
this.enableIndexDiskSyncs = enableIndexDiskSyncs;
|
||||
}
|
||||
|
||||
public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) {
|
||||
this.enableIndexRecoveryFile = enableIndexRecoveryFile;
|
||||
}
|
||||
|
||||
public void setEnableIndexPageCaching(boolean enableIndexPageCaching) {
|
||||
this.enableIndexPageCaching = enableIndexPageCaching;
|
||||
}
|
||||
|
||||
public boolean isEnableIndexDiskSyncs() {
|
||||
return enableIndexDiskSyncs;
|
||||
}
|
||||
|
||||
public boolean isEnableIndexRecoveryFile() {
|
||||
return enableIndexRecoveryFile;
|
||||
}
|
||||
|
||||
public boolean isEnableIndexPageCaching() {
|
||||
return enableIndexPageCaching;
|
||||
}
|
||||
|
||||
// /////////////////////////////////////////////////////////////////
|
||||
// Internal conversion methods.
|
||||
// /////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.activemq.command.ActiveMQQueue;
|
|||
import org.apache.activemq.command.ConnectionControl;
|
||||
import org.junit.After;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -51,7 +52,7 @@ public class KahaDBFastEnqueueTest {
|
|||
private boolean useBytesMessage= true;
|
||||
private final int parallelProducer = 2;
|
||||
private Vector<Exception> exceptions = new Vector<Exception>();
|
||||
final long toSend = 500000;
|
||||
final long toSend = 1000;//500000;
|
||||
|
||||
@Ignore("not ready yet, exploring getting broker disk bound")
|
||||
public void testPublishNoConsumer() throws Exception {
|
||||
|
@ -91,8 +92,8 @@ public class KahaDBFastEnqueueTest {
|
|||
System.out.println("Journal writes %: " + kahaDBPersistenceAdapter.getStore().getJournal().length() / (double)totalSent * 100 + "%");
|
||||
//System.out.println("Index writes %: " + kahaDBPersistenceAdapter.getStore().getPageFile().totalWritten / (double)totalSent * 100 + "%");
|
||||
|
||||
//restartBroker(0);
|
||||
//consumeMessages(toSend);
|
||||
restartBroker(0);
|
||||
consumeMessages(toSend);
|
||||
}
|
||||
|
||||
private void consumeMessages(long count) throws Exception {
|
||||
|
@ -158,11 +159,13 @@ public class KahaDBFastEnqueueTest {
|
|||
kahaDBPersistenceAdapter.setCheckpointInterval(20 * 60 * 1000);
|
||||
|
||||
// optimise for disk best batch rate
|
||||
//kahaDBPersistenceAdapter.setJournalMaxWriteBatchSize(128*1024); //4mb default
|
||||
kahaDBPersistenceAdapter.setJournalMaxFileLength(1024*1024*1024); // 32mb default
|
||||
kahaDBPersistenceAdapter.setJournalMaxWriteBatchSize(24*1024*1024); //4mb default
|
||||
kahaDBPersistenceAdapter.setJournalMaxFileLength(128*1024*1024); // 32mb default
|
||||
// keep index in memory
|
||||
kahaDBPersistenceAdapter.setIndexCacheSize(500000);
|
||||
kahaDBPersistenceAdapter.setIndexWriteBatchSize(500000);
|
||||
kahaDBPersistenceAdapter.setEnableIndexRecoveryFile(false);
|
||||
kahaDBPersistenceAdapter.setEnableIndexDiskSyncs(false);
|
||||
|
||||
broker.setUseJmx(false);
|
||||
broker.addConnector("tcp://0.0.0.0:0");
|
||||
|
|
|
@ -0,0 +1,393 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kahadb.journal;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.zip.Adler32;
|
||||
import java.util.zip.Checksum;
|
||||
import org.apache.kahadb.util.ByteSequence;
|
||||
import org.apache.kahadb.util.DataByteArrayOutputStream;
|
||||
import org.apache.kahadb.util.LinkedNodeList;
|
||||
|
||||
/**
|
||||
* An optimized writer to do batch appends to a data file. This object is thread
|
||||
* safe and gains throughput as you increase the number of concurrent writes it
|
||||
* does.
|
||||
* The thread calling enqueue does the file open and buffering of the data, which
|
||||
* reduces the round trip of the write thread.
|
||||
*
|
||||
*/
|
||||
class CallerBufferingDataFileAppender implements FileAppender {
|
||||
|
||||
protected final Journal journal;
|
||||
protected final Map<Journal.WriteKey, Journal.WriteCommand> inflightWrites;
|
||||
protected final Object enqueueMutex = new Object() {
|
||||
};
|
||||
protected WriteBatch nextWriteBatch;
|
||||
|
||||
protected boolean shutdown;
|
||||
protected IOException firstAsyncException;
|
||||
protected final CountDownLatch shutdownDone = new CountDownLatch(1);
|
||||
protected int maxWriteBatchSize;
|
||||
|
||||
private boolean running;
|
||||
private Thread thread;
|
||||
|
||||
final DataByteArrayOutputStream cachedBuffers[] = new DataByteArrayOutputStream[] {
|
||||
new DataByteArrayOutputStream(maxWriteBatchSize),
|
||||
new DataByteArrayOutputStream(maxWriteBatchSize)
|
||||
};
|
||||
AtomicInteger writeBatchInstanceCount = new AtomicInteger();
|
||||
public class WriteBatch {
|
||||
|
||||
DataByteArrayOutputStream buff = cachedBuffers[writeBatchInstanceCount.getAndIncrement()%2];
|
||||
public final DataFile dataFile;
|
||||
|
||||
public final LinkedNodeList<Journal.WriteCommand> writes = new LinkedNodeList<Journal.WriteCommand>();
|
||||
public final CountDownLatch latch = new CountDownLatch(1);
|
||||
private final int offset;
|
||||
public int size = Journal.BATCH_CONTROL_RECORD_SIZE;
|
||||
public AtomicReference<IOException> exception = new AtomicReference<IOException>();
|
||||
public boolean forceToDisk;
|
||||
|
||||
public WriteBatch(DataFile dataFile, int offset, Journal.WriteCommand write) throws IOException {
|
||||
this.dataFile = dataFile;
|
||||
this.offset = offset;
|
||||
this.dataFile.incrementLength(Journal.BATCH_CONTROL_RECORD_SIZE);
|
||||
this.size=Journal.BATCH_CONTROL_RECORD_SIZE;
|
||||
journal.addToTotalLength(Journal.BATCH_CONTROL_RECORD_SIZE);
|
||||
initBuffer(buff);
|
||||
append(write);
|
||||
}
|
||||
|
||||
public boolean canAppend(Journal.WriteCommand write) {
|
||||
int newSize = size + write.location.getSize();
|
||||
if (newSize >= maxWriteBatchSize || offset+newSize > journal.getMaxFileLength() ) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public void append(Journal.WriteCommand write) throws IOException {
|
||||
this.writes.addLast(write);
|
||||
write.location.setDataFileId(dataFile.getDataFileId());
|
||||
write.location.setOffset(offset+size);
|
||||
int s = write.location.getSize();
|
||||
size += s;
|
||||
dataFile.incrementLength(s);
|
||||
journal.addToTotalLength(s);
|
||||
forceToDisk |= appendToBuffer(write, buff);
|
||||
}
|
||||
}
|
||||
|
||||
private void initBuffer(DataByteArrayOutputStream buff) throws IOException {
|
||||
// Write an empty batch control record.
|
||||
buff.reset();
|
||||
buff.write(Journal.BATCH_CONTROL_RECORD_HEADER);
|
||||
buff.writeInt(0);
|
||||
buff.writeLong(0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct a Store writer
|
||||
*/
|
||||
public CallerBufferingDataFileAppender(Journal dataManager) {
|
||||
this.journal = dataManager;
|
||||
this.inflightWrites = this.journal.getInflightWrites();
|
||||
this.maxWriteBatchSize = this.journal.getWriteBatchSize();
|
||||
}
|
||||
|
||||
public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException {
|
||||
|
||||
// Write the packet our internal buffer.
|
||||
int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
|
||||
|
||||
final Location location = new Location();
|
||||
location.setSize(size);
|
||||
location.setType(type);
|
||||
|
||||
Journal.WriteCommand write = new Journal.WriteCommand(location, data, sync);
|
||||
|
||||
WriteBatch batch = enqueue(write);
|
||||
location.setLatch(batch.latch);
|
||||
if (sync) {
|
||||
try {
|
||||
batch.latch.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedIOException();
|
||||
}
|
||||
IOException exception = batch.exception.get();
|
||||
if (exception != null) {
|
||||
throw exception;
|
||||
}
|
||||
}
|
||||
|
||||
return location;
|
||||
}
|
||||
|
||||
public Location storeItem(ByteSequence data, byte type, Runnable onComplete) throws IOException {
|
||||
// Write the packet our internal buffer.
|
||||
int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
|
||||
|
||||
final Location location = new Location();
|
||||
location.setSize(size);
|
||||
location.setType(type);
|
||||
|
||||
Journal.WriteCommand write = new Journal.WriteCommand(location, data, onComplete);
|
||||
|
||||
WriteBatch batch = enqueue(write);
|
||||
|
||||
location.setLatch(batch.latch);
|
||||
return location;
|
||||
}
|
||||
|
||||
private WriteBatch enqueue(Journal.WriteCommand write) throws IOException {
|
||||
synchronized (enqueueMutex) {
|
||||
if (shutdown) {
|
||||
throw new IOException("Async Writter Thread Shutdown");
|
||||
}
|
||||
|
||||
if (!running) {
|
||||
running = true;
|
||||
thread = new Thread() {
|
||||
public void run() {
|
||||
processQueue();
|
||||
}
|
||||
};
|
||||
thread.setPriority(Thread.MAX_PRIORITY);
|
||||
thread.setDaemon(true);
|
||||
thread.setName("ActiveMQ Data File Writer");
|
||||
thread.start();
|
||||
firstAsyncException = null;
|
||||
}
|
||||
|
||||
if (firstAsyncException != null) {
|
||||
throw firstAsyncException;
|
||||
}
|
||||
|
||||
while ( true ) {
|
||||
if (nextWriteBatch == null) {
|
||||
DataFile file = journal.getCurrentWriteFile();
|
||||
if( file.getLength() > journal.getMaxFileLength() ) {
|
||||
file = journal.rotateWriteFile();
|
||||
}
|
||||
|
||||
nextWriteBatch = new WriteBatch(file, file.getLength(), write);
|
||||
enqueueMutex.notifyAll();
|
||||
break;
|
||||
} else {
|
||||
// Append to current batch if possible..
|
||||
if (nextWriteBatch.canAppend(write)) {
|
||||
nextWriteBatch.append(write);
|
||||
break;
|
||||
} else {
|
||||
// Otherwise wait for the queuedCommand to be null
|
||||
try {
|
||||
while (nextWriteBatch != null) {
|
||||
final long start = System.currentTimeMillis();
|
||||
enqueueMutex.wait();
|
||||
if (maxStat > 0) {
|
||||
System.err.println("Watiting for write to finish with full batch... millis: " + (System.currentTimeMillis() - start));
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedIOException();
|
||||
}
|
||||
if (shutdown) {
|
||||
throw new IOException("Async Writter Thread Shutdown");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!write.sync) {
|
||||
inflightWrites.put(new Journal.WriteKey(write.location), write);
|
||||
}
|
||||
return nextWriteBatch;
|
||||
}
|
||||
}
|
||||
|
||||
public void close() throws IOException {
|
||||
synchronized (enqueueMutex) {
|
||||
if (!shutdown) {
|
||||
shutdown = true;
|
||||
if (running) {
|
||||
enqueueMutex.notifyAll();
|
||||
} else {
|
||||
shutdownDone.countDown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
shutdownDone.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedIOException();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static final String PROPERTY_LOG_WRITE_STAT_WINDOW = "org.apache.kahadb.journal.appender.WRITE_STAT_WINDOW";
|
||||
public static final int maxStat = Integer.parseInt(System.getProperty(PROPERTY_LOG_WRITE_STAT_WINDOW, "0"));
|
||||
int statIdx = 0;
|
||||
int[] stats = new int[maxStat];
|
||||
/**
|
||||
* The async processing loop that writes to the data files and does the
|
||||
* force calls. Since the file sync() call is the slowest of all the
|
||||
* operations, this algorithm tries to 'batch' or group together several
|
||||
* file sync() requests into a single file sync() call. The batching is
|
||||
* accomplished attaching the same CountDownLatch instance to every force
|
||||
* request in a group.
|
||||
*/
|
||||
protected void processQueue() {
|
||||
DataFile dataFile = null;
|
||||
RandomAccessFile file = null;
|
||||
WriteBatch wb = null;
|
||||
try {
|
||||
|
||||
while (true) {
|
||||
|
||||
Object o = null;
|
||||
|
||||
// Block till we get a command.
|
||||
synchronized (enqueueMutex) {
|
||||
while (true) {
|
||||
if (nextWriteBatch != null) {
|
||||
o = nextWriteBatch;
|
||||
nextWriteBatch = null;
|
||||
break;
|
||||
}
|
||||
if (shutdown) {
|
||||
return;
|
||||
}
|
||||
enqueueMutex.wait();
|
||||
}
|
||||
enqueueMutex.notifyAll();
|
||||
}
|
||||
|
||||
wb = (WriteBatch)o;
|
||||
if (dataFile != wb.dataFile) {
|
||||
if (file != null) {
|
||||
file.setLength(dataFile.getLength());
|
||||
dataFile.closeRandomAccessFile(file);
|
||||
}
|
||||
dataFile = wb.dataFile;
|
||||
file = dataFile.openRandomAccessFile();
|
||||
if( file.length() < journal.preferedFileLength ) {
|
||||
file.setLength(journal.preferedFileLength);
|
||||
}
|
||||
}
|
||||
|
||||
final DataByteArrayOutputStream buff = wb.buff;
|
||||
final boolean forceToDisk = wb.forceToDisk;
|
||||
|
||||
ByteSequence sequence = buff.toByteSequence();
|
||||
|
||||
// Now we can fill in the batch control record properly.
|
||||
buff.reset();
|
||||
buff.skip(5+Journal.BATCH_CONTROL_RECORD_MAGIC.length);
|
||||
buff.writeInt(sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE);
|
||||
if( journal.isChecksum() ) {
|
||||
Checksum checksum = new Adler32();
|
||||
checksum.update(sequence.getData(), sequence.getOffset()+Journal.BATCH_CONTROL_RECORD_SIZE, sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE);
|
||||
buff.writeLong(checksum.getValue());
|
||||
}
|
||||
|
||||
// Now do the 1 big write.
|
||||
file.seek(wb.offset);
|
||||
if (maxStat > 0) {
|
||||
if (statIdx < maxStat) {
|
||||
stats[statIdx++] = sequence.getLength();
|
||||
} else {
|
||||
long all = 0;
|
||||
for (;statIdx > 0;) {
|
||||
all+= stats[--statIdx];
|
||||
}
|
||||
System.err.println("Ave writeSize: " + all/maxStat);
|
||||
}
|
||||
}
|
||||
file.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
|
||||
ReplicationTarget replicationTarget = journal.getReplicationTarget();
|
||||
if( replicationTarget!=null ) {
|
||||
replicationTarget.replicate(wb.writes.getHead().location, sequence, forceToDisk);
|
||||
}
|
||||
|
||||
if (forceToDisk) {
|
||||
file.getFD().sync();
|
||||
}
|
||||
|
||||
Journal.WriteCommand lastWrite = wb.writes.getTail();
|
||||
journal.setLastAppendLocation(lastWrite.location);
|
||||
|
||||
// Now that the data is on disk, remove the writes from the in
|
||||
// flight
|
||||
// cache.
|
||||
Journal.WriteCommand write = wb.writes.getHead();
|
||||
while (write != null) {
|
||||
if (!write.sync) {
|
||||
inflightWrites.remove(new Journal.WriteKey(write.location));
|
||||
}
|
||||
if (write.onComplete != null) {
|
||||
try {
|
||||
write.onComplete.run();
|
||||
} catch (Throwable e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
write = write.getNext();
|
||||
}
|
||||
|
||||
// Signal any waiting threads that the write is on disk.
|
||||
wb.latch.countDown();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
synchronized (enqueueMutex) {
|
||||
firstAsyncException = e;
|
||||
if (wb != null) {
|
||||
wb.exception.set(e);
|
||||
wb.latch.countDown();
|
||||
}
|
||||
if (nextWriteBatch != null) {
|
||||
nextWriteBatch.exception.set(e);
|
||||
nextWriteBatch.latch.countDown();
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
} finally {
|
||||
try {
|
||||
if (file != null) {
|
||||
dataFile.closeRandomAccessFile(file);
|
||||
}
|
||||
} catch (Throwable ignore) {
|
||||
}
|
||||
shutdownDone.countDown();
|
||||
running = false;
|
||||
}
|
||||
}
|
||||
|
||||
private boolean appendToBuffer(Journal.WriteCommand write, DataByteArrayOutputStream buff) throws IOException {
|
||||
buff.writeInt(write.location.getSize());
|
||||
buff.writeByte(write.location.getType());
|
||||
buff.write(write.data.getData(), write.data.getOffset(), write.data.getLength());
|
||||
return write.sync | write.onComplete != null;
|
||||
}
|
||||
}
|
|
@ -33,7 +33,7 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil
|
|||
|
||||
protected final File file;
|
||||
protected final Integer dataFileId;
|
||||
protected int length;
|
||||
protected volatile int length;
|
||||
protected final SequenceSet corruptedBlocks = new SequenceSet();
|
||||
|
||||
DataFile(File file, int number, int preferedSize) {
|
||||
|
|
|
@ -20,8 +20,6 @@ import java.io.IOException;
|
|||
import java.io.RandomAccessFile;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.kahadb.journal.DataFileAppender.WriteCommand;
|
||||
import org.apache.kahadb.journal.DataFileAppender.WriteKey;
|
||||
import org.apache.kahadb.util.ByteSequence;
|
||||
|
||||
/**
|
||||
|
@ -33,7 +31,7 @@ import org.apache.kahadb.util.ByteSequence;
|
|||
final class DataFileAccessor {
|
||||
|
||||
private final DataFile dataFile;
|
||||
private final Map<WriteKey, WriteCommand> inflightWrites;
|
||||
private final Map<Journal.WriteKey, Journal.WriteCommand> inflightWrites;
|
||||
private final RandomAccessFile file;
|
||||
private boolean disposed;
|
||||
|
||||
|
@ -71,7 +69,7 @@ final class DataFileAccessor {
|
|||
throw new IOException("Invalid location: " + location);
|
||||
}
|
||||
|
||||
WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
|
||||
Journal.WriteCommand asyncWrite = (Journal.WriteCommand)inflightWrites.get(new Journal.WriteKey(location));
|
||||
if (asyncWrite != null) {
|
||||
return asyncWrite.data;
|
||||
}
|
||||
|
@ -106,7 +104,7 @@ final class DataFileAccessor {
|
|||
}
|
||||
|
||||
public void readLocationDetails(Location location) throws IOException {
|
||||
WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
|
||||
Journal.WriteCommand asyncWrite = (Journal.WriteCommand)inflightWrites.get(new Journal.WriteKey(location));
|
||||
if (asyncWrite != null) {
|
||||
location.setSize(asyncWrite.location.getSize());
|
||||
location.setType(asyncWrite.location.getType());
|
||||
|
|
|
@ -27,7 +27,6 @@ import java.util.zip.Checksum;
|
|||
|
||||
import org.apache.kahadb.util.ByteSequence;
|
||||
import org.apache.kahadb.util.DataByteArrayOutputStream;
|
||||
import org.apache.kahadb.util.LinkedNode;
|
||||
import org.apache.kahadb.util.LinkedNodeList;
|
||||
|
||||
/**
|
||||
|
@ -37,10 +36,10 @@ import org.apache.kahadb.util.LinkedNodeList;
|
|||
*
|
||||
*
|
||||
*/
|
||||
class DataFileAppender {
|
||||
class DataFileAppender implements FileAppender {
|
||||
|
||||
protected final Journal journal;
|
||||
protected final Map<WriteKey, WriteCommand> inflightWrites;
|
||||
protected final Map<Journal.WriteKey, Journal.WriteCommand> inflightWrites;
|
||||
protected final Object enqueueMutex = new Object() {
|
||||
};
|
||||
protected WriteBatch nextWriteBatch;
|
||||
|
@ -82,13 +81,13 @@ class DataFileAppender {
|
|||
|
||||
public final DataFile dataFile;
|
||||
|
||||
public final LinkedNodeList<WriteCommand> writes = new LinkedNodeList<WriteCommand>();
|
||||
public final LinkedNodeList<Journal.WriteCommand> writes = new LinkedNodeList<Journal.WriteCommand>();
|
||||
public final CountDownLatch latch = new CountDownLatch(1);
|
||||
private final int offset;
|
||||
public int size = Journal.BATCH_CONTROL_RECORD_SIZE;
|
||||
public AtomicReference<IOException> exception = new AtomicReference<IOException>();
|
||||
|
||||
public WriteBatch(DataFile dataFile, int offset, WriteCommand write) throws IOException {
|
||||
public WriteBatch(DataFile dataFile, int offset, Journal.WriteCommand write) throws IOException {
|
||||
this.dataFile = dataFile;
|
||||
this.offset = offset;
|
||||
this.dataFile.incrementLength(Journal.BATCH_CONTROL_RECORD_SIZE);
|
||||
|
@ -97,7 +96,7 @@ class DataFileAppender {
|
|||
append(write);
|
||||
}
|
||||
|
||||
public boolean canAppend(WriteCommand write) {
|
||||
public boolean canAppend(Journal.WriteCommand write) {
|
||||
int newSize = size + write.location.getSize();
|
||||
if (newSize >= maxWriteBatchSize || offset+newSize > journal.getMaxFileLength() ) {
|
||||
return false;
|
||||
|
@ -105,7 +104,7 @@ class DataFileAppender {
|
|||
return true;
|
||||
}
|
||||
|
||||
public void append(WriteCommand write) throws IOException {
|
||||
public void append(Journal.WriteCommand write) throws IOException {
|
||||
this.writes.addLast(write);
|
||||
write.location.setDataFileId(dataFile.getDataFileId());
|
||||
write.location.setOffset(offset+size);
|
||||
|
@ -116,27 +115,6 @@ class DataFileAppender {
|
|||
}
|
||||
}
|
||||
|
||||
public static class WriteCommand extends LinkedNode<WriteCommand> {
|
||||
public final Location location;
|
||||
public final ByteSequence data;
|
||||
final boolean sync;
|
||||
public final Runnable onComplete;
|
||||
|
||||
public WriteCommand(Location location, ByteSequence data, boolean sync) {
|
||||
this.location = location;
|
||||
this.data = data;
|
||||
this.sync = sync;
|
||||
this.onComplete = null;
|
||||
}
|
||||
|
||||
public WriteCommand(Location location, ByteSequence data, Runnable onComplete) {
|
||||
this.location = location;
|
||||
this.data = data;
|
||||
this.onComplete = onComplete;
|
||||
this.sync = false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct a Store writer
|
||||
*/
|
||||
|
@ -155,7 +133,7 @@ class DataFileAppender {
|
|||
location.setSize(size);
|
||||
location.setType(type);
|
||||
|
||||
WriteCommand write = new WriteCommand(location, data, sync);
|
||||
Journal.WriteCommand write = new Journal.WriteCommand(location, data, sync);
|
||||
|
||||
WriteBatch batch = enqueue(write);
|
||||
location.setLatch(batch.latch);
|
||||
|
@ -182,7 +160,7 @@ class DataFileAppender {
|
|||
location.setSize(size);
|
||||
location.setType(type);
|
||||
|
||||
WriteCommand write = new WriteCommand(location, data, onComplete);
|
||||
Journal.WriteCommand write = new Journal.WriteCommand(location, data, onComplete);
|
||||
|
||||
WriteBatch batch = enqueue(write);
|
||||
|
||||
|
@ -190,7 +168,7 @@ class DataFileAppender {
|
|||
return location;
|
||||
}
|
||||
|
||||
private WriteBatch enqueue(WriteCommand write) throws IOException {
|
||||
private WriteBatch enqueue(Journal.WriteCommand write) throws IOException {
|
||||
synchronized (enqueueMutex) {
|
||||
if (shutdown) {
|
||||
throw new IOException("Async Writter Thread Shutdown");
|
||||
|
@ -220,7 +198,7 @@ class DataFileAppender {
|
|||
if( file.getLength() > journal.getMaxFileLength() ) {
|
||||
file = journal.rotateWriteFile();
|
||||
}
|
||||
|
||||
|
||||
nextWriteBatch = new WriteBatch(file, file.getLength(), write);
|
||||
enqueueMutex.notifyAll();
|
||||
break;
|
||||
|
@ -249,7 +227,7 @@ class DataFileAppender {
|
|||
}
|
||||
}
|
||||
if (!write.sync) {
|
||||
inflightWrites.put(new WriteKey(write.location), write);
|
||||
inflightWrites.put(new Journal.WriteKey(write.location), write);
|
||||
}
|
||||
return nextWriteBatch;
|
||||
}
|
||||
|
@ -327,7 +305,7 @@ class DataFileAppender {
|
|||
}
|
||||
}
|
||||
|
||||
WriteCommand write = wb.writes.getHead();
|
||||
Journal.WriteCommand write = wb.writes.getHead();
|
||||
|
||||
// Write an empty batch control record.
|
||||
buff.reset();
|
||||
|
@ -336,7 +314,7 @@ class DataFileAppender {
|
|||
buff.write(Journal.BATCH_CONTROL_RECORD_MAGIC);
|
||||
buff.writeInt(0);
|
||||
buff.writeLong(0);
|
||||
|
||||
|
||||
boolean forceToDisk = false;
|
||||
while (write != null) {
|
||||
forceToDisk |= write.sync | write.onComplete != null;
|
||||
|
@ -382,7 +360,7 @@ class DataFileAppender {
|
|||
file.getFD().sync();
|
||||
}
|
||||
|
||||
WriteCommand lastWrite = wb.writes.getTail();
|
||||
Journal.WriteCommand lastWrite = wb.writes.getTail();
|
||||
journal.setLastAppendLocation(lastWrite.location);
|
||||
|
||||
// Now that the data is on disk, remove the writes from the in
|
||||
|
@ -391,7 +369,7 @@ class DataFileAppender {
|
|||
write = wb.writes.getHead();
|
||||
while (write != null) {
|
||||
if (!write.sync) {
|
||||
inflightWrites.remove(new WriteKey(write.location));
|
||||
inflightWrites.remove(new Journal.WriteKey(write.location));
|
||||
}
|
||||
if (write.onComplete != null) {
|
||||
try {
|
||||
|
|
|
@ -0,0 +1,15 @@
|
|||
package org.apache.kahadb.journal;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.apache.kahadb.util.ByteSequence;
|
||||
|
||||
/**
|
||||
* User: gtully
|
||||
*/
|
||||
public interface FileAppender {
|
||||
Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException;
|
||||
|
||||
Location storeItem(ByteSequence data, byte type, Runnable onComplete) throws IOException;
|
||||
|
||||
void close() throws IOException;
|
||||
}
|
|
@ -36,10 +36,9 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.zip.Adler32;
|
||||
import java.util.zip.Checksum;
|
||||
import org.apache.kahadb.util.LinkedNode;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.kahadb.journal.DataFileAppender.WriteCommand;
|
||||
import org.apache.kahadb.journal.DataFileAppender.WriteKey;
|
||||
import org.apache.kahadb.util.ByteSequence;
|
||||
import org.apache.kahadb.util.DataByteArrayInputStream;
|
||||
import org.apache.kahadb.util.DataByteArrayOutputStream;
|
||||
|
@ -53,6 +52,8 @@ import org.apache.kahadb.util.Sequence;
|
|||
*
|
||||
*/
|
||||
public class Journal {
|
||||
public static final String CALLER_BUFFER_APPENDER = "org.apache.kahadb.journal.CALLER_BUFFER_APPENDER";
|
||||
public static final boolean callerBufferAppender = Boolean.parseBoolean(System.getProperty(CALLER_BUFFER_APPENDER, "false"));
|
||||
|
||||
private static final int MAX_BATCH_SIZE = 32*1024*1024;
|
||||
|
||||
|
@ -103,7 +104,7 @@ public class Journal {
|
|||
protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF;
|
||||
protected int writeBatchSize = DEFAULT_MAX_WRITE_BATCH_SIZE;
|
||||
|
||||
protected DataFileAppender appender;
|
||||
protected FileAppender appender;
|
||||
protected DataFileAccessorPool accessorPool;
|
||||
|
||||
protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
|
||||
|
@ -130,7 +131,7 @@ public class Journal {
|
|||
started = true;
|
||||
preferedFileLength = Math.max(PREFERED_DIFF, getMaxFileLength() - PREFERED_DIFF);
|
||||
|
||||
appender = new DataFileAppender(this);
|
||||
appender = callerBufferAppender ? new CallerBufferingDataFileAppender(this) : new DataFileAppender(this);
|
||||
|
||||
File[] files = directory.listFiles(new FilenameFilter() {
|
||||
public boolean accept(File dir, String n) {
|
||||
|
@ -751,4 +752,50 @@ public class Journal {
|
|||
public void setSizeAccumulator(AtomicLong storeSizeAccumulator) {
|
||||
this.totalLength = storeSizeAccumulator;
|
||||
}
|
||||
|
||||
public static class WriteCommand extends LinkedNode<WriteCommand> {
|
||||
public final Location location;
|
||||
public final ByteSequence data;
|
||||
final boolean sync;
|
||||
public final Runnable onComplete;
|
||||
|
||||
public WriteCommand(Location location, ByteSequence data, boolean sync) {
|
||||
this.location = location;
|
||||
this.data = data;
|
||||
this.sync = sync;
|
||||
this.onComplete = null;
|
||||
}
|
||||
|
||||
public WriteCommand(Location location, ByteSequence data, Runnable onComplete) {
|
||||
this.location = location;
|
||||
this.data = data;
|
||||
this.onComplete = onComplete;
|
||||
this.sync = false;
|
||||
}
|
||||
}
|
||||
|
||||
public static class WriteKey {
|
||||
private final int file;
|
||||
private final long offset;
|
||||
private final int hash;
|
||||
|
||||
public WriteKey(Location item) {
|
||||
file = item.getDataFileId();
|
||||
offset = item.getOffset();
|
||||
// TODO: see if we can build a better hash
|
||||
hash = (int)(file ^ offset);
|
||||
}
|
||||
|
||||
public int hashCode() {
|
||||
return hash;
|
||||
}
|
||||
|
||||
public boolean equals(Object obj) {
|
||||
if (obj instanceof WriteKey) {
|
||||
WriteKey di = (WriteKey)obj;
|
||||
return di.file == file && di.offset == offset;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1039,7 +1039,9 @@ public class PageFile {
|
|||
}
|
||||
|
||||
Checksum checksum = new Adler32();
|
||||
recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
|
||||
if (enableRecoveryFile) {
|
||||
recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
|
||||
}
|
||||
for (PageWrite w : batch) {
|
||||
if (enableRecoveryFile) {
|
||||
try {
|
||||
|
@ -1078,7 +1080,9 @@ public class PageFile {
|
|||
|
||||
if (enableDiskSyncs) {
|
||||
// Sync to make sure recovery buffer writes land on disk..
|
||||
recoveryFile.getFD().sync();
|
||||
if (enableRecoveryFile) {
|
||||
recoveryFile.getFD().sync();
|
||||
}
|
||||
writeFile.getFD().sync();
|
||||
}
|
||||
} finally {
|
||||
|
|
Loading…
Reference in New Issue