- Fixed endian convertion issues

- Fixed UTF8 encoding issues (size prefix needed to be a short instead of a int)
- Simplified usage of the Binary Reader/Writers by extending the class.
- Fixed Primitive Map encoding issues (edianness was not right)
- Enabled a durable sub test that works now due to message properties being correctly marshalled.
- Added a MessageTest 



git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@383459 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-03-06 03:39:51 +00:00
parent e6ad61952e
commit ecf56d1a74
31 changed files with 898 additions and 450 deletions

View File

@ -117,12 +117,12 @@ namespace ActiveMQ.OpenWire
/*
if (wireFormat.isCacheEnabled()) {
if (bs.ReadBoolean()) {
short index = dataInReadShort(dataIn)Int16();
short index = dataIndataIn.ReadInt16()Int16();
DataStructure value = wireFormat.UnmarshalNestedObject(dataIn, bs);
wireFormat.setInUnmarshallCache(index, value);
return value;
} else {
short index = ReadShort(dataIn);
short index = dataIn.ReadInt16();
return wireFormat.getFromUnmarshallCache(index);
}
} else {
@ -165,10 +165,10 @@ namespace ActiveMQ.OpenWire
if (wireFormat.isCacheEnabled()) {
Short index = wireFormat.getMarshallCacheIndex(o);
if (bs.ReadBoolean()) {
WriteShort(index.shortValue(), dataOut);
dataOut.Write(index.shortValue(), dataOut);
wireFormat.Marshal2NestedObject(o, dataOut, bs);
} else {
WriteShort(index.shortValue(), dataOut);
dataOut.Write(index.shortValue(), dataOut);
}
} else {
wireFormat.Marshal2NestedObject(o, dataOut, bs);
@ -189,7 +189,7 @@ namespace ActiveMQ.OpenWire
}
else
{
return ReadUTF8(dataIn);
return dataIn.ReadString();
}
}
else
@ -200,7 +200,7 @@ namespace ActiveMQ.OpenWire
protected virtual String ReadAsciiString(BinaryReader dataIn)
{
int size = ReadShort(dataIn);
int size = dataIn.ReadInt16();
byte[] data = new byte[size];
dataIn.Read(data, 0, size);
char[] text = new char[size];
@ -218,41 +218,34 @@ namespace ActiveMQ.OpenWire
{
int strlen = value.Length;
// TODO until we get UTF8 working, lets just force ASCII
bs.WriteBoolean(true);
return strlen + 2;
/*
int utflen = 0;
int c = 0;
bool isOnlyAscii = true;
char[] charr = value.ToCharArray();
for (int i = 0; i < strlen; i++)
{
c = charr[i];
if ((c >= 0x0001) && (c <= 0x007F))
{
utflen++;
}
else if (c > 0x07FF)
{
utflen += 3;
isOnlyAscii = false;
}
else
{
isOnlyAscii = false;
utflen += 2;
}
}
if (utflen >= Int16.MaxValue)
throw new IOException("Encountered a String value that is too long to encode.");
bs.WriteBoolean(isOnlyAscii);
return utflen + 2;
*/
int utflen = 0;
int c = 0;
bool isOnlyAscii = true;
char[] charr = value.ToCharArray();
for (int i = 0; i < strlen; i++)
{
c = charr[i];
if ((c >= 0x0001) && (c <= 0x007F))
{
utflen++;
}
else if (c > 0x07FF)
{
utflen += 3;
isOnlyAscii = false;
}
else
{
isOnlyAscii = false;
utflen += 2;
}
}
if (utflen >= Int16.MaxValue)
throw new IOException("Encountered a String value that is too long to encode.");
bs.WriteBoolean(isOnlyAscii);
return utflen + 2;
}
else
{
@ -267,101 +260,21 @@ namespace ActiveMQ.OpenWire
// If we verified it only holds ascii values
if (bs.ReadBoolean())
{
WriteShort((short) value.Length, dataOut);
dataOut.Write((short) value.Length);
// now lets write the bytes
char[] chars = value.ToCharArray();
for (int i = 0; i < chars.Length; i++)
{
WriteByte((byte) chars[i], dataOut);
dataOut.Write((byte)(chars[i]&0xFF00>>8));
}
}
else
{
// TODO how should we properly write a String so that Java will grok it???
dataOut.Write(value);
}
}
}
public static byte ReadByte(BinaryReader dataIn)
{
return dataIn.ReadByte();
}
public static char ReadChar(BinaryReader dataIn)
{
return (char) ReadShort(dataIn);
}
public static short ReadShort(BinaryReader dataIn)
{
return SwitchEndian(dataIn.ReadInt16());
}
public static int ReadInt(BinaryReader dataIn)
{
return SwitchEndian(dataIn.ReadInt32());
}
public static long ReadLong(BinaryReader dataIn)
{
return SwitchEndian(dataIn.ReadInt64());
}
public static void WriteByte(byte value, BinaryWriter dataOut)
{
dataOut.Write(value);
}
public static void WriteChar(char value, BinaryWriter dataOut)
{
dataOut.Write(SwitchEndian((short) value));
}
public static void WriteShort(short value, BinaryWriter dataOut)
{
dataOut.Write(SwitchEndian(value));
}
public static void WriteInt(int value, BinaryWriter dataOut)
{
dataOut.Write(SwitchEndian(value));
}
public static void WriteLong(long value, BinaryWriter dataOut)
{
dataOut.Write(SwitchEndian(value));
}
/// <summary>
/// Switches from one endian to the other
/// </summary>
public static int SwitchEndian(int x)
{
return ((x << 24) | ((x & 0xff00) << 8) | ((x & 0xff0000) >> 8) | (x >> 24));
}
public static short SwitchEndian(short x)
{
int low = x & 0xff;
int high = x & 0xff00;
return(short)(high >> 8 | low << 8);
}
public static long SwitchEndian(long x)
{
long answer = 0;
for (int i = 0; i < 8; i++)
{
long lowest = x & 0xff;
x >>= 8;
answer <<= 8;
answer += lowest;
}
return answer;
}
public virtual int TightMarshalLong1(OpenWireFormat wireFormat, long o, BooleanStream bs)
{
if (o == 0L)
@ -404,18 +317,18 @@ namespace ActiveMQ.OpenWire
{
if (bs.ReadBoolean())
{
WriteLong(o, dataOut);
dataOut.Write(o);
}
else
{
WriteInt((int) o, dataOut);
dataOut.Write((int)o);
}
}
else
{
if (bs.ReadBoolean())
{
WriteShort((short) o, dataOut);
dataOut.Write((short)o);
}
}
}
@ -425,18 +338,18 @@ namespace ActiveMQ.OpenWire
{
if (bs.ReadBoolean())
{
return ReadLong(dataIn);
return dataIn.ReadInt64(); // dataIn.ReadInt64();
}
else
{
return ReadInt(dataIn);
return dataIn.ReadInt32();
}
}
else
{
if (bs.ReadBoolean())
{
return ReadShort(dataIn);
return dataIn.ReadInt16();
}
else
{
@ -475,7 +388,7 @@ namespace ActiveMQ.OpenWire
{
if (bs.ReadBoolean())
{
WriteShort((short) objects.Length, dataOut);
dataOut.Write((short) objects.Length);
for (int i = 0; i < objects.Length; i++)
{
TightMarshalNestedObject2(wireFormat, objects[i], dataOut, bs);
@ -487,7 +400,7 @@ namespace ActiveMQ.OpenWire
{
if (flag)
{
int size = ReadInt(dataIn);
int size = dataIn.ReadInt32();
return dataIn.ReadBytes(size);
}
else
@ -498,7 +411,7 @@ namespace ActiveMQ.OpenWire
protected virtual byte[] ReadBytes(BinaryReader dataIn)
{
int size = ReadInt(dataIn);
int size = dataIn.ReadInt32();
return dataIn.ReadBytes(size);
}
@ -509,7 +422,7 @@ namespace ActiveMQ.OpenWire
protected virtual void WriteBytes(byte[] command, BinaryWriter dataOut)
{
WriteInt(command.Length, dataOut);
dataOut.Write(command.Length);
dataOut.Write(command);
}
@ -526,7 +439,7 @@ namespace ActiveMQ.OpenWire
answer.Message = TightUnmarshalString(dataIn, bs);
if (wireFormat.StackTraceEnabled)
{
short length = ReadShort(dataIn);
short length = dataIn.ReadInt16();
StackTraceElement[] stackTrace = new StackTraceElement[length];
for (int i = 0; i < stackTrace.Length; i++)
{
@ -534,7 +447,7 @@ namespace ActiveMQ.OpenWire
element.ClassName = TightUnmarshalString(dataIn, bs);
element.MethodName = TightUnmarshalString(dataIn, bs);
element.FileName = TightUnmarshalString(dataIn, bs);
element.LineNumber = ReadInt(dataIn);
element.LineNumber = dataIn.ReadInt32();
stackTrace[i] = element;
}
answer.StackTraceElements = stackTrace;
@ -593,7 +506,7 @@ namespace ActiveMQ.OpenWire
if (wireFormat.StackTraceEnabled)
{
StackTraceElement[] stackTrace = o.StackTraceElements;
WriteShort((short) stackTrace.Length, dataOut);
dataOut.Write((short) stackTrace.Length);
for (int i = 0; i < stackTrace.Length; i++)
{
@ -601,7 +514,7 @@ namespace ActiveMQ.OpenWire
TightMarshalString2(element.ClassName, dataOut, bs);
TightMarshalString2(element.MethodName, dataOut, bs);
TightMarshalString2(element.FileName, dataOut, bs);
WriteInt(element.LineNumber, dataOut);
dataOut.Write(element.LineNumber);
}
TightMarshalBrokerError2(wireFormat, o.Cause, dataOut, bs);
}
@ -620,7 +533,7 @@ namespace ActiveMQ.OpenWire
else
{
MemoryStream memoryStream = new MemoryStream();
MarshalPrimitiveMap(map, new BinaryWriter(memoryStream));
MarshalPrimitiveMap(map, new OpenWireBinaryWriter(memoryStream));
return memoryStream.GetBuffer();
}
}
@ -628,15 +541,15 @@ namespace ActiveMQ.OpenWire
{
if (map == null)
{
WriteInt(-1, dataOut);
dataOut.Write((int)-1);
}
else
{
WriteInt(map.Count, dataOut);
dataOut.Write(map.Count);
foreach (DictionaryEntry entry in map)
{
String name = (String) entry.Key;
WriteUTF8(name, dataOut);
dataOut.Write(name);
Object value = entry.Value;
MarshalPrimitive(dataOut, value);
}
@ -655,13 +568,13 @@ namespace ActiveMQ.OpenWire
}
else
{
return UnmarshalPrimitiveMap(new BinaryReader(new MemoryStream(data)));
return UnmarshalPrimitiveMap(new OpenWireBinaryReader(new MemoryStream(data)));
}
}
public static IDictionary UnmarshalPrimitiveMap(BinaryReader dataIn)
{
int size = ReadInt(dataIn);
int size = dataIn.ReadInt32();
if (size < 0)
{
return null;
@ -671,7 +584,7 @@ namespace ActiveMQ.OpenWire
IDictionary answer = new Hashtable(size);
for (int i=0; i < size; i++)
{
String name = ReadUTF8(dataIn);
String name = dataIn.ReadString();
answer[name] = UnmarshalPrimitive(dataIn);
}
return answer;
@ -683,59 +596,59 @@ namespace ActiveMQ.OpenWire
{
if (value == null)
{
WriteByte(NULL, dataOut);
dataOut.Write(NULL);
}
else if (value is bool)
{
WriteByte(BOOLEAN_TYPE, dataOut);
WriteBoolean((bool) value, dataOut);
dataOut.Write(BOOLEAN_TYPE);
dataOut.Write((bool) value);
}
else if (value is byte)
{
WriteByte(BYTE_TYPE, dataOut);
WriteByte(((Byte)value), dataOut);
dataOut.Write(BYTE_TYPE);
dataOut.Write(((Byte)value));
}
else if (value is char)
{
WriteByte(CHAR_TYPE, dataOut);
WriteChar((char) value, dataOut);
dataOut.Write(CHAR_TYPE);
dataOut.Write((char) value);
}
else if (value is short)
{
WriteByte(SHORT_TYPE, dataOut);
WriteShort((short) value, dataOut);
dataOut.Write(SHORT_TYPE);
dataOut.Write((short) value);
}
else if (value is int)
{
WriteByte(INTEGER_TYPE, dataOut);
WriteInt((int) value, dataOut);
dataOut.Write(INTEGER_TYPE);
dataOut.Write((int) value);
}
else if (value is long)
{
WriteByte(LONG_TYPE, dataOut);
WriteLong((long) value, dataOut);
dataOut.Write(LONG_TYPE);
dataOut.Write((long) value);
}
else if (value is float)
{
WriteByte(FLOAT_TYPE, dataOut);
WriteFloat((float) value, dataOut);
dataOut.Write(FLOAT_TYPE);
dataOut.Write((float) value);
}
else if (value is double)
{
WriteByte(DOUBLE_TYPE, dataOut);
WriteDouble((double) value, dataOut);
dataOut.Write(DOUBLE_TYPE);
dataOut.Write((double) value);
}
else if (value is byte[])
{
byte[] data = (byte[]) value;
WriteByte(BYTE_ARRAY_TYPE, dataOut);
WriteInt(data.Length, dataOut);
dataOut.Write(BYTE_ARRAY_TYPE);
dataOut.Write(data.Length);
dataOut.Write(data);
}
else if (value is string)
{
WriteByte(STRING_TYPE, dataOut);
WriteUTF8((string) value, dataOut);
dataOut.Write(STRING_TYPE);
dataOut.Write((string) value);
}
else
{
@ -746,25 +659,25 @@ namespace ActiveMQ.OpenWire
public static Object UnmarshalPrimitive(BinaryReader dataIn)
{
Object value=null;
switch (ReadByte(dataIn))
switch (dataIn.ReadByte())
{
case BYTE_TYPE:
value = ReadByte(dataIn);
value = dataIn.ReadByte();
break;
case BOOLEAN_TYPE:
value = ReadBoolean(dataIn);
value = dataIn.ReadBoolean();
break;
case CHAR_TYPE:
value = ReadChar(dataIn);
value = dataIn.ReadChar();
break;
case SHORT_TYPE:
value = ReadShort(dataIn);
value = dataIn.ReadInt16();
break;
case INTEGER_TYPE:
value = ReadInt(dataIn);
value = dataIn.ReadInt32();
break;
case LONG_TYPE:
value = ReadLong(dataIn);
value = dataIn.ReadInt64();
break;
case FLOAT_TYPE:
value = ReadFloat(dataIn);
@ -773,13 +686,13 @@ namespace ActiveMQ.OpenWire
value = ReadDouble(dataIn);
break;
case BYTE_ARRAY_TYPE:
int size = ReadInt(dataIn);
int size = dataIn.ReadInt32();
byte[] data = new byte[size];
dataIn.Read(data, 0, size);
value = data;
break;
case STRING_TYPE:
value = ReadUTF8(dataIn);
value = dataIn.ReadString();
break;
}
return value;
@ -824,152 +737,6 @@ namespace ActiveMQ.OpenWire
dataOut.Write(value);
}
public static void WriteUTF8(String text, BinaryWriter dataOut)
{
if (text != null)
{
int strlen = text.Length;
int utflen = 0;
int c, count = 0;
char[] charr = text.ToCharArray();
for (int i = 0; i < strlen; i++)
{
c = charr[i];
if ((c >= 0x0001) && (c <= 0x007F))
{
utflen++;
}
else if (c > 0x07FF)
{
utflen += 3;
}
else
{
utflen += 2;
}
}
WriteInt(utflen, dataOut);
byte[] bytearr = new byte[utflen];
/*
byte[] bytearr = new byte[utflen + 4];
bytearr[count++] = (byte) ((utflen >>> 24) & 0xFF);
bytearr[count++] = (byte) ((utflen >>> 16) & 0xFF);
bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
bytearr[count++] = (byte) ((utflen >>> 0) & 0xFF);
*/
for (int i = 0; i < strlen; i++)
{
c = charr[i];
if ((c >= 0x0001) && (c <= 0x007F))
{
bytearr[count++] = (byte) c;
}
else if (c > 0x07FF)
{
bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
}
else
{
bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
}
}
dataOut.Write(bytearr);
}
else
{
WriteInt(-1, dataOut);
}
}
public static String ReadUTF8(BinaryReader dataIn)
{
int utflen = ReadInt(dataIn);
if (utflen > -1)
{
StringBuilder str = new StringBuilder(utflen);
byte[] bytearr = new byte[utflen];
int c, char2, char3;
int count = 0;
dataIn.Read(bytearr, 0, utflen);
while (count < utflen)
{
c = bytearr[count] & 0xff;
switch (c >> 4)
{
case 0:
case 1:
case 2:
case 3:
case 4:
case 5:
case 6:
case 7:
/* 0xxxxxxx */
count++;
str.Append((char) c);
break;
case 12:
case 13:
/* 110x xxxx 10xx xxxx */
count += 2;
if (count > utflen)
{
throw CreateDataFormatException();
}
char2 = bytearr[count - 1];
if ((char2 & 0xC0) != 0x80)
{
throw CreateDataFormatException();
}
str.Append((char) (((c & 0x1F) << 6) | (char2 & 0x3F)));
break;
case 14:
/* 1110 xxxx 10xx xxxx 10xx xxxx */
count += 3;
if (count > utflen)
{
throw CreateDataFormatException();
}
char2 = bytearr[count - 2];
char3 = bytearr[count - 1];
if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80))
{
throw CreateDataFormatException();
}
str.Append((char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0)));
break;
default :
/* 10xx xxxx, 1111 xxxx */
throw CreateDataFormatException();
}
}
// The number of chars produced may be less than utflen
return str.ToString();
}
else
{
return null;
}
}
private static Exception CreateDataFormatException()
{
// TODO: implement a better exception
return new Exception("Data format error!");
}
/// <summary>
/// Converts the object to a String
/// </summary>

View File

@ -79,7 +79,7 @@ namespace ActiveMQ.OpenWire
dataOut.Write((byte)arrayLimit);
} else {
dataOut.Write((byte)0x80);
BaseDataStreamMarshaller.WriteShort(arrayLimit, dataOut);
dataOut.Write(arrayLimit);
}
dataOut.Write(data, 0, arrayLimit);
Clear();
@ -87,11 +87,11 @@ namespace ActiveMQ.OpenWire
public void Unmarshal(BinaryReader dataIn)
{
arrayLimit = (short)(BaseDataStreamMarshaller.ReadByte(dataIn) & 0xFF);
arrayLimit = (short)(dataIn.ReadByte() & 0xFF);
if ( arrayLimit == 0xC0 ) {
arrayLimit = (short)(BaseDataStreamMarshaller.ReadByte(dataIn) & 0xFF);
arrayLimit = (short)(dataIn.ReadByte() & 0xFF);
} else if( arrayLimit == 0x80 ) {
arrayLimit = BaseDataStreamMarshaller.ReadShort(dataIn);
arrayLimit = dataIn.ReadInt16();
}
if( data.Length < arrayLimit ) {
data = new byte[arrayLimit];

View File

@ -0,0 +1,95 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System.IO;
namespace ActiveMQ.OpenWire
{
/// <summary>
/// Support class that switches from one endian to the other.
/// </summary>
public class EndianSupport
{
public static char SwitchEndian(char x)
{
return (char) (
(((char)( (byte)(x) )) << 8 ) |
(((char)( (byte)(x >> 8) )) )
);
}
public static short SwitchEndian(short x)
{
return (short) (
(((short)( (byte)(x) )) << 8 ) |
(((short)( (byte)(x >> 8) )) )
);
}
public static ushort SwitchEndian(ushort x)
{
return (ushort) (
(((ushort)( (byte)(x) )) << 8 ) |
(((ushort)( (byte)(x >> 8) )) )
);
}
public static int SwitchEndian(int x)
{
return
(((int)( (byte)(x) )) << 24 ) |
(((int)( (byte)(x >> 8) )) << 16 ) |
(((int)( (byte)(x >> 16) )) << 8 ) |
(((int)( (byte)(x >> 24) )) );
}
public static uint SwitchEndian(uint x)
{
return
(((uint)( (byte)(x ) )) << 24 ) |
(((uint)( (byte)(x >> 8) )) << 16 ) |
(((uint)( (byte)(x >> 16) )) << 8 ) |
(((uint)( (byte)(x >> 24) )) );
}
public static long SwitchEndian(long x)
{
return
(((long)( (byte)(x ) )) << 56 ) |
(((long)( (byte)(x >> 8) )) << 48 ) |
(((long)( (byte)(x >> 16) )) << 40 ) |
(((long)( (byte)(x >> 24) )) << 32 ) |
(((long)( (byte)(x >> 32) )) << 24 ) |
(((long)( (byte)(x >> 40) )) << 16 ) |
(((long)( (byte)(x >> 48) )) << 8 ) |
(((long)( (byte)(x >> 56) )) );
}
public static ulong SwitchEndian(ulong x)
{
return
(((ulong)( (byte)(x ) )) << 56 ) |
(((ulong)( (byte)(x >> 8) )) << 48 ) |
(((ulong)( (byte)(x >> 16) )) << 40 ) |
(((ulong)( (byte)(x >> 24) )) << 32 ) |
(((ulong)( (byte)(x >> 32) )) << 24 ) |
(((ulong)( (byte)(x >> 40) )) << 16 ) |
(((ulong)( (byte)(x >> 48) )) << 8 ) |
(((ulong)( (byte)(x >> 56) )) );
}
}
}

View File

@ -0,0 +1,218 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.IO;
using System.Text;
namespace ActiveMQ.OpenWire
{
/// <summary>
/// A BinaryWriter that switches the endian orientation of the read opperations so that they
/// are compatible with marshalling used by OpenWire.
/// </summary>
public class OpenWireBinaryReader : BinaryReader
{
public OpenWireBinaryReader(Stream input) : base(input)
{
}
/// <summary>
/// Method Read
/// </summary>
/// <returns>An int</returns>
/// <param name="buffer">A char[]</param>
/// <param name="index">An int</param>
/// <param name="count">An int</param>
public override int Read(char[] buffer, int index, int count)
{
int size = base.Read(buffer, index, count);
for( int i=0; i < size; i++ ) {
buffer[index+i] = EndianSupport.SwitchEndian(buffer[index+i]);
}
return size;
}
/// <summary>
/// Method ReadChars
/// </summary>
/// <returns>A char[]</returns>
/// <param name="count">An int</param>
public override char[] ReadChars(int count)
{
char[] rc = base.ReadChars(count);
if( rc!=null ) {
for( int i=0; i < rc.Length; i++ ) {
rc[i] = EndianSupport.SwitchEndian(rc[i]);
}
}
return rc;
}
/// <summary>
/// Method ReadInt16
/// </summary>
/// <returns>A short</returns>
public override short ReadInt16()
{
return EndianSupport.SwitchEndian(base.ReadInt16());
}
/// <summary>
/// Method ReadChar
/// </summary>
/// <returns>A char</returns>
public override char ReadChar()
{
return EndianSupport.SwitchEndian(base.ReadChar());
}
/// <summary>
/// Method ReadInt64
/// </summary>
/// <returns>A long</returns>
public override long ReadInt64()
{
return EndianSupport.SwitchEndian(base.ReadInt64());
}
/// <summary>
/// Method ReadUInt64
/// </summary>
/// <returns>An ulong</returns>
public override ulong ReadUInt64()
{
return EndianSupport.SwitchEndian(base.ReadUInt64());
}
/// <summary>
/// Method ReadUInt32
/// </summary>
/// <returns>An uint</returns>
public override uint ReadUInt32()
{
return EndianSupport.SwitchEndian(base.ReadUInt32());
}
/// <summary>
/// Method ReadUInt16
/// </summary>
/// <returns>An ushort</returns>
public override ushort ReadUInt16()
{
return EndianSupport.SwitchEndian(base.ReadUInt16());
}
/// <summary>
/// Method ReadInt32
/// </summary>
/// <returns>An int</returns>
public override int ReadInt32()
{
int x = base.ReadInt32();
int y = EndianSupport.SwitchEndian(x);
return y;
}
/// <summary>
/// Method ReadString
/// </summary>
/// <returns>A string</returns>
public override String ReadString()
{
short utflen = ReadInt16();
if (utflen > -1)
{
StringBuilder str = new StringBuilder(utflen);
byte[] bytearr = new byte[utflen];
int c, char2, char3;
int count = 0;
Read(bytearr, 0, utflen);
while (count < utflen)
{
c = bytearr[count] & 0xff;
switch (c >> 4)
{
case 0:
case 1:
case 2:
case 3:
case 4:
case 5:
case 6:
case 7:
/* 0xxxxxxx */
count++;
str.Append((char) c);
break;
case 12:
case 13:
/* 110x xxxx 10xx xxxx */
count += 2;
if (count > utflen)
{
throw CreateDataFormatException();
}
char2 = bytearr[count - 1];
if ((char2 & 0xC0) != 0x80)
{
throw CreateDataFormatException();
}
str.Append((char) (((c & 0x1F) << 6) | (char2 & 0x3F)));
break;
case 14:
/* 1110 xxxx 10xx xxxx 10xx xxxx */
count += 3;
if (count > utflen)
{
throw CreateDataFormatException();
}
char2 = bytearr[count - 2];
char3 = bytearr[count - 1];
if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80))
{
throw CreateDataFormatException();
}
str.Append((char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0)));
break;
default :
/* 10xx xxxx, 1111 xxxx */
throw CreateDataFormatException();
}
}
// The number of chars produced may be less than utflen
return str.ToString();
}
else
{
return null;
}
}
private static Exception CreateDataFormatException()
{
// TODO: implement a better exception
return new IOException("Data format error!");
}
}
}

View File

@ -0,0 +1,194 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using ActiveMQ.Commands;
using System;
using System.Collections;
using System.IO;
using System.Text;
namespace ActiveMQ.OpenWire
{
/// <summary>
/// A BinaryWriter that switches the endian orientation of the write opperations so that they
/// are compatible with marshalling used by OpenWire.
/// </summary>
public class OpenWireBinaryWriter : BinaryWriter
{
public OpenWireBinaryWriter(Stream output) : base(output)
{
}
/// <summary>
/// Method Write
/// </summary>
/// <param name="value">A long</param>
public override void Write(long value)
{
base.Write(EndianSupport.SwitchEndian(value));
}
/// <summary>
/// Method Write
/// </summary>
/// <param name="value">An ushort</param>
public override void Write(ushort value)
{
base.Write(EndianSupport.SwitchEndian(value));
}
/// <summary>
/// Method Write
/// </summary>
/// <param name="value">An int</param>
public override void Write(int value)
{
int x = EndianSupport.SwitchEndian(value);
base.Write(x);
}
/// <summary>
/// Method Write
/// </summary>
/// <param name="chars">A char[]</param>
/// <param name="index">An int</param>
/// <param name="count">An int</param>
public override void Write(char[] chars, int index, int count)
{
char[] t = new char[count];
for( int i=0; i < count; i++ ) {
t[index+i] = EndianSupport.SwitchEndian(t[index+i]);
}
base.Write(t);
}
/// <summary>
/// Method Write
/// </summary>
/// <param name="chars">A char[]</param>
public override void Write(char[] chars)
{
Write(chars, 0, chars.Length);
}
/// <summary>
/// Method Write
/// </summary>
/// <param name="value">An uint</param>
public override void Write(uint value)
{
base.Write(EndianSupport.SwitchEndian(value));
}
/// <summary>
/// Method Write
/// </summary>
/// <param name="ch">A char</param>
public override void Write(char ch)
{
base.Write(EndianSupport.SwitchEndian(ch));
}
/// <summary>
/// Method Write
/// </summary>
/// <param name="value">An ulong</param>
public override void Write(ulong value)
{
base.Write(EndianSupport.SwitchEndian(value));
}
/// <summary>
/// Method Write
/// </summary>
/// <param name="value">A short</param>
public override void Write(short value)
{
base.Write(EndianSupport.SwitchEndian(value));
}
/// <summary>
/// Method Write
/// </summary>
/// <param name="value">A string</param>
public override void Write(String text)
{
if (text != null)
{
if( text.Length > short.MaxValue ) {
throw new IOException("Cannot marshall string longer than: "+short.MaxValue+" characters, supplied steing was: "+text.Length+" characters");
}
short strlen = (short)text.Length;
short utflen = 0;
int c, count = 0;
char[] charr = text.ToCharArray();
for (int i = 0; i < strlen; i++)
{
c = charr[i];
if ((c >= 0x0001) && (c <= 0x007F))
{
utflen++;
}
else if (c > 0x07FF)
{
utflen += 3;
}
else
{
utflen += 2;
}
}
Write(utflen);
byte[] bytearr = new byte[utflen];
for (int i = 0; i < strlen; i++)
{
c = charr[i];
if ((c >= 0x0001) && (c <= 0x007F))
{
bytearr[count++] = (byte) c;
}
else if (c > 0x07FF)
{
bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
}
else
{
bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
}
}
Write(bytearr);
}
else
{
Write((short)-1);
}
}
}
}

View File

@ -72,7 +72,7 @@ namespace ActiveMQ.OpenWire
{
DataStructure c = (DataStructure) o;
byte type = c.GetDataStructureType();
BaseDataStreamMarshaller dsm = (BaseDataStreamMarshaller) dataMarshallers[type & 0xFF];
BaseDataStreamMarshaller dsm = dataMarshallers[type & 0xFF];
if (dsm == null)
throw new IOException("Unknown data type: " + type);
@ -80,28 +80,28 @@ namespace ActiveMQ.OpenWire
size += dsm.TightMarshal1(this, c, bs);
size += bs.MarshalledSize();
BaseDataStreamMarshaller.WriteInt(size, ds);
BaseDataStreamMarshaller.WriteByte(type, ds);
ds.Write(size);
ds.Write(type);
bs.Marshal(ds);
dsm.TightMarshal2(this, c, ds, bs);
}
else
{
BaseDataStreamMarshaller.WriteInt(size, ds);
BaseDataStreamMarshaller.WriteByte(NULL_TYPE, ds);
ds.Write(size);
ds.Write(NULL_TYPE);
}
}
public Object Unmarshal(BinaryReader dis)
{
// lets ignore the size of the packet
BaseDataStreamMarshaller.ReadInt(dis);
dis.ReadInt32();
// first byte is the type of the packet
byte dataType = BaseDataStreamMarshaller.ReadByte(dis);
byte dataType = dis.ReadByte();
if (dataType != NULL_TYPE)
{
BaseDataStreamMarshaller dsm = (BaseDataStreamMarshaller) dataMarshallers[dataType & 0xFF];
BaseDataStreamMarshaller dsm = dataMarshallers[dataType & 0xFF];
if (dsm == null)
throw new IOException("Unknown data type: " + dataType);
//Console.WriteLine("Parsing type: " + dataType + " with: " + dsm);
@ -151,7 +151,7 @@ namespace ActiveMQ.OpenWire
return ;
byte type = o.GetDataStructureType();
BaseDataStreamMarshaller.WriteByte(type, ds);
ds.Write(type);
if (o.IsMarshallAware() && bs.ReadBoolean())
{
@ -174,7 +174,7 @@ namespace ActiveMQ.OpenWire
if (bs.ReadBoolean())
{
byte dataType = BaseDataStreamMarshaller.ReadByte(dis);
byte dataType = dis.ReadByte();
BaseDataStreamMarshaller dsm = (BaseDataStreamMarshaller) dataMarshallers[dataType & 0xFF];
if (dsm == null)
throw new IOException("Unknown data type: " + dataType);
@ -182,8 +182,8 @@ namespace ActiveMQ.OpenWire
if (data.IsMarshallAware() && bs.ReadBoolean())
{
BaseDataStreamMarshaller.ReadInt(dis);
BaseDataStreamMarshaller.ReadByte(dis);
dis.ReadInt32();
dis.ReadByte();
BooleanStream bs2 = new BooleanStream();
bs2.Unmarshal(dis);

View File

@ -44,7 +44,7 @@ namespace ActiveMQ.OpenWire.V1
base.TightUnmarshal(wireFormat, o, dataIn, bs);
BaseCommand info = (BaseCommand)o;
info.CommandId = BaseDataStreamMarshaller.ReadShort(dataIn);
info.CommandId = dataIn.ReadInt16();
info.ResponseRequired = bs.ReadBoolean();
}
@ -69,7 +69,7 @@ namespace ActiveMQ.OpenWire.V1
base.TightMarshal2(wireFormat, o, dataOut, bs);
BaseCommand info = (BaseCommand)o;
BaseDataStreamMarshaller.WriteShort(info.CommandId, dataOut);
dataOut.Write(info.CommandId);
bs.ReadBoolean();
}

View File

@ -59,7 +59,7 @@ namespace ActiveMQ.OpenWire.V1
info.BrokerURL = TightUnmarshalString(dataIn, bs);
if (bs.ReadBoolean()) {
short size = BaseDataStreamMarshaller.ReadShort(dataIn);
short size = dataIn.ReadInt16();
BrokerInfo[] value = new BrokerInfo[size];
for( int i=0; i < size; i++ ) {
value[i] = (BrokerInfo) TightUnmarshalNestedObject(wireFormat,dataIn, bs);

View File

@ -61,7 +61,7 @@ namespace ActiveMQ.OpenWire.V1
info.UserName = TightUnmarshalString(dataIn, bs);
if (bs.ReadBoolean()) {
short size = BaseDataStreamMarshaller.ReadShort(dataIn);
short size = dataIn.ReadInt16();
BrokerId[] value = new BrokerId[size];
for( int i=0; i < size; i++ ) {
value[i] = (BrokerId) TightUnmarshalNestedObject(wireFormat,dataIn, bs);

View File

@ -58,18 +58,18 @@ namespace ActiveMQ.OpenWire.V1
info.ConsumerId = (ConsumerId) TightUnmarshalCachedObject(wireFormat, dataIn, bs);
info.Browser = bs.ReadBoolean();
info.Destination = (ActiveMQDestination) TightUnmarshalCachedObject(wireFormat, dataIn, bs);
info.PrefetchSize = BaseDataStreamMarshaller.ReadInt(dataIn);
info.MaximumPendingMessageLimit = BaseDataStreamMarshaller.ReadInt(dataIn);
info.PrefetchSize = dataIn.ReadInt32();
info.MaximumPendingMessageLimit = dataIn.ReadInt32();
info.DispatchAsync = bs.ReadBoolean();
info.Selector = TightUnmarshalString(dataIn, bs);
info.SubcriptionName = TightUnmarshalString(dataIn, bs);
info.NoLocal = bs.ReadBoolean();
info.Exclusive = bs.ReadBoolean();
info.Retroactive = bs.ReadBoolean();
info.Priority = BaseDataStreamMarshaller.ReadByte(dataIn);
info.Priority = dataIn.ReadByte();
if (bs.ReadBoolean()) {
short size = BaseDataStreamMarshaller.ReadShort(dataIn);
short size = dataIn.ReadInt16();
BrokerId[] value = new BrokerId[size];
for( int i=0; i < size; i++ ) {
value[i] = (BrokerId) TightUnmarshalNestedObject(wireFormat,dataIn, bs);
@ -116,15 +116,15 @@ namespace ActiveMQ.OpenWire.V1
TightMarshalCachedObject2(wireFormat, info.ConsumerId, dataOut, bs);
bs.ReadBoolean();
TightMarshalCachedObject2(wireFormat, info.Destination, dataOut, bs);
BaseDataStreamMarshaller.WriteInt(info.PrefetchSize, dataOut);
BaseDataStreamMarshaller.WriteInt(info.MaximumPendingMessageLimit, dataOut);
dataOut.Write(info.PrefetchSize);
dataOut.Write(info.MaximumPendingMessageLimit);
bs.ReadBoolean();
TightMarshalString2(info.Selector, dataOut, bs);
TightMarshalString2(info.SubcriptionName, dataOut, bs);
bs.ReadBoolean();
bs.ReadBoolean();
bs.ReadBoolean();
BaseDataStreamMarshaller.WriteByte(info.Priority, dataOut);
dataOut.Write(info.Priority);
TightMarshalObjectArray2(wireFormat, info.BrokerPath, dataOut, bs);
bs.ReadBoolean();

View File

@ -57,7 +57,7 @@ namespace ActiveMQ.OpenWire.V1
DataArrayResponse info = (DataArrayResponse)o;
if (bs.ReadBoolean()) {
short size = BaseDataStreamMarshaller.ReadShort(dataIn);
short size = dataIn.ReadInt16();
DataStructure[] value = new DataStructure[size];
for( int i=0; i < size; i++ ) {
value[i] = (DataStructure) TightUnmarshalNestedObject(wireFormat,dataIn, bs);

View File

@ -57,11 +57,11 @@ namespace ActiveMQ.OpenWire.V1
DestinationInfo info = (DestinationInfo)o;
info.ConnectionId = (ConnectionId) TightUnmarshalCachedObject(wireFormat, dataIn, bs);
info.Destination = (ActiveMQDestination) TightUnmarshalCachedObject(wireFormat, dataIn, bs);
info.OperationType = BaseDataStreamMarshaller.ReadByte(dataIn);
info.OperationType = dataIn.ReadByte();
info.Timeout = TightUnmarshalLong(wireFormat, dataIn, bs);
if (bs.ReadBoolean()) {
short size = BaseDataStreamMarshaller.ReadShort(dataIn);
short size = dataIn.ReadInt16();
BrokerId[] value = new BrokerId[size];
for( int i=0; i < size; i++ ) {
value[i] = (BrokerId) TightUnmarshalNestedObject(wireFormat,dataIn, bs);
@ -99,7 +99,7 @@ namespace ActiveMQ.OpenWire.V1
DestinationInfo info = (DestinationInfo)o;
TightMarshalCachedObject2(wireFormat, info.ConnectionId, dataOut, bs);
TightMarshalCachedObject2(wireFormat, info.Destination, dataOut, bs);
BaseDataStreamMarshaller.WriteByte(info.OperationType, dataOut);
dataOut.Write(info.OperationType);
TightMarshalLong2(wireFormat, info.Timeout, dataOut, bs);
TightMarshalObjectArray2(wireFormat, info.BrokerPath, dataOut, bs);

View File

@ -55,7 +55,7 @@ namespace ActiveMQ.OpenWire.V1
base.TightUnmarshal(wireFormat, o, dataIn, bs);
IntegerResponse info = (IntegerResponse)o;
info.Result = BaseDataStreamMarshaller.ReadInt(dataIn);
info.Result = dataIn.ReadInt32();
}
@ -78,7 +78,7 @@ namespace ActiveMQ.OpenWire.V1
base.TightMarshal2(wireFormat, o, dataOut, bs);
IntegerResponse info = (IntegerResponse)o;
BaseDataStreamMarshaller.WriteInt(info.Result, dataOut);
dataOut.Write(info.Result);
}
}

View File

@ -56,7 +56,7 @@ namespace ActiveMQ.OpenWire.V1
JournalTransaction info = (JournalTransaction)o;
info.TransactionId = (TransactionId) TightUnmarshalNestedObject(wireFormat, dataIn, bs);
info.Type = BaseDataStreamMarshaller.ReadByte(dataIn);
info.Type = dataIn.ReadByte();
info.WasPrepared = bs.ReadBoolean();
}
@ -83,7 +83,7 @@ namespace ActiveMQ.OpenWire.V1
JournalTransaction info = (JournalTransaction)o;
TightMarshalNestedObject2(wireFormat, info.TransactionId, dataOut, bs);
BaseDataStreamMarshaller.WriteByte(info.Type, dataOut);
dataOut.Write(info.Type);
bs.ReadBoolean();
}

View File

@ -58,10 +58,10 @@ namespace ActiveMQ.OpenWire.V1
info.Destination = (ActiveMQDestination) TightUnmarshalCachedObject(wireFormat, dataIn, bs);
info.TransactionId = (TransactionId) TightUnmarshalCachedObject(wireFormat, dataIn, bs);
info.ConsumerId = (ConsumerId) TightUnmarshalCachedObject(wireFormat, dataIn, bs);
info.AckType = BaseDataStreamMarshaller.ReadByte(dataIn);
info.AckType = dataIn.ReadByte();
info.FirstMessageId = (MessageId) TightUnmarshalNestedObject(wireFormat, dataIn, bs);
info.LastMessageId = (MessageId) TightUnmarshalNestedObject(wireFormat, dataIn, bs);
info.MessageCount = BaseDataStreamMarshaller.ReadInt(dataIn);
info.MessageCount = dataIn.ReadInt32();
}
@ -92,10 +92,10 @@ namespace ActiveMQ.OpenWire.V1
TightMarshalCachedObject2(wireFormat, info.Destination, dataOut, bs);
TightMarshalCachedObject2(wireFormat, info.TransactionId, dataOut, bs);
TightMarshalCachedObject2(wireFormat, info.ConsumerId, dataOut, bs);
BaseDataStreamMarshaller.WriteByte(info.AckType, dataOut);
dataOut.Write(info.AckType);
TightMarshalNestedObject2(wireFormat, info.FirstMessageId, dataOut, bs);
TightMarshalNestedObject2(wireFormat, info.LastMessageId, dataOut, bs);
BaseDataStreamMarshaller.WriteInt(info.MessageCount, dataOut);
dataOut.Write(info.MessageCount);
}
}

View File

@ -58,7 +58,7 @@ namespace ActiveMQ.OpenWire.V1
info.ConsumerId = (ConsumerId) TightUnmarshalCachedObject(wireFormat, dataIn, bs);
info.Destination = (ActiveMQDestination) TightUnmarshalCachedObject(wireFormat, dataIn, bs);
info.Message = (Message) TightUnmarshalNestedObject(wireFormat, dataIn, bs);
info.RedeliveryCounter = BaseDataStreamMarshaller.ReadInt(dataIn);
info.RedeliveryCounter = dataIn.ReadInt32();
}
@ -87,7 +87,7 @@ namespace ActiveMQ.OpenWire.V1
TightMarshalCachedObject2(wireFormat, info.ConsumerId, dataOut, bs);
TightMarshalCachedObject2(wireFormat, info.Destination, dataOut, bs);
TightMarshalNestedObject2(wireFormat, info.Message, dataOut, bs);
BaseDataStreamMarshaller.WriteInt(info.RedeliveryCounter, dataOut);
dataOut.Write(info.RedeliveryCounter);
}
}

View File

@ -51,11 +51,11 @@ namespace ActiveMQ.OpenWire.V1
info.MessageId = (MessageId) TightUnmarshalNestedObject(wireFormat, dataIn, bs);
info.OriginalTransactionId = (TransactionId) TightUnmarshalCachedObject(wireFormat, dataIn, bs);
info.GroupID = TightUnmarshalString(dataIn, bs);
info.GroupSequence = BaseDataStreamMarshaller.ReadInt(dataIn);
info.GroupSequence = dataIn.ReadInt32();
info.CorrelationId = TightUnmarshalString(dataIn, bs);
info.Persistent = bs.ReadBoolean();
info.Expiration = TightUnmarshalLong(wireFormat, dataIn, bs);
info.Priority = BaseDataStreamMarshaller.ReadByte(dataIn);
info.Priority = dataIn.ReadByte();
info.ReplyTo = (ActiveMQDestination) TightUnmarshalNestedObject(wireFormat, dataIn, bs);
info.Timestamp = TightUnmarshalLong(wireFormat, dataIn, bs);
info.Type = TightUnmarshalString(dataIn, bs);
@ -64,10 +64,10 @@ namespace ActiveMQ.OpenWire.V1
info.DataStructure = (DataStructure) TightUnmarshalNestedObject(wireFormat, dataIn, bs);
info.TargetConsumerId = (ConsumerId) TightUnmarshalCachedObject(wireFormat, dataIn, bs);
info.Compressed = bs.ReadBoolean();
info.RedeliveryCounter = BaseDataStreamMarshaller.ReadInt(dataIn);
info.RedeliveryCounter = dataIn.ReadInt32();
if (bs.ReadBoolean()) {
short size = BaseDataStreamMarshaller.ReadShort(dataIn);
short size = dataIn.ReadInt16();
BrokerId[] value = new BrokerId[size];
for( int i=0; i < size; i++ ) {
value[i] = (BrokerId) TightUnmarshalNestedObject(wireFormat,dataIn, bs);
@ -133,26 +133,26 @@ namespace ActiveMQ.OpenWire.V1
TightMarshalNestedObject2(wireFormat, info.MessageId, dataOut, bs);
TightMarshalCachedObject2(wireFormat, info.OriginalTransactionId, dataOut, bs);
TightMarshalString2(info.GroupID, dataOut, bs);
BaseDataStreamMarshaller.WriteInt(info.GroupSequence, dataOut);
dataOut.Write(info.GroupSequence);
TightMarshalString2(info.CorrelationId, dataOut, bs);
bs.ReadBoolean();
TightMarshalLong2(wireFormat, info.Expiration, dataOut, bs);
BaseDataStreamMarshaller.WriteByte(info.Priority, dataOut);
dataOut.Write(info.Priority);
TightMarshalNestedObject2(wireFormat, info.ReplyTo, dataOut, bs);
TightMarshalLong2(wireFormat, info.Timestamp, dataOut, bs);
TightMarshalString2(info.Type, dataOut, bs);
if(bs.ReadBoolean()) {
BaseDataStreamMarshaller.WriteInt(info.Content.Length, dataOut);
dataOut.Write(info.Content.Length);
dataOut.Write(info.Content);
}
if(bs.ReadBoolean()) {
BaseDataStreamMarshaller.WriteInt(info.MarshalledProperties.Length, dataOut);
dataOut.Write(info.MarshalledProperties.Length);
dataOut.Write(info.MarshalledProperties);
}
TightMarshalNestedObject2(wireFormat, info.DataStructure, dataOut, bs);
TightMarshalCachedObject2(wireFormat, info.TargetConsumerId, dataOut, bs);
bs.ReadBoolean();
BaseDataStreamMarshaller.WriteInt(info.RedeliveryCounter, dataOut);
dataOut.Write(info.RedeliveryCounter);
TightMarshalObjectArray2(wireFormat, info.BrokerPath, dataOut, bs);
TightMarshalLong2(wireFormat, info.Arrival, dataOut, bs);
TightMarshalString2(info.UserID, dataOut, bs);

View File

@ -59,7 +59,7 @@ namespace ActiveMQ.OpenWire.V1
info.Destination = (ActiveMQDestination) TightUnmarshalCachedObject(wireFormat, dataIn, bs);
if (bs.ReadBoolean()) {
short size = BaseDataStreamMarshaller.ReadShort(dataIn);
short size = dataIn.ReadInt16();
BrokerId[] value = new BrokerId[size];
for( int i=0; i < size; i++ ) {
value[i] = (BrokerId) TightUnmarshalNestedObject(wireFormat,dataIn, bs);

View File

@ -55,7 +55,7 @@ namespace ActiveMQ.OpenWire.V1
base.TightUnmarshal(wireFormat, o, dataIn, bs);
Response info = (Response)o;
info.CorrelationId = BaseDataStreamMarshaller.ReadShort(dataIn);
info.CorrelationId = dataIn.ReadInt16();
}
@ -78,7 +78,7 @@ namespace ActiveMQ.OpenWire.V1
base.TightMarshal2(wireFormat, o, dataOut, bs);
Response info = (Response)o;
BaseDataStreamMarshaller.WriteShort(info.CorrelationId, dataOut);
dataOut.Write(info.CorrelationId);
}
}

View File

@ -57,7 +57,7 @@ namespace ActiveMQ.OpenWire.V1
TransactionInfo info = (TransactionInfo)o;
info.ConnectionId = (ConnectionId) TightUnmarshalCachedObject(wireFormat, dataIn, bs);
info.TransactionId = (TransactionId) TightUnmarshalCachedObject(wireFormat, dataIn, bs);
info.Type = BaseDataStreamMarshaller.ReadByte(dataIn);
info.Type = dataIn.ReadByte();
}
@ -84,7 +84,7 @@ namespace ActiveMQ.OpenWire.V1
TransactionInfo info = (TransactionInfo)o;
TightMarshalCachedObject2(wireFormat, info.ConnectionId, dataOut, bs);
TightMarshalCachedObject2(wireFormat, info.TransactionId, dataOut, bs);
BaseDataStreamMarshaller.WriteByte(info.Type, dataOut);
dataOut.Write(info.Type);
}
}

View File

@ -56,7 +56,7 @@ namespace ActiveMQ.OpenWire.V1
WireFormatInfo info = (WireFormatInfo)o;
info.Magic = ReadBytes(dataIn, 8);
info.Version = BaseDataStreamMarshaller.ReadInt(dataIn);
info.Version = dataIn.ReadInt32();
info.CacheEnabled = bs.ReadBoolean();
info.StackTraceEnabled = bs.ReadBoolean();
info.TcpNoDelayEnabled = bs.ReadBoolean();
@ -90,7 +90,7 @@ namespace ActiveMQ.OpenWire.V1
WireFormatInfo info = (WireFormatInfo)o;
dataOut.Write(info.Magic, 0, 8);
BaseDataStreamMarshaller.WriteInt(info.Version, dataOut);
dataOut.Write(info.Version);
bs.ReadBoolean();
bs.ReadBoolean();
bs.ReadBoolean();

View File

@ -55,7 +55,7 @@ namespace ActiveMQ.OpenWire.V1
base.TightUnmarshal(wireFormat, o, dataIn, bs);
XATransactionId info = (XATransactionId)o;
info.FormatId = BaseDataStreamMarshaller.ReadInt(dataIn);
info.FormatId = dataIn.ReadInt32();
info.GlobalTransactionId = ReadBytes(dataIn, bs.ReadBoolean());
info.BranchQualifier = ReadBytes(dataIn, bs.ReadBoolean());
@ -84,13 +84,13 @@ namespace ActiveMQ.OpenWire.V1
base.TightMarshal2(wireFormat, o, dataOut, bs);
XATransactionId info = (XATransactionId)o;
BaseDataStreamMarshaller.WriteInt(info.FormatId, dataOut);
dataOut.Write(info.FormatId);
if(bs.ReadBoolean()) {
BaseDataStreamMarshaller.WriteInt(info.GlobalTransactionId.Length, dataOut);
dataOut.Write(info.GlobalTransactionId.Length);
dataOut.Write(info.GlobalTransactionId);
}
if(bs.ReadBoolean()) {
BaseDataStreamMarshaller.WriteInt(info.BranchQualifier.Length, dataOut);
dataOut.Write(info.BranchQualifier.Length);
dataOut.Write(info.BranchQualifier);
}

View File

@ -41,7 +41,7 @@ namespace ActiveMQ.Transport.Tcp
private bool started;
volatile private bool closed;
private CommandHandler commandHandlerHandlerHandlerHandlerHandler;
private CommandHandler commandHandler;
private ExceptionHandler exceptionHandler;
public TcpTransport(Socket socket)
@ -56,7 +56,7 @@ namespace ActiveMQ.Transport.Tcp
{
if (!started)
{
if( commandHandlerHandlerHandlerHandlerHandler == null )
if( commandHandler == null )
throw new InvalidOperationException ("command cannot be null when Start is called.");
if( exceptionHandler == null )
throw new InvalidOperationException ("exception cannot be null when Start is called.");
@ -64,8 +64,8 @@ namespace ActiveMQ.Transport.Tcp
started = true;
NetworkStream networkStream = new NetworkStream(socket);
socketWriter = new BinaryWriter(networkStream);
socketReader = new BinaryReader(networkStream);
socketWriter = new OpenWireBinaryWriter(networkStream);
socketReader = new OpenWireBinaryReader(networkStream);
// now lets create the background read thread
readThread = new Thread(new ThreadStart(ReadLoop));
@ -108,37 +108,29 @@ namespace ActiveMQ.Transport.Tcp
try
{
Command command = (Command) wireformat.Unmarshal(socketReader);
this.commandHandlerHandlerHandlerHandlerHandler(this, command);
this.commandHandler(this, command);
}
catch (ObjectDisposedException)
{
break;
}
catch ( IOException e) {
catch ( Exception e) {
if( e.GetBaseException() is ObjectDisposedException ) {
break;
}
if( !closed ) {
this.exceptionHandler(this,e);
}
break;
}
catch (Exception e)
{
if( !closed ) {
this.exceptionHandler(this,e);
}
}
}
}
// Implementation methods
public CommandHandler Command {
get { return commandHandlerHandlerHandlerHandlerHandler; }
set { this.commandHandlerHandlerHandlerHandlerHandler = value; }
get { return commandHandler; }
set { this.commandHandler = value; }
}
public ExceptionHandler Exception {

View File

@ -30,7 +30,7 @@ namespace ActiveMQ.Transport.Tcp
Socket socket = Connect(location.Host, location.Port);
ITransport rc = new TcpTransport(socket);
// TODO: use URI query string to enable the LoggingTransport
// rc = new LoggingTransport(rc);
rc = new LoggingTransport(rc);
rc = new ResponseCorrelator(rc);
rc = new MutexTransport(rc);
return rc;

View File

@ -103,7 +103,10 @@
<Compile Include="ActiveMQ\MessageProducer.cs"/>
<Compile Include="ActiveMQ\OpenWire\BaseDataStreamMarshaller.cs"/>
<Compile Include="ActiveMQ\OpenWire\BooleanStream.cs"/>
<Compile Include="ActiveMQ\OpenWire\EndianSupport.cs"/>
<Compile Include="ActiveMQ\OpenWire\MessagePropertyHelper.cs"/>
<Compile Include="ActiveMQ\OpenWire\OpenWireBinaryReader.cs"/>
<Compile Include="ActiveMQ\OpenWire\OpenWireBinaryWriter.cs"/>
<Compile Include="ActiveMQ\OpenWire\OpenWireFormat.cs"/>
<Compile Include="ActiveMQ\OpenWire\PrimitiveMap.cs"/>
<Compile Include="ActiveMQ\OpenWire\V1\ActiveMQBytesMessageMarshaller.cs"/>

View File

@ -24,7 +24,6 @@ namespace ActiveMQ.OpenWire
[TestFixture]
public class BooleanStreamTest
{
protected OpenWireFormat openWireformat;
protected int endOfStreamMarker = 0x12345678;
int numberOfBytes = 8 * 200;
@ -72,7 +71,7 @@ namespace ActiveMQ.OpenWire
protected void TestBooleanStream(int numberOfBytes, GetBooleanValueDelegate valueDelegate)
{
for (int i = 0; i < numberOfBytes; i++)
for (int i = 1017; i < numberOfBytes; i++)
{
AssertMarshalBooleans(i, valueDelegate);
}
@ -86,14 +85,14 @@ namespace ActiveMQ.OpenWire
bs.WriteBoolean(valueDelegate(i, count));
}
MemoryStream buffer = new MemoryStream();
BinaryWriter ds = new BinaryWriter(buffer);
BinaryWriter ds = new OpenWireBinaryWriter(buffer);
bs.Marshal(ds);
BaseDataStreamMarshaller.WriteInt(endOfStreamMarker, ds);
ds.Write(endOfStreamMarker);
// now lets read from the stream
MemoryStream ins = new MemoryStream(buffer.ToArray());
BinaryReader dis = new BinaryReader(ins);
BinaryReader dis = new OpenWireBinaryReader(ins);
bs = new BooleanStream();
bs.Unmarshal(dis);
@ -111,7 +110,7 @@ namespace ActiveMQ.OpenWire
Assert.Fail("Failed to parse bool: " + i + " out of: " + count + " due to: " + e);
}
}
int marker = BaseDataStreamMarshaller.ReadInt(dis);
int marker = dis.ReadInt32();
Assert.AreEqual(endOfStreamMarker, marker, "did not match: "+endOfStreamMarker+" and "+marker);
// lets try read and we should get an exception
@ -125,20 +124,6 @@ namespace ActiveMQ.OpenWire
}
}
[SetUp]
protected void SetUp()
{
openWireformat = createOpenWireFormat();
}
protected OpenWireFormat createOpenWireFormat()
{
OpenWireFormat wf = new OpenWireFormat();
// wf.setCacheEnabled(true);
// wf.setStackTraceEnabled(false);
// wf.setVersion(1);
return wf;
}
}
}

View File

@ -17,7 +17,7 @@
using ActiveMQ.OpenWire;
using NUnit.Framework;
using System;
using System.IO;
namespace ActiveMQ.OpenWire
{
@ -29,15 +29,10 @@ namespace ActiveMQ.OpenWire
public void TestLongEndian()
{
long value = 0x0102030405060708L;
long newValue = BaseDataStreamMarshaller.SwitchEndian(value);
long newValue = EndianSupport.SwitchEndian(value);
Console.WriteLine("New value: " + newValue);
Assert.AreEqual(0x0807060504030201L, newValue);
long actual = BaseDataStreamMarshaller.SwitchEndian(newValue);
long actual = EndianSupport.SwitchEndian(newValue);
Assert.AreEqual(value, actual);
}
@ -45,32 +40,91 @@ namespace ActiveMQ.OpenWire
public void TestIntEndian()
{
int value = 0x12345678;
int newValue = BaseDataStreamMarshaller.SwitchEndian(value);
int newValue = EndianSupport.SwitchEndian(value);
Console.WriteLine("New value: " + newValue);
Assert.AreEqual(0x78563412, newValue);
int actual = BaseDataStreamMarshaller.SwitchEndian(newValue);
int actual = EndianSupport.SwitchEndian(newValue);
Assert.AreEqual(value, actual);
}
[Test]
public void TestShortEndian()
{
short value = 0x1234;
short newValue = BaseDataStreamMarshaller.SwitchEndian(value);
short newValue = EndianSupport.SwitchEndian(value);
Console.WriteLine("New value: " + newValue);
Assert.AreEqual(0x3412, newValue);
short actual = BaseDataStreamMarshaller.SwitchEndian(newValue);
short actual = EndianSupport.SwitchEndian(newValue);
Assert.AreEqual(value, actual);
}
[Test]
public void TestNegativeLongEndian()
{
long value = -0x0102030405060708L;
long newValue = EndianSupport.SwitchEndian(value);
Console.WriteLine("New value: " + newValue);
long actual = EndianSupport.SwitchEndian(newValue);
Assert.AreEqual(value, actual);
}
[Test]
public void TestNegativeIntEndian()
{
int value = -0x12345678;
int newValue = EndianSupport.SwitchEndian(value);
Console.WriteLine("New value: " + newValue);
int actual = EndianSupport.SwitchEndian(newValue);
Assert.AreEqual(value, actual);
}
[Test]
public void TestNegativeShortEndian()
{
short value = -0x1234;
short newValue = EndianSupport.SwitchEndian(value);
Console.WriteLine("New value: " + newValue);
short actual = EndianSupport.SwitchEndian(newValue);
Assert.AreEqual(value, actual);
}
[Test]
public void TestFloatDontNeedEndianSwitch()
{
float value = -1.223F;
Console.WriteLine("value: " + value);
// Convert to int so we can compare to Java version.
MemoryStream ms = new MemoryStream(4);
BinaryWriter bw = new BinaryWriter(ms);
bw.Write(value);
bw.Close();
ms = new MemoryStream(ms.ToArray());
BinaryReader br = new BinaryReader(ms);
// System.out.println(Integer.toString(Float.floatToIntBits(-1.223F), 16));
Assert.AreEqual(-0x406374bc, br.ReadInt32());
}
[Test]
public void TestDoublDontNeedEndianSwitch()
{
double value = -1.223D;
Console.WriteLine("New value: " + value);
// Convert to int so we can compare to Java version.
MemoryStream ms = new MemoryStream(4);
BinaryWriter bw = new BinaryWriter(ms);
bw.Write(value);
bw.Close();
ms = new MemoryStream(ms.ToArray());
BinaryReader br = new BinaryReader(ms);
long longVersion = br.ReadInt64();
// System.out.println(Long.toString(Double.doubleToLongBits(-1.223D), 16));
Assert.AreEqual(-0x400c6e978d4fdf3b, longVersion);
}
}
}

View File

@ -48,8 +48,8 @@ namespace JMS
}
//[Ignore("Not fully implemented yet.")]
[Test]
[Ignore("Not fully implemented yet.")]
public void testDurableConsumerSelectorChange()
{
@ -59,10 +59,10 @@ namespace JMS
// Send the messages
ITextMessage message = session.CreateTextMessage("1st");
//message.SetStringProperty("color", "red");
message.Properties["color"] = "red";
producer.Send(message);
IMessage m = consumer.Receive(1000);
IMessage m = consumer.Receive(receiveTimeout );
Assert.IsNotNull(m);
Assert.AreEqual("1st", ((ITextMessage)m).Text);
@ -71,10 +71,10 @@ namespace JMS
consumer = session.CreateDurableConsumer((ITopic)Destination, "test", "color='blue'", false);
message = session.CreateTextMessage("2nd");
// message.setStringProperty("color", "red");
message.Properties["color"] = "red";
producer.Send(message);
message = session.CreateTextMessage("3rd");
// message.setStringProperty("color", "blue");
message.Properties["color"] = "blue";
producer.Send(message);
// Selector should skip the 2nd message.

View File

@ -31,6 +31,11 @@ namespace JMS
int e = 0x12345678;
long f = 0x1234567812345678;
string g = "Hello World!";
bool h = false;
byte i = 0xFF;
short j = -0x1234;
int k = -0x12345678;
long l = -0x1234567812345678;
[SetUp]
override public void SetUp()
@ -61,6 +66,11 @@ namespace JMS
message.Body["e"] = e;
message.Body["f"] = f;
message.Body["g"] = g;
message.Body["h"] = h;
message.Body["i"] = i;
message.Body["j"] = j;
message.Body["k"] = k;
message.Body["l"] = l;
return message;
}
@ -83,6 +93,11 @@ namespace JMS
Assert.AreEqual(e, mapMessage.Body["e"], "generic map entry: e");
Assert.AreEqual(f, mapMessage.Body["f"], "generic map entry: f");
Assert.AreEqual(g, mapMessage.Body["g"], "generic map entry: g");
Assert.AreEqual(h, mapMessage.Body["h"], "generic map entry: h");
Assert.AreEqual(i, mapMessage.Body["i"], "generic map entry: i");
Assert.AreEqual(j, mapMessage.Body["j"], "generic map entry: j");
Assert.AreEqual(k, mapMessage.Body["k"], "generic map entry: k");
Assert.AreEqual(l, mapMessage.Body["l"], "generic map entry: l");
// use type safe APIs
Assert.AreEqual(a, mapMessage.Body.GetBool("a"), "map entry: a");
@ -92,8 +107,12 @@ namespace JMS
Assert.AreEqual(e, mapMessage.Body.GetInt("e"), "map entry: e");
Assert.AreEqual(f, mapMessage.Body.GetLong("f"), "map entry: f");
Assert.AreEqual(g, mapMessage.Body.GetString("g"), "map entry: g");
Assert.AreEqual(h, mapMessage.Body.GetBool("h"), "map entry: h");
Assert.AreEqual(i, mapMessage.Body.GetByte("i"), "map entry: i");
Assert.AreEqual(j, mapMessage.Body.GetShort("j"), "map entry: j");
Assert.AreEqual(k, mapMessage.Body.GetInt("k"), "map entry: k");
Assert.AreEqual(l, mapMessage.Body.GetLong("l"), "map entry: l");
}
protected string ToHex(long value)

View File

@ -0,0 +1,120 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using JMS;
using NUnit.Framework;
using System;
namespace tests
{
[ TestFixture ]
public class MessageTest : JMSTestSupport
{
bool a = true;
byte b = 123;
char c = 'c';
short d = 0x1234;
int e = 0x12345678;
long f = 0x1234567812345678;
string g = "Hello World!";
bool h = false;
byte i = 0xFF;
short j = -0x1234;
int k = -0x12345678;
long l = -0x1234567812345678;
[SetUp]
override public void SetUp()
{
base.SetUp();
}
[TearDown]
override public void TearDown()
{
base.TearDown();
}
[ Test ]
public override void SendAndSyncReceive()
{
base.SendAndSyncReceive();
}
protected override IMessage CreateMessage()
{
IMessage message = session.CreateMessage();
message.Properties["a"] = a;
message.Properties["b"] = b;
message.Properties["c"] = c;
message.Properties["d"] = d;
message.Properties["e"] = e;
message.Properties["f"] = f;
message.Properties["g"] = g;
message.Properties["h"] = h;
message.Properties["i"] = i;
message.Properties["j"] = j;
message.Properties["k"] = k;
message.Properties["l"] = l;
return message;
}
protected override void AssertValidMessage(IMessage message)
{
Console.WriteLine("Received message: " + message);
Console.WriteLine("Received Count: " + message.Properties.Count);
Assert.AreEqual(ToHex(f), ToHex(message.Properties.GetLong("f")), "map entry: f as hex");
// use generic API to access entries
Assert.AreEqual(a, message.Properties["a"], "generic map entry: a");
Assert.AreEqual(b, message.Properties["b"], "generic map entry: b");
Assert.AreEqual(c, message.Properties["c"], "generic map entry: c");
Assert.AreEqual(d, message.Properties["d"], "generic map entry: d");
Assert.AreEqual(e, message.Properties["e"], "generic map entry: e");
Assert.AreEqual(f, message.Properties["f"], "generic map entry: f");
Assert.AreEqual(g, message.Properties["g"], "generic map entry: g");
Assert.AreEqual(h, message.Properties["h"], "generic map entry: h");
Assert.AreEqual(i, message.Properties["i"], "generic map entry: i");
Assert.AreEqual(j, message.Properties["j"], "generic map entry: j");
Assert.AreEqual(k, message.Properties["k"], "generic map entry: k");
Assert.AreEqual(l, message.Properties["l"], "generic map entry: l");
// use type safe APIs
Assert.AreEqual(a, message.Properties.GetBool("a"), "map entry: a");
Assert.AreEqual(b, message.Properties.GetByte("b"), "map entry: b");
Assert.AreEqual(c, message.Properties.GetChar("c"), "map entry: c");
Assert.AreEqual(d, message.Properties.GetShort("d"), "map entry: d");
Assert.AreEqual(e, message.Properties.GetInt("e"), "map entry: e");
Assert.AreEqual(f, message.Properties.GetLong("f"), "map entry: f");
Assert.AreEqual(g, message.Properties.GetString("g"), "map entry: g");
Assert.AreEqual(h, message.Properties.GetBool("h"), "map entry: h");
Assert.AreEqual(i, message.Properties.GetByte("i"), "map entry: i");
Assert.AreEqual(j, message.Properties.GetShort("j"), "map entry: j");
Assert.AreEqual(k, message.Properties.GetInt("k"), "map entry: k");
Assert.AreEqual(l, message.Properties.GetLong("l"), "map entry: l");
}
protected string ToHex(long value)
{
return String.Format("{0:x}", value);
}
}
}

View File

@ -52,6 +52,7 @@
<Compile Include="JMS\JMSPropertyTest.cs"/>
<Compile Include="JMS\JMSTestSupport.cs"/>
<Compile Include="JMS\MapMessageTest.cs"/>
<Compile Include="JMS\MessageTest.cs"/>
<Compile Include="JMS\TextMessage.cs"/>
<Compile Include="JMS\TransactionTest.cs"/>
</ItemGroup>