diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java index 800b7a729f..8ab950bc7c 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java @@ -537,6 +537,30 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi letter.setUseIndexLFRUEviction(useIndexLFRUEviction); } + public void setEnableIndexDiskSyncs(boolean diskSyncs) { + letter.setEnableIndexDiskSyncs(diskSyncs); + } + + public boolean isEnableIndexDiskSyncs() { + return letter.isEnableIndexDiskSyncs(); + } + + public void setEnableIndexRecoveryFile(boolean enable) { + letter.setEnableIndexRecoveryFile(enable); + } + + public boolean isEnableIndexRecoveryFile() { + return letter.isEnableIndexRecoveryFile(); + } + + public void setEnableIndexPageCaching(boolean enable) { + letter.setEnableIndexPageCaching(enable); + } + + public boolean isEnableIndexPageCaching() { + return isEnableIndexPageCaching(); + } + public KahaDBStore getStore() { return letter; } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 5e8eeee99e..9ea92e70cf 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -184,6 +184,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe private boolean archiveCorruptedIndex = false; private boolean useIndexLFRUEviction = false; private float indexLFUEvictionFactor = 0.2f; + private boolean enableIndexDiskSyncs = true; + private boolean enableIndexRecoveryFile = true; + private boolean enableIndexPageCaching = true; public MessageDatabase() { } @@ -2058,6 +2061,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe index.setPageCacheSize(indexCacheSize); index.setUseLFRUEviction(isUseIndexLFRUEviction()); index.setLFUEvictionFactor(getIndexLFUEvictionFactor()); + index.setEnableDiskSyncs(isEnableIndexDiskSyncs()); + index.setEnableRecoveryFile(isEnableIndexRecoveryFile()); + index.setEnablePageCaching(isEnableIndexPageCaching()); return index; } @@ -2297,6 +2303,30 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe this.useIndexLFRUEviction = useIndexLFRUEviction; } + public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) { + this.enableIndexDiskSyncs = enableIndexDiskSyncs; + } + + public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) { + this.enableIndexRecoveryFile = enableIndexRecoveryFile; + } + + public void setEnableIndexPageCaching(boolean enableIndexPageCaching) { + this.enableIndexPageCaching = enableIndexPageCaching; + } + + public boolean isEnableIndexDiskSyncs() { + return enableIndexDiskSyncs; + } + + public boolean isEnableIndexRecoveryFile() { + return enableIndexRecoveryFile; + } + + public boolean isEnableIndexPageCaching() { + return enableIndexPageCaching; + } + // ///////////////////////////////////////////////////////////////// // Internal conversion methods. // ///////////////////////////////////////////////////////////////// diff --git a/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java b/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java index 81a04a9acd..9fc9fbcee4 100644 --- a/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java @@ -35,6 +35,7 @@ import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ConnectionControl; import org.junit.After; import org.junit.Ignore; +import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,7 +52,7 @@ public class KahaDBFastEnqueueTest { private boolean useBytesMessage= true; private final int parallelProducer = 2; private Vector exceptions = new Vector(); - final long toSend = 500000; + final long toSend = 1000;//500000; @Ignore("not ready yet, exploring getting broker disk bound") public void testPublishNoConsumer() throws Exception { @@ -91,8 +92,8 @@ public class KahaDBFastEnqueueTest { System.out.println("Journal writes %: " + kahaDBPersistenceAdapter.getStore().getJournal().length() / (double)totalSent * 100 + "%"); //System.out.println("Index writes %: " + kahaDBPersistenceAdapter.getStore().getPageFile().totalWritten / (double)totalSent * 100 + "%"); - //restartBroker(0); - //consumeMessages(toSend); + restartBroker(0); + consumeMessages(toSend); } private void consumeMessages(long count) throws Exception { @@ -158,11 +159,13 @@ public class KahaDBFastEnqueueTest { kahaDBPersistenceAdapter.setCheckpointInterval(20 * 60 * 1000); // optimise for disk best batch rate - //kahaDBPersistenceAdapter.setJournalMaxWriteBatchSize(128*1024); //4mb default - kahaDBPersistenceAdapter.setJournalMaxFileLength(1024*1024*1024); // 32mb default + kahaDBPersistenceAdapter.setJournalMaxWriteBatchSize(24*1024*1024); //4mb default + kahaDBPersistenceAdapter.setJournalMaxFileLength(128*1024*1024); // 32mb default // keep index in memory kahaDBPersistenceAdapter.setIndexCacheSize(500000); kahaDBPersistenceAdapter.setIndexWriteBatchSize(500000); + kahaDBPersistenceAdapter.setEnableIndexRecoveryFile(false); + kahaDBPersistenceAdapter.setEnableIndexDiskSyncs(false); broker.setUseJmx(false); broker.addConnector("tcp://0.0.0.0:0"); diff --git a/kahadb/src/main/java/org/apache/kahadb/journal/CallerBufferingDataFileAppender.java b/kahadb/src/main/java/org/apache/kahadb/journal/CallerBufferingDataFileAppender.java new file mode 100644 index 0000000000..65a8967a73 --- /dev/null +++ b/kahadb/src/main/java/org/apache/kahadb/journal/CallerBufferingDataFileAppender.java @@ -0,0 +1,393 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kahadb.journal; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.RandomAccessFile; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.zip.Adler32; +import java.util.zip.Checksum; +import org.apache.kahadb.util.ByteSequence; +import org.apache.kahadb.util.DataByteArrayOutputStream; +import org.apache.kahadb.util.LinkedNodeList; + +/** + * An optimized writer to do batch appends to a data file. This object is thread + * safe and gains throughput as you increase the number of concurrent writes it + * does. + * The thread calling enqueue does the file open and buffering of the data, which + * reduces the round trip of the write thread. + * + */ +class CallerBufferingDataFileAppender implements FileAppender { + + protected final Journal journal; + protected final Map inflightWrites; + protected final Object enqueueMutex = new Object() { + }; + protected WriteBatch nextWriteBatch; + + protected boolean shutdown; + protected IOException firstAsyncException; + protected final CountDownLatch shutdownDone = new CountDownLatch(1); + protected int maxWriteBatchSize; + + private boolean running; + private Thread thread; + + final DataByteArrayOutputStream cachedBuffers[] = new DataByteArrayOutputStream[] { + new DataByteArrayOutputStream(maxWriteBatchSize), + new DataByteArrayOutputStream(maxWriteBatchSize) + }; + AtomicInteger writeBatchInstanceCount = new AtomicInteger(); + public class WriteBatch { + + DataByteArrayOutputStream buff = cachedBuffers[writeBatchInstanceCount.getAndIncrement()%2]; + public final DataFile dataFile; + + public final LinkedNodeList writes = new LinkedNodeList(); + public final CountDownLatch latch = new CountDownLatch(1); + private final int offset; + public int size = Journal.BATCH_CONTROL_RECORD_SIZE; + public AtomicReference exception = new AtomicReference(); + public boolean forceToDisk; + + public WriteBatch(DataFile dataFile, int offset, Journal.WriteCommand write) throws IOException { + this.dataFile = dataFile; + this.offset = offset; + this.dataFile.incrementLength(Journal.BATCH_CONTROL_RECORD_SIZE); + this.size=Journal.BATCH_CONTROL_RECORD_SIZE; + journal.addToTotalLength(Journal.BATCH_CONTROL_RECORD_SIZE); + initBuffer(buff); + append(write); + } + + public boolean canAppend(Journal.WriteCommand write) { + int newSize = size + write.location.getSize(); + if (newSize >= maxWriteBatchSize || offset+newSize > journal.getMaxFileLength() ) { + return false; + } + return true; + } + + public void append(Journal.WriteCommand write) throws IOException { + this.writes.addLast(write); + write.location.setDataFileId(dataFile.getDataFileId()); + write.location.setOffset(offset+size); + int s = write.location.getSize(); + size += s; + dataFile.incrementLength(s); + journal.addToTotalLength(s); + forceToDisk |= appendToBuffer(write, buff); + } + } + + private void initBuffer(DataByteArrayOutputStream buff) throws IOException { + // Write an empty batch control record. + buff.reset(); + buff.write(Journal.BATCH_CONTROL_RECORD_HEADER); + buff.writeInt(0); + buff.writeLong(0); + } + + /** + * Construct a Store writer + */ + public CallerBufferingDataFileAppender(Journal dataManager) { + this.journal = dataManager; + this.inflightWrites = this.journal.getInflightWrites(); + this.maxWriteBatchSize = this.journal.getWriteBatchSize(); + } + + public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException { + + // Write the packet our internal buffer. + int size = data.getLength() + Journal.RECORD_HEAD_SPACE; + + final Location location = new Location(); + location.setSize(size); + location.setType(type); + + Journal.WriteCommand write = new Journal.WriteCommand(location, data, sync); + + WriteBatch batch = enqueue(write); + location.setLatch(batch.latch); + if (sync) { + try { + batch.latch.await(); + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } + IOException exception = batch.exception.get(); + if (exception != null) { + throw exception; + } + } + + return location; + } + + public Location storeItem(ByteSequence data, byte type, Runnable onComplete) throws IOException { + // Write the packet our internal buffer. + int size = data.getLength() + Journal.RECORD_HEAD_SPACE; + + final Location location = new Location(); + location.setSize(size); + location.setType(type); + + Journal.WriteCommand write = new Journal.WriteCommand(location, data, onComplete); + + WriteBatch batch = enqueue(write); + + location.setLatch(batch.latch); + return location; + } + + private WriteBatch enqueue(Journal.WriteCommand write) throws IOException { + synchronized (enqueueMutex) { + if (shutdown) { + throw new IOException("Async Writter Thread Shutdown"); + } + + if (!running) { + running = true; + thread = new Thread() { + public void run() { + processQueue(); + } + }; + thread.setPriority(Thread.MAX_PRIORITY); + thread.setDaemon(true); + thread.setName("ActiveMQ Data File Writer"); + thread.start(); + firstAsyncException = null; + } + + if (firstAsyncException != null) { + throw firstAsyncException; + } + + while ( true ) { + if (nextWriteBatch == null) { + DataFile file = journal.getCurrentWriteFile(); + if( file.getLength() > journal.getMaxFileLength() ) { + file = journal.rotateWriteFile(); + } + + nextWriteBatch = new WriteBatch(file, file.getLength(), write); + enqueueMutex.notifyAll(); + break; + } else { + // Append to current batch if possible.. + if (nextWriteBatch.canAppend(write)) { + nextWriteBatch.append(write); + break; + } else { + // Otherwise wait for the queuedCommand to be null + try { + while (nextWriteBatch != null) { + final long start = System.currentTimeMillis(); + enqueueMutex.wait(); + if (maxStat > 0) { + System.err.println("Watiting for write to finish with full batch... millis: " + (System.currentTimeMillis() - start)); + } + } + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } + if (shutdown) { + throw new IOException("Async Writter Thread Shutdown"); + } + } + } + } + if (!write.sync) { + inflightWrites.put(new Journal.WriteKey(write.location), write); + } + return nextWriteBatch; + } + } + + public void close() throws IOException { + synchronized (enqueueMutex) { + if (!shutdown) { + shutdown = true; + if (running) { + enqueueMutex.notifyAll(); + } else { + shutdownDone.countDown(); + } + } + } + + try { + shutdownDone.await(); + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } + + } + + public static final String PROPERTY_LOG_WRITE_STAT_WINDOW = "org.apache.kahadb.journal.appender.WRITE_STAT_WINDOW"; + public static final int maxStat = Integer.parseInt(System.getProperty(PROPERTY_LOG_WRITE_STAT_WINDOW, "0")); + int statIdx = 0; + int[] stats = new int[maxStat]; + /** + * The async processing loop that writes to the data files and does the + * force calls. Since the file sync() call is the slowest of all the + * operations, this algorithm tries to 'batch' or group together several + * file sync() requests into a single file sync() call. The batching is + * accomplished attaching the same CountDownLatch instance to every force + * request in a group. + */ + protected void processQueue() { + DataFile dataFile = null; + RandomAccessFile file = null; + WriteBatch wb = null; + try { + + while (true) { + + Object o = null; + + // Block till we get a command. + synchronized (enqueueMutex) { + while (true) { + if (nextWriteBatch != null) { + o = nextWriteBatch; + nextWriteBatch = null; + break; + } + if (shutdown) { + return; + } + enqueueMutex.wait(); + } + enqueueMutex.notifyAll(); + } + + wb = (WriteBatch)o; + if (dataFile != wb.dataFile) { + if (file != null) { + file.setLength(dataFile.getLength()); + dataFile.closeRandomAccessFile(file); + } + dataFile = wb.dataFile; + file = dataFile.openRandomAccessFile(); + if( file.length() < journal.preferedFileLength ) { + file.setLength(journal.preferedFileLength); + } + } + + final DataByteArrayOutputStream buff = wb.buff; + final boolean forceToDisk = wb.forceToDisk; + + ByteSequence sequence = buff.toByteSequence(); + + // Now we can fill in the batch control record properly. + buff.reset(); + buff.skip(5+Journal.BATCH_CONTROL_RECORD_MAGIC.length); + buff.writeInt(sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE); + if( journal.isChecksum() ) { + Checksum checksum = new Adler32(); + checksum.update(sequence.getData(), sequence.getOffset()+Journal.BATCH_CONTROL_RECORD_SIZE, sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE); + buff.writeLong(checksum.getValue()); + } + + // Now do the 1 big write. + file.seek(wb.offset); + if (maxStat > 0) { + if (statIdx < maxStat) { + stats[statIdx++] = sequence.getLength(); + } else { + long all = 0; + for (;statIdx > 0;) { + all+= stats[--statIdx]; + } + System.err.println("Ave writeSize: " + all/maxStat); + } + } + file.write(sequence.getData(), sequence.getOffset(), sequence.getLength()); + ReplicationTarget replicationTarget = journal.getReplicationTarget(); + if( replicationTarget!=null ) { + replicationTarget.replicate(wb.writes.getHead().location, sequence, forceToDisk); + } + + if (forceToDisk) { + file.getFD().sync(); + } + + Journal.WriteCommand lastWrite = wb.writes.getTail(); + journal.setLastAppendLocation(lastWrite.location); + + // Now that the data is on disk, remove the writes from the in + // flight + // cache. + Journal.WriteCommand write = wb.writes.getHead(); + while (write != null) { + if (!write.sync) { + inflightWrites.remove(new Journal.WriteKey(write.location)); + } + if (write.onComplete != null) { + try { + write.onComplete.run(); + } catch (Throwable e) { + e.printStackTrace(); + } + } + write = write.getNext(); + } + + // Signal any waiting threads that the write is on disk. + wb.latch.countDown(); + } + } catch (IOException e) { + synchronized (enqueueMutex) { + firstAsyncException = e; + if (wb != null) { + wb.exception.set(e); + wb.latch.countDown(); + } + if (nextWriteBatch != null) { + nextWriteBatch.exception.set(e); + nextWriteBatch.latch.countDown(); + } + } + } catch (InterruptedException e) { + } finally { + try { + if (file != null) { + dataFile.closeRandomAccessFile(file); + } + } catch (Throwable ignore) { + } + shutdownDone.countDown(); + running = false; + } + } + + private boolean appendToBuffer(Journal.WriteCommand write, DataByteArrayOutputStream buff) throws IOException { + buff.writeInt(write.location.getSize()); + buff.writeByte(write.location.getType()); + buff.write(write.data.getData(), write.data.getOffset(), write.data.getLength()); + return write.sync | write.onComplete != null; + } +} diff --git a/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java b/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java index f5d9b120db..c966027306 100644 --- a/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java +++ b/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java @@ -33,7 +33,7 @@ public class DataFile extends LinkedNode implements Comparable inflightWrites; + private final Map inflightWrites; private final RandomAccessFile file; private boolean disposed; @@ -71,7 +69,7 @@ final class DataFileAccessor { throw new IOException("Invalid location: " + location); } - WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location)); + Journal.WriteCommand asyncWrite = (Journal.WriteCommand)inflightWrites.get(new Journal.WriteKey(location)); if (asyncWrite != null) { return asyncWrite.data; } @@ -106,7 +104,7 @@ final class DataFileAccessor { } public void readLocationDetails(Location location) throws IOException { - WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location)); + Journal.WriteCommand asyncWrite = (Journal.WriteCommand)inflightWrites.get(new Journal.WriteKey(location)); if (asyncWrite != null) { location.setSize(asyncWrite.location.getSize()); location.setType(asyncWrite.location.getType()); diff --git a/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java b/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java index 63cce00dfd..4fa2d4cd5a 100644 --- a/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java +++ b/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java @@ -27,7 +27,6 @@ import java.util.zip.Checksum; import org.apache.kahadb.util.ByteSequence; import org.apache.kahadb.util.DataByteArrayOutputStream; -import org.apache.kahadb.util.LinkedNode; import org.apache.kahadb.util.LinkedNodeList; /** @@ -37,10 +36,10 @@ import org.apache.kahadb.util.LinkedNodeList; * * */ -class DataFileAppender { +class DataFileAppender implements FileAppender { protected final Journal journal; - protected final Map inflightWrites; + protected final Map inflightWrites; protected final Object enqueueMutex = new Object() { }; protected WriteBatch nextWriteBatch; @@ -82,13 +81,13 @@ class DataFileAppender { public final DataFile dataFile; - public final LinkedNodeList writes = new LinkedNodeList(); + public final LinkedNodeList writes = new LinkedNodeList(); public final CountDownLatch latch = new CountDownLatch(1); private final int offset; public int size = Journal.BATCH_CONTROL_RECORD_SIZE; public AtomicReference exception = new AtomicReference(); - public WriteBatch(DataFile dataFile, int offset, WriteCommand write) throws IOException { + public WriteBatch(DataFile dataFile, int offset, Journal.WriteCommand write) throws IOException { this.dataFile = dataFile; this.offset = offset; this.dataFile.incrementLength(Journal.BATCH_CONTROL_RECORD_SIZE); @@ -97,7 +96,7 @@ class DataFileAppender { append(write); } - public boolean canAppend(WriteCommand write) { + public boolean canAppend(Journal.WriteCommand write) { int newSize = size + write.location.getSize(); if (newSize >= maxWriteBatchSize || offset+newSize > journal.getMaxFileLength() ) { return false; @@ -105,7 +104,7 @@ class DataFileAppender { return true; } - public void append(WriteCommand write) throws IOException { + public void append(Journal.WriteCommand write) throws IOException { this.writes.addLast(write); write.location.setDataFileId(dataFile.getDataFileId()); write.location.setOffset(offset+size); @@ -116,27 +115,6 @@ class DataFileAppender { } } - public static class WriteCommand extends LinkedNode { - public final Location location; - public final ByteSequence data; - final boolean sync; - public final Runnable onComplete; - - public WriteCommand(Location location, ByteSequence data, boolean sync) { - this.location = location; - this.data = data; - this.sync = sync; - this.onComplete = null; - } - - public WriteCommand(Location location, ByteSequence data, Runnable onComplete) { - this.location = location; - this.data = data; - this.onComplete = onComplete; - this.sync = false; - } - } - /** * Construct a Store writer */ @@ -155,7 +133,7 @@ class DataFileAppender { location.setSize(size); location.setType(type); - WriteCommand write = new WriteCommand(location, data, sync); + Journal.WriteCommand write = new Journal.WriteCommand(location, data, sync); WriteBatch batch = enqueue(write); location.setLatch(batch.latch); @@ -182,7 +160,7 @@ class DataFileAppender { location.setSize(size); location.setType(type); - WriteCommand write = new WriteCommand(location, data, onComplete); + Journal.WriteCommand write = new Journal.WriteCommand(location, data, onComplete); WriteBatch batch = enqueue(write); @@ -190,7 +168,7 @@ class DataFileAppender { return location; } - private WriteBatch enqueue(WriteCommand write) throws IOException { + private WriteBatch enqueue(Journal.WriteCommand write) throws IOException { synchronized (enqueueMutex) { if (shutdown) { throw new IOException("Async Writter Thread Shutdown"); @@ -220,7 +198,7 @@ class DataFileAppender { if( file.getLength() > journal.getMaxFileLength() ) { file = journal.rotateWriteFile(); } - + nextWriteBatch = new WriteBatch(file, file.getLength(), write); enqueueMutex.notifyAll(); break; @@ -249,7 +227,7 @@ class DataFileAppender { } } if (!write.sync) { - inflightWrites.put(new WriteKey(write.location), write); + inflightWrites.put(new Journal.WriteKey(write.location), write); } return nextWriteBatch; } @@ -327,7 +305,7 @@ class DataFileAppender { } } - WriteCommand write = wb.writes.getHead(); + Journal.WriteCommand write = wb.writes.getHead(); // Write an empty batch control record. buff.reset(); @@ -336,7 +314,7 @@ class DataFileAppender { buff.write(Journal.BATCH_CONTROL_RECORD_MAGIC); buff.writeInt(0); buff.writeLong(0); - + boolean forceToDisk = false; while (write != null) { forceToDisk |= write.sync | write.onComplete != null; @@ -382,7 +360,7 @@ class DataFileAppender { file.getFD().sync(); } - WriteCommand lastWrite = wb.writes.getTail(); + Journal.WriteCommand lastWrite = wb.writes.getTail(); journal.setLastAppendLocation(lastWrite.location); // Now that the data is on disk, remove the writes from the in @@ -391,7 +369,7 @@ class DataFileAppender { write = wb.writes.getHead(); while (write != null) { if (!write.sync) { - inflightWrites.remove(new WriteKey(write.location)); + inflightWrites.remove(new Journal.WriteKey(write.location)); } if (write.onComplete != null) { try { diff --git a/kahadb/src/main/java/org/apache/kahadb/journal/FileAppender.java b/kahadb/src/main/java/org/apache/kahadb/journal/FileAppender.java new file mode 100644 index 0000000000..50dd45e1f2 --- /dev/null +++ b/kahadb/src/main/java/org/apache/kahadb/journal/FileAppender.java @@ -0,0 +1,15 @@ +package org.apache.kahadb.journal; + +import java.io.IOException; +import org.apache.kahadb.util.ByteSequence; + +/** + * User: gtully + */ +public interface FileAppender { + Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException; + + Location storeItem(ByteSequence data, byte type, Runnable onComplete) throws IOException; + + void close() throws IOException; +} diff --git a/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java b/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java index 327d91b892..49a726dce9 100644 --- a/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java +++ b/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java @@ -36,10 +36,9 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.zip.Adler32; import java.util.zip.Checksum; +import org.apache.kahadb.util.LinkedNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.kahadb.journal.DataFileAppender.WriteCommand; -import org.apache.kahadb.journal.DataFileAppender.WriteKey; import org.apache.kahadb.util.ByteSequence; import org.apache.kahadb.util.DataByteArrayInputStream; import org.apache.kahadb.util.DataByteArrayOutputStream; @@ -53,6 +52,8 @@ import org.apache.kahadb.util.Sequence; * */ public class Journal { + public static final String CALLER_BUFFER_APPENDER = "org.apache.kahadb.journal.CALLER_BUFFER_APPENDER"; + public static final boolean callerBufferAppender = Boolean.parseBoolean(System.getProperty(CALLER_BUFFER_APPENDER, "false")); private static final int MAX_BATCH_SIZE = 32*1024*1024; @@ -103,7 +104,7 @@ public class Journal { protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF; protected int writeBatchSize = DEFAULT_MAX_WRITE_BATCH_SIZE; - protected DataFileAppender appender; + protected FileAppender appender; protected DataFileAccessorPool accessorPool; protected Map fileMap = new HashMap(); @@ -130,7 +131,7 @@ public class Journal { started = true; preferedFileLength = Math.max(PREFERED_DIFF, getMaxFileLength() - PREFERED_DIFF); - appender = new DataFileAppender(this); + appender = callerBufferAppender ? new CallerBufferingDataFileAppender(this) : new DataFileAppender(this); File[] files = directory.listFiles(new FilenameFilter() { public boolean accept(File dir, String n) { @@ -751,4 +752,50 @@ public class Journal { public void setSizeAccumulator(AtomicLong storeSizeAccumulator) { this.totalLength = storeSizeAccumulator; } + + public static class WriteCommand extends LinkedNode { + public final Location location; + public final ByteSequence data; + final boolean sync; + public final Runnable onComplete; + + public WriteCommand(Location location, ByteSequence data, boolean sync) { + this.location = location; + this.data = data; + this.sync = sync; + this.onComplete = null; + } + + public WriteCommand(Location location, ByteSequence data, Runnable onComplete) { + this.location = location; + this.data = data; + this.onComplete = onComplete; + this.sync = false; + } + } + + public static class WriteKey { + private final int file; + private final long offset; + private final int hash; + + public WriteKey(Location item) { + file = item.getDataFileId(); + offset = item.getOffset(); + // TODO: see if we can build a better hash + hash = (int)(file ^ offset); + } + + public int hashCode() { + return hash; + } + + public boolean equals(Object obj) { + if (obj instanceof WriteKey) { + WriteKey di = (WriteKey)obj; + return di.file == file && di.offset == offset; + } + return false; + } + } } diff --git a/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java b/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java index 167029a245..f266811a65 100644 --- a/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java +++ b/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java @@ -1039,7 +1039,9 @@ public class PageFile { } Checksum checksum = new Adler32(); - recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE); + if (enableRecoveryFile) { + recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE); + } for (PageWrite w : batch) { if (enableRecoveryFile) { try { @@ -1078,7 +1080,9 @@ public class PageFile { if (enableDiskSyncs) { // Sync to make sure recovery buffer writes land on disk.. - recoveryFile.getFD().sync(); + if (enableRecoveryFile) { + recoveryFile.getFD().sync(); + } writeFile.getFD().sync(); } } finally {