diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/StoreByteArrayInputStream.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/StoreByteArrayInputStream.java
index 88113ee24d..3c67f08a6c 100755
--- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/StoreByteArrayInputStream.java
+++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/StoreByteArrayInputStream.java
@@ -21,6 +21,7 @@ import java.io.DataInput;
import java.io.IOException;
import java.io.InputStream;
import java.io.UTFDataFormatException;
+import org.apache.activemq.util.ByteSequence;
/**
* Optimized ByteArrayInputStream that can be used more than once
*
@@ -29,15 +30,27 @@ import java.io.UTFDataFormatException;
public final class StoreByteArrayInputStream extends InputStream implements DataInput{
private byte[] buf;
private int pos;
+ private int offset;
/**
- * Creates a WireByteArrayInputStream
.
+ * Creates a StoreByteArrayInputStream
.
*
* @param buf the input buffer.
*/
public StoreByteArrayInputStream(byte buf[]){
this.buf=buf;
this.pos=0;
+ this.offset = 0;
+ }
+
+ /**
+ * Creates a StoreByteArrayInputStream
.
+ *
+ * @param sequence the input buffer.
+ */
+ public StoreByteArrayInputStream(ByteSequence sequence){
+ this.buf=sequence.getData();
+ this.offset=this.pos=sequence.getOffset();
}
/**
@@ -47,8 +60,12 @@ public final class StoreByteArrayInputStream extends InputStream implements Data
this(new byte[0]);
}
+ /**
+ *
+ * @return the size
+ */
public int size(){
- return pos;
+ return pos-offset;
}
/**
@@ -59,7 +76,7 @@ public final class StoreByteArrayInputStream extends InputStream implements Data
}
/**
- * reset the WireByteArrayInputStream
to use an new byte array
+ * reset the StoreByteArrayInputStream
to use an new byte array
*
* @param newBuff
*/
@@ -67,6 +84,16 @@ public final class StoreByteArrayInputStream extends InputStream implements Data
buf=newBuff;
pos=0;
}
+
+ /**
+ * reset the StoreByteArrayInputStream
to use an new ByteSequence
+ * @param sequence
+ *
+ */
+ public void restart(ByteSequence sequence){
+ this.buf=sequence.getData();
+ this.pos=sequence.getOffset();
+ }
/**
* re-start the input stream - reusing the current buffer
diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/StoreByteArrayOutputStream.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/StoreByteArrayOutputStream.java
index 60594734e2..dfc1f7d3f8 100755
--- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/StoreByteArrayOutputStream.java
+++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/StoreByteArrayOutputStream.java
@@ -21,6 +21,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UTFDataFormatException;
+import org.apache.activemq.util.ByteSequence;
/**
* Optimized ByteArrayOutputStream
*
@@ -59,6 +60,14 @@ public final class StoreByteArrayOutputStream extends OutputStream implements Da
buf=new byte[size];
pos=0;
}
+
+ /**
+ * Get a ByteSequence from the stream
+ * @return the byte sequence
+ */
+ public ByteSequence toByteSequence() {
+ return new ByteSequence(buf, 0, pos);
+ }
/**
* Writes the specified byte to this byte array output stream.
@@ -117,6 +126,8 @@ public final class StoreByteArrayOutputStream extends OutputStream implements Da
public int size(){
return pos;
}
+
+
public void writeBoolean(boolean v){
ensureEnoughBuffer(1);