mirror of https://github.com/apache/activemq.git
- Added some of the intial infrastucture needed to be able to use that DataManager to do index file recovery.
- ObjectMarshaller now avoids doing byte[] allocations. - We now store the data item's size in the index file so that when we load data from the data file using a DataItem object, the data payload can be read in 1 io operation, previously an extra io was need to find out the size of the payload. git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@409297 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
06b94f1220
commit
aa026f1f6a
|
@ -13,13 +13,13 @@
|
|||
*/
|
||||
package org.apache.activemq.kaha;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.ObjectInputStream;
|
||||
import java.io.ObjectOutputStream;
|
||||
import java.io.OutputStream;
|
||||
/**
|
||||
* Implementation of a Marshaller for Objects
|
||||
*
|
||||
|
@ -34,13 +34,20 @@ public class ObjectMarshaller implements Marshaller{
|
|||
* @throws IOException
|
||||
*/
|
||||
public void writePayload(Object object,DataOutput dataOut) throws IOException{
|
||||
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
|
||||
ObjectOutputStream objectOut=new ObjectOutputStream(bytesOut);
|
||||
|
||||
// I failed to see why we just did not just used the provided stream directly
|
||||
// ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
|
||||
// ObjectOutputStream objectOut=new ObjectOutputStream(bytesOut);
|
||||
// objectOut.writeObject(object);
|
||||
// objectOut.close();
|
||||
// byte[] data = bytesOut.toByteArray();
|
||||
// dataOut.writeInt(data.length);
|
||||
// dataOut.write(data);
|
||||
|
||||
ObjectOutputStream objectOut=new ObjectOutputStream((OutputStream) dataOut);
|
||||
objectOut.writeObject(object);
|
||||
objectOut.close();
|
||||
byte[] data = bytesOut.toByteArray();
|
||||
dataOut.writeInt(data.length);
|
||||
dataOut.write(data);
|
||||
objectOut.reset();
|
||||
objectOut.flush();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -51,15 +58,25 @@ public class ObjectMarshaller implements Marshaller{
|
|||
* @throws IOException
|
||||
*/
|
||||
public Object readPayload(DataInput dataIn) throws IOException{
|
||||
int size = dataIn.readInt();
|
||||
byte[] data = new byte[size];
|
||||
dataIn.readFully(data);
|
||||
ByteArrayInputStream bytesIn = new ByteArrayInputStream(data);
|
||||
ObjectInputStream objectIn=new ObjectInputStream(bytesIn);
|
||||
|
||||
// I failed to see why we just did not just used the provided stream directly
|
||||
// int size = dataIn.readInt();
|
||||
// byte[] data = new byte[size];
|
||||
// dataIn.readFully(data);
|
||||
// ByteArrayInputStream bytesIn = new ByteArrayInputStream(data);
|
||||
// ObjectInputStream objectIn=new ObjectInputStream(bytesIn);
|
||||
// try{
|
||||
// return objectIn.readObject();
|
||||
// }catch(ClassNotFoundException e){
|
||||
// throw new IOException(e.getMessage());
|
||||
// }
|
||||
|
||||
ObjectInputStream objectIn=new ObjectInputStream((InputStream) dataIn);
|
||||
try{
|
||||
return objectIn.readObject();
|
||||
}catch(ClassNotFoundException e){
|
||||
} catch(ClassNotFoundException e) {
|
||||
throw new IOException(e.getMessage());
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,51 +13,29 @@
|
|||
*/
|
||||
package org.apache.activemq.kaha.impl;
|
||||
|
||||
import org.apache.activemq.kaha.Marshaller;
|
||||
import java.io.DataInput;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutput;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
/**
|
||||
* A a wrapper for a data in the store
|
||||
*
|
||||
* @version $Revision: 1.2 $
|
||||
*/
|
||||
final class DataItem implements Item{
|
||||
static final int HEAD_SIZE=6; // magic + len
|
||||
private int size;
|
||||
private long offset=POSITION_NOT_SET;
|
||||
|
||||
private int file=(int) POSITION_NOT_SET;
|
||||
private long offset=POSITION_NOT_SET;
|
||||
private int size;
|
||||
|
||||
DataItem(){}
|
||||
|
||||
DataItem(DataItem item) {
|
||||
this.file = item.file;
|
||||
this.offset = item.offset;
|
||||
this.size = item.size;
|
||||
}
|
||||
|
||||
boolean isValid(){
|
||||
return file != POSITION_NOT_SET;
|
||||
}
|
||||
|
||||
void writeHeader(DataOutput dataOut) throws IOException{
|
||||
dataOut.writeShort(MAGIC);
|
||||
dataOut.writeInt(size);
|
||||
}
|
||||
|
||||
void readHeader(DataInput dataIn) throws IOException{
|
||||
int magic=dataIn.readShort();
|
||||
if(magic==MAGIC){
|
||||
size=dataIn.readInt();
|
||||
}else{
|
||||
throw new BadMagicException("Unexpected Magic value: "+magic);
|
||||
}
|
||||
}
|
||||
|
||||
void writePayload(Marshaller marshaller,Object object,DataOutputStream dataOut) throws IOException{
|
||||
marshaller.writePayload(object,dataOut);
|
||||
}
|
||||
|
||||
Object readPayload(Marshaller marshaller,DataInputStream dataIn) throws IOException{
|
||||
return marshaller.readPayload(dataIn);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Returns the size.
|
||||
*/
|
||||
|
@ -107,4 +85,8 @@ final class DataItem implements Item{
|
|||
String result="offset = "+offset+", file = " + file + ", size = "+size;
|
||||
return result;
|
||||
}
|
||||
|
||||
public DataItem copy() {
|
||||
return new DataItem(this);
|
||||
}
|
||||
}
|
|
@ -22,7 +22,9 @@ import java.util.HashMap;
|
|||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.activemq.kaha.Marshaller;
|
||||
import org.apache.activemq.util.IOExceptionSupport;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
/**
|
||||
|
@ -40,6 +42,10 @@ final class DataManager{
|
|||
private DataFile currentWriteFile;
|
||||
Map fileMap=new HashMap();
|
||||
|
||||
public static final int ITEM_HEAD_SIZE=5; // type + length
|
||||
public static final byte DATA_ITEM_TYPE=1;
|
||||
public static final byte REDO_ITEM_TYPE=2;
|
||||
|
||||
DataManager(File dir,String pf){
|
||||
this.dir=dir;
|
||||
this.prefix=pf;
|
||||
|
@ -91,15 +97,56 @@ final class DataManager{
|
|||
}
|
||||
throw new IOException("Could not locate data file "+prefix+item.getFile());
|
||||
}
|
||||
|
||||
synchronized Object readItem(Marshaller marshaller,DataItem item) throws IOException{
|
||||
|
||||
synchronized Object readItem(Marshaller marshaller, DataItem item) throws IOException{
|
||||
return reader.readItem(marshaller,item);
|
||||
}
|
||||
|
||||
synchronized DataItem storeItem(Marshaller marshaller,Object payload) throws IOException{
|
||||
return writer.storeItem(marshaller,payload);
|
||||
synchronized DataItem storeDataItem(Marshaller marshaller, Object payload) throws IOException{
|
||||
return writer.storeItem(marshaller,payload, DATA_ITEM_TYPE);
|
||||
}
|
||||
|
||||
synchronized DataItem storeRedoItem(Marshaller marshaller, Object payload) throws IOException{
|
||||
return writer.storeItem(marshaller,payload, REDO_ITEM_TYPE);
|
||||
}
|
||||
|
||||
synchronized void recoverRedoItems(Marshaller marshaller, RedoListener listener) throws IOException{
|
||||
DataItem item = new DataItem();
|
||||
item.setFile(currentWriteFile.getNumber().intValue());
|
||||
item.setOffset(0);
|
||||
while( true ) {
|
||||
byte type;
|
||||
try {
|
||||
type = reader.readDataItemSize(item);
|
||||
} catch (IOException ignore) {
|
||||
log.trace("End of data file reached at (header was invalid): "+item);
|
||||
return;
|
||||
}
|
||||
if( type == REDO_ITEM_TYPE ) {
|
||||
// Un-marshal the redo item
|
||||
Object object;
|
||||
try {
|
||||
object = readItem(marshaller, item);
|
||||
} catch (IOException e1) {
|
||||
log.trace("End of data file reached at (payload was invalid): "+item);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
|
||||
listener.onRedoItem(item, object);
|
||||
// in case the listener is holding on to item references, copy it
|
||||
// so we don't change it behind the listener's back.
|
||||
item = item.copy();
|
||||
|
||||
} catch (Exception e) {
|
||||
throw IOExceptionSupport.create("Recovery handler failed: "+e,e);
|
||||
}
|
||||
}
|
||||
// Move to the next item.
|
||||
item.setOffset(item.getOffset()+ITEM_HEAD_SIZE+item.getSize());
|
||||
}
|
||||
}
|
||||
|
||||
synchronized void close() throws IOException{
|
||||
for(Iterator i=fileMap.values().iterator();i.hasNext();){
|
||||
DataFile dataFile=(DataFile) i.next();
|
||||
|
|
|
@ -17,13 +17,13 @@ import java.io.DataInput;
|
|||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
/**
|
||||
* A an Item with a relative postion and location to other Items in the Store
|
||||
* A an Item with a relative position and location to other Items in the Store
|
||||
*
|
||||
* @version $Revision: 1.2 $
|
||||
*/
|
||||
final class IndexItem implements Item{
|
||||
|
||||
static final int INDEX_SIZE=43;
|
||||
static final int INDEX_SIZE=51;
|
||||
//used by linked list
|
||||
IndexItem next;
|
||||
IndexItem prev;
|
||||
|
@ -31,13 +31,17 @@ final class IndexItem implements Item{
|
|||
private long offset=POSITION_NOT_SET;
|
||||
private long previousItem=POSITION_NOT_SET;
|
||||
private long nextItem=POSITION_NOT_SET;
|
||||
private long keyOffset=POSITION_NOT_SET;
|
||||
private int keyFile=(int) POSITION_NOT_SET;
|
||||
private long valueOffset=POSITION_NOT_SET;
|
||||
private int valueFile=(int) POSITION_NOT_SET;
|
||||
private boolean active=true;
|
||||
|
||||
|
||||
// TODO: consider just using a DataItem for the following fields.
|
||||
private long keyOffset=POSITION_NOT_SET;
|
||||
private int keyFile=(int) POSITION_NOT_SET;
|
||||
private int keySize=0;
|
||||
|
||||
private long valueOffset=POSITION_NOT_SET;
|
||||
private int valueFile=(int) POSITION_NOT_SET;
|
||||
private int valueSize=0;
|
||||
|
||||
/**
|
||||
* Default Constructor
|
||||
*/
|
||||
|
@ -48,8 +52,10 @@ final class IndexItem implements Item{
|
|||
nextItem=POSITION_NOT_SET;
|
||||
keyOffset=POSITION_NOT_SET;
|
||||
keyFile=(int) POSITION_NOT_SET;
|
||||
keySize=0;
|
||||
valueOffset=POSITION_NOT_SET;
|
||||
valueFile=(int) POSITION_NOT_SET;
|
||||
valueSize=0;
|
||||
active=true;
|
||||
}
|
||||
|
||||
|
@ -57,6 +63,7 @@ final class IndexItem implements Item{
|
|||
DataItem result=new DataItem();
|
||||
result.setOffset(keyOffset);
|
||||
result.setFile(keyFile);
|
||||
result.setSize(keySize);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -64,17 +71,20 @@ final class IndexItem implements Item{
|
|||
DataItem result=new DataItem();
|
||||
result.setOffset(valueOffset);
|
||||
result.setFile(valueFile);
|
||||
result.setSize(valueSize);
|
||||
return result;
|
||||
}
|
||||
|
||||
void setValueData(DataItem item){
|
||||
valueOffset=item.getOffset();
|
||||
valueFile=item.getFile();
|
||||
valueSize=item.getSize();
|
||||
}
|
||||
|
||||
void setKeyData(DataItem item){
|
||||
keyOffset=item.getOffset();
|
||||
keyFile=item.getFile();
|
||||
keySize=item.getSize();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -88,8 +98,10 @@ final class IndexItem implements Item{
|
|||
dataOut.writeLong(nextItem);
|
||||
dataOut.writeInt(keyFile);
|
||||
dataOut.writeLong(keyOffset);
|
||||
dataOut.writeInt(keySize);
|
||||
dataOut.writeInt(valueFile);
|
||||
dataOut.writeLong(valueOffset);
|
||||
dataOut.writeInt(valueSize);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -105,8 +117,10 @@ final class IndexItem implements Item{
|
|||
nextItem=dataIn.readLong();
|
||||
keyFile=dataIn.readInt();
|
||||
keyOffset=dataIn.readLong();
|
||||
keySize=dataIn.readInt();
|
||||
valueFile=dataIn.readInt();
|
||||
valueOffset=dataIn.readLong();
|
||||
valueSize=dataIn.readInt();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -221,12 +235,31 @@ final class IndexItem implements Item{
|
|||
this.offset=offset;
|
||||
}
|
||||
|
||||
public int getKeySize() {
|
||||
return keySize;
|
||||
}
|
||||
|
||||
public void setKeySize(int keySize) {
|
||||
this.keySize = keySize;
|
||||
}
|
||||
|
||||
public int getValueSize() {
|
||||
return valueSize;
|
||||
}
|
||||
|
||||
public void setValueSize(int valueSize) {
|
||||
this.valueSize = valueSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return eprtty print of 'this'
|
||||
* @return print of 'this'
|
||||
*/
|
||||
public String toString(){
|
||||
String result="offset="+offset+" , keyFile = "+keyFile+" , keyOffset = "+keyOffset+", valueOffset = "
|
||||
+valueOffset+" , previousItem = "+previousItem+" , nextItem = "+nextItem;
|
||||
String result="offset="+offset+
|
||||
", key=("+keyFile+", "+keyOffset+", "+keySize+")"+
|
||||
", value=("+valueFile+", "+valueOffset+", "+valueSize+")"+
|
||||
", previousItem="+previousItem+", nextItem="+nextItem
|
||||
;
|
||||
return result;
|
||||
}
|
||||
}
|
|
@ -66,7 +66,7 @@ class IndexRootContainer {
|
|||
removeRoot(key);
|
||||
}
|
||||
|
||||
DataItem data = dataManager.storeItem(rootMarshaller, key);
|
||||
DataItem data = dataManager.storeDataItem(rootMarshaller, key);
|
||||
IndexItem index = indexManager.createNewIndex();
|
||||
index.setKeyData(data);
|
||||
IndexItem newRoot = indexManager.createNewIndex();
|
||||
|
|
|
@ -641,7 +641,7 @@ final class ListContainerImpl extends BaseContainerImpl implements ListContainer
|
|||
IndexItem index=null;
|
||||
try{
|
||||
if(value!=null){
|
||||
DataItem data=dataManager.storeItem(marshaller,value);
|
||||
DataItem data=dataManager.storeDataItem(marshaller,value);
|
||||
index=indexManager.createNewIndex();
|
||||
index.setValueData(data);
|
||||
IndexItem prev=list.getLast();
|
||||
|
@ -668,7 +668,7 @@ final class ListContainerImpl extends BaseContainerImpl implements ListContainer
|
|||
IndexItem index=null;
|
||||
try{
|
||||
if(value!=null){
|
||||
DataItem data=dataManager.storeItem(marshaller,value);
|
||||
DataItem data=dataManager.storeDataItem(marshaller,value);
|
||||
index=indexManager.createNewIndex();
|
||||
index.setValueData(data);
|
||||
IndexItem prev=root;
|
||||
|
@ -695,7 +695,7 @@ final class ListContainerImpl extends BaseContainerImpl implements ListContainer
|
|||
IndexItem index=null;
|
||||
try{
|
||||
if(value!=null){
|
||||
DataItem data=dataManager.storeItem(marshaller,value);
|
||||
DataItem data=dataManager.storeDataItem(marshaller,value);
|
||||
index=indexManager.createNewIndex();
|
||||
index.setValueData(data);
|
||||
IndexItem prev=null;
|
||||
|
|
|
@ -342,11 +342,11 @@ final class MapContainerImpl extends BaseContainerImpl implements MapContainer{
|
|||
try{
|
||||
if(key!=null){
|
||||
index=indexManager.createNewIndex();
|
||||
DataItem data=dataManager.storeItem(keyMarshaller,key);
|
||||
DataItem data=dataManager.storeDataItem(keyMarshaller,key);
|
||||
index.setKeyData(data);
|
||||
}
|
||||
if(value!=null){
|
||||
DataItem data=dataManager.storeItem(valueMarshaller,value);
|
||||
DataItem data=dataManager.storeDataItem(valueMarshaller,value);
|
||||
index.setValueData(data);
|
||||
}
|
||||
IndexItem last=list.isEmpty()?null:(IndexItem) list.getLast();
|
||||
|
|
|
@ -0,0 +1,7 @@
|
|||
package org.apache.activemq.kaha.impl;
|
||||
|
||||
public interface RedoListener {
|
||||
|
||||
void onRedoItem(DataItem item, Object object) throws Exception;
|
||||
|
||||
}
|
|
@ -22,9 +22,9 @@ import org.apache.activemq.kaha.Marshaller;
|
|||
* @version $Revision: 1.1.1.1 $
|
||||
*/
|
||||
final class StoreDataReader{
|
||||
|
||||
private DataManager dataManager;
|
||||
private StoreByteArrayInputStream dataIn;
|
||||
private byte[] header=new byte[DataItem.HEAD_SIZE];
|
||||
|
||||
/**
|
||||
* Construct a Store reader
|
||||
|
@ -36,13 +36,31 @@ final class StoreDataReader{
|
|||
this.dataIn=new StoreByteArrayInputStream();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the size property on a DataItem and returns the type of item that this was
|
||||
* created as.
|
||||
*
|
||||
* @param marshaller
|
||||
* @param item
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
protected byte readDataItemSize(DataItem item) throws IOException {
|
||||
|
||||
RandomAccessFile file = dataManager.getDataFile(item);
|
||||
file.seek(item.getOffset()); // jump to the size field
|
||||
byte rc = file.readByte();
|
||||
item.setSize(file.readInt());
|
||||
return rc;
|
||||
}
|
||||
|
||||
protected Object readItem(Marshaller marshaller,DataItem item) throws IOException{
|
||||
RandomAccessFile file=dataManager.getDataFile(item);
|
||||
file.seek(item.getOffset());
|
||||
file.readFully(header);
|
||||
dataIn.restart(header);
|
||||
item.readHeader(dataIn);
|
||||
|
||||
// TODO: we could reuse the buffer in dataIn if it's big enough to avoid
|
||||
// allocating byte[] arrays on every readItem.
|
||||
byte[] data=new byte[item.getSize()];
|
||||
file.seek(item.getOffset()+DataManager.ITEM_HEAD_SIZE);
|
||||
file.readFully(data);
|
||||
dataIn.restart(data);
|
||||
return marshaller.readPayload(dataIn);
|
||||
|
|
|
@ -18,7 +18,9 @@
|
|||
*/
|
||||
package org.apache.activemq.kaha.impl;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.activemq.kaha.Marshaller;
|
||||
/**
|
||||
* Optimized Store writer
|
||||
|
@ -26,9 +28,11 @@ import org.apache.activemq.kaha.Marshaller;
|
|||
* @version $Revision: 1.1.1.1 $
|
||||
*/
|
||||
final class StoreDataWriter{
|
||||
private StoreByteArrayOutputStream dataOut;
|
||||
|
||||
private StoreByteArrayOutputStream buffer;
|
||||
private DataManager dataManager;
|
||||
|
||||
|
||||
/**
|
||||
* Construct a Store writer
|
||||
*
|
||||
|
@ -36,23 +40,39 @@ final class StoreDataWriter{
|
|||
*/
|
||||
StoreDataWriter(DataManager fileManager){
|
||||
this.dataManager=fileManager;
|
||||
this.dataOut=new StoreByteArrayOutputStream();
|
||||
this.buffer=new StoreByteArrayOutputStream();
|
||||
}
|
||||
|
||||
DataItem storeItem(Marshaller marshaller,Object payload) throws IOException{
|
||||
dataOut.reset();
|
||||
dataOut.position(DataItem.HEAD_SIZE);
|
||||
marshaller.writePayload(payload,dataOut);
|
||||
int size=dataOut.size();
|
||||
int payloadSize=size-DataItem.HEAD_SIZE;
|
||||
/**
|
||||
* @param marshaller
|
||||
* @param payload
|
||||
* @param data_item2
|
||||
* @return
|
||||
* @throws IOException
|
||||
* @throws FileNotFoundException
|
||||
*/
|
||||
DataItem storeItem(Marshaller marshaller, Object payload, byte type) throws IOException {
|
||||
|
||||
// Write the packet our internal buffer.
|
||||
buffer.reset();
|
||||
buffer.position(DataManager.ITEM_HEAD_SIZE);
|
||||
marshaller.writePayload(payload,buffer);
|
||||
int size=buffer.size();
|
||||
int payloadSize=size-DataManager.ITEM_HEAD_SIZE;
|
||||
buffer.reset();
|
||||
buffer.writeByte(type);
|
||||
buffer.writeInt(payloadSize);
|
||||
|
||||
// Find the position where this item will land at.
|
||||
DataItem item=new DataItem();
|
||||
item.setSize(payloadSize);
|
||||
DataFile dataFile=dataManager.findSpaceForData(item);
|
||||
dataOut.reset();
|
||||
item.writeHeader(dataOut);
|
||||
|
||||
// Now splat the buffer to the file.
|
||||
dataFile.getRandomAccessFile().seek(item.getOffset());
|
||||
dataFile.getRandomAccessFile().write(dataOut.getData(),0,size);
|
||||
dataFile.getRandomAccessFile().write(buffer.getData(),0,size);
|
||||
dataFile.incrementLength(size);
|
||||
|
||||
dataManager.addInterestInFile(dataFile);
|
||||
return item;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue