- Added support for large string in ActiveMQStreamMessage.
- Change the marshalling method of ActiveMQStreamMessage to also use the MarshallingSupport utility. This allows a central utility to handle object marshalling. Unmarshalling was still left to ActiveMQStreamMessage as it performs type conversions.
- NOTE: The constants use to determine the data type was also change to refer to the constants in MarshallingSupport. This makes this version's wireformat incompatible with previous versions.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@452793 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Adrian T. Co 2006-10-04 08:29:38 +00:00
parent 00282727b8
commit 09ebb720d3
3 changed files with 162 additions and 117 deletions

View File

@ -40,6 +40,7 @@ import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.activemq.util.MarshallingSupport;
/**
* A <CODE>StreamMessage</CODE> object is used to send a stream of primitive
@ -109,21 +110,6 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_STREAM_MESSAGE;
/**
* message property types
*/
private final static byte BYTES = 3;
private final static byte STRING = 4;
private final static byte BOOLEAN = 5;
private final static byte CHAR = 6;
private final static byte BYTE = 7;
private final static byte SHORT = 8;
private final static byte INT = 9;
private final static byte LONG = 10;
private final static byte FLOAT = 11;
private final static byte DOUBLE = 12;
private final static byte NULL = 13;
transient protected DataOutputStream dataOut;
transient protected ByteArrayOutputStream bytesOut;
transient protected DataInputStream dataIn;
@ -142,7 +128,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
copy.bytesOut = null;
copy.dataIn = null;
}
public void onSend() {
super.onSend();
storeContent();
@ -210,13 +196,13 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
if (type == -1) {
throw new MessageEOFException("reached end of data");
}
if (type == BOOLEAN) {
if (type == MarshallingSupport.BOOLEAN_TYPE) {
return this.dataIn.readBoolean();
}
if (type == STRING) {
if (type == MarshallingSupport.STRING_TYPE) {
return Boolean.valueOf(this.dataIn.readUTF()).booleanValue();
}
if (type == NULL) {
if (type == MarshallingSupport.NULL) {
this.dataIn.reset();
throw new NullPointerException("Cannot convert NULL value to boolean.");
} else {
@ -255,13 +241,13 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
if (type == -1) {
throw new MessageEOFException("reached end of data");
}
if (type == BYTE) {
if (type == MarshallingSupport.BYTE_TYPE) {
return this.dataIn.readByte();
}
if (type == STRING) {
if (type == MarshallingSupport.STRING_TYPE) {
return Byte.valueOf(this.dataIn.readUTF()).byteValue();
}
if (type == NULL) {
if (type == MarshallingSupport.NULL) {
this.dataIn.reset();
throw new NullPointerException("Cannot convert NULL value to byte.");
} else {
@ -307,16 +293,16 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
if (type == -1) {
throw new MessageEOFException("reached end of data");
}
if (type == SHORT) {
if (type == MarshallingSupport.SHORT_TYPE) {
return this.dataIn.readShort();
}
if (type == BYTE) {
if (type == MarshallingSupport.BYTE_TYPE) {
return this.dataIn.readByte();
}
if (type == STRING) {
if (type == MarshallingSupport.STRING_TYPE) {
return Short.valueOf(this.dataIn.readUTF()).shortValue();
}
if (type == NULL) {
if (type == MarshallingSupport.NULL) {
this.dataIn.reset();
throw new NullPointerException("Cannot convert NULL value to short.");
} else {
@ -363,10 +349,10 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
if (type == -1) {
throw new MessageEOFException("reached end of data");
}
if (type == CHAR) {
if (type == MarshallingSupport.CHAR_TYPE) {
return this.dataIn.readChar();
}
if (type == NULL) {
if (type == MarshallingSupport.NULL) {
this.dataIn.reset();
throw new NullPointerException("Cannot convert NULL value to char.");
} else {
@ -413,19 +399,19 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
if (type == -1) {
throw new MessageEOFException("reached end of data");
}
if (type == INT) {
if (type == MarshallingSupport.INTEGER_TYPE) {
return this.dataIn.readInt();
}
if (type == SHORT) {
if (type == MarshallingSupport.SHORT_TYPE) {
return this.dataIn.readShort();
}
if (type == BYTE) {
if (type == MarshallingSupport.BYTE_TYPE) {
return this.dataIn.readByte();
}
if (type == STRING) {
if (type == MarshallingSupport.STRING_TYPE) {
return Integer.valueOf(this.dataIn.readUTF()).intValue();
}
if (type == NULL) {
if (type == MarshallingSupport.NULL) {
this.dataIn.reset();
throw new NullPointerException("Cannot convert NULL value to int.");
} else {
@ -472,22 +458,22 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
if (type == -1) {
throw new MessageEOFException("reached end of data");
}
if (type == LONG) {
if (type == MarshallingSupport.LONG_TYPE) {
return this.dataIn.readLong();
}
if (type == INT) {
if (type == MarshallingSupport.INTEGER_TYPE) {
return this.dataIn.readInt();
}
if (type == SHORT) {
if (type == MarshallingSupport.SHORT_TYPE) {
return this.dataIn.readShort();
}
if (type == BYTE) {
if (type == MarshallingSupport.BYTE_TYPE) {
return this.dataIn.readByte();
}
if (type == STRING) {
if (type == MarshallingSupport.STRING_TYPE) {
return Long.valueOf(this.dataIn.readUTF()).longValue();
}
if (type == NULL) {
if (type == MarshallingSupport.NULL) {
this.dataIn.reset();
throw new NullPointerException("Cannot convert NULL value to long.");
} else {
@ -532,13 +518,13 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
if (type == -1) {
throw new MessageEOFException("reached end of data");
}
if (type == FLOAT) {
if (type == MarshallingSupport.FLOAT_TYPE) {
return this.dataIn.readFloat();
}
if (type == STRING) {
if (type == MarshallingSupport.STRING_TYPE) {
return Float.valueOf(this.dataIn.readUTF()).floatValue();
}
if (type == NULL) {
if (type == MarshallingSupport.NULL) {
this.dataIn.reset();
throw new NullPointerException("Cannot convert NULL value to float.");
} else {
@ -584,16 +570,16 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
if (type == -1) {
throw new MessageEOFException("reached end of data");
}
if (type == DOUBLE) {
if (type == MarshallingSupport.DOUBLE_TYPE) {
return this.dataIn.readDouble();
}
if (type == FLOAT) {
if (type == MarshallingSupport.FLOAT_TYPE) {
return this.dataIn.readFloat();
}
if (type == STRING) {
if (type == MarshallingSupport.STRING_TYPE) {
return Double.valueOf(this.dataIn.readUTF()).doubleValue();
}
if (type == NULL) {
if (type == MarshallingSupport.NULL) {
this.dataIn.reset();
throw new NullPointerException("Cannot convert NULL value to double.");
} else {
@ -639,34 +625,37 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
if (type == -1) {
throw new MessageEOFException("reached end of data");
}
if (type == NULL) {
if (type == MarshallingSupport.NULL) {
return null;
}
if (type == STRING) {
if (type == MarshallingSupport.BIG_STRING_TYPE) {
return MarshallingSupport.readUTF8(dataIn);
}
if (type == MarshallingSupport.STRING_TYPE) {
return this.dataIn.readUTF();
}
if (type == LONG) {
if (type == MarshallingSupport.LONG_TYPE) {
return new Long(this.dataIn.readLong()).toString();
}
if (type == INT) {
if (type == MarshallingSupport.INTEGER_TYPE) {
return new Integer(this.dataIn.readInt()).toString();
}
if (type == SHORT) {
if (type == MarshallingSupport.SHORT_TYPE) {
return new Short(this.dataIn.readShort()).toString();
}
if (type == BYTE) {
if (type == MarshallingSupport.BYTE_TYPE) {
return new Byte(this.dataIn.readByte()).toString();
}
if (type == FLOAT) {
if (type == MarshallingSupport.FLOAT_TYPE) {
return new Float(this.dataIn.readFloat()).toString();
}
if (type == DOUBLE) {
if (type == MarshallingSupport.DOUBLE_TYPE) {
return new Double(this.dataIn.readDouble()).toString();
}
if (type == BOOLEAN) {
if (type == MarshallingSupport.BOOLEAN_TYPE) {
return (this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE).toString();
}
if (type == CHAR) {
if (type == MarshallingSupport.CHAR_TYPE) {
return new Character(this.dataIn.readChar()).toString();
} else {
this.dataIn.reset();
@ -732,20 +721,20 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
*/
public int readBytes(byte[] value) throws JMSException {
initializeReading();
try {
if (value == null) {
throw new NullPointerException();
}
if( remainingBytes == -1 ) {
this.dataIn.mark(value.length + 1);
int type = this.dataIn.read();
if (type == -1) {
throw new MessageEOFException("reached end of data");
}
if (type != BYTES) {
if (type != MarshallingSupport.BYTE_ARRAY_TYPE) {
throw new MessageFormatException("Not a byte array");
}
remainingBytes = this.dataIn.readInt();
@ -753,7 +742,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
remainingBytes = -1;
return -1;
}
if (value.length <= remainingBytes) {
// small buffer
remainingBytes -= value.length;
@ -764,8 +753,8 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
int rc = this.dataIn.read(value, 0, remainingBytes);
remainingBytes=0;
return rc;
}
}
} catch (EOFException e) {
JMSException jmsEx = new MessageEOFException(e.getMessage());
jmsEx.setLinkedException(e);
@ -815,37 +804,40 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
if (type == -1) {
throw new MessageEOFException("reached end of data");
}
if (type == NULL) {
if (type == MarshallingSupport.NULL) {
return null;
}
if (type == STRING) {
if (type == MarshallingSupport.BIG_STRING_TYPE) {
return MarshallingSupport.readUTF8(dataIn);
}
if (type == MarshallingSupport.STRING_TYPE) {
return this.dataIn.readUTF();
}
if (type == LONG) {
if (type == MarshallingSupport.LONG_TYPE) {
return new Long(this.dataIn.readLong());
}
if (type == INT) {
if (type == MarshallingSupport.INTEGER_TYPE) {
return new Integer(this.dataIn.readInt());
}
if (type == SHORT) {
if (type == MarshallingSupport.SHORT_TYPE) {
return new Short(this.dataIn.readShort());
}
if (type == BYTE) {
if (type == MarshallingSupport.BYTE_TYPE) {
return new Byte(this.dataIn.readByte());
}
if (type == FLOAT) {
if (type == MarshallingSupport.FLOAT_TYPE) {
return new Float(this.dataIn.readFloat());
}
if (type == DOUBLE) {
if (type == MarshallingSupport.DOUBLE_TYPE) {
return new Double(this.dataIn.readDouble());
}
if (type == BOOLEAN) {
if (type == MarshallingSupport.BOOLEAN_TYPE) {
return this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE;
}
if (type == CHAR) {
if (type == MarshallingSupport.CHAR_TYPE) {
return new Character(this.dataIn.readChar());
}
if (type == BYTES) {
if (type == MarshallingSupport.BYTE_ARRAY_TYPE) {
int len = this.dataIn.readInt();
byte[] value = new byte[len];
this.dataIn.readFully(value);
@ -890,8 +882,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
public void writeBoolean(boolean value) throws JMSException {
initializeWriting();
try {
this.dataOut.write(BOOLEAN);
this.dataOut.writeBoolean(value);
MarshallingSupport.marshalBoolean(dataOut, value);
} catch (IOException ioe) {
throw JMSExceptionSupport.create(ioe);
}
@ -912,8 +903,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
public void writeByte(byte value) throws JMSException {
initializeWriting();
try {
this.dataOut.write(BYTE);
this.dataOut.writeByte(value);
MarshallingSupport.marshalByte(dataOut, value);
} catch (IOException ioe) {
throw JMSExceptionSupport.create(ioe);
}
@ -934,8 +924,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
public void writeShort(short value) throws JMSException {
initializeWriting();
try {
this.dataOut.write(SHORT);
this.dataOut.writeShort(value);
MarshallingSupport.marshalShort(dataOut, value);
} catch (IOException ioe) {
throw JMSExceptionSupport.create(ioe);
}
@ -956,8 +945,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
public void writeChar(char value) throws JMSException {
initializeWriting();
try {
this.dataOut.write(CHAR);
this.dataOut.writeChar(value);
MarshallingSupport.marshalChar(dataOut, value);
} catch (IOException ioe) {
throw JMSExceptionSupport.create(ioe);
}
@ -978,8 +966,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
public void writeInt(int value) throws JMSException {
initializeWriting();
try {
this.dataOut.write(INT);
this.dataOut.writeInt(value);
MarshallingSupport.marshalInt(dataOut, value);
} catch (IOException ioe) {
throw JMSExceptionSupport.create(ioe);
}
@ -1000,8 +987,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
public void writeLong(long value) throws JMSException {
initializeWriting();
try {
this.dataOut.write(LONG);
this.dataOut.writeLong(value);
MarshallingSupport.marshalLong(dataOut, value);
} catch (IOException ioe) {
throw JMSExceptionSupport.create(ioe);
}
@ -1022,8 +1008,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
public void writeFloat(float value) throws JMSException {
initializeWriting();
try {
this.dataOut.write(FLOAT);
this.dataOut.writeFloat(value);
MarshallingSupport.marshalFloat(dataOut, value);
} catch (IOException ioe) {
throw JMSExceptionSupport.create(ioe);
}
@ -1044,8 +1029,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
public void writeDouble(double value) throws JMSException {
initializeWriting();
try {
this.dataOut.write(DOUBLE);
this.dataOut.writeDouble(value);
MarshallingSupport.marshalDouble(dataOut, value);
} catch (IOException ioe) {
throw JMSExceptionSupport.create(ioe);
}
@ -1067,10 +1051,9 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
initializeWriting();
try {
if (value == null) {
this.dataOut.write(NULL);
MarshallingSupport.marshalNull(dataOut);
} else {
this.dataOut.write(STRING);
this.dataOut.writeUTF(value);
MarshallingSupport.marshalString(dataOut, value);
}
} catch (IOException ioe) {
throw JMSExceptionSupport.create(ioe);
@ -1121,9 +1104,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
public void writeBytes(byte[] value, int offset, int length) throws JMSException {
initializeWriting();
try {
this.dataOut.write(BYTES);
this.dataOut.writeInt(length);
this.dataOut.write(value, offset, length);
MarshallingSupport.marshalByteArray(dataOut, value, offset, length);
} catch (IOException ioe) {
throw JMSExceptionSupport.create(ioe);
}
@ -1151,7 +1132,7 @@ public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMess
initializeWriting();
if (value == null) {
try {
this.dataOut.write(NULL);
MarshallingSupport.marshalNull(dataOut);
} catch (IOException ioe) {
throw JMSExceptionSupport.create(ioe);
}

View File

@ -115,35 +115,25 @@ public class MarshallingSupport {
static public void marshalPrimitive(DataOutputStream out, Object value) throws IOException {
if( value == null ) {
out.writeByte(NULL);
marshalNull(out);
} else if( value.getClass() == Boolean.class ) {
out.writeByte(BOOLEAN_TYPE);
out.writeBoolean(((Boolean)value).booleanValue());
marshalBoolean(out, ((Boolean)value).booleanValue());
} else if( value.getClass() == Byte.class ) {
out.writeByte(BYTE_TYPE);
out.writeByte(((Byte)value).byteValue());
marshalByte(out, ((Byte)value).byteValue());
} else if( value.getClass() == Character.class ) {
out.writeByte(CHAR_TYPE);
out.writeChar(((Character)value).charValue());
marshalChar(out, ((Character)value).charValue());
} else if( value.getClass() == Short.class ) {
out.writeByte(SHORT_TYPE);
out.writeShort(((Short)value).shortValue());
marshalShort(out, ((Short)value).shortValue());
} else if( value.getClass() == Integer.class ) {
out.writeByte(INTEGER_TYPE);
out.writeInt(((Integer)value).intValue());
marshalInt(out, ((Integer)value).intValue());
} else if( value.getClass() == Long.class ) {
out.writeByte(LONG_TYPE);
out.writeLong(((Long)value).longValue());
marshalLong(out, ((Long)value).longValue());
} else if( value.getClass() == Float.class ) {
out.writeByte(FLOAT_TYPE);
out.writeFloat(((Float)value).floatValue());
marshalFloat(out, ((Float)value).floatValue());
} else if( value.getClass() == Double.class ) {
out.writeByte(DOUBLE_TYPE);
out.writeDouble(((Double)value).doubleValue());
marshalDouble(out, ((Double)value).doubleValue());
} else if( value.getClass() == byte[].class ) {
out.writeByte(BYTE_ARRAY_TYPE);
out.writeInt(((byte[])value).length);
out.write(((byte[])value));
marshalByteArray(out, ((byte[])value));
} else if( value.getClass() == String.class ) {
marshalString(out, (String)value);
} else if( value instanceof Map) {
@ -205,6 +195,61 @@ public class MarshallingSupport {
return value;
}
public static void marshalNull(DataOutputStream out) throws IOException {
out.writeByte(NULL);
}
public static void marshalBoolean(DataOutputStream out, boolean value) throws IOException {
out.writeByte(BOOLEAN_TYPE);
out.writeBoolean(value);
}
public static void marshalByte(DataOutputStream out, byte value) throws IOException {
out.writeByte(BYTE_TYPE);
out.writeByte(value);
}
public static void marshalChar(DataOutputStream out, char value) throws IOException {
out.writeByte(CHAR_TYPE);
out.writeChar(value);
}
public static void marshalShort(DataOutputStream out, short value) throws IOException {
out.writeByte(SHORT_TYPE);
out.writeShort(value);
}
public static void marshalInt(DataOutputStream out, int value) throws IOException {
out.writeByte(INTEGER_TYPE);
out.writeInt(value);
}
public static void marshalLong(DataOutputStream out, long value) throws IOException {
out.writeByte(LONG_TYPE);
out.writeLong(value);
}
public static void marshalFloat(DataOutputStream out, float value) throws IOException {
out.writeByte(FLOAT_TYPE);
out.writeFloat(value);
}
public static void marshalDouble(DataOutputStream out, double value) throws IOException {
out.writeByte(DOUBLE_TYPE);
out.writeDouble(value);
}
public static void marshalByteArray(DataOutputStream out, byte[] value) throws IOException {
marshalByteArray(out, value, 0, value.length);
}
public static void marshalByteArray(DataOutputStream out, byte[] value, int offset, int length) throws IOException {
out.writeByte(BYTE_ARRAY_TYPE);
out.writeInt(length);
out.write(value, offset, length);
}
public static void marshalString(DataOutputStream out, String s) throws IOException {
// If it's too big, out.writeUTF may not able able to write it out.
if( s.length() < Short.MAX_VALUE/4 ) {
@ -216,7 +261,6 @@ public class MarshallingSupport {
}
}
static public void writeUTF8(DataOutput dataOut, String text) throws IOException {
if (text != null) {
int strlen = text.length();

View File

@ -601,6 +601,26 @@ public class ActiveMQStreamMessageTest extends TestCase {
}
}
public void testReadBigString() {
ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
try {
// Test with a 1Meg String
StringBuffer bigSB = new StringBuffer(1024*1024);
for( int i=0; i < 1024*1024; i++ ) {
bigSB.append((char)'a'+i%26);
}
String bigString = bigSB.toString();
msg.writeString(bigString);
msg.reset();
assertEquals(bigString, msg.readString());
} catch (JMSException jmsEx) {
jmsEx.printStackTrace();
assertTrue(false);
}
}
public void testReadBytes() {
ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
try {