added support for the latest wireformat and clean stack trace support

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@380771 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-02-24 18:39:16 +00:00
parent fbb26ba7d7
commit baabdde9f2
12 changed files with 341 additions and 153 deletions

View File

@ -19,15 +19,34 @@ using System.Collections;
using OpenWire.Client.Commands; using OpenWire.Client.Commands;
using OpenWire.Client.Core; using OpenWire.Client.Core;
namespace OpenWire.Client { namespace OpenWire.Client
{
/// <summary> /// <summary>
/// Exception thrown when the broker returns an error /// Exception thrown when the broker returns an error
/// </summary> /// </summary>
public class BrokerException : OpenWireException { public class BrokerException : OpenWireException
public BrokerException(BrokerError cause) : base("The operation failed: Type: " {
+ cause.ExceptionClass
+ " stack: " private BrokerError brokerError;
+ cause.StackTrace) {
public BrokerException(BrokerError brokerError) : base(
brokerError.ExceptionClass + " : " + brokerError.Message)
{
this.brokerError = brokerError;
} }
public BrokerError BrokerError {
get {
return brokerError;
}
}
public virtual string StackTrace
{
get {
return brokerError.StackTrace;
}
}
} }
} }

View File

@ -1,19 +1,19 @@
/* /*
* Copyright 2006 The Apache Software Foundation or its licensors, as * Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable. * applicable.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
using System; using System;
using System.Collections; using System.Collections;

View File

@ -1,19 +1,19 @@
/* /*
* Copyright 2006 The Apache Software Foundation or its licensors, as * Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable. * applicable.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
using System; using System;
using System.Collections; using System.Collections;
@ -36,32 +36,27 @@ namespace OpenWire.Client.Commands
public const byte ID_TransactionId = 0; public const byte ID_TransactionId = 0;
public override int GetHashCode() public override int GetHashCode() {
{
int answer = 0; int answer = 0;
return answer; return answer;
} }
public override bool Equals(object that) public override bool Equals(object that) {
{ if (that is TransactionId) {
if (that is TransactionId)
{
return Equals((TransactionId) that); return Equals((TransactionId) that);
} }
return false; return false;
} }
public virtual bool Equals(TransactionId that) public virtual bool Equals(TransactionId that) {
{
return true; return true;
} }
public override string ToString() public override string ToString() {
{
return GetType().Name + "[" return GetType().Name + "["
+ " ]"; + " ]";
@ -69,9 +64,12 @@ namespace OpenWire.Client.Commands
public override byte GetDataStructureType() public override byte GetDataStructureType() {
{
return ID_TransactionId; return ID_TransactionId;
} }
// Properties
} }
} }

View File

@ -37,13 +37,19 @@ namespace OpenWire.Client.Commands
byte[] magic; byte[] magic;
int version; int version;
int options; bool cacheEnabled;
bool compressionEnabled;
bool stackTraceEnabled;
bool tcpNoDelayEnabled;
public override string ToString() { public override string ToString() {
return GetType().Name + "[" return GetType().Name + "["
+ " Magic=" + Magic + " Magic=" + Magic
+ " Version=" + Version + " Version=" + Version
+ " Options=" + Options + " CacheEnabled=" + CacheEnabled
+ " CompressionEnabled=" + CompressionEnabled
+ " StackTraceEnabled=" + StackTraceEnabled
+ " TcpNoDelayEnabled=" + TcpNoDelayEnabled
+ " ]"; + " ]";
} }
@ -69,10 +75,28 @@ namespace OpenWire.Client.Commands
set { this.version = value; } set { this.version = value; }
} }
public int Options public bool CacheEnabled
{ {
get { return options; } get { return cacheEnabled; }
set { this.options = value; } set { this.cacheEnabled = value; }
}
public bool CompressionEnabled
{
get { return compressionEnabled; }
set { this.compressionEnabled = value; }
}
public bool StackTraceEnabled
{
get { return stackTraceEnabled; }
set { this.stackTraceEnabled = value; }
}
public bool TcpNoDelayEnabled
{
get { return tcpNoDelayEnabled; }
set { this.tcpNoDelayEnabled = value; }
} }
} }

View File

@ -10,11 +10,8 @@ namespace OpenWire.Client
/// </summary> /// </summary>
public class Connection : IConnection public class Connection : IConnection
{ {
static private char[] MAGIC = new char[] { 'A', 'c', 't', 'i', 'v', 'e', 'M', 'Q' };
private ITransport transport; private ITransport transport;
private ConnectionInfo info; private ConnectionInfo info;
private WireFormatInfo wireFormatInfo = new WireFormatInfo();
private BrokerInfo brokerInfo; // from broker private BrokerInfo brokerInfo; // from broker
private WireFormatInfo brokerWireFormatInfo; // from broker private WireFormatInfo brokerWireFormatInfo; // from broker
private IList sessions = new ArrayList(); private IList sessions = new ArrayList();
@ -165,11 +162,6 @@ namespace OpenWire.Client
} }
if (!connected) if (!connected)
{ {
// lets configure the wire format
wireFormatInfo.Magic = CreateMagicBytes();
wireFormatInfo.Version = 1;
transport.Oneway(wireFormatInfo);
// now lets send the connection and see if we get an ack/nak // now lets send the connection and see if we get an ack/nak
SyncRequest(info); SyncRequest(info);
connected = true; connected = true;
@ -231,18 +223,5 @@ namespace OpenWire.Client
} }
} }
/// <summary>
/// Method CreateMagicBytes
/// </summary>
/// <returns>A byte[]</retutns>
private byte[] CreateMagicBytes()
{
byte[] answer = new byte[MAGIC.Length];
for (int i = 0; i < answer.Length; i++)
{
answer[i] = (byte) MAGIC[i];
}
return answer;
}
} }
} }

View File

@ -15,30 +15,76 @@
* limitations under the License. * limitations under the License.
*/ */
using System; using System;
using System.Text;
using OpenWire.Client.Core; using OpenWire.Client.Core;
using System.IO;
namespace OpenWire.Client.Core
{
public struct StackTraceElement
{
public string ClassName;
public string FileName;
public string MethodName;
public int LineNumber;
}
namespace OpenWire.Client.Core {
/// <summary> /// <summary>
/// Represents an exception on the broker /// Represents an exception on the broker
/// </summary> /// </summary>
public class BrokerError : AbstractCommand { public class BrokerError : AbstractCommand
{
private string message; private string message;
private string exceptionClass; private string exceptionClass;
private string stackTrace; private StackTraceElement[] stackTraceElements = {};
private BrokerError cause;
public string Message { public string Message
{
get { return message; } get { return message; }
set { message = value; } set { message = value; }
} }
public string ExceptionClass { public string ExceptionClass
{
get { return exceptionClass; } get { return exceptionClass; }
set { exceptionClass = value; } set { exceptionClass = value; }
} }
public string StackTrace { public StackTraceElement[] StackTraceElements
get { return stackTrace; } {
set { stackTrace = value; } get { return stackTraceElements; }
set { stackTraceElements = value; }
}
public BrokerError Cause
{
get { return cause; }
set { cause = value; }
}
public String StackTrace {
get {
StringWriter writer = new StringWriter();
PrintStackTrace(writer);
return writer.ToString();
}
}
public void PrintStackTrace(TextWriter writer)
{
writer.WriteLine(exceptionClass + ": " + message);
for (int i = 0; i < stackTraceElements.Length; i++)
{
StackTraceElement element = stackTraceElements[i];
writer.WriteLine(" at " + element.ClassName + "." + element.MethodName + "(" + element.FileName + ":" + element.LineNumber + ")");
}
if (cause != null)
{
writer.WriteLine("Nested Exception:");
cause.PrintStackTrace(writer);
}
} }
} }
} }

View File

@ -525,12 +525,26 @@ namespace OpenWire.Client.Core
{ {
if (bs.ReadBoolean()) if (bs.ReadBoolean())
{ {
String clazz = ReadString(dataIn, bs);
String message = ReadString(dataIn, bs);
BrokerError answer = new BrokerError(); BrokerError answer = new BrokerError();
answer.ExceptionClass = clazz;
answer.Message = message; answer.ExceptionClass = ReadString(dataIn, bs);
answer.Message = ReadString(dataIn, bs);
if (wireFormat.StackTraceEnabled)
{
short length = ReadShort(dataIn);
StackTraceElement[] stackTrace = new StackTraceElement[length];
for (int i = 0; i < stackTrace.Length; i++)
{
StackTraceElement element = new StackTraceElement();
element.ClassName = ReadString(dataIn, bs);
element.MethodName = ReadString(dataIn, bs);
element.FileName = ReadString(dataIn, bs);
element.LineNumber = ReadInt(dataIn);
stackTrace[i] = element;
}
answer.StackTraceElements = stackTrace;
answer.Cause = UnmarshalBrokerError(wireFormat, dataIn, bs);
}
return answer; return answer;
} }
else else
@ -552,6 +566,21 @@ namespace OpenWire.Client.Core
bs.WriteBoolean(true); bs.WriteBoolean(true);
rc += WriteString(o.ExceptionClass, bs); rc += WriteString(o.ExceptionClass, bs);
rc += WriteString(o.Message, bs); rc += WriteString(o.Message, bs);
if (wireFormat.StackTraceEnabled)
{
rc += 2;
StackTraceElement[] stackTrace = o.StackTraceElements;
for (int i = 0; i < stackTrace.Length; i++)
{
StackTraceElement element = stackTrace[i];
rc += WriteString(element.ClassName, bs);
rc += WriteString(element.MethodName, bs);
rc += WriteString(element.FileName, bs);
rc += 4;
}
rc += MarshalBrokerError(wireFormat, o.Cause, bs);
}
return rc; return rc;
} }
} }
@ -566,6 +595,21 @@ namespace OpenWire.Client.Core
{ {
WriteString(o.ExceptionClass, dataOut, bs); WriteString(o.ExceptionClass, dataOut, bs);
WriteString(o.Message, dataOut, bs); WriteString(o.Message, dataOut, bs);
if (wireFormat.StackTraceEnabled)
{
StackTraceElement[] stackTrace = o.StackTraceElements;
WriteShort((short) stackTrace.Length, dataOut);
for (int i = 0; i < stackTrace.Length; i++)
{
StackTraceElement element = stackTrace[i];
WriteString(element.ClassName, dataOut, bs);
WriteString(element.MethodName, bs);
WriteString(element.FileName, bs);
WriteInt(element.LineNumber, dataOut);
}
MarshalBrokerError(wireFormat, o.Cause, dataOut, bs);
}
} }
} }

View File

@ -28,17 +28,37 @@ namespace OpenWire.Client.Core
/// </summary> /// </summary>
public class OpenWireFormat public class OpenWireFormat
{ {
static private char[] MAGIC = new char[] { 'A', 'c', 't', 'i', 'v', 'e', 'M', 'Q' };
private DataStreamMarshaller[] dataMarshallers; private DataStreamMarshaller[] dataMarshallers;
private const byte NULL_TYPE = 0; private const byte NULL_TYPE = 0;
private WireFormatInfo wireFormatInfo = new WireFormatInfo();
public OpenWireFormat() public OpenWireFormat()
{ {
// lets configure the wire format
wireFormatInfo.Magic = CreateMagicBytes();
wireFormatInfo.Version = 1;
wireFormatInfo.StackTraceEnabled = true;
wireFormatInfo.TcpNoDelayEnabled = true;
dataMarshallers = new DataStreamMarshaller[256]; dataMarshallers = new DataStreamMarshaller[256];
MarshallerFactory factory = new MarshallerFactory(); MarshallerFactory factory = new MarshallerFactory();
factory.configure(this); factory.configure(this);
} }
public WireFormatInfo WireFormatInfo {
get {
return wireFormatInfo;
}
}
public bool StackTraceEnabled {
get {
return wireFormatInfo.StackTraceEnabled;
}
}
public void addMarshaller(DataStreamMarshaller marshaller) public void addMarshaller(DataStreamMarshaller marshaller)
{ {
byte type = marshaller.GetDataStructureType(); byte type = marshaller.GetDataStructureType();
@ -182,5 +202,19 @@ namespace OpenWire.Client.Core
return null; return null;
} }
} }
/// <summary>
/// Method CreateMagicBytes
/// </summary>
/// <returns>A byte[]</retutns>
private byte[] CreateMagicBytes()
{
byte[] answer = new byte[MAGIC.Length];
for (int i = 0; i < answer.Length; i++)
{
answer[i] = (byte) MAGIC[i];
}
return answer;
}
} }
} }

View File

@ -70,14 +70,13 @@ namespace OpenWire.Client.Core
NetworkStream networkStream = new NetworkStream(socket); NetworkStream networkStream = new NetworkStream(socket);
socketWriter = new BinaryWriter(networkStream); socketWriter = new BinaryWriter(networkStream);
socketReader = new BinaryReader(networkStream); socketReader = new BinaryReader(networkStream);
/*
socketWriter = new BinaryWriter(new NetworkStream(socket));
socketReader = new BinaryReader(new NetworkStream(socket));
*/
// now lets create the background read thread // now lets create the background read thread
readThread = new Thread(new ThreadStart(ReadLoop)); readThread = new Thread(new ThreadStart(ReadLoop));
readThread.Start(); readThread.Start();
// lets send the wireformat we're using
Oneway(wireformat.WireFormatInfo);
} }
} }
@ -101,8 +100,16 @@ namespace OpenWire.Client.Core
public Response Request(Command command) public Response Request(Command command)
{ {
FutureResponse response = AsyncRequest(command); FutureResponse future = AsyncRequest(command);
return response.Response; Response response = future.Response;
if (response is ExceptionResponse)
{
ExceptionResponse er = (ExceptionResponse) response;
BrokerError brokerError = er.Exception;
throw new BrokerException(brokerError);
}
return response;
} }
public void Dispose() public void Dispose()
@ -149,21 +156,14 @@ namespace OpenWire.Client.Core
if (response is ExceptionResponse) if (response is ExceptionResponse)
{ {
ExceptionResponse er = (ExceptionResponse) response; ExceptionResponse er = (ExceptionResponse) response;
Exception e = new BrokerException(er.Exception); BrokerError brokerError = er.Exception;
if (this.Exception != null) if (this.Exception != null)
{ {
this.Exception(this, e); this.Exception(this, new BrokerException(brokerError));
}
else
{
throw e;
} }
} }
else
{
future.Response = response; future.Response = response;
} }
}
else else
{ {
Console.WriteLine("ERROR: Unknown response ID: " + response.CommandId + " for response: " + response); Console.WriteLine("ERROR: Unknown response ID: " + response.CommandId + " for response: " + response);

View File

@ -58,7 +58,10 @@ namespace OpenWire.Client.IO
WireFormatInfo info = (WireFormatInfo)o; WireFormatInfo info = (WireFormatInfo)o;
info.Magic = ReadBytes(dataIn, 8); info.Magic = ReadBytes(dataIn, 8);
info.Version = DataStreamMarshaller.ReadInt(dataIn); info.Version = DataStreamMarshaller.ReadInt(dataIn);
info.Options = DataStreamMarshaller.ReadInt(dataIn); info.CacheEnabled = bs.ReadBoolean();
info.CompressionEnabled = bs.ReadBoolean();
info.StackTraceEnabled = bs.ReadBoolean();
info.TcpNoDelayEnabled = bs.ReadBoolean();
} }
@ -70,8 +73,12 @@ namespace OpenWire.Client.IO
WireFormatInfo info = (WireFormatInfo)o; WireFormatInfo info = (WireFormatInfo)o;
int rc = base.Marshal1(wireFormat, info, bs); int rc = base.Marshal1(wireFormat, info, bs);
bs.WriteBoolean(info.CacheEnabled);
bs.WriteBoolean(info.CompressionEnabled);
bs.WriteBoolean(info.StackTraceEnabled);
bs.WriteBoolean(info.TcpNoDelayEnabled);
return rc + 10; return rc + 9;
} }
// //
@ -83,7 +90,10 @@ namespace OpenWire.Client.IO
WireFormatInfo info = (WireFormatInfo)o; WireFormatInfo info = (WireFormatInfo)o;
dataOut.Write(info.Magic, 0, 8); dataOut.Write(info.Magic, 0, 8);
DataStreamMarshaller.WriteInt(info.Version, dataOut); DataStreamMarshaller.WriteInt(info.Version, dataOut);
DataStreamMarshaller.WriteInt(info.Options, dataOut); bs.ReadBoolean();
bs.ReadBoolean();
bs.ReadBoolean();
bs.ReadBoolean();
} }
} }

View File

@ -0,0 +1,31 @@
using NUnit.Framework;
using System;
namespace OpenWire.Client
{
[TestFixture]
public class BadConsumeTest : TestSupport
{
[Test]
public void TestBadConsumeOperationToTestExceptions()
{
IConnectionFactory factory = new ConnectionFactory("localhost", 61616);
using (IConnection connection = factory.CreateConnection())
{
ISession session = connection.CreateSession();
try
{
IMessageConsumer consumer = session.CreateConsumer(null);
Assert.Fail("Should have thrown an exception!");
}
catch (BrokerException e)
{
Console.WriteLine("Caught expected exception: " + e);
Console.WriteLine("Stack: " + e.StackTrace);
Console.WriteLine("BrokerStrack: " + e.BrokerError.StackTrace);
}
}
}
}
}

View File

@ -33,7 +33,6 @@ namespace OpenWire.Client
public abstract class TestSupport public abstract class TestSupport
{ {
[ Test ]
public virtual void SendAndSyncReceive() public virtual void SendAndSyncReceive()
{ {
IConnectionFactory factory = new ConnectionFactory("localhost", 61616); IConnectionFactory factory = new ConnectionFactory("localhost", 61616);
@ -81,8 +80,12 @@ namespace OpenWire.Client
return destination; return destination;
} }
protected abstract IMessage CreateMessage(ISession session); protected virtual IMessage CreateMessage(ISession session) {
return session.CreateMessage();
}
protected abstract void AssertValidMessage(IMessage message); protected virtual void AssertValidMessage(IMessage message) {
Assert.IsNotNull(message, "Null Message!");
}
} }
} }