added support for asynchronous consumption to .Net using a MessageListener along with fixing up some nant build errors

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@381319 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-02-27 11:28:35 +00:00
parent 72fb003975
commit 81cc428b76
11 changed files with 152 additions and 24 deletions

View File

@ -41,7 +41,7 @@ namespace OpenWire.Client
} }
} }
public virtual string StackTrace public virtual string JavaStackTrace
{ {
get { get {
return brokerError.StackTrace; return brokerError.StackTrace;

View File

@ -2,6 +2,7 @@ using System;
using System.Collections; using System.Collections;
using OpenWire.Client.Commands; using OpenWire.Client.Commands;
using OpenWire.Client.Core; using OpenWire.Client.Core;
using System.Threading;
namespace OpenWire.Client namespace OpenWire.Client
{ {
@ -111,6 +112,18 @@ namespace OpenWire.Client
} }
} }
public BrokerInfo BrokerInfo {
get {
return brokerInfo;
}
}
public WireFormatInfo BrokerWireFormat {
get {
return brokerWireFormatInfo;
}
}
// Implementation methods // Implementation methods
/// <summary> /// <summary>

View File

@ -340,8 +340,6 @@ namespace OpenWire.Client.Core
/// <summary> /// <summary>
/// Switches from one endian to the other /// Switches from one endian to the other
/// </summary> /// </summary>
/// <param name="value">An int</param>
/// <returns>An int</retutns>
public static int SwitchEndian(int x) public static int SwitchEndian(int x)
{ {
return ((x << 24) | ((x & 0xff00) << 8) | ((x & 0xff0000) >> 8) | (x >> 24)); return ((x << 24) | ((x & 0xff00) << 8) | ((x & 0xff0000) >> 8) | (x >> 24));
@ -799,8 +797,6 @@ namespace OpenWire.Client.Core
/// <summary> /// <summary>
/// Method ReadFloat /// Method ReadFloat
/// </summary> /// </summary>
/// <param name="dataIn">A BinaryReader</param>
/// <returns>An Object</retutns>
private static Object ReadFloat(BinaryReader dataIn) private static Object ReadFloat(BinaryReader dataIn)
{ {
// TODO: Implement this method // TODO: Implement this method

View File

@ -17,6 +17,7 @@
using System.Collections; using System.Collections;
using OpenWire.Client.Commands; using OpenWire.Client.Commands;
using System; using System;
using OpenWire.Client;
namespace OpenWire.Client.Core namespace OpenWire.Client.Core
{ {
@ -30,7 +31,6 @@ namespace OpenWire.Client.Core
/// <summary> /// <summary>
/// Method Enqueue /// Method Enqueue
/// </summary> /// </summary>
/// <param name="message">An ActiveMQMessage</param>
public void Enqueue(ActiveMQMessage message) public void Enqueue(ActiveMQMessage message)
{ {
queue.Enqueue(message); queue.Enqueue(message);
@ -39,7 +39,6 @@ namespace OpenWire.Client.Core
/// <summary> /// <summary>
/// Method DequeueNoWait /// Method DequeueNoWait
/// </summary> /// </summary>
/// <returns>An IMessage</retutns>
public IMessage DequeueNoWait() public IMessage DequeueNoWait()
{ {
lock (queue) lock (queue)
@ -55,8 +54,6 @@ namespace OpenWire.Client.Core
/// <summary> /// <summary>
/// Method Dequeue /// Method Dequeue
/// </summary> /// </summary>
/// <param name="timeout">A long</param>
/// <returns>An IMessage</retutns>
public IMessage Dequeue(long timeout) public IMessage Dequeue(long timeout)
{ {
// TODO // TODO
@ -66,12 +63,10 @@ namespace OpenWire.Client.Core
/// <summary> /// <summary>
/// Method Dequeue /// Method Dequeue
/// </summary> /// </summary>
/// <returns>An IMessage</retutns>
public IMessage Dequeue() public IMessage Dequeue()
{ {
return (IMessage) queue.Dequeue(); return (IMessage) queue.Dequeue();
} }
} }
} }

View File

@ -94,7 +94,10 @@ namespace OpenWire.Client.Core
public Object Unmarshal(BinaryReader dis) public Object Unmarshal(BinaryReader dis)
{ {
int size = DataStreamMarshaller.ReadInt(dis); // lets ignore the size of the packet
DataStreamMarshaller.ReadInt(dis);
// first byte is the type of the packet
byte dataType = DataStreamMarshaller.ReadByte(dis); byte dataType = DataStreamMarshaller.ReadByte(dis);
if (dataType != NULL_TYPE) if (dataType != NULL_TYPE)
{ {
@ -206,7 +209,6 @@ namespace OpenWire.Client.Core
/// <summary> /// <summary>
/// Method CreateMagicBytes /// Method CreateMagicBytes
/// </summary> /// </summary>
/// <returns>A byte[]</retutns>
private byte[] CreateMagicBytes() private byte[] CreateMagicBytes()
{ {
byte[] answer = new byte[MAGIC.Length]; byte[] answer = new byte[MAGIC.Length];

View File

@ -19,7 +19,7 @@ using OpenWire.Client.Commands;
namespace OpenWire.Client namespace OpenWire.Client
{ {
public delegate void MessageHandler(IMessage message); public delegate void MessageListener(IMessage message);
/// <summary> /// <summary>
/// A consumer of messages /// A consumer of messages
@ -45,6 +45,6 @@ namespace OpenWire.Client
/// <summary> /// <summary>
/// An asynchronous listener which can be used to consume messages asynchronously /// An asynchronous listener which can be used to consume messages asynchronously
/// </summary> /// </summary>
event MessageHandler Listener; event MessageListener Listener;
} }
} }

View File

@ -41,7 +41,7 @@ namespace OpenWire.Client
private bool closed; private bool closed;
private Dispatcher dispatcher = new Dispatcher(); private Dispatcher dispatcher = new Dispatcher();
public event MessageHandler Listener; public event MessageListener Listener;
public MessageConsumer(Session session, ConsumerInfo info, AcknowledgementMode acknowledgementMode) public MessageConsumer(Session session, ConsumerInfo info, AcknowledgementMode acknowledgementMode)
{ {
@ -50,6 +50,13 @@ namespace OpenWire.Client
this.acknowledgementMode = acknowledgementMode; this.acknowledgementMode = acknowledgementMode;
} }
public ConsumerId ConsumerId {
get {
return info.ConsumerId;
}
}
/// <summary> /// <summary>
/// Method Dispatch /// Method Dispatch
/// </summary> /// </summary>
@ -57,6 +64,11 @@ namespace OpenWire.Client
public void Dispatch(ActiveMQMessage message) public void Dispatch(ActiveMQMessage message)
{ {
dispatcher.Enqueue(message); dispatcher.Enqueue(message);
if (Listener != null) {
// lets dispatch to the thread pool for this connection for messages to be processed
ThreadPool.QueueUserWorkItem(new WaitCallback(session.DispatchAsyncMessages));
}
} }
public IMessage Receive() public IMessage Receive()
@ -85,6 +97,22 @@ namespace OpenWire.Client
closed = true; closed = true;
} }
/// <summary>
/// Dispatch any pending messages to the asynchronous listener
/// </summary>
public void DispatchAsyncMessages()
{
while (Listener != null) {
IMessage message = dispatcher.DequeueNoWait();
if (message != null) {
Listener(message);
}
else {
break;
}
}
}
protected void CheckClosed() protected void CheckClosed()
{ {
if (closed) if (closed)

View File

@ -17,6 +17,7 @@
using System; using System;
using OpenWire.Client.Commands; using OpenWire.Client.Commands;
using OpenWire.Client.Core; using OpenWire.Client.Core;
using System.Collections;
namespace OpenWire.Client namespace OpenWire.Client
{ {
@ -31,6 +32,7 @@ namespace OpenWire.Client
private long consumerCounter; private long consumerCounter;
private long producerCounter; private long producerCounter;
private int prefetchSize = 1000; private int prefetchSize = 1000;
private IDictionary consumers = new Hashtable();
public Session(Connection connection, SessionInfo info, AcknowledgementMode acknowledgementMode) public Session(Connection connection, SessionInfo info, AcknowledgementMode acknowledgementMode)
{ {
@ -75,6 +77,8 @@ namespace OpenWire.Client
connection.AddConsumer(consumerId, consumer); connection.AddConsumer(consumerId, consumer);
connection.SyncRequest(command); connection.SyncRequest(command);
consumers[consumerId] = consumer;
return consumer; return consumer;
} }
catch (Exception e) catch (Exception e)
@ -98,6 +102,8 @@ namespace OpenWire.Client
connection.AddConsumer(consumerId, consumer); connection.AddConsumer(consumerId, consumer);
connection.SyncRequest(command); connection.SyncRequest(command);
consumers[consumerId] = consumer;
return consumer; return consumer;
} }
catch (Exception e) catch (Exception e)
@ -194,12 +200,22 @@ namespace OpenWire.Client
public void DisposeOf(ConsumerId objectId) public void DisposeOf(ConsumerId objectId)
{ {
consumers.Remove(objectId);
connection.RemoveConsumer(objectId); connection.RemoveConsumer(objectId);
RemoveInfo command = new RemoveInfo(); RemoveInfo command = new RemoveInfo();
command.ObjectId = objectId; command.ObjectId = objectId;
connection.SyncRequest(command); connection.SyncRequest(command);
} }
public void DispatchAsyncMessages(object state) {
// lets iterate through each consumer created by this session
// ensuring that they have all pending messages dispatched
foreach (MessageConsumer consumer in consumers.Values) {
consumer.DispatchAsyncMessages();
}
}
protected virtual ConsumerInfo CreateConsumerInfo(IDestination destination, string selector) protected virtual ConsumerInfo CreateConsumerInfo(IDestination destination, string selector)
{ {
ConsumerInfo answer = new ConsumerInfo(); ConsumerInfo answer = new ConsumerInfo();
@ -237,11 +253,8 @@ namespace OpenWire.Client
/// <summary> /// <summary>
/// Configures the message command /// Configures the message command
/// </summary> /// </summary>
/// <param name="activeMQMessage">An ActiveMQMessage</param>
/// <returns>An IMessage</retutns>
protected void Configure(ActiveMQMessage message) protected void Configure(ActiveMQMessage message)
{ {
} }
} }
} }

View File

@ -0,0 +1,77 @@
using System;
using System.IO;
using System.Threading;
using NUnit.Framework;
using OpenWire.Client;
using OpenWire.Client.Core;
namespace OpenWire.Client
{
[TestFixture]
public class AsyncConsumeTest : TestSupport
{
protected Object semaphore = new Object();
protected bool received;
[Test]
public void TestAsynchronousConsume()
{
IConnectionFactory factory = new ConnectionFactory("localhost", 61616);
Assert.IsTrue(factory != null, "no factory created");
using (IConnection connection = factory.CreateConnection())
{
Assert.IsTrue(connection != null, "no connection created");
Console.WriteLine("Connected to ActiveMQ!");
ISession session = connection.CreateSession();
IDestination destination = CreateDestination(session);
Assert.IsTrue(destination != null, "No queue available!");
// lets create an async consumer
// START SNIPPET: demo
IMessageConsumer consumer = session.CreateConsumer(destination);
consumer.Listener += new MessageListener(OnMessage);
// END SNIPPET: demo
// now lets send a message
session = connection.CreateSession();
IMessageProducer producer = session.CreateProducer(destination);
IMessage request = CreateMessage(session);
request.JMSCorrelationID = "abc";
request.JMSType = "Test";
producer.Send(request);
WaitForMessageToArrive();
}
}
protected void OnMessage(IMessage message)
{
Console.WriteLine("Received message: " + message);
lock (semaphore)
{
received = true;
Monitor.PulseAll(semaphore);
}
}
protected void WaitForMessageToArrive()
{
lock (semaphore)
{
if (!received)
{
Monitor.Wait(semaphore, 10000);
}
}
Assert.AreEqual(true, received, "Should have received a message by now!");
}
}
}

View File

@ -17,13 +17,15 @@ namespace OpenWire.Client
try try
{ {
IMessageConsumer consumer = session.CreateConsumer(null); IMessageConsumer consumer = session.CreateConsumer(null);
Console.WriteLine("Created consumer: " + consumer);
Assert.Fail("Should have thrown an exception!"); Assert.Fail("Should have thrown an exception!");
} }
catch (BrokerException e) catch (BrokerException e)
{ {
Console.WriteLine("Caught expected exception: " + e); Console.WriteLine("Caught expected exception: " + e);
Console.WriteLine("Stack: " + e.StackTrace); Console.WriteLine("Stack: " + e.StackTrace);
Console.WriteLine("BrokerStrack: " + e.BrokerError.StackTrace); Console.WriteLine("Java Stack: " + e.JavaStackTrace);
} }
} }
} }

View File

@ -80,11 +80,13 @@ namespace OpenWire.Client
return destination; return destination;
} }
protected virtual IMessage CreateMessage(ISession session) { protected virtual IMessage CreateMessage(ISession session)
{
return session.CreateMessage(); return session.CreateMessage();
} }
protected virtual void AssertValidMessage(IMessage message) { protected virtual void AssertValidMessage(IMessage message)
{
Assert.IsNotNull(message, "Null Message!"); Assert.IsNotNull(message, "Null Message!");
} }
} }