diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/ObjectMarshaller.java b/activemq-core/src/main/java/org/apache/activemq/kaha/ObjectMarshaller.java index 0bdf83710e..de488e457b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/ObjectMarshaller.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/ObjectMarshaller.java @@ -13,13 +13,13 @@ */ package org.apache.activemq.kaha; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.io.InputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.io.OutputStream; /** * Implementation of a Marshaller for Objects * @@ -34,13 +34,20 @@ public class ObjectMarshaller implements Marshaller{ * @throws IOException */ public void writePayload(Object object,DataOutput dataOut) throws IOException{ - ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); - ObjectOutputStream objectOut=new ObjectOutputStream(bytesOut); + +// I failed to see why we just did not just used the provided stream directly +// ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); +// ObjectOutputStream objectOut=new ObjectOutputStream(bytesOut); +// objectOut.writeObject(object); +// objectOut.close(); +// byte[] data = bytesOut.toByteArray(); +// dataOut.writeInt(data.length); +// dataOut.write(data); + + ObjectOutputStream objectOut=new ObjectOutputStream((OutputStream) dataOut); objectOut.writeObject(object); - objectOut.close(); - byte[] data = bytesOut.toByteArray(); - dataOut.writeInt(data.length); - dataOut.write(data); + objectOut.reset(); + objectOut.flush(); } /** @@ -51,15 +58,25 @@ public class ObjectMarshaller implements Marshaller{ * @throws IOException */ public Object readPayload(DataInput dataIn) throws IOException{ - int size = dataIn.readInt(); - byte[] data = new byte[size]; - dataIn.readFully(data); - ByteArrayInputStream bytesIn = new ByteArrayInputStream(data); - ObjectInputStream objectIn=new ObjectInputStream(bytesIn); + +// I failed to see why we just did not just used the provided stream directly +// int size = dataIn.readInt(); +// byte[] data = new byte[size]; +// dataIn.readFully(data); +// ByteArrayInputStream bytesIn = new ByteArrayInputStream(data); +// ObjectInputStream objectIn=new ObjectInputStream(bytesIn); +// try{ +// return objectIn.readObject(); +// }catch(ClassNotFoundException e){ +// throw new IOException(e.getMessage()); +// } + + ObjectInputStream objectIn=new ObjectInputStream((InputStream) dataIn); try{ return objectIn.readObject(); - }catch(ClassNotFoundException e){ + } catch(ClassNotFoundException e) { throw new IOException(e.getMessage()); } + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataItem.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataItem.java index 09953e26ed..cdd3a28b43 100755 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataItem.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataItem.java @@ -13,51 +13,29 @@ */ package org.apache.activemq.kaha.impl; -import org.apache.activemq.kaha.Marshaller; -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.IOException; /** * A a wrapper for a data in the store * * @version $Revision: 1.2 $ */ final class DataItem implements Item{ - static final int HEAD_SIZE=6; // magic + len - private int size; - private long offset=POSITION_NOT_SET; + private int file=(int) POSITION_NOT_SET; + private long offset=POSITION_NOT_SET; + private int size; DataItem(){} + DataItem(DataItem item) { + this.file = item.file; + this.offset = item.offset; + this.size = item.size; + } + boolean isValid(){ return file != POSITION_NOT_SET; } - void writeHeader(DataOutput dataOut) throws IOException{ - dataOut.writeShort(MAGIC); - dataOut.writeInt(size); - } - - void readHeader(DataInput dataIn) throws IOException{ - int magic=dataIn.readShort(); - if(magic==MAGIC){ - size=dataIn.readInt(); - }else{ - throw new BadMagicException("Unexpected Magic value: "+magic); - } - } - - void writePayload(Marshaller marshaller,Object object,DataOutputStream dataOut) throws IOException{ - marshaller.writePayload(object,dataOut); - } - - Object readPayload(Marshaller marshaller,DataInputStream dataIn) throws IOException{ - return marshaller.readPayload(dataIn); - } - /** * @return Returns the size. */ @@ -107,4 +85,8 @@ final class DataItem implements Item{ String result="offset = "+offset+", file = " + file + ", size = "+size; return result; } + + public DataItem copy() { + return new DataItem(this); + } } \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java index 74b8a22d68..85e5c28a6e 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java @@ -22,7 +22,9 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; + import org.apache.activemq.kaha.Marshaller; +import org.apache.activemq.util.IOExceptionSupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; /** @@ -40,6 +42,10 @@ final class DataManager{ private DataFile currentWriteFile; Map fileMap=new HashMap(); + public static final int ITEM_HEAD_SIZE=5; // type + length + public static final byte DATA_ITEM_TYPE=1; + public static final byte REDO_ITEM_TYPE=2; + DataManager(File dir,String pf){ this.dir=dir; this.prefix=pf; @@ -91,15 +97,56 @@ final class DataManager{ } throw new IOException("Could not locate data file "+prefix+item.getFile()); } - - synchronized Object readItem(Marshaller marshaller,DataItem item) throws IOException{ + + synchronized Object readItem(Marshaller marshaller, DataItem item) throws IOException{ return reader.readItem(marshaller,item); } - synchronized DataItem storeItem(Marshaller marshaller,Object payload) throws IOException{ - return writer.storeItem(marshaller,payload); + synchronized DataItem storeDataItem(Marshaller marshaller, Object payload) throws IOException{ + return writer.storeItem(marshaller,payload, DATA_ITEM_TYPE); + } + + synchronized DataItem storeRedoItem(Marshaller marshaller, Object payload) throws IOException{ + return writer.storeItem(marshaller,payload, REDO_ITEM_TYPE); } + synchronized void recoverRedoItems(Marshaller marshaller, RedoListener listener) throws IOException{ + DataItem item = new DataItem(); + item.setFile(currentWriteFile.getNumber().intValue()); + item.setOffset(0); + while( true ) { + byte type; + try { + type = reader.readDataItemSize(item); + } catch (IOException ignore) { + log.trace("End of data file reached at (header was invalid): "+item); + return; + } + if( type == REDO_ITEM_TYPE ) { + // Un-marshal the redo item + Object object; + try { + object = readItem(marshaller, item); + } catch (IOException e1) { + log.trace("End of data file reached at (payload was invalid): "+item); + return; + } + try { + + listener.onRedoItem(item, object); + // in case the listener is holding on to item references, copy it + // so we don't change it behind the listener's back. + item = item.copy(); + + } catch (Exception e) { + throw IOExceptionSupport.create("Recovery handler failed: "+e,e); + } + } + // Move to the next item. + item.setOffset(item.getOffset()+ITEM_HEAD_SIZE+item.getSize()); + } + } + synchronized void close() throws IOException{ for(Iterator i=fileMap.values().iterator();i.hasNext();){ DataFile dataFile=(DataFile) i.next(); diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexItem.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexItem.java index 5922a448aa..0d1e953c60 100755 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexItem.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexItem.java @@ -17,13 +17,13 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** - * A an Item with a relative postion and location to other Items in the Store + * A an Item with a relative position and location to other Items in the Store * * @version $Revision: 1.2 $ */ final class IndexItem implements Item{ - static final int INDEX_SIZE=43; + static final int INDEX_SIZE=51; //used by linked list IndexItem next; IndexItem prev; @@ -31,13 +31,17 @@ final class IndexItem implements Item{ private long offset=POSITION_NOT_SET; private long previousItem=POSITION_NOT_SET; private long nextItem=POSITION_NOT_SET; - private long keyOffset=POSITION_NOT_SET; - private int keyFile=(int) POSITION_NOT_SET; - private long valueOffset=POSITION_NOT_SET; - private int valueFile=(int) POSITION_NOT_SET; private boolean active=true; - + // TODO: consider just using a DataItem for the following fields. + private long keyOffset=POSITION_NOT_SET; + private int keyFile=(int) POSITION_NOT_SET; + private int keySize=0; + + private long valueOffset=POSITION_NOT_SET; + private int valueFile=(int) POSITION_NOT_SET; + private int valueSize=0; + /** * Default Constructor */ @@ -48,8 +52,10 @@ final class IndexItem implements Item{ nextItem=POSITION_NOT_SET; keyOffset=POSITION_NOT_SET; keyFile=(int) POSITION_NOT_SET; + keySize=0; valueOffset=POSITION_NOT_SET; valueFile=(int) POSITION_NOT_SET; + valueSize=0; active=true; } @@ -57,6 +63,7 @@ final class IndexItem implements Item{ DataItem result=new DataItem(); result.setOffset(keyOffset); result.setFile(keyFile); + result.setSize(keySize); return result; } @@ -64,17 +71,20 @@ final class IndexItem implements Item{ DataItem result=new DataItem(); result.setOffset(valueOffset); result.setFile(valueFile); + result.setSize(valueSize); return result; } void setValueData(DataItem item){ valueOffset=item.getOffset(); valueFile=item.getFile(); + valueSize=item.getSize(); } void setKeyData(DataItem item){ keyOffset=item.getOffset(); keyFile=item.getFile(); + keySize=item.getSize(); } /** @@ -88,8 +98,10 @@ final class IndexItem implements Item{ dataOut.writeLong(nextItem); dataOut.writeInt(keyFile); dataOut.writeLong(keyOffset); + dataOut.writeInt(keySize); dataOut.writeInt(valueFile); dataOut.writeLong(valueOffset); + dataOut.writeInt(valueSize); } /** @@ -105,8 +117,10 @@ final class IndexItem implements Item{ nextItem=dataIn.readLong(); keyFile=dataIn.readInt(); keyOffset=dataIn.readLong(); + keySize=dataIn.readInt(); valueFile=dataIn.readInt(); valueOffset=dataIn.readLong(); + valueSize=dataIn.readInt(); } /** @@ -221,12 +235,31 @@ final class IndexItem implements Item{ this.offset=offset; } + public int getKeySize() { + return keySize; + } + + public void setKeySize(int keySize) { + this.keySize = keySize; + } + + public int getValueSize() { + return valueSize; + } + + public void setValueSize(int valueSize) { + this.valueSize = valueSize; + } + /** - * @return eprtty print of 'this' + * @return print of 'this' */ public String toString(){ - String result="offset="+offset+" , keyFile = "+keyFile+" , keyOffset = "+keyOffset+", valueOffset = " - +valueOffset+" , previousItem = "+previousItem+" , nextItem = "+nextItem; + String result="offset="+offset+ + ", key=("+keyFile+", "+keyOffset+", "+keySize+")"+ + ", value=("+valueFile+", "+valueOffset+", "+valueSize+")"+ + ", previousItem="+previousItem+", nextItem="+nextItem + ; return result; } } \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java index 64355f5e7e..ca87420cda 100755 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java @@ -66,7 +66,7 @@ class IndexRootContainer { removeRoot(key); } - DataItem data = dataManager.storeItem(rootMarshaller, key); + DataItem data = dataManager.storeDataItem(rootMarshaller, key); IndexItem index = indexManager.createNewIndex(); index.setKeyData(data); IndexItem newRoot = indexManager.createNewIndex(); diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ListContainerImpl.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ListContainerImpl.java index 3f5e6e1fdf..f244267df0 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ListContainerImpl.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ListContainerImpl.java @@ -641,7 +641,7 @@ final class ListContainerImpl extends BaseContainerImpl implements ListContainer IndexItem index=null; try{ if(value!=null){ - DataItem data=dataManager.storeItem(marshaller,value); + DataItem data=dataManager.storeDataItem(marshaller,value); index=indexManager.createNewIndex(); index.setValueData(data); IndexItem prev=list.getLast(); @@ -668,7 +668,7 @@ final class ListContainerImpl extends BaseContainerImpl implements ListContainer IndexItem index=null; try{ if(value!=null){ - DataItem data=dataManager.storeItem(marshaller,value); + DataItem data=dataManager.storeDataItem(marshaller,value); index=indexManager.createNewIndex(); index.setValueData(data); IndexItem prev=root; @@ -695,7 +695,7 @@ final class ListContainerImpl extends BaseContainerImpl implements ListContainer IndexItem index=null; try{ if(value!=null){ - DataItem data=dataManager.storeItem(marshaller,value); + DataItem data=dataManager.storeDataItem(marshaller,value); index=indexManager.createNewIndex(); index.setValueData(data); IndexItem prev=null; diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/MapContainerImpl.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/MapContainerImpl.java index 455728dd6c..19e6dae558 100755 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/MapContainerImpl.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/MapContainerImpl.java @@ -342,11 +342,11 @@ final class MapContainerImpl extends BaseContainerImpl implements MapContainer{ try{ if(key!=null){ index=indexManager.createNewIndex(); - DataItem data=dataManager.storeItem(keyMarshaller,key); + DataItem data=dataManager.storeDataItem(keyMarshaller,key); index.setKeyData(data); } if(value!=null){ - DataItem data=dataManager.storeItem(valueMarshaller,value); + DataItem data=dataManager.storeDataItem(valueMarshaller,value); index.setValueData(data); } IndexItem last=list.isEmpty()?null:(IndexItem) list.getLast(); diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/RedoListener.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/RedoListener.java new file mode 100644 index 0000000000..645d4f22d7 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/RedoListener.java @@ -0,0 +1,7 @@ +package org.apache.activemq.kaha.impl; + +public interface RedoListener { + + void onRedoItem(DataItem item, Object object) throws Exception; + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreDataReader.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreDataReader.java index 6cf3d3bf82..a80d6438f5 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreDataReader.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreDataReader.java @@ -22,9 +22,9 @@ import org.apache.activemq.kaha.Marshaller; * @version $Revision: 1.1.1.1 $ */ final class StoreDataReader{ + private DataManager dataManager; private StoreByteArrayInputStream dataIn; - private byte[] header=new byte[DataItem.HEAD_SIZE]; /** * Construct a Store reader @@ -36,13 +36,31 @@ final class StoreDataReader{ this.dataIn=new StoreByteArrayInputStream(); } + /** + * Sets the size property on a DataItem and returns the type of item that this was + * created as. + * + * @param marshaller + * @param item + * @return + * @throws IOException + */ + protected byte readDataItemSize(DataItem item) throws IOException { + + RandomAccessFile file = dataManager.getDataFile(item); + file.seek(item.getOffset()); // jump to the size field + byte rc = file.readByte(); + item.setSize(file.readInt()); + return rc; + } + protected Object readItem(Marshaller marshaller,DataItem item) throws IOException{ RandomAccessFile file=dataManager.getDataFile(item); - file.seek(item.getOffset()); - file.readFully(header); - dataIn.restart(header); - item.readHeader(dataIn); + + // TODO: we could reuse the buffer in dataIn if it's big enough to avoid + // allocating byte[] arrays on every readItem. byte[] data=new byte[item.getSize()]; + file.seek(item.getOffset()+DataManager.ITEM_HEAD_SIZE); file.readFully(data); dataIn.restart(data); return marshaller.readPayload(dataIn); diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreDataWriter.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreDataWriter.java index b04f2416b9..3599c707b4 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreDataWriter.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreDataWriter.java @@ -18,7 +18,9 @@ */ package org.apache.activemq.kaha.impl; +import java.io.FileNotFoundException; import java.io.IOException; + import org.apache.activemq.kaha.Marshaller; /** * Optimized Store writer @@ -26,9 +28,11 @@ import org.apache.activemq.kaha.Marshaller; * @version $Revision: 1.1.1.1 $ */ final class StoreDataWriter{ - private StoreByteArrayOutputStream dataOut; + + private StoreByteArrayOutputStream buffer; private DataManager dataManager; + /** * Construct a Store writer * @@ -36,23 +40,39 @@ final class StoreDataWriter{ */ StoreDataWriter(DataManager fileManager){ this.dataManager=fileManager; - this.dataOut=new StoreByteArrayOutputStream(); + this.buffer=new StoreByteArrayOutputStream(); } - DataItem storeItem(Marshaller marshaller,Object payload) throws IOException{ - dataOut.reset(); - dataOut.position(DataItem.HEAD_SIZE); - marshaller.writePayload(payload,dataOut); - int size=dataOut.size(); - int payloadSize=size-DataItem.HEAD_SIZE; + /** + * @param marshaller + * @param payload + * @param data_item2 + * @return + * @throws IOException + * @throws FileNotFoundException + */ + DataItem storeItem(Marshaller marshaller, Object payload, byte type) throws IOException { + + // Write the packet our internal buffer. + buffer.reset(); + buffer.position(DataManager.ITEM_HEAD_SIZE); + marshaller.writePayload(payload,buffer); + int size=buffer.size(); + int payloadSize=size-DataManager.ITEM_HEAD_SIZE; + buffer.reset(); + buffer.writeByte(type); + buffer.writeInt(payloadSize); + + // Find the position where this item will land at. DataItem item=new DataItem(); item.setSize(payloadSize); DataFile dataFile=dataManager.findSpaceForData(item); - dataOut.reset(); - item.writeHeader(dataOut); + + // Now splat the buffer to the file. dataFile.getRandomAccessFile().seek(item.getOffset()); - dataFile.getRandomAccessFile().write(dataOut.getData(),0,size); + dataFile.getRandomAccessFile().write(buffer.getData(),0,size); dataFile.incrementLength(size); + dataManager.addInterestInFile(dataFile); return item; }