added support for transactions together with support for consumer.Receive(timeout)

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@381643 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-02-28 12:36:22 +00:00
parent 695a766b5c
commit 8ee9b45484
50 changed files with 1120 additions and 443 deletions

View File

@ -13,16 +13,16 @@ namespace OpenWire.Client
{
private ITransport transport;
private ConnectionInfo info;
private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
private BrokerInfo brokerInfo; // from broker
private WireFormatInfo brokerWireFormatInfo; // from broker
private IList sessions = new ArrayList();
private bool transacted;
private IDictionary consumers = new Hashtable(); // TODO threadsafe
private bool connected;
private bool closed;
private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
private long sessionCounter;
private long temporaryDestinationCounter;
private IDictionary consumers = new Hashtable(); // TODO threadsafe
private long localTransactionCounter;
public Connection(ITransport transport, ConnectionInfo info)
@ -53,16 +53,15 @@ namespace OpenWire.Client
/// </summary>
public ISession CreateSession()
{
return CreateSession(transacted, acknowledgementMode);
return CreateSession(acknowledgementMode);
}
/// <summary>
/// Creates a new session to work on this connection
/// </summary>
public ISession CreateSession(bool transacted, AcknowledgementMode acknowledgementMode)
public ISession CreateSession(AcknowledgementMode acknowledgementMode)
{
CheckConnected();
SessionInfo info = CreateSessionInfo(transacted, acknowledgementMode);
SessionInfo info = CreateSessionInfo(acknowledgementMode);
SyncRequest(info);
Session session = new Session(this, info, acknowledgementMode);
sessions.Add(session);
@ -71,10 +70,13 @@ namespace OpenWire.Client
public void Dispose()
{
/*
foreach (Session session in sessions)
{
session.Dispose();
}
*/
DisposeOf(ConnectionId);
sessions.Clear();
transport.Dispose();
closed = true;
@ -88,11 +90,6 @@ namespace OpenWire.Client
set { this.transport = value; }
}
public bool Transacted
{
get { return transacted; }
set { this.transacted = value; }
}
public AcknowledgementMode AcknowledgementMode
{
@ -112,13 +109,22 @@ namespace OpenWire.Client
}
}
public BrokerInfo BrokerInfo {
public ConnectionId ConnectionId
{
get {
return info.ConnectionId;
}
}
public BrokerInfo BrokerInfo
{
get {
return brokerInfo;
}
}
public WireFormatInfo BrokerWireFormat {
public WireFormatInfo BrokerWireFormat
{
get {
return brokerWireFormatInfo;
}
@ -131,6 +137,7 @@ namespace OpenWire.Client
/// </summary>
public Response SyncRequest(Command command)
{
CheckConnected();
Response response = transport.Request(command);
if (response is ExceptionResponse)
{
@ -141,18 +148,17 @@ namespace OpenWire.Client
return response;
}
protected SessionInfo CreateSessionInfo(bool transacted, AcknowledgementMode acknowledgementMode)
public void OneWay(Command command)
{
SessionInfo answer = new SessionInfo();
SessionId sessionId = new SessionId();
sessionId.ConnectionId = info.ConnectionId.Value;
lock (this)
{
sessionId.Value = ++sessionCounter;
CheckConnected();
transport.Oneway(command);
}
answer.SessionId = sessionId;
return answer;
public void DisposeOf(DataStructure objectId)
{
RemoveInfo command = new RemoveInfo();
command.ObjectId = objectId;
SyncRequest(command);
}
@ -167,6 +173,20 @@ namespace OpenWire.Client
}
}
/// <summary>
/// Creates a new local transaction ID
/// </summary>
public LocalTransactionId CreateLocalTransactionId()
{
LocalTransactionId id= new LocalTransactionId();
id.ConnectionId = ConnectionId;
lock (this)
{
id.Value = (++localTransactionCounter);
}
return id;
}
protected void CheckConnected()
{
if (closed)
@ -175,9 +195,9 @@ namespace OpenWire.Client
}
if (!connected)
{
connected = true;
// now lets send the connection and see if we get an ack/nak
SyncRequest(info);
connected = true;
}
}
@ -224,10 +244,12 @@ namespace OpenWire.Client
consumer.Dispatch(message);
}
}
else if (command is WireFormatInfo) {
else if (command is WireFormatInfo)
{
this.brokerWireFormatInfo = (WireFormatInfo) command;
}
else if (command is BrokerInfo) {
else if (command is BrokerInfo)
{
this.brokerInfo = (BrokerInfo) command;
}
else
@ -236,5 +258,18 @@ namespace OpenWire.Client
}
}
protected SessionInfo CreateSessionInfo(AcknowledgementMode acknowledgementMode)
{
SessionInfo answer = new SessionInfo();
SessionId sessionId = new SessionId();
sessionId.ConnectionId = info.ConnectionId.Value;
lock (this)
{
sessionId.Value = ++sessionCounter;
}
answer.SessionId = sessionId;
return answer;
}
}
}

View File

@ -18,6 +18,7 @@ using System.Collections;
using OpenWire.Client.Commands;
using System;
using OpenWire.Client;
using System.Threading;
namespace OpenWire.Client.Core
{
@ -26,14 +27,52 @@ namespace OpenWire.Client.Core
/// </summary>
public class Dispatcher
{
Queue queue = Queue.Synchronized( new Queue() );
Queue queue = new Queue();
Object semaphore = new Object();
ArrayList messagesToRedeliver = new ArrayList();
/// <summary>
/// Whem we start a transaction we must redeliver any rolled back messages
/// </summary>
public void RedeliverRolledBackMessages() {
lock (semaphore)
{
Queue replacement = new Queue(queue.Count + messagesToRedeliver.Count);
foreach (ActiveMQMessage element in messagesToRedeliver) {
replacement.Enqueue(element);
}
messagesToRedeliver.Clear();
while (queue.Count > 0)
{
ActiveMQMessage element = (ActiveMQMessage) queue.Dequeue();
replacement.Enqueue(element);
}
queue = replacement;
Monitor.PulseAll(semaphore);
}
}
/// <summary>
/// Redeliver the given message, putting it at the head of the queue
/// </summary>
public void Redeliver(ActiveMQMessage message)
{
lock (semaphore) {
messagesToRedeliver.Add(message);
}
}
/// <summary>
/// Method Enqueue
/// </summary>
public void Enqueue(ActiveMQMessage message)
{
lock (semaphore)
{
queue.Enqueue(message);
Monitor.PulseAll(semaphore);
}
}
/// <summary>
@ -41,9 +80,9 @@ namespace OpenWire.Client.Core
/// </summary>
public IMessage DequeueNoWait()
{
lock (queue)
lock (semaphore)
{
if (queue.Peek() != null)
if (queue.Count > 0)
{
return (IMessage) queue.Dequeue();
}
@ -54,19 +93,32 @@ namespace OpenWire.Client.Core
/// <summary>
/// Method Dequeue
/// </summary>
public IMessage Dequeue(long timeout)
public IMessage Dequeue(int timeout)
{
// TODO
throw new Exception("Not implemented yet");
lock (semaphore)
{
if (queue.Count == 0)
{
Monitor.Wait(semaphore, timeout);
}
if (queue.Count > 0)
{
return (IMessage) queue.Dequeue();
}
}
return null;
}
/// <summary>
/// Method Dequeue
/// </summary>
public IMessage Dequeue()
{
lock (semaphore)
{
return (IMessage) queue.Dequeue();
}
}
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace OpenWire.Client.Core
{
public interface ISynchronization
{
/// <summary>
/// Called before a commit
/// </summary>
void BeforeCommit();
/// <summary>
/// Called after a commit
/// </summary>
void AfterCommit();
/// <summary>
/// Called after a transaction rollback
/// </summary>
void AfterRollback();
}
}

View File

@ -0,0 +1,110 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System.Collections;
using OpenWire.Client.Commands;
using System;
using OpenWire.Client;
namespace OpenWire.Client.Core
{
public enum TransactionType
{
Begin = 0, Prepare = 1, CommitOnePhase = 2, CommitTwoPhase = 3, Rollback = 4, Recover=5, Forget = 6, End = 7
}
public class TransactionContext
{
private TransactionId transactionId;
private Session session;
private ArrayList synchronizations = new ArrayList();
public TransactionContext(Session session) {
this.session = session;
}
public TransactionId TransactionId
{
get { return transactionId; }
}
/// <summary>
/// Method AddSynchronization
/// </summary>
public void AddSynchronization(ISynchronization synchronization)
{
synchronizations.Add(synchronization);
}
public void Begin()
{
if (transactionId == null)
{
transactionId = session.Connection.CreateLocalTransactionId();
TransactionInfo info = new TransactionInfo();
info.ConnectionId = session.Connection.ConnectionId;
info.TransactionId = transactionId;
info.Type = (int) TransactionType.Begin;
session.Connection.OneWay(info);
}
}
public void Rollback()
{
if (transactionId != null)
{
TransactionInfo info = new TransactionInfo();
info.ConnectionId = session.Connection.ConnectionId;
info.TransactionId = transactionId;
info.Type = (int) TransactionType.Rollback;
transactionId = null;
session.Connection.OneWay(info);
}
foreach (ISynchronization synchronization in synchronizations) {
synchronization.AfterRollback();
}
synchronizations.Clear();
}
public void Commit()
{
foreach (ISynchronization synchronization in synchronizations) {
synchronization.BeforeCommit();
}
if (transactionId != null)
{
TransactionInfo info = new TransactionInfo();
info.ConnectionId = session.Connection.ConnectionId;
info.TransactionId = transactionId;
info.Type = (int) TransactionType.CommitOnePhase;
transactionId = null;
session.Connection.OneWay(info);
}
foreach (ISynchronization synchronization in synchronizations) {
synchronization.AfterCommit();
}
synchronizations.Clear();
}
}
}

View File

@ -17,9 +17,11 @@
using System;
using OpenWire.Client.Commands;
namespace OpenWire.Client {
namespace OpenWire.Client
{
public enum AcknowledgementMode {
public enum AcknowledgementMode
{
Unknown, AutoAcknowledge, ClientAcknowledge, Transactional
}
@ -27,7 +29,8 @@ namespace OpenWire.Client {
/// <summary>
/// Represents a connection with a message broker
/// </summary>
public interface IConnection : IDisposable, IStartable {
public interface IConnection : IDisposable, IStartable
{
/// <summary>
/// Creates a new session to work on this connection
@ -37,17 +40,13 @@ namespace OpenWire.Client {
/// <summary>
/// Creates a new session to work on this connection
/// </summary>
ISession CreateSession(bool transacted, AcknowledgementMode acknowledgementMode);
ISession CreateSession(AcknowledgementMode acknowledgementMode);
// Properties
bool Transacted {
get;
set;
}
AcknowledgementMode AcknowledgementMode {
AcknowledgementMode AcknowledgementMode
{
get;
set;
}

View File

@ -35,7 +35,7 @@ namespace OpenWire.Client
/// <summary>
/// If a message is available within the timeout duration it is returned otherwise this method returns null
/// </summary>
IMessage Receive(long timeout);
IMessage Receive(int timeout);
/// <summary>
/// If a message is available immediately it is returned otherwise this method returns null

View File

@ -105,5 +105,20 @@ namespace OpenWire.Client
/// </summary>
IBytesMessage CreateBytesMessage(byte[] body);
// Transaction methods
/// <summary>
/// If this is a transactional session then commit all message
/// send and acknowledgements for producers and consumers in this session
/// </summary>
void Commit();
/// <summary>
/// If this is a transactional session then rollback all message
/// send and acknowledgements for producers and consumers in this session
/// </summary>
void Rollback();
}
}

View File

@ -22,10 +22,11 @@ using OpenWire.Client.Core;
namespace OpenWire.Client
{
public enum AckType {
public enum AckType
{
DeliveredAck = 0, // Message delivered but not consumed
ConsumedAck = 1, // Message consumed, discard
PoisonAck = 2 // Message could not be processed due to poison pill but discard anyway
PoisonAck = 1, // Message could not be processed due to poison pill but discard anyway
ConsumedAck = 2 // Message consumed, discard
}
@ -40,9 +41,12 @@ namespace OpenWire.Client
private AcknowledgementMode acknowledgementMode;
private bool closed;
private Dispatcher dispatcher = new Dispatcher();
private int maximumRedeliveryCount = 10;
private int redeliveryTimeout = 500;
public event MessageListener Listener;
public MessageConsumer(Session session, ConsumerInfo info, AcknowledgementMode acknowledgementMode)
{
this.session = session;
@ -50,12 +54,29 @@ namespace OpenWire.Client
this.acknowledgementMode = acknowledgementMode;
}
public ConsumerId ConsumerId {
public ConsumerId ConsumerId
{
get {
return info.ConsumerId;
}
}
public int MaximumRedeliveryCount
{
get { return maximumRedeliveryCount; }
set { maximumRedeliveryCount = value; }
}
public int RedeliveryTimeout
{
get { return redeliveryTimeout; }
set { redeliveryTimeout = value; }
}
public void RedeliverRolledBackMessages()
{
dispatcher.RedeliverRolledBackMessages();
}
/// <summary>
/// Method Dispatch
@ -65,7 +86,8 @@ namespace OpenWire.Client
{
dispatcher.Enqueue(message);
if (Listener != null) {
if (Listener != null)
{
// lets dispatch to the thread pool for this connection for messages to be processed
ThreadPool.QueueUserWorkItem(new WaitCallback(session.DispatchAsyncMessages));
}
@ -77,7 +99,7 @@ namespace OpenWire.Client
return AutoAcknowledge(dispatcher.Dequeue());
}
public IMessage Receive(long timeout)
public IMessage Receive(int timeout)
{
CheckClosed();
return AutoAcknowledge(dispatcher.Dequeue(timeout));
@ -102,12 +124,15 @@ namespace OpenWire.Client
/// </summary>
public void DispatchAsyncMessages()
{
while (Listener != null) {
while (Listener != null)
{
IMessage message = dispatcher.DequeueNoWait();
if (message != null) {
if (message != null)
{
Listener(message);
}
else {
else
{
break;
}
}
@ -163,10 +188,70 @@ namespace OpenWire.Client
ack.FirstMessageId = message.MessageId;
ack.LastMessageId = message.MessageId;
ack.MessageCount = 1;
ack.TransactionId = message.TransactionId;
if (session.Transacted)
{
session.DoStartTransaction();
ack.TransactionId = session.TransactionContext.TransactionId;
session.TransactionContext.AddSynchronization(new MessageConsumerSynchronization(this, message));
}
return ack;
}
public void AfterRollback(ActiveMQMessage message)
{
// lets redeliver the message again
message.RedeliveryCounter += 1;
if (message.RedeliveryCounter > MaximumRedeliveryCount)
{
// lets send back a poisoned pill
MessageAck ack = new MessageAck();
ack.AckType = (int) AckType.PoisonAck;
ack.ConsumerId = info.ConsumerId;
ack.Destination = message.Destination;
ack.FirstMessageId = message.MessageId;
ack.LastMessageId = message.MessageId;
ack.MessageCount = 1;
session.Connection.OneWay(ack);
}
else
{
dispatcher.Redeliver(message);
if (Listener != null)
{
// lets re-dispatch the message at some point in the future
Thread.Sleep(RedeliveryTimeout);
ThreadPool.QueueUserWorkItem(new WaitCallback(session.DispatchAsyncMessages));
}
}
}
}
// TODO maybe there's a cleaner way of creating stateful delegates to make this code neater
class MessageConsumerSynchronization : ISynchronization
{
private MessageConsumer consumer;
private Message message;
public MessageConsumerSynchronization(MessageConsumer consumer, Message message)
{
this.message = message;
this.consumer = consumer;
}
public void BeforeCommit()
{
}
public void AfterCommit()
{
}
public void AfterRollback()
{
consumer.AfterRollback((ActiveMQMessage) message);
}
}
}

View File

@ -59,12 +59,18 @@ namespace OpenWire.Client
activeMessage.ProducerId = info.ProducerId;
activeMessage.Destination = ActiveMQDestination.Transform(destination);
if (session.Transacted)
{
session.DoStartTransaction();
activeMessage.TransactionId = session.TransactionContext.TransactionId;
}
session.DoSend(destination, message);
}
public void Dispose()
{
session.DisposeOf(info.ProducerId);
session.Connection.DisposeOf(info.ProducerId);
}
public bool Persistent

View File

@ -33,17 +33,20 @@ namespace OpenWire.Client
private long producerCounter;
private int prefetchSize = 1000;
private IDictionary consumers = new Hashtable();
private TransactionContext transactionContext;
public Session(Connection connection, SessionInfo info, AcknowledgementMode acknowledgementMode)
{
this.connection = connection;
this.info = info;
this.acknowledgementMode = acknowledgementMode;
transactionContext = new TransactionContext(this);
}
public void Dispose()
{
DisposeOf(info.SessionId);
connection.DisposeOf(info.SessionId);
}
public IMessageProducer CreateProducer()
@ -174,13 +177,52 @@ namespace OpenWire.Client
return answer;
}
public void Commit()
{
if (! Transacted)
{
throw new InvalidOperationException("You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: " + acknowledgementMode);
}
transactionContext.Commit();
}
public void Rollback()
{
if (! Transacted)
{
throw new InvalidOperationException("You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: " + acknowledgementMode);
}
transactionContext.Rollback();
// lets ensure all the consumers redeliver any rolled back messages
foreach (MessageConsumer consumer in consumers.Values)
{
consumer.RedeliverRolledBackMessages();
}
}
// Properties
public Connection Connection
{
get {
return connection;
get { return connection; }
}
public SessionId SessionId
{
get { return info.SessionId; }
}
public bool Transacted
{
get { return acknowledgementMode == AcknowledgementMode.Transactional; }
}
public TransactionContext TransactionContext
{
get { return transactionContext; }
}
// Implementation methods
@ -191,21 +233,22 @@ namespace OpenWire.Client
connection.SyncRequest(command);
}
public void DisposeOf(DataStructure objectId)
/// <summary>
/// Ensures that a transaction is started
/// </summary>
public void DoStartTransaction()
{
// TODO dispose of all the session first?
RemoveInfo command = new RemoveInfo();
command.ObjectId = objectId;
connection.SyncRequest(command);
if (Transacted)
{
transactionContext.Begin();
}
}
public void DisposeOf(ConsumerId objectId)
{
consumers.Remove(objectId);
connection.RemoveConsumer(objectId);
RemoveInfo command = new RemoveInfo();
command.ObjectId = objectId;
connection.SyncRequest(command);
connection.DisposeOf(objectId);
}
public void DispatchAsyncMessages(object state)

View File

@ -53,7 +53,7 @@ namespace OpenWire.Client
connection.ClientId="test";
connection.Start();
ISession session = connection.CreateSession(false, AcknowledgementMode.AutoAcknowledge);
ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
destination = session.GetTopic("foo");
IMessageProducer producer = session.CreateProducer(destination);
producer.Persistent = true;

View File

@ -73,13 +73,19 @@ namespace OpenWire.Client
protected virtual IDestination CreateDestination(ISession session)
{
string name = "Test.DotNet." + GetType().Name;
string name = CreateDestinationName();
IDestination destination = session.GetQueue(name);
Console.WriteLine("Using queue: " + destination);
return destination;
}
protected virtual string CreateDestinationName()
{
string name = "Test.DotNet." + GetType().Name;
return name;
}
protected virtual IMessage CreateMessage(ISession session)
{
return session.CreateMessage();

View File

@ -0,0 +1,290 @@
/*
* Copyright 2006 The Apache Software Foundation or its licensors, as
* applicable.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.IO;
using NUnit.Framework;
using OpenWire.Client;
using OpenWire.Client.Core;
using System.Collections;
namespace OpenWire.Client
{
[TestFixture]
public class TransactionTest : TestSupport
{
private static int destinationCounter;
IDestination destination;
IConnection connection;
ISession session;
IMessageProducer producer;
IMessageConsumer consumer;
[SetUp]
public void SetUp()
{
Connect();
// lets consume any outstanding messages from previous test runs
while (consumer.Receive(1000) != null)
{
}
session.Commit();
}
[TearDown]
public void TearDown()
{
Disconnect();
}
[Test]
public void TestSendRollback()
{
IMessage[] outbound = new IMessage[]{
session.CreateTextMessage("First Message"),
session.CreateTextMessage("Second Message")
};
//sends a message
producer.Send(outbound[0]);
session.Commit();
//sends a message that gets rollbacked
producer.Send(session.CreateTextMessage("I'm going to get rolled back."));
session.Rollback();
//sends a message
producer.Send(outbound[1]);
session.Commit();
//receives the first message
ArrayList messages = new ArrayList();
Console.WriteLine("About to consume message 1");
IMessage message = consumer.Receive(1000);
messages.Add(message);
Console.WriteLine("Received: " + message);
//receives the second message
Console.WriteLine("About to consume message 2");
message = consumer.Receive(4000);
messages.Add(message);
Console.WriteLine("Received: " + message);
//validates that the rollbacked was not consumed
session.Commit();
IMessage[] inbound = new IMessage[messages.Count];
messages.CopyTo(inbound);
AssertTextMessagesEqual("Rollback did not work.", outbound, inbound);
}
[Test]
public void TestSendSessionClose()
{
IMessage[] outbound = new IMessage[]{
session.CreateTextMessage("First Message"),
session.CreateTextMessage("Second Message")
};
//sends a message
producer.Send(outbound[0]);
session.Commit();
//sends a message that gets rollbacked
producer.Send(session.CreateTextMessage("I'm going to get rolled back."));
consumer.Dispose();
session.Dispose();
Reconnect();
//sends a message
producer.Send(outbound[1]);
session.Commit();
//receives the first message
ArrayList messages = new ArrayList();
Console.WriteLine("About to consume message 1");
IMessage message = consumer.Receive(1000);
messages.Add(message);
Console.WriteLine("Received: " + message);
//receives the second message
Console.WriteLine("About to consume message 2");
message = consumer.Receive(4000);
messages.Add(message);
Console.WriteLine("Received: " + message);
//validates that the rollbacked was not consumed
session.Commit();
IMessage[] inbound = new IMessage[messages.Count];
messages.CopyTo(inbound);
AssertTextMessagesEqual("Rollback did not work.", outbound, inbound);
}
[Test]
public void TestReceiveRollback()
{
IMessage[] outbound = new IMessage[]{
session.CreateTextMessage("First Message"),
session.CreateTextMessage("Second Message")
};
//sent both messages
producer.Send(outbound[0]);
producer.Send(outbound[1]);
session.Commit();
Console.WriteLine("Sent 0: " + outbound[0]);
Console.WriteLine("Sent 1: " + outbound[1]);
ArrayList messages = new ArrayList();
IMessage message = consumer.Receive(1000);
messages.Add(message);
Assert.AreEqual(outbound[0], message);
session.Commit();
// rollback so we can get that last message again.
message = consumer.Receive(1000);
Assert.IsNotNull(message);
Assert.AreEqual(outbound[1], message);
session.Rollback();
// Consume again.. the previous message should
// get redelivered.
message = consumer.Receive(5000);
Assert.IsNotNull(message, "Should have re-received the message again!");
messages.Add(message);
session.Commit();
IMessage[] inbound = new IMessage[messages.Count];
messages.CopyTo(inbound);
AssertTextMessagesEqual("Rollback did not work", outbound, inbound);
}
[Test]
public void TestReceiveTwoThenRollback()
{
IMessage[] outbound = new IMessage[]{
session.CreateTextMessage("First Message"),
session.CreateTextMessage("Second Message")
};
producer.Send(outbound[0]);
producer.Send(outbound[1]);
session.Commit();
Console.WriteLine("Sent 0: " + outbound[0]);
Console.WriteLine("Sent 1: " + outbound[1]);
ArrayList messages = new ArrayList();
IMessage message = consumer.Receive(1000);
AssertTextMessageEqual("first mesage received before rollback", outbound[0], message);
message = consumer.Receive(1000);
Assert.IsNotNull(message);
AssertTextMessageEqual("second message received before rollback", outbound[1], message);
session.Rollback();
// Consume again.. the previous message should
// get redelivered.
message = consumer.Receive(5000);
Assert.IsNotNull(message, "Should have re-received the first message again!");
messages.Add(message);
AssertTextMessageEqual("first message received after rollback", outbound[0], message);
message = consumer.Receive(5000);
Assert.IsNotNull(message, "Should have re-received the second message again!");
messages.Add(message);
AssertTextMessageEqual("second message received after rollback", outbound[1], message);
Assert.IsNull(consumer.ReceiveNoWait());
session.Commit();
IMessage[] inbound = new IMessage[messages.Count];
messages.CopyTo(inbound);
AssertTextMessagesEqual("Rollback did not work", outbound, inbound);
}
protected override string CreateDestinationName()
{
// TODO - how can we get the test name?
return base.CreateDestinationName() + (++destinationCounter);
}
protected void AssertTextMessagesEqual(String message, IMessage[] expected, IMessage[] actual)
{
Assert.AreEqual(expected.Length, actual.Length, "Incorrect number of messages received");
for (int i = 0; i < expected.Length; i++)
{
AssertTextMessageEqual(message + ". Index: " + i, expected[i], actual[i]);
}
}
protected void AssertTextMessageEqual(String message, IMessage expected, IMessage actual)
{
Assert.IsTrue(expected is ITextMessage, "expected object not a text message");
Assert.IsTrue(actual is ITextMessage, "actual object not a text message");
String expectedText = ((ITextMessage) expected).Text;
String actualText = ((ITextMessage) actual).Text;
Assert.AreEqual(expectedText, actualText, message);
}
protected void Connect()
{
IConnectionFactory factory = new ConnectionFactory("localhost", 61616);
connection = factory.CreateConnection();
session = connection.CreateSession(AcknowledgementMode.Transactional);
// reuse the same destination if we reconnect
if (destination == null)
{
destination = CreateDestination(session);
}
consumer = session.CreateConsumer(destination);
producer = session.CreateProducer(destination);
}
protected void Disconnect()
{
if (connection != null)
{
connection.Dispose();
connection = null;
}
}
protected void Reconnect()
{
Disconnect();
Connect();
}
}
}