diff --git a/openwire-dotnet/src/OpenWire.Client/BrokerException.cs b/openwire-dotnet/src/OpenWire.Client/BrokerException.cs index e250c6921a..61d608b15b 100755 --- a/openwire-dotnet/src/OpenWire.Client/BrokerException.cs +++ b/openwire-dotnet/src/OpenWire.Client/BrokerException.cs @@ -41,7 +41,7 @@ namespace OpenWire.Client } } - public virtual string StackTrace + public virtual string JavaStackTrace { get { return brokerError.StackTrace; diff --git a/openwire-dotnet/src/OpenWire.Client/Connection.cs b/openwire-dotnet/src/OpenWire.Client/Connection.cs index d6b60c0f8e..a9785f9fec 100755 --- a/openwire-dotnet/src/OpenWire.Client/Connection.cs +++ b/openwire-dotnet/src/OpenWire.Client/Connection.cs @@ -2,6 +2,7 @@ using System; using System.Collections; using OpenWire.Client.Commands; using OpenWire.Client.Core; +using System.Threading; namespace OpenWire.Client { @@ -111,6 +112,18 @@ namespace OpenWire.Client } } + public BrokerInfo BrokerInfo { + get { + return brokerInfo; + } + } + + public WireFormatInfo BrokerWireFormat { + get { + return brokerWireFormatInfo; + } + } + // Implementation methods /// diff --git a/openwire-dotnet/src/OpenWire.Client/Core/DataStreamMarshaller.cs b/openwire-dotnet/src/OpenWire.Client/Core/DataStreamMarshaller.cs index 5878c9a53e..7918cb0951 100755 --- a/openwire-dotnet/src/OpenWire.Client/Core/DataStreamMarshaller.cs +++ b/openwire-dotnet/src/OpenWire.Client/Core/DataStreamMarshaller.cs @@ -340,8 +340,6 @@ namespace OpenWire.Client.Core /// /// Switches from one endian to the other /// - /// An int - /// An int public static int SwitchEndian(int x) { return ((x << 24) | ((x & 0xff00) << 8) | ((x & 0xff0000) >> 8) | (x >> 24)); @@ -799,8 +797,6 @@ namespace OpenWire.Client.Core /// /// Method ReadFloat /// - /// A BinaryReader - /// An Object private static Object ReadFloat(BinaryReader dataIn) { // TODO: Implement this method diff --git a/openwire-dotnet/src/OpenWire.Client/Core/Dispatcher.cs b/openwire-dotnet/src/OpenWire.Client/Core/Dispatcher.cs index 2255d17bcb..a31719f93a 100644 --- a/openwire-dotnet/src/OpenWire.Client/Core/Dispatcher.cs +++ b/openwire-dotnet/src/OpenWire.Client/Core/Dispatcher.cs @@ -17,6 +17,7 @@ using System.Collections; using OpenWire.Client.Commands; using System; +using OpenWire.Client; namespace OpenWire.Client.Core { @@ -26,11 +27,10 @@ namespace OpenWire.Client.Core public class Dispatcher { Queue queue = Queue.Synchronized( new Queue() ); - + /// /// Method Enqueue /// - /// An ActiveMQMessage public void Enqueue(ActiveMQMessage message) { queue.Enqueue(message); @@ -39,7 +39,6 @@ namespace OpenWire.Client.Core /// /// Method DequeueNoWait /// - /// An IMessage public IMessage DequeueNoWait() { lock (queue) @@ -55,8 +54,6 @@ namespace OpenWire.Client.Core /// /// Method Dequeue /// - /// A long - /// An IMessage public IMessage Dequeue(long timeout) { // TODO @@ -66,12 +63,10 @@ namespace OpenWire.Client.Core /// /// Method Dequeue /// - /// An IMessage public IMessage Dequeue() { return (IMessage) queue.Dequeue(); } - } } diff --git a/openwire-dotnet/src/OpenWire.Client/Core/OpenWireFormat.cs b/openwire-dotnet/src/OpenWire.Client/Core/OpenWireFormat.cs index 3bd24f3437..5eaa0b19e9 100755 --- a/openwire-dotnet/src/OpenWire.Client/Core/OpenWireFormat.cs +++ b/openwire-dotnet/src/OpenWire.Client/Core/OpenWireFormat.cs @@ -94,7 +94,10 @@ namespace OpenWire.Client.Core public Object Unmarshal(BinaryReader dis) { - int size = DataStreamMarshaller.ReadInt(dis); + // lets ignore the size of the packet + DataStreamMarshaller.ReadInt(dis); + + // first byte is the type of the packet byte dataType = DataStreamMarshaller.ReadByte(dis); if (dataType != NULL_TYPE) { @@ -206,7 +209,6 @@ namespace OpenWire.Client.Core /// /// Method CreateMagicBytes /// - /// A byte[] private byte[] CreateMagicBytes() { byte[] answer = new byte[MAGIC.Length]; diff --git a/openwire-dotnet/src/OpenWire.Client/IMessageConsumer.cs b/openwire-dotnet/src/OpenWire.Client/IMessageConsumer.cs index fe4f94dbc9..ac583a6f2a 100755 --- a/openwire-dotnet/src/OpenWire.Client/IMessageConsumer.cs +++ b/openwire-dotnet/src/OpenWire.Client/IMessageConsumer.cs @@ -19,7 +19,7 @@ using OpenWire.Client.Commands; namespace OpenWire.Client { - public delegate void MessageHandler(IMessage message); + public delegate void MessageListener(IMessage message); /// /// A consumer of messages @@ -45,6 +45,6 @@ namespace OpenWire.Client /// /// An asynchronous listener which can be used to consume messages asynchronously /// - event MessageHandler Listener; + event MessageListener Listener; } } diff --git a/openwire-dotnet/src/OpenWire.Client/MessageConsumer.cs b/openwire-dotnet/src/OpenWire.Client/MessageConsumer.cs index 6d20133ae4..d3d928c770 100755 --- a/openwire-dotnet/src/OpenWire.Client/MessageConsumer.cs +++ b/openwire-dotnet/src/OpenWire.Client/MessageConsumer.cs @@ -41,7 +41,7 @@ namespace OpenWire.Client private bool closed; private Dispatcher dispatcher = new Dispatcher(); - public event MessageHandler Listener; + public event MessageListener Listener; public MessageConsumer(Session session, ConsumerInfo info, AcknowledgementMode acknowledgementMode) { @@ -50,6 +50,13 @@ namespace OpenWire.Client this.acknowledgementMode = acknowledgementMode; } + public ConsumerId ConsumerId { + get { + return info.ConsumerId; + } + } + + /// /// Method Dispatch /// @@ -57,6 +64,11 @@ namespace OpenWire.Client public void Dispatch(ActiveMQMessage message) { dispatcher.Enqueue(message); + + if (Listener != null) { + // lets dispatch to the thread pool for this connection for messages to be processed + ThreadPool.QueueUserWorkItem(new WaitCallback(session.DispatchAsyncMessages)); + } } public IMessage Receive() @@ -85,6 +97,22 @@ namespace OpenWire.Client closed = true; } + /// + /// Dispatch any pending messages to the asynchronous listener + /// + public void DispatchAsyncMessages() + { + while (Listener != null) { + IMessage message = dispatcher.DequeueNoWait(); + if (message != null) { + Listener(message); + } + else { + break; + } + } + } + protected void CheckClosed() { if (closed) diff --git a/openwire-dotnet/src/OpenWire.Client/Session.cs b/openwire-dotnet/src/OpenWire.Client/Session.cs index 98ce841a8f..8dfb235069 100755 --- a/openwire-dotnet/src/OpenWire.Client/Session.cs +++ b/openwire-dotnet/src/OpenWire.Client/Session.cs @@ -17,6 +17,7 @@ using System; using OpenWire.Client.Commands; using OpenWire.Client.Core; +using System.Collections; namespace OpenWire.Client { @@ -31,6 +32,7 @@ namespace OpenWire.Client private long consumerCounter; private long producerCounter; private int prefetchSize = 1000; + private IDictionary consumers = new Hashtable(); public Session(Connection connection, SessionInfo info, AcknowledgementMode acknowledgementMode) { @@ -75,6 +77,8 @@ namespace OpenWire.Client connection.AddConsumer(consumerId, consumer); connection.SyncRequest(command); + + consumers[consumerId] = consumer; return consumer; } catch (Exception e) @@ -98,6 +102,8 @@ namespace OpenWire.Client connection.AddConsumer(consumerId, consumer); connection.SyncRequest(command); + + consumers[consumerId] = consumer; return consumer; } catch (Exception e) @@ -194,12 +200,22 @@ namespace OpenWire.Client public void DisposeOf(ConsumerId objectId) { + consumers.Remove(objectId); connection.RemoveConsumer(objectId); RemoveInfo command = new RemoveInfo(); command.ObjectId = objectId; connection.SyncRequest(command); } + public void DispatchAsyncMessages(object state) { + // lets iterate through each consumer created by this session + // ensuring that they have all pending messages dispatched + foreach (MessageConsumer consumer in consumers.Values) { + consumer.DispatchAsyncMessages(); + } + } + + protected virtual ConsumerInfo CreateConsumerInfo(IDestination destination, string selector) { ConsumerInfo answer = new ConsumerInfo(); @@ -237,11 +253,8 @@ namespace OpenWire.Client /// /// Configures the message command /// - /// An ActiveMQMessage - /// An IMessage protected void Configure(ActiveMQMessage message) { } - } } diff --git a/openwire-dotnet/tests/OpenWire.Client/AsyncConsumeTest.cs b/openwire-dotnet/tests/OpenWire.Client/AsyncConsumeTest.cs new file mode 100644 index 0000000000..c97dbfd849 --- /dev/null +++ b/openwire-dotnet/tests/OpenWire.Client/AsyncConsumeTest.cs @@ -0,0 +1,77 @@ +using System; +using System.IO; +using System.Threading; + +using NUnit.Framework; + +using OpenWire.Client; +using OpenWire.Client.Core; +namespace OpenWire.Client +{ + [TestFixture] + public class AsyncConsumeTest : TestSupport + { + protected Object semaphore = new Object(); + protected bool received; + + [Test] + public void TestAsynchronousConsume() + { + IConnectionFactory factory = new ConnectionFactory("localhost", 61616); + Assert.IsTrue(factory != null, "no factory created"); + + using (IConnection connection = factory.CreateConnection()) + { + Assert.IsTrue(connection != null, "no connection created"); + Console.WriteLine("Connected to ActiveMQ!"); + + ISession session = connection.CreateSession(); + IDestination destination = CreateDestination(session); + Assert.IsTrue(destination != null, "No queue available!"); + + // lets create an async consumer + // START SNIPPET: demo + IMessageConsumer consumer = session.CreateConsumer(destination); + consumer.Listener += new MessageListener(OnMessage); + // END SNIPPET: demo + + + // now lets send a message + session = connection.CreateSession(); + IMessageProducer producer = session.CreateProducer(destination); + IMessage request = CreateMessage(session); + request.JMSCorrelationID = "abc"; + request.JMSType = "Test"; + producer.Send(request); + + + WaitForMessageToArrive(); + } + + } + + protected void OnMessage(IMessage message) + { + Console.WriteLine("Received message: " + message); + lock (semaphore) + { + received = true; + Monitor.PulseAll(semaphore); + } + + } + + protected void WaitForMessageToArrive() + { + lock (semaphore) + { + if (!received) + { + Monitor.Wait(semaphore, 10000); + } + } + Assert.AreEqual(true, received, "Should have received a message by now!"); + } + + } +} diff --git a/openwire-dotnet/tests/OpenWire.Client/BadConsumeTest.cs b/openwire-dotnet/tests/OpenWire.Client/BadConsumeTest.cs index 5a6b81c9f1..1234250a94 100644 --- a/openwire-dotnet/tests/OpenWire.Client/BadConsumeTest.cs +++ b/openwire-dotnet/tests/OpenWire.Client/BadConsumeTest.cs @@ -17,13 +17,15 @@ namespace OpenWire.Client try { IMessageConsumer consumer = session.CreateConsumer(null); + Console.WriteLine("Created consumer: " + consumer); + Assert.Fail("Should have thrown an exception!"); } catch (BrokerException e) { Console.WriteLine("Caught expected exception: " + e); Console.WriteLine("Stack: " + e.StackTrace); - Console.WriteLine("BrokerStrack: " + e.BrokerError.StackTrace); + Console.WriteLine("Java Stack: " + e.JavaStackTrace); } } } diff --git a/openwire-dotnet/tests/OpenWire.Client/TestSupport.cs b/openwire-dotnet/tests/OpenWire.Client/TestSupport.cs index 56ade1e710..05a23851ef 100644 --- a/openwire-dotnet/tests/OpenWire.Client/TestSupport.cs +++ b/openwire-dotnet/tests/OpenWire.Client/TestSupport.cs @@ -70,7 +70,7 @@ namespace OpenWire.Client } } } - + protected virtual IDestination CreateDestination(ISession session) { string name = "Test.DotNet." + GetType().Name; @@ -80,11 +80,13 @@ namespace OpenWire.Client return destination; } - protected virtual IMessage CreateMessage(ISession session) { + protected virtual IMessage CreateMessage(ISession session) + { return session.CreateMessage(); } - protected virtual void AssertValidMessage(IMessage message) { + protected virtual void AssertValidMessage(IMessage message) + { Assert.IsNotNull(message, "Null Message!"); } }