moved the Transport API into the Core package and put an initial JMS-like API in place

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@367583 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-01-10 12:26:05 +00:00
parent 6dacb8eced
commit 89000d1b43
16 changed files with 226 additions and 23 deletions

View File

@ -0,0 +1,56 @@
using System;
using System.Collections;
using OpenWire.Client.Commands;
using OpenWire.Client.Core;
namespace OpenWire.Client {
/// <summary>
/// Represents a connection with a message broker
/// </summary>
public class Connection : IConnection {
private Transport transport;
IList sessions = new ArrayList();
private bool transacted;
private AcknowledgementMode acknowledgementMode;
/// <summary>
/// Creates a new session to work on this connection
/// </summary>
public ISession CreateSession() {
return CreateSession(transacted, acknowledgementMode);
}
/// <summary>
/// Creates a new session to work on this connection
/// </summary>
public ISession CreateSession(bool transacted, AcknowledgementMode acknowledgementMode) {
Session session = new Session(this, acknowledgementMode);
sessions.Add(session);
return session;
}
public void Dispose() {
foreach (Session session in sessions) {
session.Dispose();
}
}
// Properties
public Transport Transport {
get { return transport; }
set { this.transport = value; }
}
public bool Transacted {
get { return transacted; }
set { this.transacted = value; }
}
public AcknowledgementMode AcknowledgementMode {
get { return acknowledgementMode; }
set { this.acknowledgementMode = value; }
}
}
}

View File

@ -7,7 +7,7 @@ namespace OpenWire.Client.Core {
/// <summary>
/// Summary description for ActiveMQDestination.
/// </summary>
public abstract class ActiveMQDestination : AbstractCommand, Destination {
public abstract class ActiveMQDestination : AbstractCommand, IDestination {
/**
* Topic Destination object
@ -165,7 +165,7 @@ namespace OpenWire.Client.Core {
* @return a descriptive string for this queue or topic
*/
public static String Inspect(ActiveMQDestination destination) {
if (destination is Topic) {
if (destination is ITopic) {
return "Topic(" + destination.ToString() + ")";
} else {
return "Queue(" + destination.ToString() + ")";
@ -177,20 +177,20 @@ namespace OpenWire.Client.Core {
* @return @throws JMSException
* @throws javax.jms.JMSException
*/
public static ActiveMQDestination transformDestination(Destination destination) {
public static ActiveMQDestination transformDestination(IDestination destination) {
ActiveMQDestination result = null;
if (destination != null) {
if (destination is ActiveMQDestination) {
result = (ActiveMQDestination) destination;
} else {
if (destination is TemporaryQueue) {
result = new ActiveMQTempQueue(((Queue) destination).QueueName);
} else if (destination is TemporaryTopic) {
result = new ActiveMQTempTopic(((Topic) destination).TopicName);
} else if (destination is Queue) {
result = new ActiveMQQueue(((Queue) destination).QueueName);
} else if (destination is Topic) {
result = new ActiveMQTopic(((Topic) destination).TopicName);
if (destination is ITemporaryQueue) {
result = new ActiveMQTempQueue(((IQueue) destination).QueueName);
} else if (destination is ITemporaryTopic) {
result = new ActiveMQTempTopic(((ITopic) destination).TopicName);
} else if (destination is IQueue) {
result = new ActiveMQQueue(((IQueue) destination).QueueName);
} else if (destination is ITopic) {
result = new ActiveMQTopic(((ITopic) destination).TopicName);
}
}
}

View File

@ -7,7 +7,7 @@ namespace OpenWire.Client.Core {
/// <summary>
/// Summary description for ActiveMQQueue.
/// </summary>
public class ActiveMQQueue : ActiveMQDestination, Queue {
public class ActiveMQQueue : ActiveMQDestination, IQueue {
public const byte ID_ActiveMQQueue = 100;
public ActiveMQQueue() : base() {

View File

@ -7,7 +7,7 @@ namespace OpenWire.Client.Core {
/// <summary>
/// Summary description for ActiveMQTempQueue.
/// </summary>
public class ActiveMQTempQueue : ActiveMQDestination, TemporaryQueue {
public class ActiveMQTempQueue : ActiveMQDestination, ITemporaryQueue {
public const byte ID_ActiveMQTempQueue = 102;
public ActiveMQTempQueue() : base() {

View File

@ -7,7 +7,7 @@ namespace OpenWire.Client.Core {
/// <summary>
/// Summary description for ActiveMQTempTopic.
/// </summary>
public class ActiveMQTempTopic : ActiveMQDestination, TemporaryTopic {
public class ActiveMQTempTopic : ActiveMQDestination, ITemporaryTopic {
public const byte ID_ActiveMQTempTopic = 103;
public ActiveMQTempTopic() : base() {

View File

@ -7,7 +7,7 @@ namespace OpenWire.Client.Core {
/// <summary>
/// Summary description for ActiveMQTopic.
/// </summary>
public class ActiveMQTopic : ActiveMQDestination, Topic {
public class ActiveMQTopic : ActiveMQDestination, ITopic {
public const byte ID_ActiveMQTopic = 101;
public ActiveMQTopic() : base() {

View File

@ -0,0 +1,44 @@
using System;
using System.Threading;
using OpenWire.Client;
using OpenWire.Client.Commands;
namespace OpenWire.Client.Core {
/// <summary>
/// Handles asynchronous responses
/// </summary>
public class FutureResponse : IAsyncResult {
private Response response;
private Mutex asyncWaitHandle = new Mutex();
private bool isCompleted;
public WaitHandle AsyncWaitHandle {
get { return asyncWaitHandle; }
}
public object AsyncState {
get { return response; }
set { response = (Response) value; }
}
public bool IsCompleted {
get { return isCompleted; }
}
public bool CompletedSynchronously {
get { return false; }
}
public Response Response {
get { return response; }
set {
asyncWaitHandle.WaitOne();
response = value;
isCompleted = true;
asyncWaitHandle.ReleaseMutex();
}
}
}
}

View File

@ -0,0 +1,25 @@
using System;
using OpenWire.Client;
using OpenWire.Client.Commands;
using OpenWire.Client.Core;
namespace OpenWire.Client.Core {
public delegate void CommandHandler(Transport sender, Command command);
public delegate void ExceptionHandler(Transport sender, Exception command);
/// <summary>
/// Represents the logical networking transport layer.
/// </summary>
public interface Transport {
void Oneway(Command command);
FutureResponse AsyncRequest(Command command);
Response Request(Command command);
event CommandHandler Command;
event ExceptionHandler Exception;
}
}

View File

@ -0,0 +1,39 @@
using System;
using OpenWire.Client.Commands;
namespace OpenWire.Client {
public enum AcknowledgementMode {
Unknown, AutoAcknowledge, ClientAcknowledge, Transactional
}
/// <summary>
/// Represents a connection with a message broker
/// </summary>
public interface IConnection : IDisposable {
/// <summary>
/// Creates a new session to work on this connection
/// </summary>
ISession CreateSession();
/// <summary>
/// Creates a new session to work on this connection
/// </summary>
ISession CreateSession(bool transacted, AcknowledgementMode acknowledgementMode);
// Properties
bool Transacted {
get;
set;
}
AcknowledgementMode AcknowledgementMode {
get;
set;
}
}
}

View File

@ -5,6 +5,6 @@ namespace OpenWire.Client {
/// <summary>
/// Summary description for Destination.
/// </summary>
public interface Destination {
public interface IDestination {
}
}

View File

@ -3,9 +3,9 @@ using OpenWire.Client.Commands;
namespace OpenWire.Client {
/// <summary>
/// Summary description for Queue.
/// Summary description for IQueue.
/// </summary>
public interface Queue : Destination {
public interface IQueue : IDestination {
String QueueName {
get;

View File

@ -0,0 +1,11 @@
using System;
using OpenWire.Client.Commands;
namespace OpenWire.Client {
/// <summary>
/// Represents a single unit of work on an IConnection.
/// So the ISession can be used to perform transactional receive and sends
/// </summary>
public interface ISession {
}
}

View File

@ -3,8 +3,8 @@ using OpenWire.Client.Commands;
namespace OpenWire.Client {
/// <summary>
/// Summary description for TemporaryQueue.
/// Summary description for ITemporaryQueue.
/// </summary>
public interface TemporaryQueue : Destination {
public interface ITemporaryQueue : IDestination {
}
}

View File

@ -5,6 +5,6 @@ namespace OpenWire.Client {
/// <summary>
/// Summary description for TemporaryTopic.
/// </summary>
public interface TemporaryTopic : Destination {
public interface ITemporaryTopic : IDestination {
}
}

View File

@ -3,9 +3,9 @@ using OpenWire.Client.Commands;
namespace OpenWire.Client {
/// <summary>
/// Summary description for Topic.
/// Summary description for ITopic.
/// </summary>
public interface Topic : Destination {
public interface ITopic : IDestination {
String TopicName {
get;

View File

@ -0,0 +1,28 @@
using System;
using OpenWire.Client.Commands;
using OpenWire.Client.Core;
namespace OpenWire.Client {
/// <summary>
/// Default provider of ISession
/// </summary>
public class Session : ISession, IDisposable {
private Connection connection;
private AcknowledgementMode acknowledgementMode;
public Session(Connection connection, AcknowledgementMode acknowledgementMode) {
this.connection = connection;
this.acknowledgementMode = acknowledgementMode;
}
public void Dispose() {
}
public void Acknowledge(Message message) {
if (acknowledgementMode == AcknowledgementMode.ClientAcknowledge) {
MessageAck ack = new MessageAck();
connection.Transport.Request(ack);
}
}
}
}