Started on an AsyncDataFileWriter implementation so that we can get more concurrent writes and batch up

file sync() calls done.



git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@475775 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-11-16 15:37:35 +00:00
parent 822df45e38
commit 4f3f52392d
7 changed files with 474 additions and 50 deletions

View File

@ -74,6 +74,7 @@ public class KahaStore implements Store{
private String mode;
private boolean initialized;
private boolean logIndexChanges=false;
private boolean useAsyncWriter=false;
private long maxDataFileLength=DataManager.MAX_FILE_LENGTH;
private FileLock lock;
private String indexType=IndexTypes.DISK_INDEX;
@ -314,6 +315,7 @@ public class KahaStore implements Store{
if(dm==null){
dm=new DataManager(directory,name);
dm.setMaxFileLength(maxDataFileLength);
dm.setUseAsyncWriter(isUseAsyncWriter());
recover(dm);
dataManagers.put(name,dm);
}
@ -523,6 +525,14 @@ public class KahaStore implements Store{
}
}
public synchronized boolean isUseAsyncWriter() {
return useAsyncWriter;
}
public synchronized void setUseAsyncWriter(boolean useAsyncWriter) {
this.useAsyncWriter = useAsyncWriter;
}
}

View File

@ -0,0 +1,325 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.kaha.impl.data;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.RandomAccessFile;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.StoreLocation;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.util.DataByteArrayOutputStream;
import edu.emory.mathcs.backport.java.util.LinkedList;
import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
/**
* Optimized Store writer that uses an async thread do batched writes to
* the datafile.
*
* @version $Revision: 1.1.1.1 $
*/
final class AsyncDataFileWriter implements DataFileWriter {
private static final Object SHUTDOWN_COMMAND = new Object();
static class WriteCommand {
final RandomAccessFile dataFile;
final byte[] data;
final long offset;
final int size;
final CountDownLatch latch;
public WriteCommand(RandomAccessFile dataFile, byte[] data, long offset, int size, CountDownLatch latch) {
this.dataFile = dataFile;
this.data = data;
this.offset = offset;
this.size = size;
this.latch = latch;
}
}
private DataManager dataManager;
private final Object enqueueMutex = new Object();
private final LinkedList queue = new LinkedList();
private final UsageManager usage = new UsageManager();
private CountDownLatch latchAssignedToNewWrites = new CountDownLatch(1);
private boolean running;
private boolean shutdown;
private IOException firstAsyncException;
private final CountDownLatch shutdownDone = new CountDownLatch(1);
/**
* Construct a Store writer
*
* @param file
*/
AsyncDataFileWriter(DataManager fileManager){
this.dataManager=fileManager;
this.usage.setLimit(1024*1024*8); // Allow about 8 megs of concurrent data to be queued up
}
public void force(final DataFile dataFile) throws IOException {
try {
CountDownLatch latch = null;
synchronized( enqueueMutex ) {
latch = (CountDownLatch) dataFile.getWriterData();
}
if( latch==null ) {
return;
}
latch.await();
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
}
/**
* @param marshaller
* @param payload
* @param type
* @return
* @throws IOException
*/
public StoreLocation storeItem(Marshaller marshaller, Object payload, byte type) throws IOException {
// We may need to slow down if we are pounding the async thread too
// hard..
try {
usage.waitForSpace();
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
// Write the packet our internal buffer.
final DataByteArrayOutputStream buffer = new DataByteArrayOutputStream();
buffer.position(DataManager.ITEM_HEAD_SIZE);
marshaller.writePayload(payload,buffer);
final int size=buffer.size();
int payloadSize=size-DataManager.ITEM_HEAD_SIZE;
buffer.reset();
buffer.writeByte(type);
buffer.writeInt(payloadSize);
final DataItem item=new DataItem();
item.setSize(payloadSize);
usage.increaseUsage(size);
// Locate datafile and enqueue into the executor in sychronized block so that
// writes get equeued onto the executor in order that they were assigned by
// the data manager (which is basically just appending)
synchronized(enqueueMutex) {
// Find the position where this item will land at.
final DataFile dataFile=dataManager.findSpaceForData(item);
dataManager.addInterestInFile(dataFile);
dataFile.setWriterData(latchAssignedToNewWrites);
enqueue(new WriteCommand(dataFile.getRandomAccessFile(), buffer.getData(), item.getOffset(), size, latchAssignedToNewWrites));
}
return item;
}
/**
*
*/
public void updateItem(final StoreLocation location,Marshaller marshaller, Object payload, byte type) throws IOException {
// We may need to slow down if we are pounding the async thread too
// hard..
try {
usage.waitForSpace();
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
//Write the packet our internal buffer.
final DataByteArrayOutputStream buffer = new DataByteArrayOutputStream();
buffer.position(DataManager.ITEM_HEAD_SIZE);
marshaller.writePayload(payload,buffer);
final int size=buffer.size();
int payloadSize=size-DataManager.ITEM_HEAD_SIZE;
buffer.reset();
buffer.writeByte(type);
buffer.writeInt(payloadSize);
final DataFile dataFile = dataManager.getDataFile(location);
usage.increaseUsage(size);
// Equeue the write to an async thread.
synchronized(enqueueMutex) {
dataFile.setWriterData(latchAssignedToNewWrites);
enqueue(new WriteCommand(dataFile.getRandomAccessFile(), buffer.getData(), location.getOffset(), size, latchAssignedToNewWrites));
}
}
private void enqueue(Object command) throws IOException {
if( shutdown ) {
throw new IOException("Async Writter Thread Shutdown");
}
if( firstAsyncException !=null )
throw firstAsyncException;
if( !running ) {
running=true;
Thread thread = new Thread() {
public void run() {
processQueue();
}
};
thread.setPriority(Thread.MAX_PRIORITY);
thread.setDaemon(true);
thread.setName("ActiveMQ Data File Writer");
thread.start();
}
queue.addLast(command);
enqueueMutex.notify();
}
private Object dequeue() {
synchronized( enqueueMutex ) {
while( queue.isEmpty() ) {
try {
enqueueMutex.wait();
} catch (InterruptedException e) {
return SHUTDOWN_COMMAND;
}
}
return queue.removeFirst();
}
}
public void close() throws IOException {
synchronized( enqueueMutex ) {
if( shutdown == false ) {
shutdown = true;
if( running ) {
queue.add(SHUTDOWN_COMMAND);
enqueueMutex.notify();
} else {
shutdownDone.countDown();
}
}
}
try {
shutdownDone.await();
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
}
boolean isShutdown() {
synchronized( enqueueMutex ) {
return shutdown;
}
}
/**
* The async processing loop that writes to the data files and
* does the force calls.
*
* Since the file sync() call is the slowest of all the operations,
* this algorithm tries to 'batch' or group together several file sync() requests
* into a single file sync() call. The batching is accomplished attaching the
* same CountDownLatch instance to every force request in a group.
*
*/
private void processQueue() {
try {
CountDownLatch currentBatchLatch=null;
RandomAccessFile currentBatchDataFile=null;
while( !isShutdown() ) {
// Block till we get a command.
Object o = dequeue();
if( o == SHUTDOWN_COMMAND ) {
if( currentBatchLatch!=null ) {
currentBatchDataFile.getFD().sync();
currentBatchLatch.countDown();
}
break;
} else if( o.getClass() == CountDownLatch.class ) {
// The CountDownLatch is used as the end of batch indicator.
// Must match..
if( o == currentBatchLatch ) {
currentBatchDataFile.getFD().sync();
currentBatchLatch.countDown();
currentBatchLatch=null;
currentBatchDataFile=null;
} else {
new IOException("Got an out of sequence end of end of batch indicator.");
}
} else if( o.getClass() == WriteCommand.class ) {
WriteCommand write = (WriteCommand) o;
if( currentBatchDataFile == null )
currentBatchDataFile = write.dataFile;
// We may need to prematurely sync if the batch
// if user is switching between data files.
if( currentBatchDataFile!=write.dataFile ) {
currentBatchDataFile.getFD().sync();
currentBatchDataFile = write.dataFile;
}
// Write to the data..
write.dataFile.seek(write.offset);
write.dataFile.write(write.data,0,write.size);
usage.decreaseUsage(write.size);
// Start of a batch..
if( currentBatchLatch == null ) {
currentBatchLatch = write.latch;
synchronized(enqueueMutex) {
// get the request threads to start using a new latch..
// write commands allready in the queue should have the
// same latch assigned.
latchAssignedToNewWrites = new CountDownLatch(1);
// enqueue an end of batch indicator..
queue.add(currentBatchLatch);
enqueueMutex.notify();
}
} else if( currentBatchLatch!=write.latch ) {
// the latch on subsequent writes should match.
new IOException("Got an out of sequence write.");
}
}
}
} catch (IOException e) {
synchronized( enqueueMutex ) {
firstAsyncException = e;
}
} finally {
shutdownDone.countDown();
}
}
}

View File

@ -27,10 +27,12 @@ import java.io.RandomAccessFile;
* @version $Revision: 1.1.1.1 $
*/
class DataFile{
private File file;
private Integer number;
private int referenceCount;
private RandomAccessFile randomAcessFile;
private Object writerData;
long length=0;
DataFile(File file,int number){
@ -70,12 +72,6 @@ class DataFile{
return file.delete();
}
synchronized void force() throws IOException{
if(randomAcessFile!=null){
randomAcessFile.getFD().sync();
}
}
synchronized void close() throws IOException{
if(randomAcessFile!=null){
randomAcessFile.close();
@ -98,4 +94,19 @@ class DataFile{
String result = file.getName() + " number = " + number + " , length = " + length + " refCount = " + referenceCount;
return result;
}
/**
* @return Opaque data that a DataFileWriter may want to associate with the DataFile.
*/
public synchronized Object getWriterData() {
return writerData;
}
/**
* @param writerData - Opaque data that a DataFileWriter may want to associate with the DataFile.
*/
public synchronized void setWriterData(Object writerData) {
this.writerData = writerData;
}
}

View File

@ -0,0 +1,28 @@
package org.apache.activemq.kaha.impl.data;
import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.StoreLocation;
interface DataFileWriter {
/**
* @param marshaller
* @param payload
* @param data_item2
* @return
* @throws IOException
* @throws FileNotFoundException
*/
public StoreLocation storeItem(Marshaller marshaller, Object payload,
byte type) throws IOException;
public void updateItem(StoreLocation location, Marshaller marshaller,
Object payload, byte type) throws IOException;
public void force(DataFile dataFile) throws IOException;
public void close() throws IOException;
}

View File

@ -20,7 +20,6 @@ package org.apache.activemq.kaha.impl.data;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@ -45,11 +44,13 @@ public final class DataManager{
private static final String NAME_PREFIX="data-";
private final File dir;
private final String name;
private StoreDataReader reader;
private StoreDataWriter writer;
private SyncDataFileReader reader;
private DataFileWriter writer;
private DataFile currentWriteFile;
private long maxFileLength = MAX_FILE_LENGTH;
Map fileMap=new HashMap();
private boolean useAsyncWriter=false;
public static final int ITEM_HEAD_SIZE=5; // type + length
public static final byte DATA_ITEM_TYPE=1;
@ -61,8 +62,6 @@ public final class DataManager{
public DataManager(File dir, final String name){
this.dir=dir;
this.name=name;
this.reader=new StoreDataReader(this);
this.writer=new StoreDataWriter(this);
dataFilePrefix = NAME_PREFIX+name+"-";
// build up list of current dataFiles
@ -107,34 +106,35 @@ public final class DataManager{
currentWriteFile=createAndAddDataFile(nextNum);
}
item.setOffset(currentWriteFile.getLength());
item.setFile(currentWriteFile.getNumber().intValue());
item.setFile(currentWriteFile.getNumber().intValue());
currentWriteFile.incrementLength(item.getSize()+ITEM_HEAD_SIZE);
return currentWriteFile;
}
RandomAccessFile getDataFile(StoreLocation item) throws IOException{
DataFile getDataFile(StoreLocation item) throws IOException{
Integer key=new Integer(item.getFile());
DataFile dataFile=(DataFile) fileMap.get(key);
if(dataFile!=null){
return dataFile.getRandomAccessFile();
if(dataFile==null){
log.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
throw new IOException("Could not locate data file "+NAME_PREFIX+name+"-"+item.getFile());
}
log.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
throw new IOException("Could not locate data file "+NAME_PREFIX+name+"-"+item.getFile());
return dataFile;
}
public synchronized Object readItem(Marshaller marshaller, StoreLocation item) throws IOException{
return reader.readItem(marshaller,item);
return getReader().readItem(marshaller,item);
}
public synchronized StoreLocation storeDataItem(Marshaller marshaller, Object payload) throws IOException{
return writer.storeItem(marshaller,payload, DATA_ITEM_TYPE);
public StoreLocation storeDataItem(Marshaller marshaller, Object payload) throws IOException{
return getWriter().storeItem(marshaller,payload, DATA_ITEM_TYPE);
}
public synchronized StoreLocation storeRedoItem(Object payload) throws IOException{
return writer.storeItem(redoMarshaller, payload, REDO_ITEM_TYPE);
public StoreLocation storeRedoItem(Object payload) throws IOException{
return getWriter().storeItem(redoMarshaller, payload, REDO_ITEM_TYPE);
}
public synchronized void updateItem(StoreLocation location,Marshaller marshaller, Object payload) throws IOException {
writer.updateItem(location,marshaller,payload,DATA_ITEM_TYPE);
public void updateItem(StoreLocation location,Marshaller marshaller, Object payload) throws IOException {
getWriter().updateItem(location,marshaller,payload,DATA_ITEM_TYPE);
}
public synchronized void recoverRedoItems(RedoListener listener) throws IOException{
@ -149,7 +149,7 @@ public final class DataManager{
while( true ) {
byte type;
try {
type = reader.readDataItemSize(item);
type = getReader().readDataItemSize(item);
} catch (IOException ignore) {
log.trace("End of data file reached at (header was invalid): "+item);
return;
@ -180,9 +180,10 @@ public final class DataManager{
}
public synchronized void close() throws IOException{
getWriter().close();
for(Iterator i=fileMap.values().iterator();i.hasNext();){
DataFile dataFile=(DataFile) i.next();
dataFile.force();
getWriter().force(dataFile);
dataFile.close();
}
fileMap.clear();
@ -191,7 +192,7 @@ public final class DataManager{
public synchronized void force() throws IOException{
for(Iterator i=fileMap.values().iterator();i.hasNext();){
DataFile dataFile=(DataFile) i.next();
dataFile.force();
getWriter().force(dataFile);
}
}
@ -218,7 +219,7 @@ public final class DataManager{
}
}
void addInterestInFile(DataFile dataFile){
synchronized void addInterestInFile(DataFile dataFile){
if(dataFile!=null){
dataFile.increment();
}
@ -287,4 +288,42 @@ public final class DataManager{
public String toString(){
return "DataManager:("+NAME_PREFIX+name+")";
}
public synchronized SyncDataFileReader getReader() {
if( reader == null ) {
reader = createReader();
}
return reader;
}
protected SyncDataFileReader createReader() {
return new SyncDataFileReader(this);
}
public synchronized void setReader(SyncDataFileReader reader) {
this.reader = reader;
}
public synchronized DataFileWriter getWriter() {
if( writer==null ) {
writer = createWriter();
}
return writer;
}
private DataFileWriter createWriter() {
if( useAsyncWriter ) {
return new AsyncDataFileWriter(this);
} else {
return new SyncDataFileWriter(this);
}
}
public synchronized void setWriter(DataFileWriter writer) {
this.writer = writer;
}
public synchronized boolean isUseAsyncWriter() {
return useAsyncWriter;
}
public synchronized void setUseAsyncWriter(boolean useAsyncWriter) {
this.useAsyncWriter = useAsyncWriter;
}
}

View File

@ -27,7 +27,7 @@ import org.apache.activemq.util.DataByteArrayInputStream;
*
* @version $Revision: 1.1.1.1 $
*/
final class StoreDataReader{
final class SyncDataFileReader{
private DataManager dataManager;
private DataByteArrayInputStream dataIn;
@ -37,7 +37,7 @@ final class StoreDataReader{
*
* @param file
*/
StoreDataReader(DataManager fileManager){
SyncDataFileReader(DataManager fileManager){
this.dataManager=fileManager;
this.dataIn=new DataByteArrayInputStream();
}
@ -53,7 +53,7 @@ final class StoreDataReader{
*/
protected byte readDataItemSize(DataItem item) throws IOException {
RandomAccessFile file = dataManager.getDataFile(item);
RandomAccessFile file = dataManager.getDataFile(item).getRandomAccessFile();
file.seek(item.getOffset()); // jump to the size field
byte rc = file.readByte();
item.setSize(file.readInt());
@ -61,7 +61,7 @@ final class StoreDataReader{
}
protected Object readItem(Marshaller marshaller,StoreLocation item) throws IOException{
RandomAccessFile file=dataManager.getDataFile(item);
RandomAccessFile file=dataManager.getDataFile(item).getRandomAccessFile();
// TODO: we could reuse the buffer in dataIn if it's big enough to avoid
// allocating byte[] arrays on every readItem.

View File

@ -17,7 +17,6 @@
*/
package org.apache.activemq.kaha.impl.data;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
@ -25,11 +24,12 @@ import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.StoreLocation;
import org.apache.activemq.util.DataByteArrayOutputStream;
/**
* Optimized Store writer
* Optimized Store writer. Synchronously marshalls and writes to the data file. Simple but
* may introduce a bit of contention when put under load.
*
* @version $Revision: 1.1.1.1 $
*/
final class StoreDataWriter{
final class SyncDataFileWriter implements DataFileWriter{
private DataByteArrayOutputStream buffer;
private DataManager dataManager;
@ -40,20 +40,15 @@ final class StoreDataWriter{
*
* @param file
*/
StoreDataWriter(DataManager fileManager){
SyncDataFileWriter(DataManager fileManager){
this.dataManager=fileManager;
this.buffer=new DataByteArrayOutputStream();
}
/**
* @param marshaller
* @param payload
* @param data_item2
* @return
* @throws IOException
* @throws FileNotFoundException
*/
StoreLocation storeItem(Marshaller marshaller, Object payload, byte type) throws IOException {
/* (non-Javadoc)
* @see org.apache.activemq.kaha.impl.data.DataFileWriter#storeItem(org.apache.activemq.kaha.Marshaller, java.lang.Object, byte)
*/
public synchronized StoreLocation storeItem(Marshaller marshaller, Object payload, byte type) throws IOException {
// Write the packet our internal buffer.
buffer.reset();
@ -73,13 +68,16 @@ final class StoreDataWriter{
// Now splat the buffer to the file.
dataFile.getRandomAccessFile().seek(item.getOffset());
dataFile.getRandomAccessFile().write(buffer.getData(),0,size);
dataFile.incrementLength(size);
dataFile.setWriterData(Boolean.TRUE); // Use as dirty marker..
dataManager.addInterestInFile(dataFile);
return item;
}
void updateItem(StoreLocation location,Marshaller marshaller, Object payload, byte type) throws IOException {
/* (non-Javadoc)
* @see org.apache.activemq.kaha.impl.data.DataFileWriter#updateItem(org.apache.activemq.kaha.StoreLocation, org.apache.activemq.kaha.Marshaller, java.lang.Object, byte)
*/
public synchronized void updateItem(StoreLocation location,Marshaller marshaller, Object payload, byte type) throws IOException {
//Write the packet our internal buffer.
buffer.reset();
buffer.position(DataManager.ITEM_HEAD_SIZE);
@ -89,8 +87,21 @@ final class StoreDataWriter{
buffer.reset();
buffer.writeByte(type);
buffer.writeInt(payloadSize);
RandomAccessFile dataFile = dataManager.getDataFile(location);
dataFile.seek(location.getOffset());
dataFile.write(buffer.getData(),0,size);
DataFile dataFile = dataManager.getDataFile(location);
RandomAccessFile file = dataFile.getRandomAccessFile();
file.seek(location.getOffset());
file.write(buffer.getData(),0,size);
dataFile.setWriterData(Boolean.TRUE); // Use as dirty marker..
}
public synchronized void force(DataFile dataFile) throws IOException {
// If our dirty marker was set.. then we need to sync
if( dataFile.getWriterData()!=null ) {
dataFile.getRandomAccessFile().getFD().sync();
dataFile.setWriterData(null);
}
}
public void close() throws IOException {
}
}