Updated fix for this issue allows for enabling non-forced metadata
updates to the file channel via FileChannel#force(false)

enable this by defining
"org.apache.activemq.kahaDB.files.skipMetadataUpdate=true"
This commit is contained in:
Timothy Bish 2014-01-09 17:21:56 -05:00
parent efb988655c
commit ef619b6a9b
6 changed files with 134 additions and 112 deletions

View File

@ -17,11 +17,11 @@
package org.apache.activemq.store.kahadb.disk.journal;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.zip.Adler32;
import java.util.zip.Checksum;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.RecoverableRandomAccessFile;
/**
@ -30,7 +30,7 @@ import org.apache.activemq.util.RecoverableRandomAccessFile;
* does.
* The thread calling enqueue does the file open and buffering of the data, which
* reduces the round trip of the write thread.
*
*
*/
class CallerBufferingDataFileAppender extends DataFileAppender {
@ -49,6 +49,7 @@ class CallerBufferingDataFileAppender extends DataFileAppender {
append(write);
}
@Override
public void append(Journal.WriteCommand write) throws IOException {
super.append(write);
forceToDisk |= appendToBuffer(write, buff);
@ -124,15 +125,15 @@ class CallerBufferingDataFileAppender extends DataFileAppender {
final boolean forceToDisk = wb.forceToDisk;
ByteSequence sequence = buff.toByteSequence();
// Now we can fill in the batch control record properly.
// Now we can fill in the batch control record properly.
buff.reset();
buff.skip(5+Journal.BATCH_CONTROL_RECORD_MAGIC.length);
buff.writeInt(sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE);
if( journal.isChecksum() ) {
Checksum checksum = new Adler32();
checksum.update(sequence.getData(), sequence.getOffset()+Journal.BATCH_CONTROL_RECORD_SIZE, sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE);
buff.writeLong(checksum.getValue());
Checksum checksum = new Adler32();
checksum.update(sequence.getData(), sequence.getOffset()+Journal.BATCH_CONTROL_RECORD_SIZE, sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE);
buff.writeLong(checksum.getValue());
}
// Now do the 1 big write.
@ -151,11 +152,11 @@ class CallerBufferingDataFileAppender extends DataFileAppender {
file.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
ReplicationTarget replicationTarget = journal.getReplicationTarget();
if( replicationTarget!=null ) {
replicationTarget.replicate(wb.writes.getHead().location, sequence, forceToDisk);
replicationTarget.replicate(wb.writes.getHead().location, sequence, forceToDisk);
}
if (forceToDisk) {
file.getFD().sync();
file.sync();
}
Journal.WriteCommand lastWrite = wb.writes.getTail();

View File

@ -17,7 +17,6 @@
package org.apache.activemq.store.kahadb.disk.journal;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Map;
import org.apache.activemq.util.ByteSequence;
@ -26,8 +25,8 @@ import org.apache.activemq.util.RecoverableRandomAccessFile;
/**
* Optimized Store reader and updater. Single threaded and synchronous. Use in
* conjunction with the DataFileAccessorPool of concurrent use.
*
*
*
*
*/
final class DataFileAccessor {
@ -38,7 +37,7 @@ final class DataFileAccessor {
/**
* Construct a Store reader
*
*
* @param fileId
* @throws IOException
*/
@ -70,7 +69,7 @@ final class DataFileAccessor {
throw new IOException("Invalid location: " + location);
}
Journal.WriteCommand asyncWrite = (Journal.WriteCommand)inflightWrites.get(new Journal.WriteKey(location));
Journal.WriteCommand asyncWrite = inflightWrites.get(new Journal.WriteKey(location));
if (asyncWrite != null) {
return asyncWrite.data;
}
@ -93,7 +92,7 @@ final class DataFileAccessor {
throw new IOException("Invalid location: " + location + ", : " + e, e);
}
}
public void readFully(long offset, byte data[]) throws IOException {
file.seek(offset);
file.readFully(data);
@ -105,7 +104,7 @@ final class DataFileAccessor {
}
public void readLocationDetails(Location location) throws IOException {
Journal.WriteCommand asyncWrite = (Journal.WriteCommand)inflightWrites.get(new Journal.WriteKey(location));
Journal.WriteCommand asyncWrite = inflightWrites.get(new Journal.WriteKey(location));
if (asyncWrite != null) {
location.setSize(asyncWrite.location.getSize());
location.setType(asyncWrite.location.getType());
@ -155,9 +154,7 @@ final class DataFileAccessor {
int size = Math.min(data.getLength(), location.getSize());
file.write(data.getData(), data.getOffset(), size);
if (sync) {
file.getFD().sync();
file.sync();
}
}
}

View File

@ -18,16 +18,15 @@ package org.apache.activemq.store.kahadb.disk.journal;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.RandomAccessFile;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.Adler32;
import java.util.zip.Checksum;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream;
import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.RecoverableRandomAccessFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -67,10 +66,12 @@ class DataFileAppender implements FileAppender {
hash = (int)(file ^ offset);
}
@Override
public int hashCode() {
return hash;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof WriteKey) {
WriteKey di = (WriteKey)obj;
@ -132,6 +133,7 @@ class DataFileAppender implements FileAppender {
this.syncOnComplete = this.journal.isEnableAsyncDiskSync();
}
@Override
public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException {
// Write the packet our internal buffer.
@ -160,6 +162,7 @@ class DataFileAppender implements FileAppender {
return location;
}
@Override
public Location storeItem(ByteSequence data, byte type, Runnable onComplete) throws IOException {
// Write the packet our internal buffer.
int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
@ -185,6 +188,7 @@ class DataFileAppender implements FileAppender {
if (!running) {
running = true;
thread = new Thread() {
@Override
public void run() {
processQueue();
}
@ -246,6 +250,7 @@ class DataFileAppender implements FileAppender {
return new WriteBatch(file, file.getLength(), write);
}
@Override
public void close() throws IOException {
synchronized (enqueueMutex) {
if (!shutdown) {
@ -365,7 +370,7 @@ class DataFileAppender implements FileAppender {
}
if (forceToDisk) {
file.getFD().sync();
file.sync();
}
Journal.WriteCommand lastWrite = wb.writes.getTail();

View File

@ -42,9 +42,15 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.Adler32;
import java.util.zip.Checksum;
import org.apache.activemq.util.*;
import org.apache.activemq.store.kahadb.disk.util.Sequence;
import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
import org.apache.activemq.util.DataByteArrayOutputStream;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.LFUCache;
import org.apache.activemq.util.LRUCache;
import org.apache.activemq.util.RecoverableRandomAccessFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -75,7 +81,7 @@ public class PageFile {
private static final Logger LOG = LoggerFactory.getLogger(PageFile.class);
// A PageFile will use a couple of files in this directory
private File directory;
private final File directory;
// And the file names in that directory will be based on this name.
private final String name;
@ -97,7 +103,7 @@ public class PageFile {
// The number of pages in the current recovery buffer
private int recoveryPageCount;
private AtomicBoolean loaded = new AtomicBoolean();
private final AtomicBoolean loaded = new AtomicBoolean();
// The number of pages we are aiming to write every time we
// write to disk.
int writeBatchSize = DEFAULT_WRITE_BATCH_SIZE;
@ -118,23 +124,23 @@ public class PageFile {
private boolean enabledWriteThread = false;
// These are used if enableAsyncWrites==true
private AtomicBoolean stopWriter = new AtomicBoolean();
private final AtomicBoolean stopWriter = new AtomicBoolean();
private Thread writerThread;
private CountDownLatch checkpointLatch;
// Keeps track of writes that are being written to disk.
private TreeMap<Long, PageWrite> writes = new TreeMap<Long, PageWrite>();
private final TreeMap<Long, PageWrite> writes = new TreeMap<Long, PageWrite>();
// Keeps track of free pages.
private final AtomicLong nextFreePageId = new AtomicLong();
private SequenceSet freeList = new SequenceSet();
private AtomicLong nextTxid = new AtomicLong();
private final AtomicLong nextTxid = new AtomicLong();
// Persistent settings stored in the page file.
private MetaData metaData;
private ArrayList<File> tmpFilesForRemoval = new ArrayList<File>();
private final ArrayList<File> tmpFilesForRemoval = new ArrayList<File>();
private boolean useLFRUEviction = false;
private float LFUEvictionFactor = 0.2f;
@ -521,6 +527,7 @@ public class PageFile {
}
@Override
public String toString() {
return "Page File: " + getMainPageFile();
}
@ -610,10 +617,10 @@ public class PageFile {
// So we don't loose it.. write it 2 times...
writeFile.seek(0);
writeFile.write(d);
writeFile.getFD().sync();
writeFile.sync();
writeFile.seek(PAGE_FILE_HEADER_SIZE / 2);
writeFile.write(d);
writeFile.getFD().sync();
writeFile.sync();
}
private void storeFreeList() throws IOException {
@ -880,14 +887,17 @@ public class PageFile {
private <T> void write(Page<T> page, byte[] data) throws IOException {
final PageWrite write = new PageWrite(page, data);
Entry<Long, PageWrite> entry = new Entry<Long, PageWrite>() {
@Override
public Long getKey() {
return write.getPage().getPageId();
}
@Override
public PageWrite getValue() {
return write;
}
@Override
public PageWrite setValue(PageWrite value) {
return null;
}
@ -1081,9 +1091,9 @@ public class PageFile {
if (enableDiskSyncs) {
// Sync to make sure recovery buffer writes land on disk..
if (enableRecoveryFile) {
recoveryFile.getFD().sync();
writeFile.sync();
}
writeFile.getFD().sync();
writeFile.sync();
}
} finally {
synchronized (writes) {
@ -1185,7 +1195,7 @@ public class PageFile {
}
// And sync it to disk
writeFile.getFD().sync();
writeFile.sync();
return nextTxId;
}

View File

@ -27,13 +27,16 @@ import java.util.Arrays;
*/
public class DiskBenchmark {
private static final boolean SKIP_METADATA_UPDATE =
Boolean.getBoolean("org.apache.activemq.file.skipMetadataUpdate");
boolean verbose;
// reads and writes work with 4k of data at a time.
int bs=1024*4;
int bs = 1024 * 4;
// Work with 100 meg file.
long size=1024*1024*500;
long sampleInterval = 10*1000;
long size = 1024 * 1024 * 500;
long sampleInterval = 10 * 1000;
public static void main(String[] args) {
DiskBenchmark benchmark = new DiskBenchmark();
@ -67,79 +70,69 @@ public class DiskBenchmark {
}
}
public static class Report {
public int size;
public int writes;
public long writeDuration;
public int syncWrites;
public long syncWriteDuration;
public int reads;
public long readDuration;
@Override
public String toString() {
return
"Writes: \n" +
" "+writes+" writes of size "+size+" written in "+(writeDuration/1000.0)+" seconds.\n"+
" "+getWriteRate()+" writes/second.\n"+
" "+getWriteSizeRate()+" megs/second.\n"+
"\n"+
"Sync Writes: \n" +
" "+syncWrites+" writes of size "+size+" written in "+(syncWriteDuration/1000.0)+" seconds.\n"+
" "+getSyncWriteRate()+" writes/second.\n"+
" "+getSyncWriteSizeRate()+" megs/second.\n"+
"\n"+
"Reads: \n" +
" "+reads+" reads of size "+size+" read in "+(readDuration/1000.0)+" seconds.\n"+
" "+getReadRate()+" writes/second.\n"+
" "+getReadSizeRate()+" megs/second.\n"+
"\n"+
"";
return "Writes: \n" + " " + writes + " writes of size " + size + " written in " + (writeDuration / 1000.0) + " seconds.\n" + " " + getWriteRate()
+ " writes/second.\n" + " " + getWriteSizeRate() + " megs/second.\n" + "\n" + "Sync Writes: \n" + " " + syncWrites + " writes of size "
+ size + " written in " + (syncWriteDuration / 1000.0) + " seconds.\n" + " " + getSyncWriteRate() + " writes/second.\n" + " "
+ getSyncWriteSizeRate() + " megs/second.\n" + "\n" + "Reads: \n" + " " + reads + " reads of size " + size + " read in "
+ (readDuration / 1000.0) + " seconds.\n" + " " + getReadRate() + " writes/second.\n" + " " + getReadSizeRate() + " megs/second.\n" + "\n"
+ "";
}
private float getWriteSizeRate() {
float rc = writes;
rc *= size;
rc /= (1024*1024); // put it in megs
rc /= (writeDuration/1000.0); // get rate.
rc /= (1024 * 1024); // put it in megs
rc /= (writeDuration / 1000.0); // get rate.
return rc;
}
private float getWriteRate() {
float rc = writes;
rc /= (writeDuration/1000.0); // get rate.
rc /= (writeDuration / 1000.0); // get rate.
return rc;
}
private float getSyncWriteSizeRate() {
float rc = syncWrites;
rc *= size;
rc /= (1024*1024); // put it in megs
rc /= (syncWriteDuration/1000.0); // get rate.
rc /= (1024 * 1024); // put it in megs
rc /= (syncWriteDuration / 1000.0); // get rate.
return rc;
}
private float getSyncWriteRate() {
float rc = syncWrites;
rc /= (syncWriteDuration/1000.0); // get rate.
rc /= (syncWriteDuration / 1000.0); // get rate.
return rc;
}
private float getReadSizeRate() {
float rc = reads;
rc *= size;
rc /= (1024*1024); // put it in megs
rc /= (readDuration/1000.0); // get rate.
rc /= (1024 * 1024); // put it in megs
rc /= (readDuration / 1000.0); // get rate.
return rc;
}
private float getReadRate() {
float rc = reads;
rc /= (readDuration/1000.0); // get rate.
rc /= (readDuration / 1000.0); // get rate.
return rc;
}
@ -200,64 +193,63 @@ public class DiskBenchmark {
}
}
public Report benchmark(File file) throws IOException {
Report rc = new Report();
// Initialize the block we will be writing to disk.
byte []data = new byte[bs];
byte[] data = new byte[bs];
for (int i = 0; i < data.length; i++) {
data[i] = (byte)('a'+(i%26));
data[i] = (byte) ('a' + (i % 26));
}
rc.size = data.length;
RandomAccessFile raf = new RandomAccessFile(file, "rw");
raf.setLength(size);
// Figure out how many writes we can do in the sample interval.
long start = System.currentTimeMillis();
long now = System.currentTimeMillis();
int ioCount=0;
while( true ) {
if( (now-start)>sampleInterval ) {
int ioCount = 0;
while (true) {
if ((now - start) > sampleInterval) {
break;
}
raf.seek(0);
for( long i=0; i+data.length < size; i+=data.length) {
for (long i = 0; i + data.length < size; i += data.length) {
raf.write(data);
ioCount++;
now = System.currentTimeMillis();
if( (now-start)>sampleInterval ) {
if ((now - start) > sampleInterval) {
break;
}
}
// Sync to disk so that the we actually write the data to disk.. otherwise
// OS buffering might not really do the write.
raf.getFD().sync();
// Sync to disk so that the we actually write the data to disk..
// otherwise OS buffering might not really do the write.
raf.getChannel().force(!SKIP_METADATA_UPDATE);
}
raf.getFD().sync();
raf.getChannel().force(!SKIP_METADATA_UPDATE);
raf.close();
now = System.currentTimeMillis();
rc.size = data.length;
rc.writes = ioCount;
rc.writeDuration = (now-start);
rc.writeDuration = (now - start);
raf = new RandomAccessFile(file, "rw");
start = System.currentTimeMillis();
now = System.currentTimeMillis();
ioCount=0;
while( true ) {
if( (now-start)>sampleInterval ) {
ioCount = 0;
while (true) {
if ((now - start) > sampleInterval) {
break;
}
for( long i=0; i+data.length < size; i+=data.length) {
for (long i = 0; i + data.length < size; i += data.length) {
raf.seek(i);
raf.write(data);
raf.getFD().sync();
raf.getChannel().force(false);
ioCount++;
now = System.currentTimeMillis();
if( (now-start)>sampleInterval ) {
if ((now - start) > sampleInterval) {
break;
}
}
@ -265,72 +257,63 @@ public class DiskBenchmark {
raf.close();
now = System.currentTimeMillis();
rc.syncWrites = ioCount;
rc.syncWriteDuration = (now-start);
rc.syncWriteDuration = (now - start);
raf = new RandomAccessFile(file, "rw");
start = System.currentTimeMillis();
now = System.currentTimeMillis();
ioCount=0;
while( true ) {
if( (now-start)>sampleInterval ) {
ioCount = 0;
while (true) {
if ((now - start) > sampleInterval) {
break;
}
raf.seek(0);
for( long i=0; i+data.length < size; i+=data.length) {
for (long i = 0; i + data.length < size; i += data.length) {
raf.seek(i);
raf.readFully(data);
ioCount++;
now = System.currentTimeMillis();
if( (now-start)>sampleInterval ) {
if ((now - start) > sampleInterval) {
break;
}
}
}
raf.close();
rc.reads = ioCount;
rc.readDuration = (now-start);
rc.readDuration = (now - start);
return rc;
}
public boolean isVerbose() {
return verbose;
}
public void setVerbose(boolean verbose) {
this.verbose = verbose;
}
public int getBs() {
return bs;
}
public void setBs(int bs) {
this.bs = bs;
}
public long getSize() {
return size;
}
public void setSize(long size) {
this.size = size;
}
public long getSampleInterval() {
return sampleInterval;
}
public void setSampleInterval(long sampleInterval) {
this.sampleInterval = sampleInterval;
}
}

View File

@ -16,10 +16,18 @@
*/
package org.apache.activemq.util;
import java.io.*;
import java.io.File;
import java.io.FileDescriptor;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
public class RecoverableRandomAccessFile implements java.io.DataOutput, java.io.DataInput, java.io.Closeable {
private static final boolean SKIP_METADATA_UPDATE =
Boolean.getBoolean("org.apache.activemq.kahaDB.files.skipMetadataUpdate");
RandomAccessFile raf;
File file;
String mode;
@ -389,6 +397,24 @@ public class RecoverableRandomAccessFile implements java.io.DataOutput, java.io.
}
}
public void sync() throws IOException {
try {
getRaf().getChannel().force(!SKIP_METADATA_UPDATE);;
} catch (IOException ioe) {
handleException();
throw ioe;
}
}
public FileChannel getChannel() throws IOException {
try {
return getRaf().getChannel();
} catch (IOException ioe) {
handleException();
throw ioe;
}
}
public int read(byte[] b, int off, int len) throws IOException {
try {
return getRaf().read(b, off, len);