From 611e335cfc9effd701578ac10bcf566f5cddd947 Mon Sep 17 00:00:00 2001 From: James Strachan Date: Tue, 10 Jan 2006 13:56:34 +0000 Subject: [PATCH] added first cut of a near complete OpenWire.Net API with a shell of an implementation git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@367605 13f79535-47bb-0310-9956-ffa450edef68 --- .../{Core => Commands}/ActiveMQDestination.cs | 4 +- .../Commands/ActiveMQMessage.cs | 44 ++++++----- .../{Core => Commands}/ActiveMQQueue.cs | 2 +- .../{Core => Commands}/ActiveMQTempQueue.cs | 2 +- .../{Core => Commands}/ActiveMQTempTopic.cs | 2 +- .../Commands/ActiveMQTextMessage.cs | 38 +++++---- .../{Core => Commands}/ActiveMQTopic.cs | 2 +- .../src/OpenWire.Client/Connection.cs | 50 +++++++++++- .../ConnectionClosedException.cs | 14 ++++ .../ConsumerClosedException.cs | 14 ++++ .../src/OpenWire.Client/IMessage.cs | 14 ++++ .../src/OpenWire.Client/IMessageConsumer.cs | 21 +++++ .../src/OpenWire.Client/IMessageProducer.cs | 20 +++++ .../src/OpenWire.Client/ISession.cs | 35 ++++++++- .../src/OpenWire.Client/ITextMessage.cs | 15 ++++ .../src/OpenWire.Client/MessageConsumer.cs | 42 ++++++++++ .../src/OpenWire.Client/MessageProducer.cs | 30 ++++++++ .../src/OpenWire.Client/OpenWireException.cs | 14 ++++ .../src/OpenWire.Client/Session.cs | 77 +++++++++++++++++-- 19 files changed, 390 insertions(+), 50 deletions(-) rename openwire-dotnet/src/OpenWire.Client/{Core => Commands}/ActiveMQDestination.cs (99%) rename openwire-dotnet/src/OpenWire.Client/{Core => Commands}/ActiveMQQueue.cs (95%) rename openwire-dotnet/src/OpenWire.Client/{Core => Commands}/ActiveMQTempQueue.cs (96%) rename openwire-dotnet/src/OpenWire.Client/{Core => Commands}/ActiveMQTempTopic.cs (96%) rename openwire-dotnet/src/OpenWire.Client/{Core => Commands}/ActiveMQTopic.cs (95%) create mode 100755 openwire-dotnet/src/OpenWire.Client/ConnectionClosedException.cs create mode 100755 openwire-dotnet/src/OpenWire.Client/ConsumerClosedException.cs create mode 100755 openwire-dotnet/src/OpenWire.Client/IMessage.cs create mode 100755 openwire-dotnet/src/OpenWire.Client/IMessageConsumer.cs create mode 100755 openwire-dotnet/src/OpenWire.Client/IMessageProducer.cs create mode 100755 openwire-dotnet/src/OpenWire.Client/ITextMessage.cs create mode 100755 openwire-dotnet/src/OpenWire.Client/MessageConsumer.cs create mode 100755 openwire-dotnet/src/OpenWire.Client/MessageProducer.cs create mode 100755 openwire-dotnet/src/OpenWire.Client/OpenWireException.cs diff --git a/openwire-dotnet/src/OpenWire.Client/Core/ActiveMQDestination.cs b/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQDestination.cs similarity index 99% rename from openwire-dotnet/src/OpenWire.Client/Core/ActiveMQDestination.cs rename to openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQDestination.cs index a1901bbdee..1d03e94089 100755 --- a/openwire-dotnet/src/OpenWire.Client/Core/ActiveMQDestination.cs +++ b/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQDestination.cs @@ -2,7 +2,7 @@ using System; using OpenWire.Client.Commands; using OpenWire.Client.Core; -namespace OpenWire.Client.Core { +namespace OpenWire.Client.Commands { /// /// Summary description for ActiveMQDestination. @@ -177,7 +177,7 @@ namespace OpenWire.Client.Core { * @return @throws JMSException * @throws javax.jms.JMSException */ - public static ActiveMQDestination transformDestination(IDestination destination) { + public static ActiveMQDestination Transform(IDestination destination) { ActiveMQDestination result = null; if (destination != null) { if (destination is ActiveMQDestination) { diff --git a/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQMessage.cs b/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQMessage.cs index d5296bc1b7..c00d6ece3e 100644 --- a/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQMessage.cs +++ b/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQMessage.cs @@ -13,26 +13,32 @@ using System.Collections; using OpenWire.Client; using OpenWire.Client.Core; -namespace OpenWire.Client.Commands -{ - public class ActiveMQMessage : Message - { - public const byte ID_ActiveMQMessage = 23; - +namespace OpenWire.Client.Commands { + public class ActiveMQMessage : Message, IMessage { + public const byte ID_ActiveMQMessage = 23; + + public static ActiveMQMessage Transform(IMessage message) { + return (ActiveMQMessage) message; + } + + // TODO generate Equals method + // TODO generate GetHashCode method + // TODO generate ToString method + + public override byte GetCommandType() { + return ID_ActiveMQMessage; + } + // Properties + public IDestination FromDestination { + get { + return Destination; + } + set { + this.Destination = ActiveMQDestination.Transform(value); + } + } - // TODO generate Equals method - // TODO generate GetHashCode method - // TODO generate ToString method - - - public override byte GetCommandType() { - return ID_ActiveMQMessage; - } - - - // Properties - - } + } } diff --git a/openwire-dotnet/src/OpenWire.Client/Core/ActiveMQQueue.cs b/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQQueue.cs similarity index 95% rename from openwire-dotnet/src/OpenWire.Client/Core/ActiveMQQueue.cs rename to openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQQueue.cs index 12e391e689..c367491c46 100755 --- a/openwire-dotnet/src/OpenWire.Client/Core/ActiveMQQueue.cs +++ b/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQQueue.cs @@ -3,7 +3,7 @@ using OpenWire.Client; using OpenWire.Client.Commands; using OpenWire.Client.Core; -namespace OpenWire.Client.Core { +namespace OpenWire.Client.Commands { /// /// Summary description for ActiveMQQueue. /// diff --git a/openwire-dotnet/src/OpenWire.Client/Core/ActiveMQTempQueue.cs b/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTempQueue.cs similarity index 96% rename from openwire-dotnet/src/OpenWire.Client/Core/ActiveMQTempQueue.cs rename to openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTempQueue.cs index b3e0694074..da25a2c895 100755 --- a/openwire-dotnet/src/OpenWire.Client/Core/ActiveMQTempQueue.cs +++ b/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTempQueue.cs @@ -3,7 +3,7 @@ using OpenWire.Client; using OpenWire.Client.Commands; using OpenWire.Client.Core; -namespace OpenWire.Client.Core { +namespace OpenWire.Client.Commands { /// /// Summary description for ActiveMQTempQueue. /// diff --git a/openwire-dotnet/src/OpenWire.Client/Core/ActiveMQTempTopic.cs b/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTempTopic.cs similarity index 96% rename from openwire-dotnet/src/OpenWire.Client/Core/ActiveMQTempTopic.cs rename to openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTempTopic.cs index a6ea40a255..d330cd30da 100755 --- a/openwire-dotnet/src/OpenWire.Client/Core/ActiveMQTempTopic.cs +++ b/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTempTopic.cs @@ -3,7 +3,7 @@ using OpenWire.Client; using OpenWire.Client.Commands; using OpenWire.Client.Core; -namespace OpenWire.Client.Core { +namespace OpenWire.Client.Commands { /// /// Summary description for ActiveMQTempTopic. /// diff --git a/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTextMessage.cs b/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTextMessage.cs index df71ce1402..be1d8fc857 100644 --- a/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTextMessage.cs +++ b/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTextMessage.cs @@ -13,26 +13,34 @@ using System.Collections; using OpenWire.Client; using OpenWire.Client.Core; -namespace OpenWire.Client.Commands -{ - public class ActiveMQTextMessage : ActiveMQMessage - { - public const byte ID_ActiveMQTextMessage = 28; - +namespace OpenWire.Client.Commands { + public class ActiveMQTextMessage : ActiveMQMessage, ITextMessage { + public const byte ID_ActiveMQTextMessage = 28; - - // TODO generate Equals method - // TODO generate GetHashCode method - // TODO generate ToString method + private String text; - public override byte GetCommandType() { - return ID_ActiveMQTextMessage; - } + // TODO generate Equals method + // TODO generate GetHashCode method + // TODO generate ToString method - // Properties + public override byte GetCommandType() { + return ID_ActiveMQTextMessage; + } - } + + // Properties + + public string Text { + get { + if (text == null) { + // TODO parse from the content + } + return text; + } + set { this.text = value; } + } + } } diff --git a/openwire-dotnet/src/OpenWire.Client/Core/ActiveMQTopic.cs b/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTopic.cs similarity index 95% rename from openwire-dotnet/src/OpenWire.Client/Core/ActiveMQTopic.cs rename to openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTopic.cs index 063fe2c0da..46dcddd55e 100755 --- a/openwire-dotnet/src/OpenWire.Client/Core/ActiveMQTopic.cs +++ b/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQTopic.cs @@ -3,7 +3,7 @@ using OpenWire.Client; using OpenWire.Client.Commands; using OpenWire.Client.Core; -namespace OpenWire.Client.Core { +namespace OpenWire.Client.Commands { /// /// Summary description for ActiveMQTopic. /// diff --git a/openwire-dotnet/src/OpenWire.Client/Connection.cs b/openwire-dotnet/src/OpenWire.Client/Connection.cs index 486f021ba9..0eec268ca6 100755 --- a/openwire-dotnet/src/OpenWire.Client/Connection.cs +++ b/openwire-dotnet/src/OpenWire.Client/Connection.cs @@ -9,10 +9,18 @@ namespace OpenWire.Client { /// public class Connection : IConnection { + private ConnectionInfo info; private Transport transport; IList sessions = new ArrayList(); private bool transacted; + private bool closed; private AcknowledgementMode acknowledgementMode; + private long sessionCounter; + + public Connection(Transport transport, ConnectionInfo info) { + this.transport = transport; + this.info = info; + } /// /// Creates a new session to work on this connection @@ -25,15 +33,19 @@ namespace OpenWire.Client { /// Creates a new session to work on this connection /// public ISession CreateSession(bool transacted, AcknowledgementMode acknowledgementMode) { - Session session = new Session(this, acknowledgementMode); + CheckClosed(); + SessionInfo info = CreateSessionInfo(transacted, acknowledgementMode); + Session session = new Session(this, info); sessions.Add(session); return session; } public void Dispose() { foreach (Session session in sessions) { - session.Dispose(); + session.Dispose(); } + sessions.Clear(); + closed = true; } // Properties @@ -51,6 +63,40 @@ namespace OpenWire.Client { public AcknowledgementMode AcknowledgementMode { get { return acknowledgementMode; } set { this.acknowledgementMode = value; } + } + + // Implementation methods + + /// + /// Performs a synchronous request-response with the broker + /// + public Response SyncRequest(Command command) { + CheckClosed(); + Response response = Transport.Request(command); + if (response is ExceptionResponse) { + ExceptionResponse exceptionResponse = (ExceptionResponse) response; + // TODO include stack trace + throw new OpenWireException("Request failed: " + exceptionResponse); + } + return response; + } + + + protected SessionInfo CreateSessionInfo(bool transacted, AcknowledgementMode acknowledgementMode) { + SessionInfo answer = new SessionInfo(); + SessionId sessionId = new SessionId(); + sessionId.ConnectionId = info.ConnectionId.Value; + lock (this) { + sessionId.Value = ++sessionCounter; + } + answer.SessionId = sessionId; + return answer; + } + + protected void CheckClosed() { + if (closed) { + throw new ConnectionClosedException(); + } } } } diff --git a/openwire-dotnet/src/OpenWire.Client/ConnectionClosedException.cs b/openwire-dotnet/src/OpenWire.Client/ConnectionClosedException.cs new file mode 100755 index 0000000000..11c97beed2 --- /dev/null +++ b/openwire-dotnet/src/OpenWire.Client/ConnectionClosedException.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections; +using OpenWire.Client.Commands; +using OpenWire.Client.Core; + +namespace OpenWire.Client { + /// + /// Exception thrown when a connection is used that it already closed + /// + public class ConnectionClosedException : OpenWireException { + public ConnectionClosedException() : base("The connection is already closed!") { + } + } +} diff --git a/openwire-dotnet/src/OpenWire.Client/ConsumerClosedException.cs b/openwire-dotnet/src/OpenWire.Client/ConsumerClosedException.cs new file mode 100755 index 0000000000..50a429c4d2 --- /dev/null +++ b/openwire-dotnet/src/OpenWire.Client/ConsumerClosedException.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections; +using OpenWire.Client.Commands; +using OpenWire.Client.Core; + +namespace OpenWire.Client { + /// + /// Exception thrown when a consumer is used that it already closed + /// + public class ConsumerClosedException : OpenWireException { + public ConsumerClosedException() : base("The consumer is already closed!") { + } + } +} diff --git a/openwire-dotnet/src/OpenWire.Client/IMessage.cs b/openwire-dotnet/src/OpenWire.Client/IMessage.cs new file mode 100755 index 0000000000..5b2cb75bd5 --- /dev/null +++ b/openwire-dotnet/src/OpenWire.Client/IMessage.cs @@ -0,0 +1,14 @@ +using System; +using OpenWire.Client.Commands; + +namespace OpenWire.Client { + /// + /// Represents a message either to be sent to a message broker or received from a message broker + /// + public interface IMessage { + + IDestination FromDestination { + get; + } + } +} diff --git a/openwire-dotnet/src/OpenWire.Client/IMessageConsumer.cs b/openwire-dotnet/src/OpenWire.Client/IMessageConsumer.cs new file mode 100755 index 0000000000..86870fc60b --- /dev/null +++ b/openwire-dotnet/src/OpenWire.Client/IMessageConsumer.cs @@ -0,0 +1,21 @@ +using System; +using OpenWire.Client.Commands; + +namespace OpenWire.Client { + /// + /// A consumer of messages + /// + public interface IMessageConsumer : IDisposable { + + /// + /// Waits until a message is available and returns it + /// + IMessage Receive(); + + /// + /// If a message is available immediately it is returned otherwise this method returns null + /// + IMessage ReceiveNoWait(); + + } +} diff --git a/openwire-dotnet/src/OpenWire.Client/IMessageProducer.cs b/openwire-dotnet/src/OpenWire.Client/IMessageProducer.cs new file mode 100755 index 0000000000..8ca6b6a019 --- /dev/null +++ b/openwire-dotnet/src/OpenWire.Client/IMessageProducer.cs @@ -0,0 +1,20 @@ +using System; +using OpenWire.Client.Commands; + +namespace OpenWire.Client { + /// + /// An object capable of sending messages to some destination + /// + public interface IMessageProducer : IDisposable { + + /// + /// Sends the message to the default destination for this producer + /// + void Send(IMessage message); + + /// + /// Sends the message to the given destination + /// + void Send(IDestination destination, IMessage message); + } +} diff --git a/openwire-dotnet/src/OpenWire.Client/ISession.cs b/openwire-dotnet/src/OpenWire.Client/ISession.cs index ff81479404..92bb54e2df 100755 --- a/openwire-dotnet/src/OpenWire.Client/ISession.cs +++ b/openwire-dotnet/src/OpenWire.Client/ISession.cs @@ -6,6 +6,39 @@ namespace OpenWire.Client { /// Represents a single unit of work on an IConnection. /// So the ISession can be used to perform transactional receive and sends /// - public interface ISession { + public interface ISession : IDisposable { + + + /// + /// Creates a producer of messages + /// + IMessageProducer CreateProducer(); + + /// + /// Creates a producer of messages on a given destination + /// + IMessageProducer CreateProducer(IDestination destination); + + /// + /// Creates a cpmsi,er of messages on a given destination + /// + IMessageConsumer CreateConsumer(IDestination destination); + + /// + /// Creates a cpmsi,er of messages on a given destination with a selector + /// + IMessageConsumer CreateConsumer(IDestination destination, string selector); + + /// + /// Returns the queue for the given name + /// + IQueue GetQueue(string name); + + /// + /// Returns the topic for the given name + /// + ITopic GetTopic(string name); + + } } diff --git a/openwire-dotnet/src/OpenWire.Client/ITextMessage.cs b/openwire-dotnet/src/OpenWire.Client/ITextMessage.cs new file mode 100755 index 0000000000..778808f571 --- /dev/null +++ b/openwire-dotnet/src/OpenWire.Client/ITextMessage.cs @@ -0,0 +1,15 @@ +using System; +using OpenWire.Client.Commands; + +namespace OpenWire.Client { + /// + /// Represents a text based message + /// + public interface ITextMessage : IMessage { + + string Text { + get; + set; + } + } +} diff --git a/openwire-dotnet/src/OpenWire.Client/MessageConsumer.cs b/openwire-dotnet/src/OpenWire.Client/MessageConsumer.cs new file mode 100755 index 0000000000..3ec1f4aa6b --- /dev/null +++ b/openwire-dotnet/src/OpenWire.Client/MessageConsumer.cs @@ -0,0 +1,42 @@ +using System; +using OpenWire.Client.Commands; + +namespace OpenWire.Client { + /// + /// An object capable of receiving messages from some destination + /// + public class MessageConsumer : IMessageConsumer { + + private Session session; + private ConsumerInfo info; + private bool closed; + + public MessageConsumer(Session session, ConsumerInfo info) { + this.session = session; + this.info = info; + } + + public IMessage Receive() { + CheckClosed(); + // TODO + return null; + } + + public IMessage ReceiveNoWait() { + CheckClosed(); + // TODO + return null; + } + + public void Dispose() { + session.DisposeOf(info.ConsumerId); + closed = true; + } + + protected void CheckClosed() { + if (closed) { + throw new ConnectionClosedException(); + } + } + } +} diff --git a/openwire-dotnet/src/OpenWire.Client/MessageProducer.cs b/openwire-dotnet/src/OpenWire.Client/MessageProducer.cs new file mode 100755 index 0000000000..03a029d23f --- /dev/null +++ b/openwire-dotnet/src/OpenWire.Client/MessageProducer.cs @@ -0,0 +1,30 @@ +using System; +using OpenWire.Client.Commands; + +namespace OpenWire.Client { + /// + /// An object capable of sending messages to some destination + /// + public class MessageProducer : IMessageProducer { + + private Session session; + private ProducerInfo info; + + public MessageProducer(Session session, ProducerInfo info) { + this.session = session; + this.info = info; + } + + public void Send(IMessage message) { + Send(info.Destination, message); + } + + public void Send(IDestination destination, IMessage message) { + session.DoSend(destination, message); + } + + public void Dispose() { + session.DisposeOf(info.ProducerId); + } + } +} diff --git a/openwire-dotnet/src/OpenWire.Client/OpenWireException.cs b/openwire-dotnet/src/OpenWire.Client/OpenWireException.cs new file mode 100755 index 0000000000..d48d7a52a0 --- /dev/null +++ b/openwire-dotnet/src/OpenWire.Client/OpenWireException.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections; +using OpenWire.Client.Commands; +using OpenWire.Client.Core; + +namespace OpenWire.Client { + /// + /// Represents an OpenWire exception + /// + public class OpenWireException : Exception { + public OpenWireException(string message) : base(message) { + } + } +} diff --git a/openwire-dotnet/src/OpenWire.Client/Session.cs b/openwire-dotnet/src/OpenWire.Client/Session.cs index f6131b8651..1fabdf33c1 100755 --- a/openwire-dotnet/src/OpenWire.Client/Session.cs +++ b/openwire-dotnet/src/OpenWire.Client/Session.cs @@ -6,23 +6,86 @@ namespace OpenWire.Client { /// /// Default provider of ISession /// - public class Session : ISession, IDisposable { + public class Session : ISession { private Connection connection; private AcknowledgementMode acknowledgementMode; + private SessionInfo info; + private long consumerCounter; - public Session(Connection connection, AcknowledgementMode acknowledgementMode) { + public Session(Connection connection, SessionInfo info) { this.connection = connection; - this.acknowledgementMode = acknowledgementMode; + this.info = info; } public void Dispose() { - } - + DisposeOf(info.SessionId); + } + + public IMessageProducer CreateProducer() { + return CreateProducer(null); + } + + public IMessageProducer CreateProducer(IDestination destination) { + ProducerInfo command = CreateProducerInfo(destination); + connection.SyncRequest(command); + return new MessageProducer(this, command); + } + public void Acknowledge(Message message) { if (acknowledgementMode == AcknowledgementMode.ClientAcknowledge) { MessageAck ack = new MessageAck(); - connection.Transport.Request(ack); - } + // TODO complete packet + connection.SyncRequest(ack); + } } + + public IMessageConsumer CreateConsumer(IDestination destination) { + return CreateConsumer(destination, null); + } + + public IMessageConsumer CreateConsumer(IDestination destination, string selector) { + ConsumerInfo command = CreateConsumerInfo(destination, selector); + connection.SyncRequest(command); + return new MessageConsumer(this, command); + } + + public IQueue GetQueue(string name) { + return new ActiveMQQueue(name); + } + + public ITopic GetTopic(string name) { + return new ActiveMQTopic(name); + } + + // Implementation methods + public void DoSend(IDestination destination, IMessage message) { + ActiveMQMessage command = ActiveMQMessage.Transform(message); + // TODO complete packet + connection.SyncRequest(command); + } + + public void DisposeOf(DataStructure objectId) { + RemoveInfo command = new RemoveInfo(); + command.ObjectId = objectId; + connection.SyncRequest(command); + } + + protected ConsumerInfo CreateConsumerInfo(IDestination destination, string selector) { + ConsumerInfo answer = new ConsumerInfo(); + ConsumerId consumerId = new ConsumerId(); + consumerId.SessionId = info.SessionId.Value; + lock (this) { + consumerId.Value = ++consumerCounter; + } + // TODO complete packet + answer.ConsumerId = consumerId; + return answer; + } + + protected ProducerInfo CreateProducerInfo(IDestination destination) { + ProducerInfo info = new ProducerInfo(); + // TODO complete packet + return info; + } } }