This commit is contained in:
gtully 2015-02-11 13:21:47 +00:00
parent 8cf98a070f
commit 95f7262cb1
9 changed files with 39 additions and 124 deletions

View File

@ -111,14 +111,10 @@ class CallerBufferingDataFileAppender extends DataFileAppender {
wb = (WriteBatch)o;
if (dataFile != wb.dataFile) {
if (file != null) {
file.setLength(dataFile.getLength());
dataFile.closeRandomAccessFile(file);
}
dataFile = wb.dataFile;
file = dataFile.openRandomAccessFile();
if( file.length() < journal.preferedFileLength ) {
file.setLength(journal.preferedFileLength);
}
}
final DataByteArrayOutputStream buff = wb.buff;

View File

@ -18,7 +18,6 @@ package org.apache.activemq.store.kahadb.disk.journal;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import org.apache.activemq.store.kahadb.disk.util.LinkedNode;
import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
@ -37,7 +36,7 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil
protected volatile int length;
protected final SequenceSet corruptedBlocks = new SequenceSet();
DataFile(File file, int number, int preferedSize) {
DataFile(File file, int number) {
this.file = file;
this.dataFileId = Integer.valueOf(number);
length = (int)(file.exists() ? file.length() : 0);

View File

@ -38,7 +38,6 @@ final class DataFileAccessor {
/**
* Construct a Store reader
*
* @param fileId
* @throws IOException
*/
public DataFileAccessor(Journal dataManager, DataFile dataFile) throws IOException {

View File

@ -182,7 +182,7 @@ class DataFileAppender implements FileAppender {
private WriteBatch enqueue(Journal.WriteCommand write) throws IOException {
synchronized (enqueueMutex) {
if (shutdown) {
throw new IOException("Async Writter Thread Shutdown");
throw new IOException("Async Writer Thread Shutdown");
}
if (!running) {
@ -207,7 +207,7 @@ class DataFileAppender implements FileAppender {
while ( true ) {
if (nextWriteBatch == null) {
DataFile file = journal.getCurrentWriteFile();
if( file.getLength() > journal.getMaxFileLength() ) {
if( file.getLength() + write.location.getSize() >= journal.getMaxFileLength() ) {
file = journal.rotateWriteFile();
}
@ -226,7 +226,7 @@ class DataFileAppender implements FileAppender {
final long start = System.currentTimeMillis();
enqueueMutex.wait();
if (maxStat > 0) {
logger.info("Watiting for write to finish with full batch... millis: " +
logger.info("Waiting for write to finish with full batch... millis: " +
(System.currentTimeMillis() - start));
}
}
@ -234,7 +234,7 @@ class DataFileAppender implements FileAppender {
throw new InterruptedIOException();
}
if (shutdown) {
throw new IOException("Async Writter Thread Shutdown");
throw new IOException("Async Writer Thread Shutdown");
}
}
}
@ -273,6 +273,7 @@ class DataFileAppender implements FileAppender {
int statIdx = 0;
int[] stats = new int[maxStat];
final byte[] end = new byte[]{0};
/**
* The async processing loop that writes to the data files and does the
* force calls. Since the file sync() call is the slowest of all the
@ -308,14 +309,13 @@ class DataFileAppender implements FileAppender {
if (dataFile != wb.dataFile) {
if (file != null) {
file.setLength(dataFile.getLength());
dataFile.closeRandomAccessFile(file);
}
dataFile = wb.dataFile;
file = dataFile.openRandomAccessFile();
if( file.length() < journal.preferedFileLength ) {
file.setLength(journal.preferedFileLength);
}
// pre allocate on first open
file.seek(journal.maxFileLength-1);
file.write(end);
}
Journal.WriteCommand write = wb.writes.getHead();
@ -337,15 +337,19 @@ class DataFileAppender implements FileAppender {
write = write.getNext();
}
// append 'unset' next batch (5 bytes) so read can always find eof
buff.writeInt(0);
buff.writeByte(0);
ByteSequence sequence = buff.toByteSequence();
// Now we can fill in the batch control record properly.
buff.reset();
buff.skip(5+Journal.BATCH_CONTROL_RECORD_MAGIC.length);
buff.writeInt(sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE);
buff.writeInt(sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE - 5);
if( journal.isChecksum() ) {
Checksum checksum = new Adler32();
checksum.update(sequence.getData(), sequence.getOffset()+Journal.BATCH_CONTROL_RECORD_SIZE, sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE);
checksum.update(sequence.getData(), sequence.getOffset()+Journal.BATCH_CONTROL_RECORD_SIZE, sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE - 5);
buff.writeLong(checksum.getValue());
}

View File

@ -94,7 +94,6 @@ public class Journal {
protected boolean started;
protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF;
protected int writeBatchSize = DEFAULT_MAX_WRITE_BATCH_SIZE;
protected FileAppender appender;
@ -128,7 +127,6 @@ public class Journal {
long start = System.currentTimeMillis();
accessorPool = new DataFileAccessorPool(this);
started = true;
preferedFileLength = Math.max(PREFERED_DIFF, getMaxFileLength() - PREFERED_DIFF);
appender = callerBufferAppender ? new CallerBufferingDataFileAppender(this) : new DataFileAppender(this);
@ -144,7 +142,7 @@ public class Journal {
String n = file.getName();
String numStr = n.substring(filePrefix.length(), n.length()-fileSuffix.length());
int num = Integer.parseInt(numStr);
DataFile dataFile = new DataFile(file, num, preferedFileLength);
DataFile dataFile = new DataFile(file, num);
fileMap.put(dataFile.getDataFileId(), dataFile);
totalLength.addAndGet(dataFile.getLength());
} catch (NumberFormatException e) {
@ -178,6 +176,11 @@ public class Journal {
lastAppendLocation.set(recoveryCheck(df));
}
// ensure we don't report unused space of last journal file in size metric
if (totalLength.get() > maxFileLength && lastAppendLocation.get().getOffset() > 0) {
totalLength.addAndGet(lastAppendLocation.get().getOffset() - maxFileLength);
}
cleanupTask = new Runnable() {
public void run() {
cleanup();
@ -330,8 +333,7 @@ public class Journal {
synchronized DataFile rotateWriteFile() {
int nextNum = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1 : 1;
File file = getFile(nextNum);
DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength);
// actually allocate the disk space
DataFile nextWriteFile = new DataFile(file, nextNum);
fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
fileByFileMap.put(file, nextWriteFile);
dataFiles.addLast(nextWriteFile);
@ -399,9 +401,9 @@ public class Journal {
boolean result = true;
for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
DataFile dataFile = i.next();
totalLength.addAndGet(-dataFile.getLength());
result &= dataFile.delete();
}
totalLength.set(0);
fileMap.clear();
fileByFileMap.clear();
lastAppendLocation.set(null);
@ -479,26 +481,6 @@ public class Journal {
return directory.toString();
}
public synchronized void appendedExternally(Location loc, int length) throws IOException {
DataFile dataFile = null;
if( dataFiles.getTail().getDataFileId() == loc.getDataFileId() ) {
// It's an update to the current log file..
dataFile = dataFiles.getTail();
dataFile.incrementLength(length);
} else if( dataFiles.getTail().getDataFileId()+1 == loc.getDataFileId() ) {
// It's an update to the next log file.
int nextNum = loc.getDataFileId();
File file = getFile(nextNum);
dataFile = new DataFile(file, nextNum, preferedFileLength);
// actually allocate the disk space
fileMap.put(dataFile.getDataFileId(), dataFile);
fileByFileMap.put(file, dataFile);
dataFiles.addLast(dataFile);
} else {
throw new IOException("Invalid external append.");
}
}
public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException {
Location cur = null;
@ -547,7 +529,8 @@ public class Journal {
}
if (cur.getType() == 0) {
return null;
// invalid offset - jump to next datafile
cur.setOffset(maxFileLength);
} else if (cur.getType() == USER_RECORD_TYPE) {
// Only return user records.
return cur;
@ -555,62 +538,6 @@ public class Journal {
}
}
public synchronized Location getNextLocation(File file, Location lastLocation, boolean thisFileOnly) throws IllegalStateException, IOException {
DataFile df = fileByFileMap.get(file);
return getNextLocation(df, lastLocation, thisFileOnly);
}
public synchronized Location getNextLocation(DataFile dataFile, Location lastLocation, boolean thisFileOnly) throws IOException, IllegalStateException {
Location cur = null;
while (true) {
if (cur == null) {
if (lastLocation == null) {
DataFile head = dataFile.getHeadNode();
cur = new Location();
cur.setDataFileId(head.getDataFileId());
cur.setOffset(0);
} else {
// Set to the next offset..
cur = new Location(lastLocation);
cur.setOffset(cur.getOffset() + cur.getSize());
}
} else {
cur.setOffset(cur.getOffset() + cur.getSize());
}
// Did it go into the next file??
if (dataFile.getLength() <= cur.getOffset()) {
if (thisFileOnly) {
return null;
} else {
dataFile = getNextDataFile(dataFile);
if (dataFile == null) {
return null;
} else {
cur.setDataFileId(dataFile.getDataFileId().intValue());
cur.setOffset(0);
}
}
}
// Load in location size and type.
DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
try {
reader.readLocationDetails(cur);
} finally {
accessorPool.closeDataFileAccessor(reader);
}
if (cur.getType() == 0) {
return null;
} else if (cur.getType() > 0) {
// Only return user records.
return cur;
}
}
}
public synchronized ByteSequence read(Location location) throws IOException, IllegalStateException {
DataFile dataFile = getDataFile(location);
DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
@ -713,21 +640,7 @@ public class Journal {
}
public long getDiskSize() {
long tailLength=0;
synchronized( this ) {
if( !dataFiles.isEmpty() ) {
tailLength = dataFiles.getTail().getLength();
}
}
long rc = totalLength.get();
// The last file is actually at a minimum preferedFileLength big.
if( tailLength < preferedFileLength ) {
rc -= tailLength;
rc += preferedFileLength;
}
return rc;
return totalLength.get();
}
public void setReplicationTarget(ReplicationTarget replicationTarget) {

View File

@ -26,8 +26,8 @@ import java.io.RandomAccessFile;
*/
public class ReadOnlyDataFile extends DataFile {
ReadOnlyDataFile(File file, int number, int preferedSize) {
super(file, number, preferedSize);
ReadOnlyDataFile(File file, int number) {
super(file, number);
}
public RandomAccessFile openRandomAccessFile(boolean appender) throws IOException {

View File

@ -60,7 +60,7 @@ public class ReadOnlyJournal extends Journal {
String n = file.getName();
String numStr = n.substring(filePrefix.length(), n.length());
int num = Integer.parseInt(numStr);
DataFile dataFile = new ReadOnlyDataFile(file, num, preferedFileLength);
DataFile dataFile = new ReadOnlyDataFile(file, num);
fileMap.put(dataFile.getDataFileId(), dataFile);
totalLength.addAndGet(dataFile.getLength());
} catch (NumberFormatException e) {

View File

@ -371,12 +371,7 @@ public class RecoverableRandomAccessFile implements java.io.DataOutput, java.io.
}
public void setLength(long length) throws IOException {
try {
getRaf().setLength(length);
} catch (IOException ioe) {
handleException();
throw ioe;
}
throw new IllegalStateException("File size is pre allocated");
}
public void seek(long pos) throws IOException {

View File

@ -58,6 +58,15 @@ public class FilePendingMessageCursorTestSupport {
public void testAddToEmptyCursorWhenTempStoreIsFull() throws Exception {
createBrokerWithTempStoreLimit();
SystemUsage usage = brokerService.getSystemUsage();
PList dud = brokerService.getTempDataStore().getPList("dud");
// fill the temp store
int id=0;
ByteSequence payload = new ByteSequence(new byte[1024]);
while (!usage.getTempUsage().isFull()) {
dud.addFirst("A-" + (++id), payload);
}
assertTrue("temp store is full: %" + usage.getTempUsage().getPercentUsage(), usage.getTempUsage().isFull());
underTest = new FilePendingMessageCursor(brokerService.getBroker(), "test", false);