The AsyncDataManager now supports doing a callback notification for when a write

has been secured to disk.



git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@633603 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2008-03-04 19:36:58 +00:00
parent ab839ffc06
commit a9b841af61
5 changed files with 113 additions and 7 deletions

View File

@ -408,6 +408,23 @@ public class AsyncDataManager {
}
}
public void consolidateDataFilesNotIn(Set<Integer> inUse, Integer lastFile) throws IOException {
Set<Integer> unUsed = new HashSet<Integer>(fileMap.keySet());
unUsed.removeAll(inUse);
List<DataFile> purgeList = new ArrayList<DataFile>();
for (Integer key : unUsed) {
// Only add files less than the lastFile..
if( key.intValue() < lastFile.intValue() ) {
DataFile dataFile = (DataFile)fileMap.get(key);
purgeList.add(dataFile);
}
}
for (DataFile dataFile : purgeList) {
forceRemoveDataFile(dataFile);
}
}
public synchronized void consolidateDataFiles() throws IOException {
List<DataFile> purgeList = new ArrayList<DataFile>();
for (DataFile dataFile : fileMap.values()) {
@ -482,8 +499,12 @@ public class AsyncDataManager {
cur.setOffset(0);
} else {
// Set to the next offset..
cur = new Location(location);
cur.setOffset(cur.getOffset() + cur.getSize());
if( location.getSize() == -1 ) {
cur = new Location(location);
} else {
cur = new Location(location);
cur.setOffset(location.getOffset()+location.getSize());
}
}
} else {
cur.setOffset(cur.getOffset() + cur.getSize());
@ -606,6 +627,11 @@ public class AsyncDataManager {
Location loc = appender.storeItem(data, Location.USER_TYPE, sync);
return loc;
}
public synchronized Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException {
Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete);
return loc;
}
public synchronized Location write(ByteSequence data, byte type, boolean sync) throws IOException, IllegalStateException {
return appender.storeItem(data, type, sync);
@ -686,4 +712,28 @@ public class AsyncDataManager {
public Set<File> getFiles(){
return fileByFileMap.keySet();
}
synchronized public long getDiskSize() {
long rc=0;
DataFile cur = (DataFile)currentWriteFile.getHeadNode();
while( cur !=null ) {
rc += cur.getLength();
cur = (DataFile) cur.getNext();
}
return rc;
}
synchronized public long getDiskSizeUntil(Location startPosition) {
long rc=0;
DataFile cur = (DataFile)currentWriteFile.getHeadNode();
while( cur !=null ) {
if( cur.getDataFileId().intValue() >= startPosition.getDataFileId() ) {
return rc + startPosition.getOffset();
}
rc += cur.getLength();
cur = (DataFile) cur.getNext();
}
return rc;
}
}

View File

@ -19,7 +19,6 @@ package org.apache.activemq.kaha.impl.async;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteCommand;
import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteKey;

View File

@ -20,7 +20,6 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.RandomAccessFile;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import org.apache.activemq.util.ByteSequence;
@ -41,7 +40,7 @@ class DataFileAppender {
protected final AsyncDataManager dataManager;
protected final Map<WriteKey, WriteCommand> inflightWrites;
protected final Object enqueueMutex = new Object();
protected final Object enqueueMutex = new Object(){};
protected WriteBatch nextWriteBatch;
protected boolean shutdown;
@ -110,12 +109,21 @@ class DataFileAppender {
public final Location location;
public final ByteSequence data;
final boolean sync;
public final Runnable onComplete;
public WriteCommand(Location location, ByteSequence data, boolean sync) {
this.location = location;
this.data = data;
this.sync = sync;
this.onComplete=null;
}
public WriteCommand(Location location, ByteSequence data, Runnable onComplete) {
this.location = location;
this.data = data;
this.onComplete = onComplete;
this.sync = false;
}
}
@ -176,6 +184,34 @@ class DataFileAppender {
return location;
}
public Location storeItem(ByteSequence data, byte type, Runnable onComplete) throws IOException {
// Write the packet our internal buffer.
int size = data.getLength() + AsyncDataManager.ITEM_HEAD_FOOT_SPACE;
final Location location = new Location();
location.setSize(size);
location.setType(type);
WriteBatch batch;
WriteCommand write = new WriteCommand(location, data, onComplete);
// Locate datafile and enqueue into the executor in sychronized block so
// that
// writes get equeued onto the executor in order that they were assigned
// by
// the data manager (which is basically just appending)
synchronized (this) {
// Find the position where this item will land at.
DataFile dataFile = dataManager.allocateLocation(location);
batch = enqueue(dataFile, write);
}
location.setLatch(batch.latch);
inflightWrites.put(new WriteKey(location), write);
return location;
}
private WriteBatch enqueue(DataFile dataFile, WriteCommand write) throws IOException {
synchronized (enqueueMutex) {
@ -342,8 +378,8 @@ class DataFileAppender {
buff.reset();
}
file.getFD().sync();
file.getFD().sync();
WriteCommand lastWrite = (WriteCommand)wb.first.getTailNode();
dataManager.setLastAppendLocation(lastWrite.location);
@ -358,6 +394,13 @@ class DataFileAppender {
if (!write.sync) {
inflightWrites.remove(new WriteKey(write.location));
}
if( write.onComplete !=null ) {
try {
write.onComplete.run();
} catch (Throwable e) {
e.printStackTrace();
}
}
write = (WriteCommand)write.getNext();
}
}
@ -378,4 +421,5 @@ class DataFileAppender {
}
}
}

View File

@ -105,5 +105,11 @@ public final class JournalFacade implements Journal {
ByteSequence sequence = new ByteSequence(data.getData(), data.getOffset(), data.getLength());
return convertToRecordLocation(dataManager.write(sequence, sync));
}
public RecordLocation write(Packet packet, Runnable onComplete) throws IOException, IllegalStateException {
org.apache.activeio.packet.ByteSequence data = packet.asByteSequence();
ByteSequence sequence = new ByteSequence(data.getData(), data.getOffset(), data.getLength());
return convertToRecordLocation(dataManager.write(sequence, onComplete));
}
}

View File

@ -166,6 +166,13 @@ class NIODataFileAppender extends DataFileAppender {
if (!write.sync) {
inflightWrites.remove(new WriteKey(write.location));
}
if (write.onComplete != null) {
try {
write.onComplete.run();
} catch (Throwable e) {
e.printStackTrace();
}
}
write = (WriteCommand)write.getNext();
}
}