added first cut of a near complete OpenWire.Net API with a shell of an implementation

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@367605 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-01-10 13:56:34 +00:00
parent fb53ad9cf3
commit 611e335cfc
19 changed files with 390 additions and 50 deletions

View File

@ -2,7 +2,7 @@ using System;
using OpenWire.Client.Commands;
using OpenWire.Client.Core;
namespace OpenWire.Client.Core {
namespace OpenWire.Client.Commands {
/// <summary>
/// Summary description for ActiveMQDestination.
@ -177,7 +177,7 @@ namespace OpenWire.Client.Core {
* @return @throws JMSException
* @throws javax.jms.JMSException
*/
public static ActiveMQDestination transformDestination(IDestination destination) {
public static ActiveMQDestination Transform(IDestination destination) {
ActiveMQDestination result = null;
if (destination != null) {
if (destination is ActiveMQDestination) {

View File

@ -13,26 +13,32 @@ using System.Collections;
using OpenWire.Client;
using OpenWire.Client.Core;
namespace OpenWire.Client.Commands
{
public class ActiveMQMessage : Message
{
public const byte ID_ActiveMQMessage = 23;
namespace OpenWire.Client.Commands {
public class ActiveMQMessage : Message, IMessage {
public const byte ID_ActiveMQMessage = 23;
public static ActiveMQMessage Transform(IMessage message) {
return (ActiveMQMessage) message;
}
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
public override byte GetCommandType() {
return ID_ActiveMQMessage;
}
// Properties
public IDestination FromDestination {
get {
return Destination;
}
set {
this.Destination = ActiveMQDestination.Transform(value);
}
}
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
public override byte GetCommandType() {
return ID_ActiveMQMessage;
}
// Properties
}
}
}

View File

@ -3,7 +3,7 @@ using OpenWire.Client;
using OpenWire.Client.Commands;
using OpenWire.Client.Core;
namespace OpenWire.Client.Core {
namespace OpenWire.Client.Commands {
/// <summary>
/// Summary description for ActiveMQQueue.
/// </summary>

View File

@ -3,7 +3,7 @@ using OpenWire.Client;
using OpenWire.Client.Commands;
using OpenWire.Client.Core;
namespace OpenWire.Client.Core {
namespace OpenWire.Client.Commands {
/// <summary>
/// Summary description for ActiveMQTempQueue.
/// </summary>

View File

@ -3,7 +3,7 @@ using OpenWire.Client;
using OpenWire.Client.Commands;
using OpenWire.Client.Core;
namespace OpenWire.Client.Core {
namespace OpenWire.Client.Commands {
/// <summary>
/// Summary description for ActiveMQTempTopic.
/// </summary>

View File

@ -13,26 +13,34 @@ using System.Collections;
using OpenWire.Client;
using OpenWire.Client.Core;
namespace OpenWire.Client.Commands
{
public class ActiveMQTextMessage : ActiveMQMessage
{
public const byte ID_ActiveMQTextMessage = 28;
namespace OpenWire.Client.Commands {
public class ActiveMQTextMessage : ActiveMQMessage, ITextMessage {
public const byte ID_ActiveMQTextMessage = 28;
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
private String text;
public override byte GetCommandType() {
return ID_ActiveMQTextMessage;
}
// TODO generate Equals method
// TODO generate GetHashCode method
// TODO generate ToString method
// Properties
public override byte GetCommandType() {
return ID_ActiveMQTextMessage;
}
}
// Properties
public string Text {
get {
if (text == null) {
// TODO parse from the content
}
return text;
}
set { this.text = value; }
}
}
}

View File

@ -3,7 +3,7 @@ using OpenWire.Client;
using OpenWire.Client.Commands;
using OpenWire.Client.Core;
namespace OpenWire.Client.Core {
namespace OpenWire.Client.Commands {
/// <summary>
/// Summary description for ActiveMQTopic.
/// </summary>

View File

@ -9,10 +9,18 @@ namespace OpenWire.Client {
/// </summary>
public class Connection : IConnection {
private ConnectionInfo info;
private Transport transport;
IList sessions = new ArrayList();
private bool transacted;
private bool closed;
private AcknowledgementMode acknowledgementMode;
private long sessionCounter;
public Connection(Transport transport, ConnectionInfo info) {
this.transport = transport;
this.info = info;
}
/// <summary>
/// Creates a new session to work on this connection
@ -25,15 +33,19 @@ namespace OpenWire.Client {
/// Creates a new session to work on this connection
/// </summary>
public ISession CreateSession(bool transacted, AcknowledgementMode acknowledgementMode) {
Session session = new Session(this, acknowledgementMode);
CheckClosed();
SessionInfo info = CreateSessionInfo(transacted, acknowledgementMode);
Session session = new Session(this, info);
sessions.Add(session);
return session;
}
public void Dispose() {
foreach (Session session in sessions) {
session.Dispose();
session.Dispose();
}
sessions.Clear();
closed = true;
}
// Properties
@ -51,6 +63,40 @@ namespace OpenWire.Client {
public AcknowledgementMode AcknowledgementMode {
get { return acknowledgementMode; }
set { this.acknowledgementMode = value; }
}
// Implementation methods
/// <summary>
/// Performs a synchronous request-response with the broker
/// </summary>
public Response SyncRequest(Command command) {
CheckClosed();
Response response = Transport.Request(command);
if (response is ExceptionResponse) {
ExceptionResponse exceptionResponse = (ExceptionResponse) response;
// TODO include stack trace
throw new OpenWireException("Request failed: " + exceptionResponse);
}
return response;
}
protected SessionInfo CreateSessionInfo(bool transacted, AcknowledgementMode acknowledgementMode) {
SessionInfo answer = new SessionInfo();
SessionId sessionId = new SessionId();
sessionId.ConnectionId = info.ConnectionId.Value;
lock (this) {
sessionId.Value = ++sessionCounter;
}
answer.SessionId = sessionId;
return answer;
}
protected void CheckClosed() {
if (closed) {
throw new ConnectionClosedException();
}
}
}
}

View File

@ -0,0 +1,14 @@
using System;
using System.Collections;
using OpenWire.Client.Commands;
using OpenWire.Client.Core;
namespace OpenWire.Client {
/// <summary>
/// Exception thrown when a connection is used that it already closed
/// </summary>
public class ConnectionClosedException : OpenWireException {
public ConnectionClosedException() : base("The connection is already closed!") {
}
}
}

View File

@ -0,0 +1,14 @@
using System;
using System.Collections;
using OpenWire.Client.Commands;
using OpenWire.Client.Core;
namespace OpenWire.Client {
/// <summary>
/// Exception thrown when a consumer is used that it already closed
/// </summary>
public class ConsumerClosedException : OpenWireException {
public ConsumerClosedException() : base("The consumer is already closed!") {
}
}
}

View File

@ -0,0 +1,14 @@
using System;
using OpenWire.Client.Commands;
namespace OpenWire.Client {
/// <summary>
/// Represents a message either to be sent to a message broker or received from a message broker
/// </summary>
public interface IMessage {
IDestination FromDestination {
get;
}
}
}

View File

@ -0,0 +1,21 @@
using System;
using OpenWire.Client.Commands;
namespace OpenWire.Client {
/// <summary>
/// A consumer of messages
/// </summary>
public interface IMessageConsumer : IDisposable {
/// <summary>
/// Waits until a message is available and returns it
/// </summary>
IMessage Receive();
/// <summary>
/// If a message is available immediately it is returned otherwise this method returns null
/// </summary>
IMessage ReceiveNoWait();
}
}

View File

@ -0,0 +1,20 @@
using System;
using OpenWire.Client.Commands;
namespace OpenWire.Client {
/// <summary>
/// An object capable of sending messages to some destination
/// </summary>
public interface IMessageProducer : IDisposable {
/// <summary>
/// Sends the message to the default destination for this producer
/// </summary>
void Send(IMessage message);
/// <summary>
/// Sends the message to the given destination
/// </summary>
void Send(IDestination destination, IMessage message);
}
}

View File

@ -6,6 +6,39 @@ namespace OpenWire.Client {
/// Represents a single unit of work on an IConnection.
/// So the ISession can be used to perform transactional receive and sends
/// </summary>
public interface ISession {
public interface ISession : IDisposable {
/// <summary>
/// Creates a producer of messages
/// </summary>
IMessageProducer CreateProducer();
/// <summary>
/// Creates a producer of messages on a given destination
/// </summary>
IMessageProducer CreateProducer(IDestination destination);
/// <summary>
/// Creates a cpmsi,er of messages on a given destination
/// </summary>
IMessageConsumer CreateConsumer(IDestination destination);
/// <summary>
/// Creates a cpmsi,er of messages on a given destination with a selector
/// </summary>
IMessageConsumer CreateConsumer(IDestination destination, string selector);
/// <summary>
/// Returns the queue for the given name
/// </summary>
IQueue GetQueue(string name);
/// <summary>
/// Returns the topic for the given name
/// </summary>
ITopic GetTopic(string name);
}
}

View File

@ -0,0 +1,15 @@
using System;
using OpenWire.Client.Commands;
namespace OpenWire.Client {
/// <summary>
/// Represents a text based message
/// </summary>
public interface ITextMessage : IMessage {
string Text {
get;
set;
}
}
}

View File

@ -0,0 +1,42 @@
using System;
using OpenWire.Client.Commands;
namespace OpenWire.Client {
/// <summary>
/// An object capable of receiving messages from some destination
/// </summary>
public class MessageConsumer : IMessageConsumer {
private Session session;
private ConsumerInfo info;
private bool closed;
public MessageConsumer(Session session, ConsumerInfo info) {
this.session = session;
this.info = info;
}
public IMessage Receive() {
CheckClosed();
// TODO
return null;
}
public IMessage ReceiveNoWait() {
CheckClosed();
// TODO
return null;
}
public void Dispose() {
session.DisposeOf(info.ConsumerId);
closed = true;
}
protected void CheckClosed() {
if (closed) {
throw new ConnectionClosedException();
}
}
}
}

View File

@ -0,0 +1,30 @@
using System;
using OpenWire.Client.Commands;
namespace OpenWire.Client {
/// <summary>
/// An object capable of sending messages to some destination
/// </summary>
public class MessageProducer : IMessageProducer {
private Session session;
private ProducerInfo info;
public MessageProducer(Session session, ProducerInfo info) {
this.session = session;
this.info = info;
}
public void Send(IMessage message) {
Send(info.Destination, message);
}
public void Send(IDestination destination, IMessage message) {
session.DoSend(destination, message);
}
public void Dispose() {
session.DisposeOf(info.ProducerId);
}
}
}

View File

@ -0,0 +1,14 @@
using System;
using System.Collections;
using OpenWire.Client.Commands;
using OpenWire.Client.Core;
namespace OpenWire.Client {
/// <summary>
/// Represents an OpenWire exception
/// </summary>
public class OpenWireException : Exception {
public OpenWireException(string message) : base(message) {
}
}
}

View File

@ -6,23 +6,86 @@ namespace OpenWire.Client {
/// <summary>
/// Default provider of ISession
/// </summary>
public class Session : ISession, IDisposable {
public class Session : ISession {
private Connection connection;
private AcknowledgementMode acknowledgementMode;
private SessionInfo info;
private long consumerCounter;
public Session(Connection connection, AcknowledgementMode acknowledgementMode) {
public Session(Connection connection, SessionInfo info) {
this.connection = connection;
this.acknowledgementMode = acknowledgementMode;
this.info = info;
}
public void Dispose() {
}
DisposeOf(info.SessionId);
}
public IMessageProducer CreateProducer() {
return CreateProducer(null);
}
public IMessageProducer CreateProducer(IDestination destination) {
ProducerInfo command = CreateProducerInfo(destination);
connection.SyncRequest(command);
return new MessageProducer(this, command);
}
public void Acknowledge(Message message) {
if (acknowledgementMode == AcknowledgementMode.ClientAcknowledge) {
MessageAck ack = new MessageAck();
connection.Transport.Request(ack);
}
// TODO complete packet
connection.SyncRequest(ack);
}
}
public IMessageConsumer CreateConsumer(IDestination destination) {
return CreateConsumer(destination, null);
}
public IMessageConsumer CreateConsumer(IDestination destination, string selector) {
ConsumerInfo command = CreateConsumerInfo(destination, selector);
connection.SyncRequest(command);
return new MessageConsumer(this, command);
}
public IQueue GetQueue(string name) {
return new ActiveMQQueue(name);
}
public ITopic GetTopic(string name) {
return new ActiveMQTopic(name);
}
// Implementation methods
public void DoSend(IDestination destination, IMessage message) {
ActiveMQMessage command = ActiveMQMessage.Transform(message);
// TODO complete packet
connection.SyncRequest(command);
}
public void DisposeOf(DataStructure objectId) {
RemoveInfo command = new RemoveInfo();
command.ObjectId = objectId;
connection.SyncRequest(command);
}
protected ConsumerInfo CreateConsumerInfo(IDestination destination, string selector) {
ConsumerInfo answer = new ConsumerInfo();
ConsumerId consumerId = new ConsumerId();
consumerId.SessionId = info.SessionId.Value;
lock (this) {
consumerId.Value = ++consumerCounter;
}
// TODO complete packet
answer.ConsumerId = consumerId;
return answer;
}
protected ProducerInfo CreateProducerInfo(IDestination destination) {
ProducerInfo info = new ProducerInfo();
// TODO complete packet
return info;
}
}
}