The async writer is now working and enabled by default.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@475943 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-11-16 22:06:22 +00:00
parent 2df306cc7a
commit 849d2f330e
9 changed files with 253 additions and 60 deletions

View File

@ -74,7 +74,7 @@ public class KahaStore implements Store{
private String mode;
private boolean initialized;
private boolean logIndexChanges=false;
private boolean useAsyncWriter=false;
private boolean useAsyncWriter=true;
private long maxDataFileLength=DataManager.MAX_FILE_LENGTH;
private FileLock lock;
private String indexType=IndexTypes.DISK_INDEX;

View File

@ -0,0 +1,95 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.kaha.impl.data;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Map;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.StoreLocation;
import org.apache.activemq.kaha.impl.data.AsyncDataFileWriter.WriteCommand;
import org.apache.activemq.kaha.impl.data.AsyncDataFileWriter.WriteKey;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.DataByteArrayInputStream;
/**
* Optimized Store reader
*
* @version $Revision: 1.1.1.1 $
*/
final class AsyncDataFileReader implements DataFileReader {
// static final Log log = LogFactory.getLog(AsyncDataFileReader.class);
private DataManager dataManager;
private DataByteArrayInputStream dataIn;
private final Map inflightWrites;
/**
* Construct a Store reader
*
* @param file
*/
AsyncDataFileReader(DataManager fileManager, AsyncDataFileWriter writer){
this.dataManager=fileManager;
this.inflightWrites = writer.getInflightWrites();
this.dataIn=new DataByteArrayInputStream();
}
/* (non-Javadoc)
* @see org.apache.activemq.kaha.impl.data.DataFileReader#readDataItemSize(org.apache.activemq.kaha.impl.data.DataItem)
*/
public byte readDataItemSize(DataItem item) throws IOException {
WriteCommand asyncWrite = (WriteCommand) inflightWrites.get(new WriteKey(item));
if( asyncWrite!= null ) {
item.setSize(asyncWrite.location.getSize());
return asyncWrite.data[0];
}
RandomAccessFile file = dataManager.getDataFile(item).getRandomAccessFile();
byte rc;
synchronized(file) {
file.seek(item.getOffset()); // jump to the size field
rc = file.readByte();
item.setSize(file.readInt());
}
return rc;
}
/* (non-Javadoc)
* @see org.apache.activemq.kaha.impl.data.DataFileReader#readItem(org.apache.activemq.kaha.Marshaller, org.apache.activemq.kaha.StoreLocation)
*/
public Object readItem(Marshaller marshaller,StoreLocation item) throws IOException{
WriteCommand asyncWrite = (WriteCommand) inflightWrites.get(new WriteKey(item));
if( asyncWrite!= null ) {
ByteArrayInputStream stream = new ByteArrayInputStream(asyncWrite.data, DataManager.ITEM_HEAD_SIZE, item.getSize());
return marshaller.readPayload(new DataInputStream(stream));
}
RandomAccessFile file=dataManager.getDataFile(item).getRandomAccessFile();
// TODO: we could reuse the buffer in dataIn if it's big enough to avoid
// allocating byte[] arrays on every readItem.
byte[] data=new byte[item.getSize()];
synchronized(file) {
file.seek(item.getOffset()+DataManager.ITEM_HEAD_SIZE);
file.readFully(data);
}
dataIn.restart(data);
return marshaller.readPayload(dataIn);
}
}

View File

@ -20,6 +20,7 @@ package org.apache.activemq.kaha.impl.data;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.RandomAccessFile;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.StoreLocation;
@ -36,30 +37,60 @@ import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
* @version $Revision: 1.1.1.1 $
*/
final class AsyncDataFileWriter implements DataFileWriter {
// static final Log log = LogFactory.getLog(AsyncDataFileWriter.class);
private static final Object SHUTDOWN_COMMAND = new Object();
private static final String SHUTDOWN_COMMAND = "SHUTDOWN";
static class WriteCommand {
final RandomAccessFile dataFile;
final byte[] data;
final long offset;
final int size;
final CountDownLatch latch;
static public class WriteKey {
private final int file;
private final long offset;
private final int hash;
public WriteCommand(RandomAccessFile dataFile, byte[] data, long offset, int size, CountDownLatch latch) {
public WriteKey(StoreLocation item){
file = item.getFile();
offset = item.getOffset();
// TODO: see if we can build a better hash
hash = (int) (file ^ offset);
}
public int hashCode() {
return hash;
}
public boolean equals(Object obj) {
WriteKey di = (WriteKey)obj;
return di.file == file && di.offset == offset;
}
}
public static class WriteCommand {
public final StoreLocation location;
public final RandomAccessFile dataFile;
public final byte[] data;
public final CountDownLatch latch;
public WriteCommand(StoreLocation location, RandomAccessFile dataFile, byte[] data, CountDownLatch latch) {
this.location = location;
this.dataFile = dataFile;
this.data = data;
this.offset = offset;
this.size = size;
this.latch = latch;
}
public String toString() {
return "write: "+location+", latch = "+System.identityHashCode(latch);
}
}
private DataManager dataManager;
private final Object enqueueMutex = new Object();
private final LinkedList queue = new LinkedList();
// Maps WriteKey -> WriteCommand for all the writes that still have not landed on
// disk.
private final ConcurrentHashMap inflightWrites = new ConcurrentHashMap();
private final UsageManager usage = new UsageManager();
private CountDownLatch latchAssignedToNewWrites = new CountDownLatch(1);
@ -91,6 +122,7 @@ final class AsyncDataFileWriter implements DataFileWriter {
return;
}
latch.await();
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
@ -103,7 +135,7 @@ final class AsyncDataFileWriter implements DataFileWriter {
* @return
* @throws IOException
*/
public StoreLocation storeItem(Marshaller marshaller, Object payload, byte type) throws IOException {
public DataItem storeItem(Marshaller marshaller, Object payload, byte type) throws IOException {
// We may need to slow down if we are pounding the async thread too
// hard..
try {
@ -121,30 +153,31 @@ final class AsyncDataFileWriter implements DataFileWriter {
buffer.reset();
buffer.writeByte(type);
buffer.writeInt(payloadSize);
final DataItem item=new DataItem();
item.setSize(payloadSize);
usage.increaseUsage(size);
// Locate datafile and enqueue into the executor in sychronized block so that
// writes get equeued onto the executor in order that they were assigned by
// the data manager (which is basically just appending)
WriteCommand write;
synchronized(enqueueMutex) {
// Find the position where this item will land at.
final DataFile dataFile=dataManager.findSpaceForData(item);
dataManager.addInterestInFile(dataFile);
dataFile.setWriterData(latchAssignedToNewWrites);
enqueue(new WriteCommand(dataFile.getRandomAccessFile(), buffer.getData(), item.getOffset(), size, latchAssignedToNewWrites));
write = new WriteCommand(item, dataFile.getRandomAccessFile(), buffer.getData(), latchAssignedToNewWrites);
enqueue(write);
}
inflightWrites.put(new WriteKey(item), write);
return item;
}
/**
*
*/
public void updateItem(final StoreLocation location,Marshaller marshaller, Object payload, byte type) throws IOException {
public void updateItem(final DataItem item, Marshaller marshaller, Object payload, byte type) throws IOException {
// We may need to slow down if we are pounding the async thread too
// hard..
try {
@ -162,18 +195,23 @@ final class AsyncDataFileWriter implements DataFileWriter {
buffer.reset();
buffer.writeByte(type);
buffer.writeInt(payloadSize);
final DataFile dataFile = dataManager.getDataFile(location);
item.setSize(payloadSize);
final DataFile dataFile = dataManager.getDataFile(item);
usage.increaseUsage(size);
WriteCommand write = new WriteCommand(item, dataFile.getRandomAccessFile(), buffer.getData(), latchAssignedToNewWrites);
// Equeue the write to an async thread.
synchronized(enqueueMutex) {
dataFile.setWriterData(latchAssignedToNewWrites);
enqueue(new WriteCommand(dataFile.getRandomAccessFile(), buffer.getData(), location.getOffset(), size, latchAssignedToNewWrites));
enqueue(write);
}
inflightWrites.put(new WriteKey(item), write);
}
private void enqueue(Object command) throws IOException {
if( shutdown ) {
throw new IOException("Async Writter Thread Shutdown");
}
@ -199,6 +237,7 @@ final class AsyncDataFileWriter implements DataFileWriter {
private Object dequeue() {
synchronized( enqueueMutex ) {
while( queue.isEmpty() ) {
inflightWrites.clear();
try {
enqueueMutex.wait();
} catch (InterruptedException e) {
@ -247,13 +286,16 @@ final class AsyncDataFileWriter implements DataFileWriter {
*
*/
private void processQueue() {
// log.debug("Async thread startup");
try {
CountDownLatch currentBatchLatch=null;
RandomAccessFile currentBatchDataFile=null;
while( !isShutdown() ) {
while( true ) {
// Block till we get a command.
Object o = dequeue();
// log.debug("Processing: "+o);
if( o == SHUTDOWN_COMMAND ) {
if( currentBatchLatch!=null ) {
@ -288,9 +330,13 @@ final class AsyncDataFileWriter implements DataFileWriter {
}
// Write to the data..
write.dataFile.seek(write.offset);
write.dataFile.write(write.data,0,write.size);
usage.decreaseUsage(write.size);
int size = write.location.getSize()+DataManager.ITEM_HEAD_SIZE;
synchronized(write.dataFile) {
write.dataFile.seek(write.location.getOffset());
write.dataFile.write(write.data,0,size);
}
inflightWrites.remove(new WriteKey(write.location));
usage.decreaseUsage(size);
// Start of a batch..
if( currentBatchLatch == null ) {
@ -301,14 +347,14 @@ final class AsyncDataFileWriter implements DataFileWriter {
// write commands allready in the queue should have the
// same latch assigned.
latchAssignedToNewWrites = new CountDownLatch(1);
// enqueue an end of batch indicator..
queue.add(currentBatchLatch);
enqueueMutex.notify();
if( !shutdown ) {
enqueue(currentBatchLatch);
}
}
} else if( currentBatchLatch!=write.latch ) {
// the latch on subsequent writes should match.
new IOException("Got an out of sequence write.");
new IOException("Got an out of sequence write");
}
}
}
@ -317,9 +363,15 @@ final class AsyncDataFileWriter implements DataFileWriter {
synchronized( enqueueMutex ) {
firstAsyncException = e;
}
// log.debug("Aync thread shutdown due to error: "+e,e);
} finally {
// log.debug("Aync thread shutdown");
shutdownDone.countDown();
}
}
public synchronized ConcurrentHashMap getInflightWrites() {
return inflightWrites;
}
}

View File

@ -0,0 +1,24 @@
package org.apache.activemq.kaha.impl.data;
import java.io.IOException;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.StoreLocation;
interface DataFileReader {
/**
* Sets the size property on a DataItem and returns the type of item that this was
* created as.
*
* @param marshaller
* @param item
* @return
* @throws IOException
*/
byte readDataItemSize(DataItem item) throws IOException;
Object readItem(Marshaller marshaller, StoreLocation item)
throws IOException;
}

View File

@ -4,7 +4,6 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.StoreLocation;
interface DataFileWriter {
@ -16,10 +15,10 @@ interface DataFileWriter {
* @throws IOException
* @throws FileNotFoundException
*/
public StoreLocation storeItem(Marshaller marshaller, Object payload,
public DataItem storeItem(Marshaller marshaller, Object payload,
byte type) throws IOException;
public void updateItem(StoreLocation location, Marshaller marshaller,
public void updateItem(DataItem item, Marshaller marshaller,
Object payload, byte type) throws IOException;
public void force(DataFile dataFile) throws IOException;

View File

@ -44,7 +44,7 @@ public final class DataManager{
private static final String NAME_PREFIX="data-";
private final File dir;
private final String name;
private SyncDataFileReader reader;
private DataFileReader reader;
private DataFileWriter writer;
private DataFile currentWriteFile;
private long maxFileLength = MAX_FILE_LENGTH;
@ -134,7 +134,7 @@ public final class DataManager{
}
public void updateItem(StoreLocation location,Marshaller marshaller, Object payload) throws IOException {
getWriter().updateItem(location,marshaller,payload,DATA_ITEM_TYPE);
getWriter().updateItem((DataItem)location,marshaller,payload,DATA_ITEM_TYPE);
}
public synchronized void recoverRedoItems(RedoListener listener) throws IOException{
@ -289,16 +289,20 @@ public final class DataManager{
return "DataManager:("+NAME_PREFIX+name+")";
}
public synchronized SyncDataFileReader getReader() {
public synchronized DataFileReader getReader() {
if( reader == null ) {
reader = createReader();
}
return reader;
}
protected SyncDataFileReader createReader() {
protected DataFileReader createReader() {
if( useAsyncWriter ) {
return new AsyncDataFileReader(this, (AsyncDataFileWriter) getWriter());
} else {
return new SyncDataFileReader(this);
}
public synchronized void setReader(SyncDataFileReader reader) {
}
public synchronized void setReader(DataFileReader reader) {
this.reader = reader;
}

View File

@ -27,7 +27,7 @@ import org.apache.activemq.util.DataByteArrayInputStream;
*
* @version $Revision: 1.1.1.1 $
*/
final class SyncDataFileReader{
final class SyncDataFileReader implements DataFileReader {
private DataManager dataManager;
private DataByteArrayInputStream dataIn;
@ -42,17 +42,10 @@ final class SyncDataFileReader{
this.dataIn=new DataByteArrayInputStream();
}
/**
* Sets the size property on a DataItem and returns the type of item that this was
* created as.
*
* @param marshaller
* @param item
* @return
* @throws IOException
/* (non-Javadoc)
* @see org.apache.activemq.kaha.impl.data.DataFileReader#readDataItemSize(org.apache.activemq.kaha.impl.data.DataItem)
*/
protected byte readDataItemSize(DataItem item) throws IOException {
public byte readDataItemSize(DataItem item) throws IOException {
RandomAccessFile file = dataManager.getDataFile(item).getRandomAccessFile();
file.seek(item.getOffset()); // jump to the size field
byte rc = file.readByte();
@ -60,7 +53,10 @@ final class SyncDataFileReader{
return rc;
}
protected Object readItem(Marshaller marshaller,StoreLocation item) throws IOException{
/* (non-Javadoc)
* @see org.apache.activemq.kaha.impl.data.DataFileReader#readItem(org.apache.activemq.kaha.Marshaller, org.apache.activemq.kaha.StoreLocation)
*/
public Object readItem(Marshaller marshaller,StoreLocation item) throws IOException{
RandomAccessFile file=dataManager.getDataFile(item).getRandomAccessFile();
// TODO: we could reuse the buffer in dataIn if it's big enough to avoid

View File

@ -48,7 +48,7 @@ final class SyncDataFileWriter implements DataFileWriter{
/* (non-Javadoc)
* @see org.apache.activemq.kaha.impl.data.DataFileWriter#storeItem(org.apache.activemq.kaha.Marshaller, java.lang.Object, byte)
*/
public synchronized StoreLocation storeItem(Marshaller marshaller, Object payload, byte type) throws IOException {
public synchronized DataItem storeItem(Marshaller marshaller, Object payload, byte type) throws IOException {
// Write the packet our internal buffer.
buffer.reset();
@ -77,7 +77,7 @@ final class SyncDataFileWriter implements DataFileWriter{
/* (non-Javadoc)
* @see org.apache.activemq.kaha.impl.data.DataFileWriter#updateItem(org.apache.activemq.kaha.StoreLocation, org.apache.activemq.kaha.Marshaller, java.lang.Object, byte)
*/
public synchronized void updateItem(StoreLocation location,Marshaller marshaller, Object payload, byte type) throws IOException {
public synchronized void updateItem(DataItem item,Marshaller marshaller, Object payload, byte type) throws IOException {
//Write the packet our internal buffer.
buffer.reset();
buffer.position(DataManager.ITEM_HEAD_SIZE);
@ -87,9 +87,10 @@ final class SyncDataFileWriter implements DataFileWriter{
buffer.reset();
buffer.writeByte(type);
buffer.writeInt(payloadSize);
DataFile dataFile = dataManager.getDataFile(location);
item.setSize(payloadSize);
DataFile dataFile = dataManager.getDataFile(item);
RandomAccessFile file = dataFile.getRandomAccessFile();
file.seek(location.getOffset());
file.seek(item.getOffset());
file.write(buffer.getData(),0,size);
dataFile.setWriterData(Boolean.TRUE); // Use as dirty marker..
}

View File

@ -0,0 +1,22 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.kaha.impl.data;
import org.apache.activemq.kaha.StoreLocation;