git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@424237 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-07-21 09:00:35 +00:00
parent 4f131fdc26
commit 8dd454ed08
2 changed files with 90 additions and 64 deletions

View File

@ -127,7 +127,9 @@ namespace ActiveMQ
IMessage message = dispatcher.DequeueNoWait();
if (message != null)
{
Listener(message);
//here we add the code that if do acknowledge action.
message = AutoAcknowledge(message);
Listener(message);
}
else
{

View File

@ -21,71 +21,95 @@ using System.Threading;
namespace NMS
{
[TestFixture]
public class AsyncConsumeTest : JMSTestSupport
{
protected Object semaphore = new Object();
protected bool received;
[SetUp]
override public void SetUp()
namespace NMS {
[ TestFixture ]
public class AsyncConsumeTest : JMSTestSupport
{
base.SetUp();
}
[TearDown]
override public void TearDown()
{
base.TearDown();
}
[Test]
public void TestAsynchronousConsume()
{
// lets create an async consumer
// START SNIPPET: demo
IMessageConsumer consumer = Session.CreateConsumer(this.Destination);
consumer.Listener += new MessageListener(OnMessage);
// END SNIPPET: demo
// now lets send a message
IMessageProducer producer = CreateProducer();
IMessage request = CreateMessage();
request.NMSCorrelationID = "abc";
request.NMSType = "Test";
producer.Send(request);
WaitForMessageToArrive();
}
protected void OnMessage(IMessage message)
{
Console.WriteLine("Received message: " + message);
lock (semaphore)
{
received = true;
Monitor.PulseAll(semaphore);
}
}
protected void WaitForMessageToArrive()
{
lock (semaphore)
{
if (!received)
protected Object semaphore = new Object();
protected bool received;
[ SetUp ]
override public void SetUp()
{
Monitor.Wait(semaphore, receiveTimeout);
base.SetUp();
}
[ TearDown ]
override public void TearDown()
{
base.TearDown();
}
[ Test ]
public void TestAsynchronousConsume()
{
// lets create an async consumer
// START SNIPPET: demo
IMessageConsumer consumer = Session.CreateConsumer(this.Destination);
consumer.Listener += new MessageListener(OnMessage);
// END SNIPPET: demo
// now lets send a message
IMessageProducer producer = CreateProducer();
IMessage request = CreateMessage();
request.NMSCorrelationID = "abc";
request.NMSType = "Test";
producer.Send(request);
WaitForMessageToArrive();
}
[ Test ]
public void textMessageSRExample()
{
using (IConnection connection = Factory.CreateConnection())
{
AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
ISession session = connection.CreateSession(acknowledgementMode);
IDestination destination = session.GetQueue("FOO.BAR");
// lets create a consumer and producer
IMessageConsumer consumer = session.CreateConsumer(destination);
consumer.Listener += new MessageListener(OnMessage);
IMessageProducer producer = session.CreateProducer(destination);
producer.Persistent = true;
// lets send a message
ITextMessage request = session.CreateTextMessage(
"HelloWorld!");
request.NMSCorrelationID = "abc";
request.Properties["JMSXGroupID"] = "cheese";
request.Properties["myHeader"] = "James";
producer.Send(request);
WaitForMessageToArrive();
}
}
protected void OnMessage(IMessage message)
{
Console.WriteLine("Received message: " + message);
lock (semaphore)
{
received = true;
Monitor.PulseAll(semaphore);
}
}
protected void WaitForMessageToArrive()
{
lock (semaphore)
{
if (!received)
{
Monitor.Wait(semaphore, receiveTimeout);
}
Assert.AreEqual(true, received, "Should have received a message by now!");
}
}
Assert.AreEqual(true, received, "Should have received a message by now!");
}
}
}
}