- Enhanced the NMS test support class so that it creates the connection lazily.

- You can now get a DestinationType from a destination in case you want to know what type of destination you are working with.



git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@386712 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-03-17 20:58:24 +00:00
parent 9224eeb0a2
commit 8b3fb9b928
20 changed files with 977 additions and 833 deletions

View File

@ -517,6 +517,13 @@ namespace ActiveMQ.Commands
* @return the created Destination
*/
public abstract ActiveMQDestination CreateDestination(String name);
public abstract DestinationType DestinationType
{
get;
}
}
}

View File

@ -34,6 +34,13 @@ namespace ActiveMQ.Commands
{
}
override public DestinationType DestinationType
{
get {
return DestinationType.Queue;
}
}
public String QueueName
{
get { return PhysicalName; }

View File

@ -17,6 +17,7 @@
using ActiveMQ.Commands;
using System;
using NMS;
//
@ -31,6 +32,33 @@ namespace ActiveMQ.Commands
{
public abstract class ActiveMQTempDestination : ActiveMQDestination
{
/// <summary>
/// Method GetDestinationType
/// </summary>
/// <returns>An int</returns>
public override int GetDestinationType()
{
// TODO: Implement this method
return 0;
}
/// <summary>
/// Method CreateDestination
/// </summary>
/// <returns>An ActiveMQDestination</returns>
/// <param name="name">A String</param>
public override ActiveMQDestination CreateDestination(String name)
{
// TODO: Implement this method
return null;
}
abstract override public DestinationType DestinationType
{
get;
}
public const byte ID_ActiveMQTempDestination = 0;
public ActiveMQTempDestination() : base()

View File

@ -35,6 +35,13 @@ namespace ActiveMQ.Commands
{
}
override public DestinationType DestinationType
{
get {
return DestinationType.TemporaryQueue;
}
}
public String GetQueueName()
{
return PhysicalName;

View File

@ -16,6 +16,7 @@
*/
using NMS;
using System;
using System.CodeDom.Compiler;
namespace ActiveMQ.Commands
{
@ -35,6 +36,14 @@ namespace ActiveMQ.Commands
{
}
override public DestinationType DestinationType
{
get {
return DestinationType.TemporaryTopic;
}
}
public String GetTopicName()
{
return PhysicalName;

View File

@ -34,6 +34,14 @@ namespace ActiveMQ.Commands
{
}
override public DestinationType DestinationType
{
get {
return DestinationType.Topic;
}
}
public String TopicName
{
get { return PhysicalName; }

View File

@ -16,12 +16,24 @@
*/
namespace NMS
{
public enum DestinationType
{
Queue,
Topic,
TemporaryQueue,
TemporaryTopic
}
/// <summary>
/// Summary description for Destination.
/// </summary>
public interface IDestination
{
DestinationType DestinationType {
get;
}
}
}

View File

@ -47,7 +47,7 @@ namespace NMS
// lets create an async consumer
// START SNIPPET: demo
IMessageConsumer consumer = session.CreateConsumer(this.Destination);
IMessageConsumer consumer = Session.CreateConsumer(this.Destination);
consumer.Listener += new MessageListener(OnMessage);
// END SNIPPET: demo

View File

@ -42,7 +42,7 @@ namespace NMS
{
try
{
IMessageConsumer consumer = session.CreateConsumer(null);
IMessageConsumer consumer = Session.CreateConsumer(null);
Console.WriteLine("Created consumer: " + consumer);
Assert.Fail("Should have thrown an exception!");
}

View File

@ -47,7 +47,7 @@ namespace NMS
protected override IMessage CreateMessage()
{
IBytesMessage request = session.CreateBytesMessage(expected);
IBytesMessage request = Session.CreateBytesMessage(expected);
return request;
}

View File

@ -23,9 +23,15 @@ namespace NMS
[TestFixture]
public class ConsumerTest : JMSTestSupport
{
public bool persistent;
public int prefetch;
public bool durableConsumer;
[SetUp]
override public void SetUp()
{
clientId = "test";
base.SetUp();
}
@ -35,30 +41,32 @@ namespace NMS
base.TearDown();
}
protected override IConnection CreateConnection()
{
IConnection connection = base.CreateConnection();
connection.ClientId = "test";
return connection;
}
protected override IDestination CreateDestination()
{
return session.GetTopic(CreateDestinationName());
}
//[Ignore("Not fully implemented yet.")]
[Test]
public void testDurableConsumerSelectorChange()
public void TestDurableConsumerSelectorChangePersistent()
{
this.destinationType = DestinationType.Topic;
this.persistent = true;
doTestDurableConsumerSelectorChange();
}
[Test]
public void TestDurableConsumerSelectorChangeNonPersistent()
{
this.destinationType = DestinationType.Topic;
this.persistent = true;
doTestDurableConsumerSelectorChange();
}
public void doTestDurableConsumerSelectorChange()
{
IMessageProducer producer = session.CreateProducer(Destination);
producer.Persistent = true;
IMessageConsumer consumer = session.CreateDurableConsumer((ITopic)Destination, "test", "color='red'", false);
IMessageProducer producer = Session.CreateProducer(Destination);
producer.Persistent = persistent;
IMessageConsumer consumer = Session.CreateDurableConsumer((ITopic)Destination, "test", "color='red'", false);
// Send the messages
ITextMessage message = session.CreateTextMessage("1st");
ITextMessage message = Session.CreateTextMessage("1st");
message.Properties["color"] = "red";
producer.Send(message);
@ -68,12 +76,12 @@ namespace NMS
// Change the subscription.
consumer.Dispose();
consumer = session.CreateDurableConsumer((ITopic)Destination, "test", "color='blue'", false);
consumer = Session.CreateDurableConsumer((ITopic)Destination, "test", "color='blue'", false);
message = session.CreateTextMessage("2nd");
message = Session.CreateTextMessage("2nd");
message.Properties["color"] = "red";
producer.Send(message);
message = session.CreateTextMessage("3rd");
message = Session.CreateTextMessage("3rd");
message.Properties["color"] = "blue";
producer.Send(message);

View File

@ -65,8 +65,8 @@ namespace NMS
protected override IMessage CreateMessage()
{
ITextMessage message = session.CreateTextMessage(expectedText);
replyTo = session.CreateTemporaryQueue();
ITextMessage message = Session.CreateTextMessage(expectedText);
replyTo = Session.CreateTemporaryQueue();
// lets set the headers
message.NMSCorrelationID = correlationID;

View File

@ -28,17 +28,19 @@ namespace NMS
public abstract class JMSTestSupport
{
protected IConnectionFactory factory;
protected IConnection connection;
protected ISession session;
private IConnectionFactory factory;
private IConnection connection;
private ISession session;
private IDestination destination;
protected int receiveTimeout = 1000;
protected string clientId;
protected DestinationType destinationType = DestinationType.Queue;
protected AcknowledgementMode acknowledgementMode = AcknowledgementMode.ClientAcknowledge;
[SetUp]
virtual public void SetUp()
{
Connect();
}
[TearDown]
@ -47,21 +49,58 @@ namespace NMS
Disconnect();
}
// Properties
public bool Connected
{
get { return connection!=null; }
set { if( value ) Connect(); else Disconnect(); }
}
public IConnectionFactory Factory
{
get {
if( factory == null ) {
factory = CreateConnectionFactory();
Assert.IsNotNull(factory, "no factory created");
}
return factory;
}
set { this.factory = value; }
}
public IConnection Connection
{
get {
if( connection == null ) {
Connect();
}
return connection;
}
set { this.connection = value; }
}
public ISession Session
{
get {
if( session == null ) {
session = Connection.CreateSession(acknowledgementMode);
Assert.IsNotNull(connection != null, "no session created");
}
return session;
}
set { this.session = value; }
}
virtual protected void Connect()
{
Console.WriteLine("Connectting...");
factory = CreateConnectionFactory();
Assert.IsNotNull(factory, "no factory created");
connection = CreateConnection();
Assert.IsNotNull(connection, "no connection created");
connection.Start();
session = connection.CreateSession(acknowledgementMode);
Assert.IsNotNull(connection != null, "no session created");
Console.WriteLine("Connected.");
Assert.IsNotNull(connection, "no connection created");
}
virtual protected void Disconnect()
{
if (connection != null)
@ -69,6 +108,7 @@ namespace NMS
Console.WriteLine("Disconnecting...");
connection.Dispose();
connection = null;
session=null;
Console.WriteLine("Disconnected.");
}
}
@ -81,7 +121,7 @@ namespace NMS
protected virtual void Drain()
{
using (ISession session = connection.CreateSession())
using (ISession session = Connection.CreateSession())
{
// Tries to consume any messages on the Destination
IMessageConsumer consumer = session.CreateConsumer(Destination);
@ -98,7 +138,7 @@ namespace NMS
public virtual void SendAndSyncReceive()
{
using (ISession session = connection.CreateSession())
using (ISession session = Connection.CreateSession())
{
IMessageConsumer consumer = session.CreateConsumer(Destination);
@ -120,24 +160,38 @@ namespace NMS
protected virtual IConnection CreateConnection()
{
return factory.CreateConnection();
IConnection connection = Factory.CreateConnection();
if( clientId!=null ) {
connection.ClientId = clientId;
}
return connection;
}
protected virtual IMessageProducer CreateProducer()
{
IMessageProducer producer = session.CreateProducer(destination);
IMessageProducer producer = Session.CreateProducer(destination);
return producer;
}
protected virtual IMessageConsumer CreateConsumer()
{
IMessageConsumer consumer = session.CreateConsumer(destination);
IMessageConsumer consumer = Session.CreateConsumer(destination);
return consumer;
}
protected virtual IDestination CreateDestination()
{
return session.GetQueue(CreateDestinationName());
if( destinationType == DestinationType.Queue ) {
return Session.GetQueue(CreateDestinationName());
} else if( destinationType == DestinationType.Topic ) {
return Session.GetTopic(CreateDestinationName());
} else if( destinationType == DestinationType.TemporaryQueue ) {
return Session.CreateTemporaryQueue();
} else if( destinationType == DestinationType.TemporaryTopic ) {
return Session.CreateTemporaryTopic();
} else {
throw new Exception("Unknown destination type: "+destinationType);
}
}
protected virtual string CreateDestinationName()
@ -147,7 +201,7 @@ namespace NMS
protected virtual IMessage CreateMessage()
{
return session.CreateMessage();
return Session.CreateMessage();
}
protected virtual void AssertValidMessage(IMessage message)

View File

@ -58,7 +58,7 @@ namespace NMS
protected override IMessage CreateMessage()
{
IMapMessage message = session.CreateMapMessage();
IMapMessage message = Session.CreateMapMessage();
message.Body["a"] = a;
message.Body["b"] = b;

View File

@ -57,7 +57,7 @@ namespace NMS
protected override IMessage CreateMessage()
{
IMessage message = session.CreateMessage();
IMessage message = Session.CreateMessage();
message.Properties["a"] = a;
message.Properties["b"] = b;

View File

@ -47,7 +47,7 @@ namespace NMS
protected override IMessage CreateMessage()
{
IMessage request = session.CreateTextMessage(expected);
IMessage request = Session.CreateTextMessage(expected);
return request;
}

View File

@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using NMS;
using NUnit.Framework;
using System;
using System.Collections;
@ -33,9 +32,11 @@ namespace NMS
[SetUp]
override public void SetUp()
{
acknowledgementMode = AcknowledgementMode.Transactional;
base.SetUp();
acknowledgementMode = AcknowledgementMode.Transactional;
Drain();
consumer = Session.CreateConsumer(Destination);
producer = Session.CreateProducer(Destination);
}
[TearDown]
@ -49,21 +50,21 @@ namespace NMS
public void TestSendRollback()
{
IMessage[] outbound = new IMessage[]{
session.CreateTextMessage("First Message"),
session.CreateTextMessage("Second Message")
Session.CreateTextMessage("First Message"),
Session.CreateTextMessage("Second Message")
};
//sends a message
producer.Send(outbound[0]);
session.Commit();
Session.Commit();
//sends a message that gets rollbacked
producer.Send(session.CreateTextMessage("I'm going to get rolled back."));
session.Rollback();
producer.Send(Session.CreateTextMessage("I'm going to get rolled back."));
Session.Rollback();
//sends a message
producer.Send(outbound[1]);
session.Commit();
Session.Commit();
//receives the first message
ArrayList messages = new ArrayList();
@ -79,7 +80,7 @@ namespace NMS
Console.WriteLine("Received: " + message);
//validates that the rollbacked was not consumed
session.Commit();
Session.Commit();
IMessage[] inbound = new IMessage[messages.Count];
messages.CopyTo(inbound);
AssertTextMessagesEqual("Rollback did not work.", outbound, inbound);
@ -89,24 +90,24 @@ namespace NMS
public void TestSendSessionClose()
{
IMessage[] outbound = new IMessage[]{
session.CreateTextMessage("First Message"),
session.CreateTextMessage("Second Message")
Session.CreateTextMessage("First Message"),
Session.CreateTextMessage("Second Message")
};
//sends a message
producer.Send(outbound[0]);
session.Commit();
Session.Commit();
//sends a message that gets rollbacked
producer.Send(session.CreateTextMessage("I'm going to get rolled back."));
producer.Send(Session.CreateTextMessage("I'm going to get rolled back."));
consumer.Dispose();
session.Dispose();
Session.Dispose();
Reconnect();
//sends a message
producer.Send(outbound[1]);
session.Commit();
Session.Commit();
//receives the first message
ArrayList messages = new ArrayList();
@ -122,7 +123,7 @@ namespace NMS
Console.WriteLine("Received: " + message);
//validates that the rollbacked was not consumed
session.Commit();
Session.Commit();
IMessage[] inbound = new IMessage[messages.Count];
messages.CopyTo(inbound);
AssertTextMessagesEqual("Rollback did not work.", outbound, inbound);
@ -132,14 +133,14 @@ namespace NMS
public void TestReceiveRollback()
{
IMessage[] outbound = new IMessage[]{
session.CreateTextMessage("First Message"),
session.CreateTextMessage("Second Message")
Session.CreateTextMessage("First Message"),
Session.CreateTextMessage("Second Message")
};
//sent both messages
producer.Send(outbound[0]);
producer.Send(outbound[1]);
session.Commit();
Session.Commit();
Console.WriteLine("Sent 0: " + outbound[0]);
Console.WriteLine("Sent 1: " + outbound[1]);
@ -148,20 +149,20 @@ namespace NMS
IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
messages.Add(message);
Assert.AreEqual(outbound[0], message);
session.Commit();
Session.Commit();
// rollback so we can get that last message again.
message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
Assert.IsNotNull(message);
Assert.AreEqual(outbound[1], message);
session.Rollback();
Session.Rollback();
// Consume again.. the previous message should
// get redelivered.
message = consumer.Receive(TimeSpan.FromMilliseconds(5000));
Assert.IsNotNull(message, "Should have re-received the message again!");
messages.Add(message);
session.Commit();
Session.Commit();
IMessage[] inbound = new IMessage[messages.Count];
messages.CopyTo(inbound);
@ -173,13 +174,13 @@ namespace NMS
public void TestReceiveTwoThenRollback()
{
IMessage[] outbound = new IMessage[]{
session.CreateTextMessage("First Message"),
session.CreateTextMessage("Second Message")
Session.CreateTextMessage("First Message"),
Session.CreateTextMessage("Second Message")
};
producer.Send(outbound[0]);
producer.Send(outbound[1]);
session.Commit();
Session.Commit();
Console.WriteLine("Sent 0: " + outbound[0]);
Console.WriteLine("Sent 1: " + outbound[1]);
@ -191,7 +192,7 @@ namespace NMS
message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
Assert.IsNotNull(message);
AssertTextMessageEqual("second message received before rollback", outbound[1], message);
session.Rollback();
Session.Rollback();
// Consume again.. the previous message should
// get redelivered.
@ -206,7 +207,7 @@ namespace NMS
AssertTextMessageEqual("second message received after rollback", outbound[1], message);
Assert.IsNull(consumer.ReceiveNoWait());
session.Commit();
Session.Commit();
IMessage[] inbound = new IMessage[messages.Count];
messages.CopyTo(inbound);
@ -240,11 +241,14 @@ namespace NMS
Assert.AreEqual(expectedText, actualText, message);
}
protected override void Connect()
/// <summary>
/// Method Reconnect
/// </summary>
protected override void Reconnect()
{
base.Connect();
consumer = session.CreateConsumer(Destination);
producer = session.CreateProducer(Destination);
base.Reconnect();
consumer = Session.CreateConsumer(Destination);
producer = Session.CreateProducer(Destination);
}
}