https://issues.apache.org/jira/browse/AMQ-3646 - Allow KahaDB to run without disk syncs, higher through put without the jms persistence guarantee. Allow 0 peroid to disable checkpoint/cleanup. Allow jmx gc operation to invoke cleanup so that store gc can be initiated via jms to ensure disk is reclaimed. Ensure periodic checkpoint does not sync when enableJournalDiskSyncs=false, it waits for completion but does not force to disk. fix cached buffer allocation and refactor to reuse more code in CallerBufferingDataFileAppender

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1222705 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-12-23 15:44:52 +00:00
parent 46e6cc01fc
commit 89f22dacac
8 changed files with 182 additions and 275 deletions

View File

@ -38,12 +38,14 @@ import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.BrokerSupport; import org.apache.activemq.util.BrokerSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* *
*/ */
public class BrokerView implements BrokerViewMBean { public class BrokerView implements BrokerViewMBean {
private static final Logger LOG = LoggerFactory.getLogger(BrokerView.class);
ManagedRegionBroker broker; ManagedRegionBroker broker;
private final BrokerService brokerService; private final BrokerService brokerService;
private final AtomicInteger sessionIdCounter = new AtomicInteger(0); private final AtomicInteger sessionIdCounter = new AtomicInteger(0);
@ -76,6 +78,11 @@ public class BrokerView implements BrokerViewMBean {
public void gc() throws Exception { public void gc() throws Exception {
brokerService.getBroker().gc(); brokerService.getBroker().gc();
try {
brokerService.getPersistenceAdapter().checkpoint(true);
} catch (IOException e) {
LOG.error("Failed to checkpoint persistence adapter on gc request, reason:" + e, e);
}
} }
public void start() throws Exception { public void start() throws Exception {

View File

@ -980,7 +980,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
} }
public void checkpoint(boolean sync) throws IOException { public void checkpoint(boolean sync) throws IOException {
super.checkpointCleanup(false); super.checkpointCleanup(sync);
} }
// ///////////////////////////////////////////////////////////////// // /////////////////////////////////////////////////////////////////

View File

@ -67,7 +67,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
static final int CLOSED_STATE = 1; static final int CLOSED_STATE = 1;
static final int OPEN_STATE = 2; static final int OPEN_STATE = 2;
static final long NOT_ACKED = -1; static final long NOT_ACKED = -1;
static final long UNMATCHED_SEQ = -2;
static final int VERSION = 4; static final int VERSION = 4;
@ -247,6 +246,10 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
} }
private void startCheckpoint() { private void startCheckpoint() {
if (checkpointInterval == 0 && cleanupInterval == 0) {
LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart");
return;
}
synchronized (checkpointThreadLock) { synchronized (checkpointThreadLock) {
boolean start = false; boolean start = false;
if (checkpointThread == null) { if (checkpointThread == null) {
@ -264,15 +267,15 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
long lastCheckpoint = System.currentTimeMillis(); long lastCheckpoint = System.currentTimeMillis();
// Sleep for a short time so we can periodically check // Sleep for a short time so we can periodically check
// to see if we need to exit this thread. // to see if we need to exit this thread.
long sleepTime = Math.min(checkpointInterval, 500); long sleepTime = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500);
while (opened.get()) { while (opened.get()) {
Thread.sleep(sleepTime); Thread.sleep(sleepTime);
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
if( now - lastCleanup >= cleanupInterval ) { if( cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval) ) {
checkpointCleanup(true); checkpointCleanup(true);
lastCleanup = now; lastCleanup = now;
lastCheckpoint = now; lastCheckpoint = now;
} else if( now - lastCheckpoint >= checkpointInterval ) { } else if( checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval )) {
checkpointCleanup(false); checkpointCleanup(false);
lastCheckpoint = now; lastCheckpoint = now;
} }
@ -392,7 +395,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
} }
journal.close(); journal.close();
synchronized (checkpointThreadLock) { synchronized (checkpointThreadLock) {
checkpointThread.join(); if (checkpointThread != null) {
checkpointThread.join();
}
} }
} finally { } finally {
lockFile.unlock(); lockFile.unlock();
@ -781,13 +786,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
} }
} }
// /////////////////////////////////////////////////////////////////
// Methods call by the broker to update and query the store.
// /////////////////////////////////////////////////////////////////
public Location store(JournalCommand<?> data) throws IOException {
return store(data, false, null,null);
}
public ByteSequence toByteSequence(JournalCommand<?> data) throws IOException { public ByteSequence toByteSequence(JournalCommand<?> data) throws IOException {
int size = data.serializedSizeFramed(); int size = data.serializedSizeFramed();
DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1); DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
@ -796,20 +794,35 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
return os.toByteSequence(); return os.toByteSequence();
} }
// /////////////////////////////////////////////////////////////////
// Methods call by the broker to update and query the store.
// /////////////////////////////////////////////////////////////////
public Location store(JournalCommand<?> data) throws IOException {
return store(data, false, null,null);
}
public Location store(JournalCommand<?> data, Runnable onJournalStoreComplete) throws IOException {
return store(data, false, null,null, onJournalStoreComplete);
}
public Location store(JournalCommand<?> data, boolean sync, Runnable before,Runnable after) throws IOException {
return store(data, sync, before, after, null);
}
/** /**
* All updated are are funneled through this method. The updates are converted * All updated are are funneled through this method. The updates are converted
* to a JournalMessage which is logged to the journal and then the data from * to a JournalMessage which is logged to the journal and then the data from
* the JournalMessage is used to update the index just like it would be done * the JournalMessage is used to update the index just like it would be done
* during a recovery process. * during a recovery process.
*/ */
public Location store(JournalCommand<?> data, boolean sync, Runnable before,Runnable after) throws IOException { public Location store(JournalCommand<?> data, boolean sync, Runnable before,Runnable after, Runnable onJournalStoreComplete) throws IOException {
if (before != null) { if (before != null) {
before.run(); before.run();
} }
try { try {
ByteSequence sequence = toByteSequence(data); ByteSequence sequence = toByteSequence(data);
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
Location location = journal.write(sequence, sync); Location location = onJournalStoreComplete == null ? journal.write(sequence, sync) : journal.write(sequence, onJournalStoreComplete) ;
long start2 = System.currentTimeMillis(); long start2 = System.currentTimeMillis();
process(data, location, after); process(data, location, after);
long end = System.currentTimeMillis(); long end = System.currentTimeMillis();
@ -1408,13 +1421,25 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
LOG.debug("Checkpoint done."); LOG.debug("Checkpoint done.");
} }
final Runnable nullCompletionCallback = new Runnable() {
@Override
public void run() {
}
};
private Location checkpointProducerAudit() throws IOException { private Location checkpointProducerAudit() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream(); ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oout = new ObjectOutputStream(baos); ObjectOutputStream oout = new ObjectOutputStream(baos);
oout.writeObject(metadata.producerSequenceIdTracker); oout.writeObject(metadata.producerSequenceIdTracker);
oout.flush(); oout.flush();
oout.close(); oout.close();
return store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), true, null, null); // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false
Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), nullCompletionCallback);
try {
location.getLatch().await();
} catch (InterruptedException e) {
throw new InterruptedIOException(e.toString());
}
return location;
} }
public HashSet<Integer> getJournalFilesBeingReplicated() { public HashSet<Integer> getJournalFilesBeingReplicated() {
@ -2076,6 +2101,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
manager.setWriteBatchSize(getJournalMaxWriteBatchSize()); manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
manager.setArchiveDataLogs(isArchiveDataLogs()); manager.setArchiveDataLogs(isArchiveDataLogs());
manager.setSizeAccumulator(storeSize); manager.setSizeAccumulator(storeSize);
manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs());
if (getDirectoryArchive() != null) { if (getDirectoryArchive() != null) {
IOHelper.mkdirs(getDirectoryArchive()); IOHelper.mkdirs(getDirectoryArchive());
manager.setDirectoryArchive(getDirectoryArchive()); manager.setDirectoryArchive(getDirectoryArchive());

View File

@ -33,7 +33,11 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ConnectionControl; import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.kaha.impl.async.DataFileAppenderTest;
import org.apache.kahadb.journal.FileAppender;
import org.apache.kahadb.journal.Journal;
import org.junit.After; import org.junit.After;
import org.junit.Before;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -52,14 +56,14 @@ public class KahaDBFastEnqueueTest {
private boolean useBytesMessage= true; private boolean useBytesMessage= true;
private final int parallelProducer = 20; private final int parallelProducer = 20;
private Vector<Exception> exceptions = new Vector<Exception>(); private Vector<Exception> exceptions = new Vector<Exception>();
final long toSend = 500000; long toSend = 10000;
@Ignore("too slow, exploring getting broker disk bound")
// use with: // use with:
// -Xmx4g -Dorg.apache.kahadb.journal.appender.WRITE_STAT_WINDOW=10000 -Dorg.apache.kahadb.journal.CALLER_BUFFER_APPENDER=true // -Xmx4g -Dorg.apache.kahadb.journal.appender.WRITE_STAT_WINDOW=10000 -Dorg.apache.kahadb.journal.CALLER_BUFFER_APPENDER=true
@Test
public void testPublishNoConsumer() throws Exception { public void testPublishNoConsumer() throws Exception {
startBroker(true); startBroker(true, 10);
final AtomicLong sharedCount = new AtomicLong(toSend); final AtomicLong sharedCount = new AtomicLong(toSend);
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
@ -82,19 +86,57 @@ public class KahaDBFastEnqueueTest {
assertTrue("No exceptions: " + exceptions, exceptions.isEmpty()); assertTrue("No exceptions: " + exceptions, exceptions.isEmpty());
long totalSent = toSend * payloadString.length(); long totalSent = toSend * payloadString.length();
//System.out.println("Pre shutdown: Index totalWritten: " + kahaDBPersistenceAdapter.getStore().getPageFile().totalWritten); double duration = System.currentTimeMillis() - start;
stopBroker();
LOG.info("Duration: " + duration + "ms");
LOG.info("Rate: " + (toSend * 1000/duration) + "m/s");
LOG.info("Total send: " + totalSent);
LOG.info("Total journal write: " + kahaDBPersistenceAdapter.getStore().getJournal().length());
LOG.info("Journal writes %: " + kahaDBPersistenceAdapter.getStore().getJournal().length() / (double)totalSent * 100 + "%");
restartBroker(0, 1200000);
consumeMessages(toSend);
}
@Test
public void testPublishNoConsumerNoCheckpoint() throws Exception {
toSend = 100;
startBroker(true, 0);
final AtomicLong sharedCount = new AtomicLong(toSend);
long start = System.currentTimeMillis();
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i=0; i< parallelProducer; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
publishMessages(sharedCount, 0);
} catch (Exception e) {
exceptions.add(e);
}
}
});
}
executorService.shutdown();
executorService.awaitTermination(30, TimeUnit.MINUTES);
assertTrue("Producers done in time", executorService.isTerminated());
assertTrue("No exceptions: " + exceptions, exceptions.isEmpty());
long totalSent = toSend * payloadString.length();
broker.getAdminView().gc();
double duration = System.currentTimeMillis() - start; double duration = System.currentTimeMillis() - start;
stopBroker(); stopBroker();
System.out.println("Duration: " + duration + "ms"); LOG.info("Duration: " + duration + "ms");
System.out.println("Rate: " + (toSend * 1000/duration) + "m/s"); LOG.info("Rate: " + (toSend * 1000/duration) + "m/s");
System.out.println("Total send: " + totalSent); LOG.info("Total send: " + totalSent);
System.out.println("Total journal write: " + kahaDBPersistenceAdapter.getStore().getJournal().length()); LOG.info("Total journal write: " + kahaDBPersistenceAdapter.getStore().getJournal().length());
//System.out.println("Total index write: " + kahaDBPersistenceAdapter.getStore().getPageFile().totalWritten); LOG.info("Journal writes %: " + kahaDBPersistenceAdapter.getStore().getJournal().length() / (double)totalSent * 100 + "%");
System.out.println("Journal writes %: " + kahaDBPersistenceAdapter.getStore().getJournal().length() / (double)totalSent * 100 + "%");
//System.out.println("Index writes %: " + kahaDBPersistenceAdapter.getStore().getPageFile().totalWritten / (double)totalSent * 100 + "%");
restartBroker(0); restartBroker(0, 0);
consumeMessages(toSend); consumeMessages(toSend);
} }
@ -110,10 +152,16 @@ public class KahaDBFastEnqueueTest {
assertNull("none left over", consumer.receive(2000)); assertNull("none left over", consumer.receive(2000));
} }
private void restartBroker(int restartDelay) throws Exception { private void restartBroker(int restartDelay, int checkpoint) throws Exception {
stopBroker(); stopBroker();
TimeUnit.MILLISECONDS.sleep(restartDelay); TimeUnit.MILLISECONDS.sleep(restartDelay);
startBroker(false); startBroker(false, checkpoint);
}
@Before
public void setProps() {
System.setProperty(Journal.CALLER_BUFFER_APPENDER, Boolean.toString(true));
System.setProperty(FileAppender.PROPERTY_LOG_WRITE_STAT_WINDOW, "10000");
} }
@After @After
@ -122,6 +170,8 @@ public class KahaDBFastEnqueueTest {
broker.stop(); broker.stop();
broker.waitUntilStopped(); broker.waitUntilStopped();
} }
System.clearProperty(Journal.CALLER_BUFFER_APPENDER);
System.clearProperty(FileAppender.PROPERTY_LOG_WRITE_STAT_WINDOW);
} }
final double sampleRate = 100000; final double sampleRate = 100000;
@ -153,14 +203,14 @@ public class KahaDBFastEnqueueTest {
connection.close(); connection.close();
} }
public void startBroker(boolean deleteAllMessages) throws Exception { public void startBroker(boolean deleteAllMessages, int checkPointPeriod) throws Exception {
broker = new BrokerService(); broker = new BrokerService();
broker.setDeleteAllMessagesOnStartup(deleteAllMessages); broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter)broker.getPersistenceAdapter(); kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter)broker.getPersistenceAdapter();
kahaDBPersistenceAdapter.setEnableJournalDiskSyncs(false); kahaDBPersistenceAdapter.setEnableJournalDiskSyncs(false);
// defer checkpoints which require a sync // defer checkpoints which require a sync
kahaDBPersistenceAdapter.setCleanupInterval(20 * 60 * 1000); kahaDBPersistenceAdapter.setCleanupInterval(checkPointPeriod);
kahaDBPersistenceAdapter.setCheckpointInterval(20 * 60 * 1000); kahaDBPersistenceAdapter.setCheckpointInterval(checkPointPeriod);
// optimise for disk best batch rate // optimise for disk best batch rate
kahaDBPersistenceAdapter.setJournalMaxWriteBatchSize(24*1024*1024); //4mb default kahaDBPersistenceAdapter.setJournalMaxWriteBatchSize(24*1024*1024); //4mb default
@ -171,7 +221,6 @@ public class KahaDBFastEnqueueTest {
kahaDBPersistenceAdapter.setEnableIndexRecoveryFile(false); kahaDBPersistenceAdapter.setEnableIndexRecoveryFile(false);
kahaDBPersistenceAdapter.setEnableIndexDiskSyncs(false); kahaDBPersistenceAdapter.setEnableIndexDiskSyncs(false);
broker.setUseJmx(false);
broker.addConnector("tcp://0.0.0.0:0"); broker.addConnector("tcp://0.0.0.0:0");
broker.start(); broker.start();
@ -179,6 +228,7 @@ public class KahaDBFastEnqueueTest {
connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri() + options); connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri() + options);
} }
@Test
public void testRollover() throws Exception { public void testRollover() throws Exception {
byte flip = 0x1; byte flip = 0x1;
for (long i=0; i<Short.MAX_VALUE; i++) { for (long i=0; i<Short.MAX_VALUE; i++) {

View File

@ -17,17 +17,11 @@
package org.apache.kahadb.journal; package org.apache.kahadb.journal;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.Adler32; import java.util.zip.Adler32;
import java.util.zip.Checksum; import java.util.zip.Checksum;
import org.apache.kahadb.util.ByteSequence; import org.apache.kahadb.util.ByteSequence;
import org.apache.kahadb.util.DataByteArrayOutputStream; import org.apache.kahadb.util.DataByteArrayOutputStream;
import org.apache.kahadb.util.LinkedNodeList;
/** /**
* An optimized writer to do batch appends to a data file. This object is thread * An optimized writer to do batch appends to a data file. This object is thread
@ -37,69 +31,34 @@ import org.apache.kahadb.util.LinkedNodeList;
* reduces the round trip of the write thread. * reduces the round trip of the write thread.
* *
*/ */
class CallerBufferingDataFileAppender implements FileAppender { class CallerBufferingDataFileAppender extends DataFileAppender {
protected final Journal journal;
protected final Map<Journal.WriteKey, Journal.WriteCommand> inflightWrites;
protected final Object enqueueMutex = new Object() {
};
protected WriteBatch nextWriteBatch;
protected boolean shutdown;
protected IOException firstAsyncException;
protected final CountDownLatch shutdownDone = new CountDownLatch(1);
protected int maxWriteBatchSize;
private boolean running;
private Thread thread;
final DataByteArrayOutputStream cachedBuffers[] = new DataByteArrayOutputStream[] { final DataByteArrayOutputStream cachedBuffers[] = new DataByteArrayOutputStream[] {
new DataByteArrayOutputStream(maxWriteBatchSize), new DataByteArrayOutputStream(maxWriteBatchSize),
new DataByteArrayOutputStream(maxWriteBatchSize) new DataByteArrayOutputStream(maxWriteBatchSize)
}; };
volatile byte flip = 0x1; volatile byte flip = 0x1;
public class WriteBatch { public class WriteBatch extends DataFileAppender.WriteBatch {
DataByteArrayOutputStream buff = cachedBuffers[flip ^= 1]; DataByteArrayOutputStream buff = cachedBuffers[flip ^= 1];
public final DataFile dataFile; private boolean forceToDisk;
public final LinkedNodeList<Journal.WriteCommand> writes = new LinkedNodeList<Journal.WriteCommand>();
public final CountDownLatch latch = new CountDownLatch(1);
private final int offset;
public int size = Journal.BATCH_CONTROL_RECORD_SIZE;
public AtomicReference<IOException> exception = new AtomicReference<IOException>();
public boolean forceToDisk;
public WriteBatch(DataFile dataFile, int offset, Journal.WriteCommand write) throws IOException { public WriteBatch(DataFile dataFile, int offset, Journal.WriteCommand write) throws IOException {
this.dataFile = dataFile; super(dataFile, offset);
this.offset = offset;
this.dataFile.incrementLength(Journal.BATCH_CONTROL_RECORD_SIZE);
this.size=Journal.BATCH_CONTROL_RECORD_SIZE;
journal.addToTotalLength(Journal.BATCH_CONTROL_RECORD_SIZE);
initBuffer(buff); initBuffer(buff);
append(write); append(write);
} }
public boolean canAppend(Journal.WriteCommand write) {
int newSize = size + write.location.getSize();
if (newSize >= maxWriteBatchSize || offset+newSize > journal.getMaxFileLength() ) {
return false;
}
return true;
}
public void append(Journal.WriteCommand write) throws IOException { public void append(Journal.WriteCommand write) throws IOException {
this.writes.addLast(write); super.append(write);
write.location.setDataFileId(dataFile.getDataFileId());
write.location.setOffset(offset+size);
int s = write.location.getSize();
size += s;
dataFile.incrementLength(s);
journal.addToTotalLength(s);
forceToDisk |= appendToBuffer(write, buff); forceToDisk |= appendToBuffer(write, buff);
} }
} }
@Override
protected DataFileAppender.WriteBatch newWriteBatch(Journal.WriteCommand write, DataFile file) throws IOException {
return new WriteBatch(file, file.getLength(), write);
}
private void initBuffer(DataByteArrayOutputStream buff) throws IOException { private void initBuffer(DataByteArrayOutputStream buff) throws IOException {
// Write an empty batch control record. // Write an empty batch control record.
buff.reset(); buff.reset();
@ -108,148 +67,10 @@ class CallerBufferingDataFileAppender implements FileAppender {
buff.writeLong(0); buff.writeLong(0);
} }
/**
* Construct a Store writer
*/
public CallerBufferingDataFileAppender(Journal dataManager) { public CallerBufferingDataFileAppender(Journal dataManager) {
this.journal = dataManager; super(dataManager);
this.inflightWrites = this.journal.getInflightWrites();
this.maxWriteBatchSize = this.journal.getWriteBatchSize();
} }
public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException {
// Write the packet our internal buffer.
int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
final Location location = new Location();
location.setSize(size);
location.setType(type);
Journal.WriteCommand write = new Journal.WriteCommand(location, data, sync);
WriteBatch batch = enqueue(write);
location.setLatch(batch.latch);
if (sync) {
try {
batch.latch.await();
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
IOException exception = batch.exception.get();
if (exception != null) {
throw exception;
}
}
return location;
}
public Location storeItem(ByteSequence data, byte type, Runnable onComplete) throws IOException {
// Write the packet our internal buffer.
int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
final Location location = new Location();
location.setSize(size);
location.setType(type);
Journal.WriteCommand write = new Journal.WriteCommand(location, data, onComplete);
WriteBatch batch = enqueue(write);
location.setLatch(batch.latch);
return location;
}
private WriteBatch enqueue(Journal.WriteCommand write) throws IOException {
synchronized (enqueueMutex) {
if (shutdown) {
throw new IOException("Async Writter Thread Shutdown");
}
if (!running) {
running = true;
thread = new Thread() {
public void run() {
processQueue();
}
};
thread.setPriority(Thread.MAX_PRIORITY);
thread.setDaemon(true);
thread.setName("ActiveMQ Data File Writer");
thread.start();
firstAsyncException = null;
}
if (firstAsyncException != null) {
throw firstAsyncException;
}
while ( true ) {
if (nextWriteBatch == null) {
DataFile file = journal.getCurrentWriteFile();
if( file.getLength() > journal.getMaxFileLength() ) {
file = journal.rotateWriteFile();
}
nextWriteBatch = new WriteBatch(file, file.getLength(), write);
enqueueMutex.notifyAll();
break;
} else {
// Append to current batch if possible..
if (nextWriteBatch.canAppend(write)) {
nextWriteBatch.append(write);
break;
} else {
// Otherwise wait for the queuedCommand to be null
try {
while (nextWriteBatch != null) {
final long start = System.currentTimeMillis();
enqueueMutex.wait();
if (maxStat > 0) {
System.err.println("Watiting for write to finish with full batch... millis: " + (System.currentTimeMillis() - start));
}
}
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
if (shutdown) {
throw new IOException("Async Writter Thread Shutdown");
}
}
}
}
if (!write.sync) {
inflightWrites.put(new Journal.WriteKey(write.location), write);
}
return nextWriteBatch;
}
}
public void close() throws IOException {
synchronized (enqueueMutex) {
if (!shutdown) {
shutdown = true;
if (running) {
enqueueMutex.notifyAll();
} else {
shutdownDone.countDown();
}
}
}
try {
shutdownDone.await();
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
}
public static final String PROPERTY_LOG_WRITE_STAT_WINDOW = "org.apache.kahadb.journal.appender.WRITE_STAT_WINDOW";
public static final int maxStat = Integer.parseInt(System.getProperty(PROPERTY_LOG_WRITE_STAT_WINDOW, "0"));
int statIdx = 0;
int[] stats = new int[maxStat];
/** /**
* The async processing loop that writes to the data files and does the * The async processing loop that writes to the data files and does the
* force calls. Since the file sync() call is the slowest of all the * force calls. Since the file sync() call is the slowest of all the
@ -258,6 +79,7 @@ class CallerBufferingDataFileAppender implements FileAppender {
* accomplished attaching the same CountDownLatch instance to every force * accomplished attaching the same CountDownLatch instance to every force
* request in a group. * request in a group.
*/ */
@Override
protected void processQueue() { protected void processQueue() {
DataFile dataFile = null; DataFile dataFile = null;
RandomAccessFile file = null; RandomAccessFile file = null;
@ -337,27 +159,9 @@ class CallerBufferingDataFileAppender implements FileAppender {
Journal.WriteCommand lastWrite = wb.writes.getTail(); Journal.WriteCommand lastWrite = wb.writes.getTail();
journal.setLastAppendLocation(lastWrite.location); journal.setLastAppendLocation(lastWrite.location);
signalDone(wb);
// Now that the data is on disk, remove the writes from the in
// flight
// cache.
Journal.WriteCommand write = wb.writes.getHead();
while (write != null) {
if (!write.sync) {
inflightWrites.remove(new Journal.WriteKey(write.location));
}
if (write.onComplete != null) {
try {
write.onComplete.run();
} catch (Throwable e) {
e.printStackTrace();
}
}
write = write.getNext();
}
// Signal any waiting threads that the write is on disk.
wb.latch.countDown();
} }
} catch (IOException e) { } catch (IOException e) {
synchronized (enqueueMutex) { synchronized (enqueueMutex) {
@ -388,6 +192,6 @@ class CallerBufferingDataFileAppender implements FileAppender {
buff.writeInt(write.location.getSize()); buff.writeInt(write.location.getSize());
buff.writeByte(write.location.getType()); buff.writeByte(write.location.getType());
buff.write(write.data.getData(), write.data.getOffset(), write.data.getLength()); buff.write(write.data.getData(), write.data.getOffset(), write.data.getLength());
return write.sync | write.onComplete != null; return write.sync | (syncOnComplete && write.onComplete != null);
} }
} }

View File

@ -48,8 +48,9 @@ class DataFileAppender implements FileAppender {
protected IOException firstAsyncException; protected IOException firstAsyncException;
protected final CountDownLatch shutdownDone = new CountDownLatch(1); protected final CountDownLatch shutdownDone = new CountDownLatch(1);
protected int maxWriteBatchSize; protected int maxWriteBatchSize;
protected final boolean syncOnComplete;
private boolean running; protected boolean running;
private Thread thread; private Thread thread;
public static class WriteKey { public static class WriteKey {
@ -83,16 +84,20 @@ class DataFileAppender implements FileAppender {
public final LinkedNodeList<Journal.WriteCommand> writes = new LinkedNodeList<Journal.WriteCommand>(); public final LinkedNodeList<Journal.WriteCommand> writes = new LinkedNodeList<Journal.WriteCommand>();
public final CountDownLatch latch = new CountDownLatch(1); public final CountDownLatch latch = new CountDownLatch(1);
private final int offset; protected final int offset;
public int size = Journal.BATCH_CONTROL_RECORD_SIZE; public int size = Journal.BATCH_CONTROL_RECORD_SIZE;
public AtomicReference<IOException> exception = new AtomicReference<IOException>(); public AtomicReference<IOException> exception = new AtomicReference<IOException>();
public WriteBatch(DataFile dataFile, int offset, Journal.WriteCommand write) throws IOException { public WriteBatch(DataFile dataFile,int offset) {
this.dataFile = dataFile; this.dataFile = dataFile;
this.offset = offset; this.offset = offset;
this.dataFile.incrementLength(Journal.BATCH_CONTROL_RECORD_SIZE); this.dataFile.incrementLength(Journal.BATCH_CONTROL_RECORD_SIZE);
this.size=Journal.BATCH_CONTROL_RECORD_SIZE; this.size=Journal.BATCH_CONTROL_RECORD_SIZE;
journal.addToTotalLength(Journal.BATCH_CONTROL_RECORD_SIZE); journal.addToTotalLength(Journal.BATCH_CONTROL_RECORD_SIZE);
}
public WriteBatch(DataFile dataFile, int offset, Journal.WriteCommand write) throws IOException {
this(dataFile, offset);
append(write); append(write);
} }
@ -122,6 +127,7 @@ class DataFileAppender implements FileAppender {
this.journal = dataManager; this.journal = dataManager;
this.inflightWrites = this.journal.getInflightWrites(); this.inflightWrites = this.journal.getInflightWrites();
this.maxWriteBatchSize = this.journal.getWriteBatchSize(); this.maxWriteBatchSize = this.journal.getWriteBatchSize();
this.syncOnComplete = this.journal.isEnableAsyncDiskSync();
} }
public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException { public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException {
@ -199,7 +205,7 @@ class DataFileAppender implements FileAppender {
file = journal.rotateWriteFile(); file = journal.rotateWriteFile();
} }
nextWriteBatch = new WriteBatch(file, file.getLength(), write); nextWriteBatch = newWriteBatch(write, file);
enqueueMutex.notifyAll(); enqueueMutex.notifyAll();
break; break;
} else { } else {
@ -233,6 +239,10 @@ class DataFileAppender implements FileAppender {
} }
} }
protected WriteBatch newWriteBatch(Journal.WriteCommand write, DataFile file) throws IOException {
return new WriteBatch(file, file.getLength(), write);
}
public void close() throws IOException { public void close() throws IOException {
synchronized (enqueueMutex) { synchronized (enqueueMutex) {
if (!shutdown) { if (!shutdown) {
@ -253,8 +263,6 @@ class DataFileAppender implements FileAppender {
} }
public static final String PROPERTY_LOG_WRITE_STAT_WINDOW = "org.apache.kahadb.journal.appender.WRITE_STAT_WINDOW";
public static final int maxStat = Integer.parseInt(System.getProperty(PROPERTY_LOG_WRITE_STAT_WINDOW, "0"));
int statIdx = 0; int statIdx = 0;
int[] stats = new int[maxStat]; int[] stats = new int[maxStat];
/** /**
@ -317,7 +325,7 @@ class DataFileAppender implements FileAppender {
boolean forceToDisk = false; boolean forceToDisk = false;
while (write != null) { while (write != null) {
forceToDisk |= write.sync | write.onComplete != null; forceToDisk |= write.sync | (syncOnComplete && write.onComplete != null);
buff.writeInt(write.location.getSize()); buff.writeInt(write.location.getSize());
buff.writeByte(write.location.getType()); buff.writeByte(write.location.getType());
buff.write(write.data.getData(), write.data.getOffset(), write.data.getLength()); buff.write(write.data.getData(), write.data.getOffset(), write.data.getLength());
@ -363,26 +371,7 @@ class DataFileAppender implements FileAppender {
Journal.WriteCommand lastWrite = wb.writes.getTail(); Journal.WriteCommand lastWrite = wb.writes.getTail();
journal.setLastAppendLocation(lastWrite.location); journal.setLastAppendLocation(lastWrite.location);
// Now that the data is on disk, remove the writes from the in signalDone(wb);
// flight
// cache.
write = wb.writes.getHead();
while (write != null) {
if (!write.sync) {
inflightWrites.remove(new Journal.WriteKey(write.location));
}
if (write.onComplete != null) {
try {
write.onComplete.run();
} catch (Throwable e) {
e.printStackTrace();
}
}
write = write.getNext();
}
// Signal any waiting threads that the write is on disk.
wb.latch.countDown();
} }
} catch (IOException e) { } catch (IOException e) {
synchronized (enqueueMutex) { synchronized (enqueueMutex) {
@ -409,4 +398,26 @@ class DataFileAppender implements FileAppender {
} }
} }
protected void signalDone(WriteBatch wb) {
// Now that the data is on disk, remove the writes from the in
// flight
// cache.
Journal.WriteCommand write = wb.writes.getHead();
while (write != null) {
if (!write.sync) {
inflightWrites.remove(new Journal.WriteKey(write.location));
}
if (write.onComplete != null) {
try {
write.onComplete.run();
} catch (Throwable e) {
e.printStackTrace();
}
}
write = write.getNext();
}
// Signal any waiting threads that the write is on disk.
wb.latch.countDown();
}
} }

View File

@ -3,10 +3,10 @@ package org.apache.kahadb.journal;
import java.io.IOException; import java.io.IOException;
import org.apache.kahadb.util.ByteSequence; import org.apache.kahadb.util.ByteSequence;
/**
* User: gtully
*/
public interface FileAppender { public interface FileAppender {
public static final String PROPERTY_LOG_WRITE_STAT_WINDOW = "org.apache.kahadb.journal.appender.WRITE_STAT_WINDOW";
public static final int maxStat = Integer.parseInt(System.getProperty(PROPERTY_LOG_WRITE_STAT_WINDOW, "0"));
Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException; Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException;
Location storeItem(ByteSequence data, byte type, Runnable onComplete) throws IOException; Location storeItem(ByteSequence data, byte type, Runnable onComplete) throws IOException;

View File

@ -118,6 +118,7 @@ public class Journal {
private ReplicationTarget replicationTarget; private ReplicationTarget replicationTarget;
protected boolean checksum; protected boolean checksum;
protected boolean checkForCorruptionOnStartup; protected boolean checkForCorruptionOnStartup;
protected boolean enableAsyncDiskSync = true;
private Timer timer; private Timer timer;
@ -753,6 +754,14 @@ public class Journal {
this.totalLength = storeSizeAccumulator; this.totalLength = storeSizeAccumulator;
} }
public void setEnableAsyncDiskSync(boolean val) {
this.enableAsyncDiskSync = val;
}
public boolean isEnableAsyncDiskSync() {
return enableAsyncDiskSync;
}
public static class WriteCommand extends LinkedNode<WriteCommand> { public static class WriteCommand extends LinkedNode<WriteCommand> {
public final Location location; public final Location location;
public final ByteSequence data; public final ByteSequence data;