Change the way journal records are validated on recovery to be consistent. We now use a checksum to provide better consistency.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@741741 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2009-02-06 21:35:39 +00:00
parent d761e80ae4
commit f73b622a60
13 changed files with 260 additions and 511 deletions

View File

@ -1277,7 +1277,6 @@ public class MessageDatabase {
Journal manager = new Journal();
manager.setDirectory(directory);
manager.setMaxFileLength(getJournalMaxFileLength());
manager.setUseNio(false);
return manager;
}

View File

@ -29,7 +29,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.kahadb.page.Page;
import org.apache.kahadb.page.PageFile;
import org.apache.kahadb.page.Transaction;
import org.apache.kahadb.page.Transaction.Closure;
import org.apache.kahadb.util.Marshaller;
/**

View File

@ -20,7 +20,6 @@ import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import org.apache.kahadb.journal.DataFileAppender.WriteCommand;
import org.apache.kahadb.util.IOHelper;
import org.apache.kahadb.util.LinkedNode;

View File

@ -81,12 +81,12 @@ final class DataFileAccessor {
if (location.getSize() == Location.NOT_SET) {
file.seek(location.getOffset());
location.setSize(file.readInt());
file.seek(location.getOffset() + Journal.ITEM_HEAD_SPACE);
location.setType(file.readByte());
} else {
file.seek(location.getOffset() + Journal.ITEM_HEAD_SPACE);
file.seek(location.getOffset() + Journal.RECORD_HEAD_SPACE);
}
byte[] data = new byte[location.getSize() - Journal.ITEM_HEAD_FOOT_SPACE];
byte[] data = new byte[location.getSize() - Journal.RECORD_HEAD_SPACE];
file.readFully(data);
return new ByteSequence(data, 0, data.length);
@ -94,6 +94,11 @@ final class DataFileAccessor {
throw new IOException("Invalid location: " + location + ", : " + e);
}
}
public void read(long offset, byte data[]) throws IOException {
file.seek(offset);
file.readFully(data);
}
public void readLocationDetails(Location location) throws IOException {
WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
@ -107,42 +112,42 @@ final class DataFileAccessor {
}
}
public boolean readLocationDetailsAndValidate(Location location) {
try {
WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
if (asyncWrite != null) {
location.setSize(asyncWrite.location.getSize());
location.setType(asyncWrite.location.getType());
} else {
file.seek(location.getOffset());
location.setSize(file.readInt());
location.setType(file.readByte());
byte data[] = new byte[3];
file.seek(location.getOffset() + Journal.ITEM_HEAD_OFFSET_TO_SOR);
file.readFully(data);
if (data[0] != Journal.ITEM_HEAD_SOR[0]
|| data[1] != Journal.ITEM_HEAD_SOR[1]
|| data[2] != Journal.ITEM_HEAD_SOR[2]) {
return false;
}
file.seek(location.getOffset() + location.getSize() - Journal.ITEM_FOOT_SPACE);
file.readFully(data);
if (data[0] != Journal.ITEM_HEAD_EOR[0]
|| data[1] != Journal.ITEM_HEAD_EOR[1]
|| data[2] != Journal.ITEM_HEAD_EOR[2]) {
return false;
}
}
} catch (IOException e) {
return false;
}
return true;
}
// public boolean readLocationDetailsAndValidate(Location location) {
// try {
// WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
// if (asyncWrite != null) {
// location.setSize(asyncWrite.location.getSize());
// location.setType(asyncWrite.location.getType());
// } else {
// file.seek(location.getOffset());
// location.setSize(file.readInt());
// location.setType(file.readByte());
//
// byte data[] = new byte[3];
// file.seek(location.getOffset() + Journal.ITEM_HEAD_OFFSET_TO_SOR);
// file.readFully(data);
// if (data[0] != Journal.ITEM_HEAD_SOR[0]
// || data[1] != Journal.ITEM_HEAD_SOR[1]
// || data[2] != Journal.ITEM_HEAD_SOR[2]) {
// return false;
// }
// file.seek(location.getOffset() + location.getSize() - Journal.ITEM_FOOT_SPACE);
// file.readFully(data);
// if (data[0] != Journal.ITEM_HEAD_EOR[0]
// || data[1] != Journal.ITEM_HEAD_EOR[1]
// || data[2] != Journal.ITEM_HEAD_EOR[2]) {
// return false;
// }
// }
// } catch (IOException e) {
// return false;
// }
// return true;
// }
public void updateRecord(Location location, ByteSequence data, boolean sync) throws IOException {
file.seek(location.getOffset() + Journal.ITEM_HEAD_SPACE);
file.seek(location.getOffset() + Journal.RECORD_HEAD_SPACE);
int size = Math.min(data.getLength(), location.getSize());
file.write(data.getData(), data.getOffset(), size);
if (sync) {

View File

@ -30,7 +30,7 @@ import java.util.Map;
*/
public class DataFileAccessorPool {
private final Journal dataManager;
private final Journal journal;
private final Map<Integer, Pool> pools = new HashMap<Integer, Pool>();
private boolean closed;
private int maxOpenReadersPerFile = 5;
@ -50,9 +50,9 @@ public class DataFileAccessorPool {
public DataFileAccessor openDataFileReader() throws IOException {
DataFileAccessor rc = null;
if (pool.isEmpty()) {
rc = new DataFileAccessor(dataManager, file);
rc = new DataFileAccessor(journal, file);
} else {
rc = (DataFileAccessor)pool.remove(pool.size() - 1);
rc = pool.remove(pool.size() - 1);
}
used = true;
openCounter++;
@ -91,12 +91,12 @@ public class DataFileAccessorPool {
}
public DataFileAccessorPool(Journal dataManager) {
this.dataManager = dataManager;
this.journal = dataManager;
}
synchronized void clearUsedMark() {
for (Iterator iter = pools.values().iterator(); iter.hasNext();) {
Pool pool = (Pool)iter.next();
for (Iterator<Pool> iter = pools.values().iterator(); iter.hasNext();) {
Pool pool = iter.next();
pool.clearUsedMark();
}
}

View File

@ -21,6 +21,8 @@ import java.io.InterruptedIOException;
import java.io.RandomAccessFile;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.zip.Adler32;
import java.util.zip.Checksum;
import org.apache.kahadb.util.ByteSequence;
import org.apache.kahadb.util.DataByteArrayOutputStream;
@ -36,10 +38,9 @@ import org.apache.kahadb.util.LinkedNodeList;
*/
class DataFileAppender {
protected static final byte[] RESERVED_SPACE = new byte[Journal.ITEM_HEAD_RESERVED_SPACE];
protected static final int DEFAULT_MAX_BATCH_SIZE = 1024 * 1024 * 4;
protected final Journal dataManager;
protected final Journal journal;
protected final Map<WriteKey, WriteCommand> inflightWrites;
protected final Object enqueueMutex = new Object() {
};
@ -84,19 +85,21 @@ class DataFileAppender {
public final LinkedNodeList<WriteCommand> writes = new LinkedNodeList<WriteCommand>();
public final CountDownLatch latch = new CountDownLatch(1);
public int size;
private final int offset;
public int size = Journal.BATCH_CONTROL_RECORD_SIZE;
public WriteBatch(DataFile dataFile, WriteCommand write) throws IOException {
public WriteBatch(DataFile dataFile, int offset, WriteCommand write) throws IOException {
this.dataFile = dataFile;
this.writes.addLast(write);
size += write.location.getSize();
this.offset = offset;
this.dataFile.incrementLength(Journal.BATCH_CONTROL_RECORD_SIZE);
this.size=Journal.BATCH_CONTROL_RECORD_SIZE;
journal.addToTotalLength(Journal.BATCH_CONTROL_RECORD_SIZE);
append(write);
}
public boolean canAppend(DataFile dataFile, WriteCommand write) {
if (dataFile != this.dataFile) {
return false;
}
if (size + write.location.getSize() >= maxWriteBatchSize) {
public boolean canAppend(WriteCommand write) {
int newSize = size + write.location.getSize();
if (newSize >= maxWriteBatchSize || offset+newSize > journal.getMaxFileLength() ) {
return false;
}
return true;
@ -104,7 +107,12 @@ class DataFileAppender {
public void append(WriteCommand write) throws IOException {
this.writes.addLast(write);
size += write.location.getSize();
write.location.setDataFileId(dataFile.getDataFileId());
write.location.setOffset(offset+size);
int s = write.location.getSize();
size += s;
dataFile.incrementLength(s);
journal.addToTotalLength(s);
}
}
@ -135,8 +143,8 @@ class DataFileAppender {
* @param fileId
*/
public DataFileAppender(Journal dataManager) {
this.dataManager = dataManager;
this.inflightWrites = this.dataManager.getInflightWrites();
this.journal = dataManager;
this.inflightWrites = this.journal.getInflightWrites();
}
/**
@ -153,7 +161,7 @@ class DataFileAppender {
public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException {
// Write the packet our internal buffer.
int size = data.getLength() + Journal.ITEM_HEAD_FOOT_SPACE;
int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
final Location location = new Location();
location.setSize(size);
@ -168,12 +176,7 @@ class DataFileAppender {
// by the data manager (which is basically just appending)
synchronized (this) {
// Find the position where this item will land at.
DataFile dataFile = dataManager.allocateLocation(location);
if (!sync) {
inflightWrites.put(new WriteKey(location), write);
}
batch = enqueue(dataFile, write);
batch = enqueue(write);
}
location.setLatch(batch.latch);
if (sync) {
@ -182,6 +185,8 @@ class DataFileAppender {
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
} else {
inflightWrites.put(new WriteKey(location), write);
}
return location;
@ -189,7 +194,7 @@ class DataFileAppender {
public Location storeItem(ByteSequence data, byte type, Runnable onComplete) throws IOException {
// Write the packet our internal buffer.
int size = data.getLength() + Journal.ITEM_HEAD_FOOT_SPACE;
int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
final Location location = new Location();
location.setSize(size);
@ -198,23 +203,15 @@ class DataFileAppender {
WriteBatch batch;
WriteCommand write = new WriteCommand(location, data, onComplete);
// Locate datafile and enqueue into the executor in sychronized block so
// that writes get equeued onto the executor in order that they were
// assigned
// by the data manager (which is basically just appending)
synchronized (this) {
// Find the position where this item will land at.
DataFile dataFile = dataManager.allocateLocation(location);
inflightWrites.put(new WriteKey(location), write);
batch = enqueue(dataFile, write);
batch = enqueue(write);
}
inflightWrites.put(new WriteKey(location), write);
location.setLatch(batch.latch);
return location;
}
private WriteBatch enqueue(DataFile dataFile, WriteCommand write) throws IOException {
private WriteBatch enqueue(WriteCommand write) throws IOException {
synchronized (enqueueMutex) {
WriteBatch rc = null;
if (shutdown) {
@ -237,35 +234,37 @@ class DataFileAppender {
thread.start();
}
if (nextWriteBatch == null) {
nextWriteBatch = new WriteBatch(dataFile, write);
rc = nextWriteBatch;
enqueueMutex.notify();
} else {
// Append to current batch if possible..
if (nextWriteBatch.canAppend(dataFile, write)) {
nextWriteBatch.append(write);
rc = nextWriteBatch;
} else {
// Otherwise wait for the queuedCommand to be null
try {
while (nextWriteBatch != null) {
enqueueMutex.wait();
}
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
if (shutdown) {
throw new IOException("Async Writter Thread Shutdown");
}
// Start a new batch.
nextWriteBatch = new WriteBatch(dataFile, write);
rc = nextWriteBatch;
enqueueMutex.notify();
}
while ( true ) {
if (nextWriteBatch == null) {
DataFile file = journal.getCurrentWriteFile();
if( file.getLength() > journal.getMaxFileLength() ) {
file = journal.rotateWriteFile();
}
nextWriteBatch = new WriteBatch(file, file.getLength(), write);
rc = nextWriteBatch;
enqueueMutex.notify();
return rc;
} else {
// Append to current batch if possible..
if (nextWriteBatch.canAppend(write)) {
nextWriteBatch.append(write);
return nextWriteBatch;
} else {
// Otherwise wait for the queuedCommand to be null
try {
while (nextWriteBatch != null) {
enqueueMutex.wait();
}
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
if (shutdown) {
throw new IOException("Async Writter Thread Shutdown");
}
}
}
}
return rc;
}
}
@ -331,67 +330,57 @@ class DataFileAppender {
}
dataFile = wb.dataFile;
file = dataFile.openRandomAccessFile();
if( file.length() < dataManager.preferedFileLength ) {
file.setLength(dataManager.preferedFileLength);
if( file.length() < journal.preferedFileLength ) {
file.setLength(journal.preferedFileLength);
}
}
WriteCommand write = wb.writes.getHead();
// Write all the data.
// Only need to seek to first location.. all others
// are in sequence.
file.seek(write.location.getOffset());
// Write an empty batch control record.
buff.reset();
buff.writeInt(Journal.BATCH_CONTROL_RECORD_SIZE);
buff.writeByte(Journal.BATCH_CONTROL_RECORD_TYPE);
buff.write(Journal.BATCH_CONTROL_RECORD_MAGIC);
buff.writeInt(0);
buff.writeLong(0);
boolean forceToDisk = false;
//
// is it just 1 big write?
ReplicationTarget replicationTarget = dataManager.getReplicationTarget();
if (wb.size == write.location.getSize() && replicationTarget==null) {
forceToDisk = write.sync | write.onComplete != null;
// Just write it directly..
file.writeInt(write.location.getSize());
file.writeByte(write.location.getType());
file.write(RESERVED_SPACE);
file.write(Journal.ITEM_HEAD_SOR);
file.write(write.data.getData(), write.data.getOffset(), write.data.getLength());
file.write(Journal.ITEM_HEAD_EOR);
} else {
// We are going to do 1 big write.
while (write != null) {
forceToDisk |= write.sync | write.onComplete != null;
buff.writeInt(write.location.getSize());
buff.writeByte(write.location.getType());
buff.write(RESERVED_SPACE);
buff.write(Journal.ITEM_HEAD_SOR);
buff.write(write.data.getData(), write.data.getOffset(), write.data.getLength());
buff.write(Journal.ITEM_HEAD_EOR);
write = write.getNext();
}
// Now do the 1 big write.
ByteSequence sequence = buff.toByteSequence();
file.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
if( replicationTarget!=null ) {
replicationTarget.replicate(wb.writes.getHead().location, sequence, forceToDisk);
}
buff.reset();
while (write != null) {
forceToDisk |= write.sync | write.onComplete != null;
buff.writeInt(write.location.getSize());
buff.writeByte(write.location.getType());
buff.write(write.data.getData(), write.data.getOffset(), write.data.getLength());
write = write.getNext();
}
ByteSequence sequence = buff.toByteSequence();
// Now we can fill in the batch control record properly.
buff.reset();
buff.skip(5+Journal.BATCH_CONTROL_RECORD_MAGIC.length);
buff.writeInt(sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE);
if( journal.isChecksum() ) {
Checksum checksum = new Adler32();
checksum.update(sequence.getData(), sequence.getOffset()+Journal.BATCH_CONTROL_RECORD_SIZE, sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE);
buff.writeLong(checksum.getValue());
}
// Now do the 1 big write.
file.seek(wb.offset);
file.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
ReplicationTarget replicationTarget = journal.getReplicationTarget();
if( replicationTarget!=null ) {
replicationTarget.replicate(wb.writes.getHead().location, sequence, forceToDisk);
}
if (forceToDisk) {
file.getFD().sync();
}
WriteCommand lastWrite = wb.writes.getTail();
dataManager.setLastAppendLocation(lastWrite.location);
journal.setLastAppendLocation(lastWrite.location);
// Now that the data is on disk, remove the writes from the in
// flight

View File

@ -19,6 +19,7 @@ package org.apache.kahadb.journal;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -31,12 +32,15 @@ import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.Adler32;
import java.util.zip.Checksum;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kahadb.journal.DataFileAppender.WriteCommand;
import org.apache.kahadb.journal.DataFileAppender.WriteKey;
import org.apache.kahadb.util.ByteSequence;
import org.apache.kahadb.util.DataByteArrayInputStream;
import org.apache.kahadb.util.LinkedNodeList;
import org.apache.kahadb.util.Scheduler;
@ -47,24 +51,17 @@ import org.apache.kahadb.util.Scheduler;
*/
public class Journal {
public static final int CONTROL_RECORD_MAX_LENGTH = 1024;
public static final int ITEM_HEAD_RESERVED_SPACE = 21;
// ITEM_HEAD_SPACE = length + type+ reserved space + SOR
public static final int ITEM_HEAD_SPACE = 4 + 1 + ITEM_HEAD_RESERVED_SPACE + 3;
public static final int ITEM_HEAD_OFFSET_TO_SOR = ITEM_HEAD_SPACE - 3;
public static final int ITEM_FOOT_SPACE = 3; // EOR
private static final int MAX_BATCH_SIZE = 32*1024*1024;
public static final int ITEM_HEAD_FOOT_SPACE = ITEM_HEAD_SPACE + ITEM_FOOT_SPACE;
public static final byte[] ITEM_HEAD_SOR = new byte[] {
'S', 'O', 'R'
}; //
public static final byte[] ITEM_HEAD_EOR = new byte[] {
'E', 'O', 'R'
}; //
public static final byte DATA_ITEM_TYPE = 1;
public static final byte REDO_ITEM_TYPE = 2;
// ITEM_HEAD_SPACE = length + type+ reserved space + SOR
public static final int RECORD_HEAD_SPACE = 4 + 1;
public static final byte USER_RECORD_TYPE = 1;
public static final byte BATCH_CONTROL_RECORD_TYPE = 2;
// Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch.
public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH");
public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE+BATCH_CONTROL_RECORD_MAGIC.length+4+8;
public static final String DEFAULT_DIRECTORY = ".";
public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive";
public static final String DEFAULT_FILE_PREFIX = "db-";
@ -82,8 +79,7 @@ public class Journal {
protected String filePrefix = DEFAULT_FILE_PREFIX;
protected String fileSuffix = DEFAULT_FILE_SUFFIX;
protected boolean started;
protected boolean useNio = true;
protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF;
@ -99,8 +95,8 @@ public class Journal {
protected final AtomicLong totalLength = new AtomicLong();
protected boolean archiveDataLogs;
private ReplicationTarget replicationTarget;
protected boolean checksum;
@SuppressWarnings("unchecked")
public synchronized void start() throws IOException {
if (started) {
return;
@ -111,11 +107,7 @@ public class Journal {
started = true;
preferedFileLength = Math.max(PREFERED_DIFF, getMaxFileLength() - PREFERED_DIFF);
if (useNio) {
appender = new NIODataFileAppender(this);
} else {
appender = new DataFileAppender(this);
}
appender = new DataFileAppender(this);
File[] files = directory.listFiles(new FilenameFilter() {
public boolean accept(File dir, String n) {
@ -148,26 +140,14 @@ public class Journal {
}
}
// Need to check the current Write File to see if there was a partial
// write to it.
if (!dataFiles.isEmpty()) {
// See if the lastSyncedLocation is valid..
Location l = lastAppendLocation.get();
if (l != null && l.getDataFileId() != dataFiles.getTail().getDataFileId()) {
l = null;
}
// If we know the last location that was ok.. then we can skip lots
// of checking
try {
l = recoveryCheck(dataFiles.getTail(), l);
lastAppendLocation.set(l);
} catch (IOException e) {
LOG.warn("recovery check failed", e);
}
getCurrentWriteFile();
try {
Location l = recoveryCheck(dataFiles.getTail());
lastAppendLocation.set(l);
} catch (IOException e) {
LOG.warn("recovery check failed", e);
}
cleanupTask = new Runnable() {
public void run() {
cleanup();
@ -178,44 +158,98 @@ public class Journal {
LOG.trace("Startup took: "+(end-start)+" ms");
}
protected Location recoveryCheck(DataFile dataFile, Location location) throws IOException {
if (location == null) {
location = new Location();
location.setDataFileId(dataFile.getDataFileId());
location.setOffset(0);
}
DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
private static byte[] bytes(String string) {
try {
return string.getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
}
protected Location recoveryCheck(DataFile dataFile) throws IOException {
byte controlRecord[] = new byte[BATCH_CONTROL_RECORD_SIZE];
DataByteArrayInputStream controlIs = new DataByteArrayInputStream(controlRecord);
Location location = new Location();
location.setDataFileId(dataFile.getDataFileId());
location.setOffset(0);
DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
try {
while (reader.readLocationDetailsAndValidate(location)) {
location.setOffset(location.getOffset() + location.getSize());
while( true ) {
reader.read(location.getOffset(), controlRecord);
controlIs.restart();
// Assert that it's a batch record.
if( controlIs.readInt() != BATCH_CONTROL_RECORD_SIZE ) {
break;
}
if( controlIs.readByte() != BATCH_CONTROL_RECORD_TYPE ) {
break;
}
for( int i=0; i < BATCH_CONTROL_RECORD_MAGIC.length; i++ ) {
if( controlIs.readByte() != BATCH_CONTROL_RECORD_MAGIC[i] ) {
break;
}
}
int size = controlIs.readInt();
if( size > MAX_BATCH_SIZE ) {
break;
}
if( isChecksum() ) {
long expectedChecksum = controlIs.readLong();
byte data[] = new byte[size];
reader.read(location.getOffset()+BATCH_CONTROL_RECORD_SIZE, data);
Checksum checksum = new Adler32();
checksum.update(data, 0, data.length);
if( expectedChecksum!=checksum.getValue() ) {
break;
}
}
location.setOffset(location.getOffset()+BATCH_CONTROL_RECORD_SIZE+size);
}
} finally {
} catch (IOException e) {
} finally {
accessorPool.closeDataFileAccessor(reader);
}
dataFile.setLength(location.getOffset());
return location;
}
synchronized DataFile allocateLocation(Location location) throws IOException {
if (dataFiles.isEmpty()|| ((dataFiles.getTail().getLength() + location.getSize()) > maxFileLength)) {
int nextNum = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1 : 1;
File file = getFile(nextNum);
DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength);
// actually allocate the disk space
fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
fileByFileMap.put(file, nextWriteFile);
dataFiles.addLast(nextWriteFile);
void addToTotalLength(int size) {
totalLength.addAndGet(size);
}
synchronized DataFile getCurrentWriteFile() throws IOException {
if (dataFiles.isEmpty()) {
rotateWriteFile();
}
DataFile currentWriteFile = dataFiles.getTail();
location.setOffset(currentWriteFile.getLength());
location.setDataFileId(currentWriteFile.getDataFileId().intValue());
int size = location.getSize();
currentWriteFile.incrementLength(size);
totalLength.addAndGet(size);
return currentWriteFile;
return dataFiles.getTail();
}
synchronized DataFile rotateWriteFile() {
int nextNum = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1 : 1;
File file = getFile(nextNum);
DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength);
// actually allocate the disk space
fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
fileByFileMap.put(file, nextWriteFile);
dataFiles.addLast(nextWriteFile);
return nextWriteFile;
}
public File getFile(int nextNum) {
String fileName = filePrefix + nextNum + fileSuffix;
File file = new File(directory, fileName);
@ -285,11 +319,7 @@ public class Journal {
// reopen open file handles...
accessorPool = new DataFileAccessorPool(this);
if (useNio) {
appender = new NIODataFileAppender(this);
} else {
appender = new DataFileAppender(this);
}
appender = new DataFileAppender(this);
return result;
}
@ -411,7 +441,7 @@ public class Journal {
if (cur.getType() == 0) {
return null;
} else if (cur.getType() > 0) {
} else if (cur.getType() == USER_RECORD_TYPE) {
// Only return user records.
return cur;
}
@ -496,10 +526,6 @@ public class Journal {
return loc;
}
public synchronized Location write(ByteSequence data, byte type, boolean sync) throws IOException, IllegalStateException {
return appender.storeItem(data, type, sync);
}
public void update(Location location, ByteSequence data, boolean sync) throws IOException {
DataFile dataFile = getDataFile(location);
DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile);
@ -538,14 +564,6 @@ public class Journal {
this.lastAppendLocation.set(lastSyncedLocation);
}
public boolean isUseNio() {
return useNio;
}
public void setUseNio(boolean useNio) {
this.useNio = useNio;
}
public File getDirectoryArchive() {
return directoryArchive;
}
@ -614,5 +632,13 @@ public class Journal {
this.fileSuffix = fileSuffix;
}
public boolean isChecksum() {
return checksum;
}
public void setChecksum(boolean checksumWrites) {
this.checksum = checksumWrites;
}
}

View File

@ -71,13 +71,6 @@ public final class Location implements Comparable<Location> {
this.size = size;
}
/**
* @return the size of the payload of the record.
*/
public int getPaylodSize() {
return size - Journal.ITEM_HEAD_FOOT_SPACE;
}
public int getOffset() {
return offset;
}

View File

@ -1,226 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kahadb.journal;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
/**
* An AsyncDataFileAppender that uses NIO ByteBuffers and File chanels to more
* efficently copy data to files.
*
* @version $Revision$
*/
class NIODataFileAppender extends DataFileAppender {
public NIODataFileAppender(Journal fileManager) {
super(fileManager);
}
/**
* The async processing loop that writes to the data files and does the
* force calls.
*
* Since the file sync() call is the slowest of all the operations, this
* algorithm tries to 'batch' or group together several file sync() requests
* into a single file sync() call. The batching is accomplished attaching
* the same CountDownLatch instance to every force request in a group.
*
*/
protected void processQueue() {
DataFile dataFile = null;
RandomAccessFile file = null;
FileChannel channel = null;
try {
ByteBuffer header = ByteBuffer.allocateDirect(Journal.ITEM_HEAD_SPACE);
ByteBuffer footer = ByteBuffer.allocateDirect(Journal.ITEM_FOOT_SPACE);
ByteBuffer buffer = ByteBuffer.allocateDirect(maxWriteBatchSize);
// Populate the static parts of the headers and footers..
header.putInt(0); // size
header.put((byte)0); // type
header.put(RESERVED_SPACE); // reserved
header.put(Journal.ITEM_HEAD_SOR);
footer.put(Journal.ITEM_HEAD_EOR);
while (true) {
Object o = null;
// Block till we get a command.
synchronized (enqueueMutex) {
while (true) {
if (nextWriteBatch != null) {
o = nextWriteBatch;
nextWriteBatch = null;
break;
}
if (shutdown) {
return;
}
enqueueMutex.wait();
}
enqueueMutex.notify();
}
WriteBatch wb = (WriteBatch)o;
if (dataFile != wb.dataFile) {
if (file != null) {
file.setLength(dataFile.getLength());
dataFile.closeRandomAccessFile(file);
}
dataFile = wb.dataFile;
file = dataFile.openRandomAccessFile();
if( file.length() < dataManager.preferedFileLength ) {
file.setLength(dataManager.preferedFileLength);
}
channel = file.getChannel();
}
WriteCommand write = wb.writes.getHead();
// Write all the data.
// Only need to seek to first location.. all others
// are in sequence.
file.seek(write.location.getOffset());
boolean forceToDisk=false;
//
// is it just 1 big write?
if (wb.size == write.location.getSize()) {
forceToDisk = write.sync | write.onComplete!=null;
header.clear();
header.putInt(write.location.getSize());
header.put(write.location.getType());
header.clear();
transfer(header, channel);
ByteBuffer source = ByteBuffer.wrap(write.data.getData(), write.data.getOffset(),
write.data.getLength());
transfer(source, channel);
footer.clear();
transfer(footer, channel);
} else {
// Combine the smaller writes into 1 big buffer
while (write != null) {
forceToDisk |= write.sync | write.onComplete!=null;
header.clear();
header.putInt(write.location.getSize());
header.put(write.location.getType());
header.clear();
copy(header, buffer);
assert !header.hasRemaining();
ByteBuffer source = ByteBuffer.wrap(write.data.getData(), write.data.getOffset(),
write.data.getLength());
copy(source, buffer);
assert !source.hasRemaining();
footer.clear();
copy(footer, buffer);
assert !footer.hasRemaining();
write = write.getNext();
}
// Fully write out the buffer..
buffer.flip();
transfer(buffer, channel);
buffer.clear();
}
if( forceToDisk ) {
file.getChannel().force(false);
}
WriteCommand lastWrite = wb.writes.getTail();
dataManager.setLastAppendLocation(lastWrite.location);
// Now that the data is on disk, remove the writes from the in
// flight
// cache.
write = wb.writes.getHead();
while (write != null) {
if (!write.sync) {
inflightWrites.remove(new WriteKey(write.location));
}
if (write.onComplete != null) {
try {
write.onComplete.run();
} catch (Throwable e) {
e.printStackTrace();
}
}
write = write.getNext();
}
// Signal any waiting threads that the write is on disk.
wb.latch.countDown();
}
} catch (IOException e) {
synchronized (enqueueMutex) {
firstAsyncException = e;
}
} catch (InterruptedException e) {
} finally {
try {
if (file != null) {
dataFile.closeRandomAccessFile(file);
}
} catch (IOException e) {
}
shutdownDone.countDown();
}
}
/**
* Copy the bytes in header to the channel.
*
* @param header - source of data
* @param channel - destination where the data will be written.
* @throws IOException
*/
private void transfer(ByteBuffer header, FileChannel channel) throws IOException {
while (header.hasRemaining()) {
channel.write(header);
}
}
private int copy(ByteBuffer src, ByteBuffer dest) {
int rc = Math.min(dest.remaining(), src.remaining());
if (rc > 0) {
// Adjust our limit so that we don't overflow the dest buffer.
int limit = src.limit();
src.limit(src.position() + rc);
dest.put(src);
// restore the limit.
src.limit(limit);
}
return rc;
}
}

View File

@ -20,8 +20,6 @@ import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import org.apache.kahadb.util.IOHelper;
/**
* Allows you to open a data file in read only mode. Useful when working with
* archived data files.

View File

@ -23,23 +23,18 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* An AsyncDataManager that works in read only mode against multiple data directories.
* Useful for reading back archived data files.
*/
public class ReadOnlyJournal extends Journal {
private static final Log LOG = LogFactory.getLog(ReadOnlyJournal.class);
private final ArrayList<File> dirs;
public ReadOnlyJournal(final ArrayList<File> dirs) {
this.dirs = dirs;
}
@SuppressWarnings("unchecked")
public synchronized void start() throws IOException {
if (started) {
return;

View File

@ -41,7 +41,6 @@ public class JournalTest extends TestCase {
}
protected void configure(Journal dataManager) {
dataManager.setUseNio(false);
}
@Override

View File

@ -1,27 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kahadb.journal;
import org.apache.kahadb.journal.Journal;
public class NioJournalTest extends JournalTest {
@Override
protected void configure(Journal dataManager) {
dataManager.setUseNio(true);
}
}