git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1241221 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-02-06 22:24:58 +00:00
parent a1d5ff0316
commit cdba931deb
12 changed files with 275 additions and 274 deletions

View File

@ -55,7 +55,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
protected BrokerService brokerService;
public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
public static final int LOG_SLOW_ACCESS_TIME = Integer.parseInt(System.getProperty(PROPERTY_LOG_SLOW_ACCESS_TIME, "0"));
public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0);
public static final File DEFAULT_DIRECTORY = new File("KahaDB");
protected static final Buffer UNMATCHED;
static {

View File

@ -25,7 +25,7 @@ import java.util.Map;
/**
* Used to pool DataFileAccessors.
*
*
* @author chirino
*/
public class DataFileAccessorPool {
@ -95,8 +95,7 @@ public class DataFileAccessorPool {
}
synchronized void clearUsedMark() {
for (Iterator<Pool> iter = pools.values().iterator(); iter.hasNext();) {
Pool pool = iter.next();
for (Pool pool : pools.values()) {
pool.clearUsedMark();
}
}
@ -153,8 +152,7 @@ public class DataFileAccessorPool {
return;
}
closed = true;
for (Iterator<Pool> iter = pools.values().iterator(); iter.hasNext();) {
Pool pool = iter.next();
for (Pool pool : pools.values()) {
pool.dispose();
}
pools.clear();

View File

@ -28,20 +28,21 @@ import java.util.zip.Checksum;
import org.apache.kahadb.util.ByteSequence;
import org.apache.kahadb.util.DataByteArrayOutputStream;
import org.apache.kahadb.util.LinkedNodeList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An optimized writer to do batch appends to a data file. This object is thread
* safe and gains throughput as you increase the number of concurrent writes it
* does.
*
*
*/
class DataFileAppender implements FileAppender {
private static final Logger logger = LoggerFactory.getLogger(DataFileAppender.class);
protected final Journal journal;
protected final Map<Journal.WriteKey, Journal.WriteCommand> inflightWrites;
protected final Object enqueueMutex = new Object() {
};
protected final Object enqueueMutex = new Object();
protected WriteBatch nextWriteBatch;
protected boolean shutdown;
@ -90,7 +91,7 @@ class DataFileAppender implements FileAppender {
public WriteBatch(DataFile dataFile,int offset) {
this.dataFile = dataFile;
this.offset = offset;
this.offset = offset;
this.dataFile.incrementLength(Journal.BATCH_CONTROL_RECORD_SIZE);
this.size=Journal.BATCH_CONTROL_RECORD_SIZE;
journal.addToTotalLength(Journal.BATCH_CONTROL_RECORD_SIZE);
@ -103,7 +104,7 @@ class DataFileAppender implements FileAppender {
public boolean canAppend(Journal.WriteCommand write) {
int newSize = size + write.location.getSize();
if (newSize >= maxWriteBatchSize || offset+newSize > journal.getMaxFileLength() ) {
if (newSize >= maxWriteBatchSize || offset+newSize > journal.getMaxFileLength() ) {
return false;
}
return true;
@ -114,7 +115,7 @@ class DataFileAppender implements FileAppender {
write.location.setDataFileId(dataFile.getDataFileId());
write.location.setOffset(offset+size);
int s = write.location.getSize();
size += s;
size += s;
dataFile.incrementLength(s);
journal.addToTotalLength(s);
}
@ -131,7 +132,7 @@ class DataFileAppender implements FileAppender {
}
public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException {
// Write the packet our internal buffer.
int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
@ -149,11 +150,11 @@ class DataFileAppender implements FileAppender {
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
IOException exception = batch.exception.get();
IOException exception = batch.exception.get();
if (exception != null) {
throw exception;
throw exception;
}
}
}
return location;
}
@ -169,7 +170,7 @@ class DataFileAppender implements FileAppender {
Journal.WriteCommand write = new Journal.WriteCommand(location, data, onComplete);
WriteBatch batch = enqueue(write);
location.setLatch(batch.latch);
return location;
}
@ -179,7 +180,7 @@ class DataFileAppender implements FileAppender {
if (shutdown) {
throw new IOException("Async Writter Thread Shutdown");
}
if (!running) {
running = true;
thread = new Thread() {
@ -193,44 +194,45 @@ class DataFileAppender implements FileAppender {
thread.start();
firstAsyncException = null;
}
if (firstAsyncException != null) {
throw firstAsyncException;
}
while ( true ) {
if (nextWriteBatch == null) {
DataFile file = journal.getCurrentWriteFile();
if( file.getLength() > journal.getMaxFileLength() ) {
file = journal.rotateWriteFile();
}
if (nextWriteBatch == null) {
DataFile file = journal.getCurrentWriteFile();
if( file.getLength() > journal.getMaxFileLength() ) {
file = journal.rotateWriteFile();
}
nextWriteBatch = newWriteBatch(write, file);
enqueueMutex.notifyAll();
break;
} else {
// Append to current batch if possible..
if (nextWriteBatch.canAppend(write)) {
nextWriteBatch.append(write);
break;
} else {
// Otherwise wait for the queuedCommand to be null
try {
while (nextWriteBatch != null) {
final long start = System.currentTimeMillis();
enqueueMutex.wait();
if (maxStat > 0) {
System.err.println("Watiting for write to finish with full batch... millis: " + (System.currentTimeMillis() - start));
}
}
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
if (shutdown) {
throw new IOException("Async Writter Thread Shutdown");
}
}
}
nextWriteBatch = newWriteBatch(write, file);
enqueueMutex.notifyAll();
break;
} else {
// Append to current batch if possible..
if (nextWriteBatch.canAppend(write)) {
nextWriteBatch.append(write);
break;
} else {
// Otherwise wait for the queuedCommand to be null
try {
while (nextWriteBatch != null) {
final long start = System.currentTimeMillis();
enqueueMutex.wait();
if (maxStat > 0) {
logger.info("Watiting for write to finish with full batch... millis: " +
(System.currentTimeMillis() - start));
}
}
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
if (shutdown) {
throw new IOException("Async Writter Thread Shutdown");
}
}
}
}
if (!write.sync) {
inflightWrites.put(new Journal.WriteKey(write.location), write);
@ -282,13 +284,11 @@ class DataFileAppender implements FileAppender {
DataByteArrayOutputStream buff = new DataByteArrayOutputStream(maxWriteBatchSize);
while (true) {
Object o = null;
// Block till we get a command.
synchronized (enqueueMutex) {
while (true) {
if (nextWriteBatch != null) {
o = nextWriteBatch;
wb = nextWriteBatch;
nextWriteBatch = null;
break;
}
@ -300,7 +300,6 @@ class DataFileAppender implements FileAppender {
enqueueMutex.notifyAll();
}
wb = (WriteBatch)o;
if (dataFile != wb.dataFile) {
if (file != null) {
file.setLength(dataFile.getLength());
@ -333,15 +332,15 @@ class DataFileAppender implements FileAppender {
}
ByteSequence sequence = buff.toByteSequence();
// Now we can fill in the batch control record properly.
// Now we can fill in the batch control record properly.
buff.reset();
buff.skip(5+Journal.BATCH_CONTROL_RECORD_MAGIC.length);
buff.writeInt(sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE);
if( journal.isChecksum() ) {
Checksum checksum = new Adler32();
checksum.update(sequence.getData(), sequence.getOffset()+Journal.BATCH_CONTROL_RECORD_SIZE, sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE);
buff.writeLong(checksum.getValue());
Checksum checksum = new Adler32();
checksum.update(sequence.getData(), sequence.getOffset()+Journal.BATCH_CONTROL_RECORD_SIZE, sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE);
buff.writeLong(checksum.getValue());
}
// Now do the 1 big write.
@ -354,16 +353,16 @@ class DataFileAppender implements FileAppender {
for (;statIdx > 0;) {
all+= stats[--statIdx];
}
System.err.println("Ave writeSize: " + all/maxStat);
logger.info("Ave writeSize: " + all/maxStat);
}
}
file.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
ReplicationTarget replicationTarget = journal.getReplicationTarget();
if( replicationTarget!=null ) {
replicationTarget.replicate(wb.writes.getHead().location, sequence, forceToDisk);
replicationTarget.replicate(wb.writes.getHead().location, sequence, forceToDisk);
}
if (forceToDisk) {
file.getFD().sync();
}
@ -411,7 +410,7 @@ class DataFileAppender implements FileAppender {
try {
write.onComplete.run();
} catch (Throwable e) {
e.printStackTrace();
logger.info("Add exception was raised while executing the run command for onComplete", e);
}
}
write = write.getNext();

View File

@ -48,8 +48,8 @@ import org.apache.kahadb.util.Sequence;
/**
* Manages DataFiles
*
*
*
*
*/
public class Journal {
public static final String CALLER_BUFFER_APPENDER = "org.apache.kahadb.journal.CALLER_BUFFER_APPENDER";
@ -57,12 +57,12 @@ public class Journal {
private static final int MAX_BATCH_SIZE = 32*1024*1024;
// ITEM_HEAD_SPACE = length + type+ reserved space + SOR
// ITEM_HEAD_SPACE = length + type+ reserved space + SOR
public static final int RECORD_HEAD_SPACE = 4 + 1;
public static final byte USER_RECORD_TYPE = 1;
public static final byte BATCH_CONTROL_RECORD_TYPE = 2;
// Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch.
// Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch.
public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH");
public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE+BATCH_CONTROL_RECORD_MAGIC.length+4+8;
public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader();
@ -77,7 +77,7 @@ public class Journal {
sequence.compact();
return sequence.getData();
} catch (IOException e) {
throw new RuntimeException("Could not create batch control record header.");
throw new RuntimeException("Could not create batch control record header.", e);
}
}
@ -89,7 +89,7 @@ public class Journal {
public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30;
public static final int PREFERED_DIFF = 1024 * 512;
public static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4;
private static final Logger LOG = LoggerFactory.getLogger(Journal.class);
protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
@ -99,11 +99,11 @@ public class Journal {
protected String filePrefix = DEFAULT_FILE_PREFIX;
protected String fileSuffix = DEFAULT_FILE_SUFFIX;
protected boolean started;
protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF;
protected int writeBatchSize = DEFAULT_MAX_WRITE_BATCH_SIZE;
protected FileAppender appender;
protected DataFileAccessorPool accessorPool;
@ -115,18 +115,17 @@ public class Journal {
protected Runnable cleanupTask;
protected AtomicLong totalLength = new AtomicLong();
protected boolean archiveDataLogs;
private ReplicationTarget replicationTarget;
private ReplicationTarget replicationTarget;
protected boolean checksum;
protected boolean checkForCorruptionOnStartup;
protected boolean enableAsyncDiskSync = true;
private Timer timer;
public synchronized void start() throws IOException {
if (started) {
return;
}
long start = System.currentTimeMillis();
accessorPool = new DataFileAccessorPool(this);
started = true;
@ -141,9 +140,8 @@ public class Journal {
});
if (files != null) {
for (int i = 0; i < files.length; i++) {
for (File file : files) {
try {
File file = files[i];
String n = file.getName();
String numStr = n.substring(filePrefix.length(), n.length()-fileSuffix.length());
int num = Integer.parseInt(numStr);
@ -174,7 +172,7 @@ public class Journal {
}
}
getCurrentWriteFile();
getCurrentWriteFile();
if( lastAppendLocation.get()==null ) {
DataFile df = dataFiles.getTail();
@ -194,19 +192,19 @@ public class Journal {
}
private static byte[] bytes(String string) {
try {
return string.getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
}
try {
return string.getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
}
protected Location recoveryCheck(DataFile dataFile) throws IOException {
protected Location recoveryCheck(DataFile dataFile) throws IOException {
Location location = new Location();
location.setDataFileId(dataFile.getDataFileId());
location.setOffset(0);
DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
try {
while( true ) {
int size = checkBatchRecord(reader, location.getOffset());
@ -227,9 +225,9 @@ public class Journal {
}
}
}
} catch (IOException e) {
} finally {
} finally {
accessorPool.closeDataFileAccessor(reader);
}
@ -315,14 +313,14 @@ public class Journal {
}
void addToTotalLength(int size) {
totalLength.addAndGet(size);
}
void addToTotalLength(int size) {
totalLength.addAndGet(size);
}
public long length() {
return totalLength.get();
}
synchronized DataFile getCurrentWriteFile() throws IOException {
if (dataFiles.isEmpty()) {
rotateWriteFile();
@ -331,21 +329,21 @@ public class Journal {
}
synchronized DataFile rotateWriteFile() {
int nextNum = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1 : 1;
File file = getFile(nextNum);
DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength);
// actually allocate the disk space
fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
fileByFileMap.put(file, nextWriteFile);
dataFiles.addLast(nextWriteFile);
return nextWriteFile;
}
int nextNum = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1 : 1;
File file = getFile(nextNum);
DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength);
// actually allocate the disk space
fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
fileByFileMap.put(file, nextWriteFile);
dataFiles.addLast(nextWriteFile);
return nextWriteFile;
}
public File getFile(int nextNum) {
String fileName = filePrefix + nextNum + fileSuffix;
File file = new File(directory, fileName);
return file;
}
public File getFile(int nextNum) {
String fileName = filePrefix + nextNum + fileSuffix;
File file = new File(directory, fileName);
return file;
}
synchronized DataFile getDataFile(Location item) throws IOException {
Integer key = Integer.valueOf(item.getDataFileId());
@ -419,12 +417,12 @@ public class Journal {
public synchronized void removeDataFiles(Set<Integer> files) throws IOException {
for (Integer key : files) {
// Can't remove the data file (or subsequent files) that is currently being written to.
if( key >= lastAppendLocation.get().getDataFileId() ) {
continue;
}
if( key >= lastAppendLocation.get().getDataFileId() ) {
continue;
}
DataFile dataFile = fileMap.get(key);
if( dataFile!=null ) {
forceRemoveDataFile(dataFile);
forceRemoveDataFile(dataFile);
}
}
}
@ -440,9 +438,9 @@ public class Journal {
LOG.debug("moved data file " + dataFile + " to " + getDirectoryArchive());
} else {
if ( dataFile.delete() ) {
LOG.debug("Discarded data file " + dataFile);
LOG.debug("Discarded data file " + dataFile);
} else {
LOG.warn("Failed to discard data file " + dataFile.getFile());
LOG.warn("Failed to discard data file " + dataFile.getFile());
}
}
}
@ -466,14 +464,14 @@ public class Journal {
return directory.toString();
}
public synchronized void appendedExternally(Location loc, int length) throws IOException {
DataFile dataFile = null;
if( dataFiles.getTail().getDataFileId() == loc.getDataFileId() ) {
// It's an update to the current log file..
dataFile = dataFiles.getTail();
dataFile.incrementLength(length);
} else if( dataFiles.getTail().getDataFileId()+1 == loc.getDataFileId() ) {
// It's an update to the next log file.
public synchronized void appendedExternally(Location loc, int length) throws IOException {
DataFile dataFile = null;
if( dataFiles.getTail().getDataFileId() == loc.getDataFileId() ) {
// It's an update to the current log file..
dataFile = dataFiles.getTail();
dataFile.incrementLength(length);
} else if( dataFiles.getTail().getDataFileId()+1 == loc.getDataFileId() ) {
// It's an update to the next log file.
int nextNum = loc.getDataFileId();
File file = getFile(nextNum);
dataFile = new DataFile(file, nextNum, preferedFileLength);
@ -481,10 +479,10 @@ public class Journal {
fileMap.put(dataFile.getDataFileId(), dataFile);
fileByFileMap.put(file, dataFile);
dataFiles.addLast(dataFile);
} else {
throw new IOException("Invalid external append.");
}
}
} else {
throw new IOException("Invalid external append.");
}
}
public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException {
@ -494,7 +492,7 @@ public class Journal {
if (location == null) {
DataFile head = dataFiles.getHead();
if( head == null ) {
return null;
return null;
}
cur = new Location();
cur.setDataFileId(head.getDataFileId());
@ -528,7 +526,7 @@ public class Journal {
// Load in location size and type.
DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
try {
reader.readLocationDetails(cur);
reader.readLocationDetails(cur);
} finally {
accessorPool.closeDataFileAccessor(reader);
}
@ -682,7 +680,7 @@ public class Journal {
/**
* Get a set of files - only valid after start()
*
*
* @return files currently being used
*/
public Set<File> getFiles() {
@ -692,7 +690,7 @@ public class Journal {
public synchronized Map<Integer, DataFile> getFileMap() {
return new TreeMap<Integer, DataFile>(fileMap);
}
public long getDiskSize() {
long tailLength=0;
synchronized( this ) {
@ -700,9 +698,9 @@ public class Journal {
tailLength = dataFiles.getTail().getLength();
}
}
long rc = totalLength.get();
// The last file is actually at a minimum preferedFileLength big.
if( tailLength < preferedFileLength ) {
rc -= tailLength;
@ -711,12 +709,12 @@ public class Journal {
return rc;
}
public void setReplicationTarget(ReplicationTarget replicationTarget) {
this.replicationTarget = replicationTarget;
}
public ReplicationTarget getReplicationTarget() {
return replicationTarget;
}
public void setReplicationTarget(ReplicationTarget replicationTarget) {
this.replicationTarget = replicationTarget;
}
public ReplicationTarget getReplicationTarget() {
return replicationTarget;
}
public String getFileSuffix() {
return fileSuffix;
@ -726,13 +724,13 @@ public class Journal {
this.fileSuffix = fileSuffix;
}
public boolean isChecksum() {
return checksum;
}
public boolean isChecksum() {
return checksum;
}
public void setChecksum(boolean checksumWrites) {
this.checksum = checksumWrites;
}
public void setChecksum(boolean checksumWrites) {
this.checksum = checksumWrites;
}
public boolean isCheckForCorruptionOnStartup() {
return checkForCorruptionOnStartup;
@ -745,7 +743,7 @@ public class Journal {
public void setWriteBatchSize(int writeBatchSize) {
this.writeBatchSize = writeBatchSize;
}
public int getWriteBatchSize() {
return writeBatchSize;
}

View File

@ -19,19 +19,9 @@ package org.apache.kahadb.page;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.kahadb.util.ByteSequence;
import org.apache.kahadb.util.DataByteArrayInputStream;
import org.apache.kahadb.util.DataByteArrayOutputStream;
import org.apache.kahadb.util.Marshaller;
/**
* A Page within a file.
*
*
*/
public class Page<T> {
@ -48,7 +38,7 @@ public class Page<T> {
long txId;
// A field reserved to hold checksums.. Not in use (yet)
int checksum;
// Points to the next page in the chunk stream
long next;
T data;
@ -60,18 +50,17 @@ public class Page<T> {
this.pageId=pageId;
}
public void copy(Page<T> other) {
public Page<T> copy(Page<T> other) {
this.pageId = other.pageId;
this.txId = other.txId;
this.type = other.type;
this.next = other.next;
this.data = other.data;
return this;
}
Page<T> copy() {
Page<T> rc = new Page<T>();
rc.copy(this);
return rc;
return new Page<T>().copy(this);
}
void makeFree(long txId) {
@ -80,13 +69,13 @@ public class Page<T> {
this.data = null;
this.next = 0;
}
public void makePagePart(long next, long txId) {
this.type = Page.PAGE_PART_TYPE;
this.next = next;
this.txId = txId;
}
public void makePageEnd(long size, long txId) {
this.type = Page.PAGE_END_TYPE;
this.next = size;
@ -142,6 +131,4 @@ public class Page<T> {
public void setChecksum(int checksum) {
this.checksum = checksum;
}
}

View File

@ -60,10 +60,10 @@ public class PageFile {
private static final String FREE_FILE_SUFFIX = ".free";
// 4k Default page size.
public static final int DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", "" + 1024 * 4));
public static final int DEFAULT_WRITE_BATCH_SIZE = Integer.parseInt(System.getProperty("defaultWriteBatchSize", "" + 1000));
public static final int DEFAULT_PAGE_CACHE_SIZE = Integer.parseInt(System.getProperty("defaultPageCacheSize", "" + 100));
;
public static final int DEFAULT_PAGE_SIZE = Integer.getInteger("defaultPageSize", 1024*4);
public static final int DEFAULT_WRITE_BATCH_SIZE = Integer.getInteger("defaultWriteBatchSize", 1000);
public static final int DEFAULT_PAGE_CACHE_SIZE = Integer.getInteger("defaultPageCacheSize", 100);;
private static final int RECOVERY_FILE_HEADER_SIZE = 1024 * 4;
private static final int PAGE_FILE_HEADER_SIZE = 1024 * 4;
@ -87,14 +87,14 @@ public class PageFile {
// The minimum number of space allocated to the recovery file in number of pages.
private int recoveryFileMinPageCount = 1000;
// The max size that we let the recovery file grow to.. ma exceed the max, but the file will get resize
// The max size that we let the recovery file grow to.. ma exceed the max, but the file will get resize
// to this max size as soon as possible.
private int recoveryFileMaxPageCount = 10000;
// The number of pages in the current recovery buffer
private int recoveryPageCount;
private AtomicBoolean loaded = new AtomicBoolean();
// The number of pages we are aiming to write every time we
// The number of pages we are aiming to write every time we
// write to disk.
int writeBatchSize = DEFAULT_WRITE_BATCH_SIZE;
@ -113,7 +113,7 @@ public class PageFile {
// Will writes be done in an async thread?
private boolean enabledWriteThread = false;
// These are used if enableAsyncWrites==true
// These are used if enableAsyncWrites==true
private AtomicBoolean stopWriter = new AtomicBoolean();
private Thread writerThread;
private CountDownLatch checkpointLatch;
@ -127,7 +127,7 @@ public class PageFile {
private AtomicLong nextTxid = new AtomicLong();
// Persistent settings stored in the page file.
// Persistent settings stored in the page file.
private MetaData metaData;
private ArrayList<File> tmpFilesForRemoval = new ArrayList<File>();
@ -198,13 +198,11 @@ public class PageFile {
void begin() {
if (currentLocation != -1) {
diskBoundLocation = currentLocation;
currentLocation = -1;
current = null;
} else {
diskBound = current;
current = null;
currentLocation = -1;
}
current = null;
currentLocation = -1;
}
/**
@ -219,7 +217,6 @@ public class PageFile {
boolean isDone() {
return diskBound == null && diskBoundLocation == -1 && current == null && currentLocation == -1;
}
}
/**
@ -336,10 +333,8 @@ public class PageFile {
* @throws IOException
*/
private void delete(File file) throws IOException {
if (file.exists()) {
if (!file.delete()) {
throw new IOException("Could not delete: " + file.getPath());
}
if (file.exists() && !file.delete()) {
throw new IOException("Could not delete: " + file.getPath());
}
}
@ -407,13 +402,12 @@ public class PageFile {
// Scan all to find the free pages.
freeList = new SequenceSet();
for (Iterator i = tx().iterator(true); i.hasNext(); ) {
Page page = (Page) i.next();
for (Iterator<Page> i = tx().iterator(true); i.hasNext(); ) {
Page page = i.next();
if (page.getType() == Page.PAGE_FREE_TYPE) {
freeList.add(page.getPageId());
}
}
}
metaData.setCleanShutdown(false);
@ -427,7 +421,7 @@ public class PageFile {
startWriter();
} else {
throw new IllegalStateException("Cannot load the page file when it is allready loaded.");
throw new IllegalStateException("Cannot load the page file when it is already loaded.");
}
}
@ -516,7 +510,9 @@ public class PageFile {
try {
checkpointLatch.await();
} catch (InterruptedException e) {
throw new InterruptedIOException();
InterruptedIOException ioe = new InterruptedIOException();
ioe.initCause(e);
throw ioe;
}
}
@ -597,7 +593,7 @@ public class PageFile {
ByteArrayOutputStream os = new ByteArrayOutputStream(PAGE_FILE_HEADER_SIZE);
p.store(os, "");
if (os.size() > PAGE_FILE_HEADER_SIZE / 2) {
throw new IOException("Configuation is to larger than: " + PAGE_FILE_HEADER_SIZE / 2);
throw new IOException("Configuation is larger than: " + PAGE_FILE_HEADER_SIZE / 2);
}
// Fill the rest with space...
byte[] filler = new byte[(PAGE_FILE_HEADER_SIZE / 2) - os.size()];
@ -632,7 +628,7 @@ public class PageFile {
}
///////////////////////////////////////////////////////////////////
// Property Accessors
// Property Accessors
///////////////////////////////////////////////////////////////////
/**
@ -773,7 +769,6 @@ public class PageFile {
}
public void setWriteBatchSize(int writeBatchSize) {
assertNotLoaded();
this.writeBatchSize = writeBatchSize;
}
@ -833,9 +828,14 @@ public class PageFile {
Page<T> first = null;
int c = count;
while (c > 0) {
Page<T> page = new Page<T>(nextFreePageId.getAndIncrement());
page.makeFree(getNextWriteTransactionId());
// Perform the id's only once....
long pageId = nextFreePageId.getAndAdd(count);
long writeTxnId = nextTxid.getAndAdd(count);
while (c-- > 0) {
Page<T> page = new Page<T>(pageId++);
page.makeFree(writeTxnId++);
if (first == null) {
first = page;
@ -847,7 +847,6 @@ public class PageFile {
write(page, out.getData());
// LOG.debug("allocate writing: "+page.getPageId());
c--;
}
return first;
@ -985,9 +984,6 @@ public class PageFile {
// Internal Double write implementation follows...
///////////////////////////////////////////////////////////////////
/**
*
*/
private void pollWrites() {
try {
while (!stopWriter.get()) {
@ -1007,7 +1003,7 @@ public class PageFile {
writeBatch();
}
} catch (Throwable e) {
e.printStackTrace();
LOG.info("An exception was raised while performing poll writes", e);
} finally {
releaseCheckpointWaiter();
}
@ -1165,7 +1161,7 @@ public class PageFile {
batch.put(offset, data);
}
} catch (Exception e) {
// If an error occurred it was cause the redo buffer was not full written out correctly.. so don't redo it.
// If an error occurred it was cause the redo buffer was not full written out correctly.. so don't redo it.
// as the pages should still be consistent.
LOG.debug("Redo buffer was not fully intact: ", e);
return nextTxId;

View File

@ -39,6 +39,8 @@ public class Transaction implements Iterable<Page> {
* and it's data is larger than what would fit into a single page.
*/
public class PageOverflowIOException extends IOException {
private static final long serialVersionUID = 1L;
public PageOverflowIOException(String message) {
super(message);
}
@ -49,6 +51,8 @@ public class Transaction implements Iterable<Page> {
* with an invalid page id.
*/
public class InvalidPageIOException extends IOException {
private static final long serialVersionUID = 1L;
private final long page;
public InvalidPageIOException(String message, long page) {
@ -92,7 +96,7 @@ public class Transaction implements Iterable<Page> {
// List of pages freed in this transaction
private final SequenceSet freeList = new SequenceSet();
private long maxTransactionSize = Long.parseLong(System.getProperty("maxKahaDBTxSize", "" + 10485760));
private long maxTransactionSize = Long.getLong("maxKahaDBTxSize", 10485760L);
private long size = 0;
@ -178,12 +182,14 @@ public class Transaction implements Iterable<Page> {
public <T> void free(Page<T> page, int count) throws IOException {
pageFile.assertLoaded();
long offsetPage = page.getPageId();
for (int i = 0; i < count; i++) {
while (count-- > 0) {
if (page == null) {
page = load(offsetPage + i, null);
page = load(offsetPage, null);
}
free(page);
page = null;
// Increment the offsetPage value since using it depends on the current count.
offsetPage++;
}
}
@ -318,7 +324,6 @@ public class Transaction implements Iterable<Page> {
}
@SuppressWarnings("unchecked")
@Override
public void close() throws IOException {
super.close();
@ -551,7 +556,6 @@ public class Transaction implements Iterable<Page> {
* @throws IllegalStateException
* if the PageFile is not loaded
*/
@SuppressWarnings("unchecked")
public Iterator<Page> iterator() {
return (Iterator<Page>)iterator(false);
}
@ -569,6 +573,7 @@ public class Transaction implements Iterable<Page> {
pageFile.assertLoaded();
return new Iterator<Page>() {
long nextId;
Page nextPage;
Page lastPage;
@ -699,7 +704,6 @@ public class Transaction implements Iterable<Page> {
/**
* Queues up a page write that should get done when commit() gets called.
*/
@SuppressWarnings("unchecked")
private void write(final Page page, byte[] data) throws IOException {
Long key = page.getPageId();
@ -707,7 +711,7 @@ public class Transaction implements Iterable<Page> {
size = writes.size() * pageFile.getPageSize();
PageWrite write;
if (size > maxTransactionSize) {
if (tmpFile == null) {
tmpFile = new RandomAccessFile(getTempFile(), "rw");
@ -796,5 +800,4 @@ public class Transaction implements Iterable<Page> {
}
}
}
}

View File

@ -18,19 +18,18 @@ package org.apache.kahadb.util;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Stack;
/**
*
*/
public final class IOHelper {
protected static final int MAX_DIR_NAME_LENGTH;
protected static final int MAX_FILE_NAME_LENGTH;
private static final int DEFAULT_BUFFER_SIZE = 4096;
private IOHelper() {
}
@ -65,18 +64,18 @@ public final class IOHelper {
public static String toFileSystemDirectorySafeName(String name) {
return toFileSystemSafeName(name, true, MAX_DIR_NAME_LENGTH);
}
public static String toFileSystemSafeName(String name) {
return toFileSystemSafeName(name, false, MAX_FILE_NAME_LENGTH);
}
/**
* Converts any string into a string that is safe to use as a file name.
* The result will only include ascii characters and numbers, and the "-","_", and "." characters.
*
* @param name
* @param dirSeparators
* @param maxFileLength
* @param dirSeparators
* @param maxFileLength
* @return
*/
public static String toFileSystemSafeName(String name,boolean dirSeparators,int maxFileLength) {
@ -104,8 +103,45 @@ public final class IOHelper {
}
return result;
}
public static boolean deleteFile(File fileToDelete) {
public static boolean delete(File top) {
boolean result = true;
Stack<File> files = new Stack<File>();
// Add file to the stack to be processed...
files.push(top);
// Process all files until none remain...
while (!files.isEmpty()) {
File file = files.pop();
if (file.isDirectory()) {
File list[] = file.listFiles();
if (list == null || list.length == 0) {
// The current directory contains no entries...
// delete directory and continue...
result &= file.delete();
} else {
// Add back the directory since it is not empty....
// and when we process it again it will be empty and can be
// deleted safely...
files.push(file);
for (File dirFile : list) {
if (dirFile.isDirectory()) {
// Place the directory on the stack...
files.push(dirFile);
} else {
// This is a simple file, delete it...
result &= dirFile.delete();
}
}
}
} else {
// This is a simple file, delete it...
result &= file.delete();
}
}
return result;
}
private static boolean deleteFile(File fileToDelete) {
if (fileToDelete == null || !fileToDelete.exists()) {
return true;
}
@ -113,8 +149,8 @@ public final class IOHelper {
result &= fileToDelete.delete();
return result;
}
public static boolean deleteChildren(File parent) {
private static boolean deleteChildren(File parent) {
if (parent == null || !parent.exists()) {
return false;
}
@ -138,23 +174,22 @@ public final class IOHelper {
}
}
}
return result;
}
public static void moveFile(File src, File targetDirectory) throws IOException {
if (!src.renameTo(new File(targetDirectory, src.getName()))) {
throw new IOException("Failed to move " + src + " to " + targetDirectory);
}
}
public static void copyFile(File src, File dest) throws IOException {
FileInputStream fileSrc = new FileInputStream(src);
FileOutputStream fileDest = new FileOutputStream(dest);
copyInputStream(fileSrc, fileDest);
}
public static void copyInputStream(InputStream in, OutputStream out) throws IOException {
byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
int len = in.read(buffer);
@ -165,19 +200,18 @@ public final class IOHelper {
in.close();
out.close();
}
static {
MAX_DIR_NAME_LENGTH = Integer.valueOf(System.getProperty("MaximumDirNameLength","200")).intValue();
MAX_FILE_NAME_LENGTH = Integer.valueOf(System.getProperty("MaximumFileNameLength","64")).intValue();
MAX_DIR_NAME_LENGTH = Integer.getInteger("MaximumDirNameLength",200);
MAX_FILE_NAME_LENGTH = Integer.getInteger("MaximumFileNameLength",64);
}
public static void mkdirs(File dir) throws IOException {
if (dir.exists()) {
if (!dir.isDirectory()) {
throw new IOException("Failed to create directory '" + dir +"', regular file already existed with that name");
}
} else {
if (!dir.mkdirs()) {
throw new IOException("Failed to create directory '" + dir+"'");

View File

@ -25,19 +25,19 @@ import java.util.Date;
/**
* Used to lock a File.
*
*
* @author chirino
*/
public class LockFile {
private static final boolean DISABLE_FILE_LOCK = "true".equals(System.getProperty("java.nio.channels.FileLock.broken", "false"));
private static final boolean DISABLE_FILE_LOCK = Boolean.getBoolean("java.nio.channels.FileLock.broken");
final private File file;
private FileLock lock;
private RandomAccessFile readFile;
private int lockCounter;
private final boolean deleteOnUnlock;
public LockFile(File file, boolean deleteOnUnlock) {
this.file = file;
this.deleteOnUnlock = deleteOnUnlock;
@ -54,7 +54,7 @@ public class LockFile {
if( lockCounter>0 ) {
return;
}
IOHelper.mkdirs(file.getParentFile());
if (System.getProperty(getVmLockKey()) != null) {
throw new IOException("File '" + file + "' could not be locked as lock is already held for this jvm.");
@ -80,7 +80,7 @@ public class LockFile {
}
throw new IOException("File '" + file + "' could not be locked.");
}
}
}
@ -90,12 +90,12 @@ public class LockFile {
if (DISABLE_FILE_LOCK) {
return;
}
lockCounter--;
if( lockCounter!=0 ) {
return;
}
// release the lock..
if (lock != null) {
try {
@ -106,7 +106,7 @@ public class LockFile {
lock = null;
}
closeReadFile();
if( deleteOnUnlock ) {
file.delete();
}
@ -125,7 +125,7 @@ public class LockFile {
}
readFile = null;
}
}
}

View File

@ -49,16 +49,15 @@ public abstract class IndexBenchmark extends TestCase {
public void setUp() throws Exception {
ROOT_DIR = new File(IOHelper.getDefaultDataDirectory());
IOHelper.mkdirs(ROOT_DIR);
IOHelper.deleteChildren(ROOT_DIR);
IOHelper.delete(ROOT_DIR);
pf = new PageFile(ROOT_DIR, getClass().getName());
pf.load();
}
protected void tearDown() throws Exception {
Transaction tx = pf.tx();
for (Index i : indexes.values()) {
for (Index<?, ?> i : indexes.values()) {
try {
i.unload(tx);
} catch (Throwable ignore) {
@ -99,7 +98,7 @@ public abstract class IndexBenchmark extends TestCase {
try {
Transaction tx = pf.tx();
Index<String,Long> index = openIndex(name);
long counter = 0;
while (!shutdown.get()) {
@ -109,7 +108,7 @@ public abstract class IndexBenchmark extends TestCase {
index.put(tx, key, c);
tx.commit();
Thread.yield(); // This avoids consumer starvation..
onProduced(counter++);
}
@ -121,7 +120,7 @@ public abstract class IndexBenchmark extends TestCase {
public void onProduced(long counter) {
}
}
protected String key(long c) {
return "a-long-message-id-like-key-" + c;
}
@ -150,7 +149,7 @@ public abstract class IndexBenchmark extends TestCase {
while (!shutdown.get()) {
long c = counter;
String key = key(c);
Long record = index.get(tx, key);
if (record != null) {
if( index.remove(tx, key) == null ) {

View File

@ -43,9 +43,7 @@ public abstract class IndexTestSupport extends TestCase {
protected void setUp() throws Exception {
super.setUp();
directory = new File(IOHelper.getDefaultDataDirectory());
IOHelper.mkdirs(directory);
IOHelper.deleteChildren(directory);
IOHelper.delete(directory);
}
protected void tearDown() throws Exception {
@ -54,7 +52,7 @@ public abstract class IndexTestSupport extends TestCase {
pf.delete();
}
}
protected void createPageFileAndIndex(int pageSize) throws Exception {
pf = new PageFile(directory, getClass().getName());
pf.setPageSize(pageSize);

View File

@ -23,13 +23,14 @@ import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
import org.apache.kahadb.journal.Journal;
import org.apache.kahadb.util.ByteSequence;
import org.apache.kahadb.util.IOHelper;
public class JournalTest extends TestCase {
protected static final int DEFAULT_MAX_BATCH_SIZE = 1024 * 1024 * 4;
Journal dataManager;
File dir;
@Override
public void setUp() throws Exception {
dir = new File("target/tests/DataFileAppenderTest");
@ -39,28 +40,16 @@ public class JournalTest extends TestCase {
configure(dataManager);
dataManager.start();
}
protected void configure(Journal dataManager) {
}
@Override
public void tearDown() throws Exception {
dataManager.close();
deleteFilesInDirectory(dir);
dir.delete();
IOHelper.delete(dir);
}
private void deleteFilesInDirectory(File directory) {
File[] files = directory.listFiles();
for (int i=0; i<files.length; i++) {
File f = files[i];
if (f.isDirectory()) {
deleteFilesInDirectory(f);
}
f.delete();
}
}
public void testBatchWriteCallbackCompleteAfterTimeout() throws Exception {
final int iterations = 10;
final CountDownLatch latch = new CountDownLatch(iterations);
@ -68,7 +57,7 @@ public class JournalTest extends TestCase {
for (int i=0; i < iterations; i++) {
dataManager.write(data, new Runnable() {
public void run() {
latch.countDown();
latch.countDown();
}
});
}
@ -84,7 +73,7 @@ public class JournalTest extends TestCase {
for (int i=0; i<iterations; i++) {
dataManager.write(data, new Runnable() {
public void run() {
latch.countDown();
latch.countDown();
}
});
}
@ -92,7 +81,7 @@ public class JournalTest extends TestCase {
assertTrue("queued data is written", dataManager.getInflightWrites().isEmpty());
assertEquals("none written", 0, latch.getCount());
}
public void testBatchWriteCompleteAfterClose() throws Exception {
ByteSequence data = new ByteSequence("DATA".getBytes());
final int iterations = 10;
@ -102,27 +91,27 @@ public class JournalTest extends TestCase {
dataManager.close();
assertTrue("queued data is written:" + dataManager.getInflightWrites().size(), dataManager.getInflightWrites().isEmpty());
}
public void testBatchWriteToMaxMessageSize() throws Exception {
final int iterations = 4;
final CountDownLatch latch = new CountDownLatch(iterations);
Runnable done = new Runnable() {
public void run() {
latch.countDown();
latch.countDown();
}
};
int messageSize = DEFAULT_MAX_BATCH_SIZE / iterations;
byte[] message = new byte[messageSize];
ByteSequence data = new ByteSequence(message);
for (int i=0; i< iterations; i++) {
dataManager.write(data, done);
}
// write may take some time
assertTrue("all callbacks complete", latch.await(10, TimeUnit.SECONDS));
}
public void testNoBatchWriteWithSync() throws Exception {
ByteSequence data = new ByteSequence("DATA".getBytes());
final int iterations = 10;