added support for explicit client acknowledgement of messages or for auto-acknowledge

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@380656 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-02-24 12:47:14 +00:00
parent f0300f1251
commit 15dc02a88d
8 changed files with 135 additions and 72 deletions

View File

@ -22,6 +22,8 @@ using OpenWire.Client.Core;
namespace OpenWire.Client.Commands
{
public delegate void AcknowledgeHandler(ActiveMQMessage message);
public class ActiveMQMessage : Message, IMessage, MarshallAware
{
public const byte ID_ActiveMQMessage = 23;
@ -30,7 +32,7 @@ namespace OpenWire.Client.Commands
private PrimitiveMap properties;
public event AcknowledgeHandler Acknowledger;
public static ActiveMQMessage Transform(IMessage message)
{
@ -46,6 +48,15 @@ namespace OpenWire.Client.Commands
return ID_ActiveMQMessage;
}
public void Acknowledge()
{
if (Acknowledger == null){
throw new OpenWireException("No Acknowledger has been associated with this message: " + this);}
else {
Acknowledger(this);
}
}
// Properties

View File

@ -21,7 +21,7 @@ namespace OpenWire.Client
private bool transacted;
private bool connected;
private bool closed;
private AcknowledgementMode acknowledgementMode;
private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
private long sessionCounter;
private long temporaryDestinationCounter;
private IDictionary consumers = new Hashtable(); // TODO threadsafe
@ -35,20 +35,20 @@ namespace OpenWire.Client
this.transport.Start();
}
/// <summary>
/// Starts message delivery for this connection.
/// </summary>
public void Start()
{
}
/// <summary>
/// Starts message delivery for this connection.
/// </summary>
public void Start()
{
}
/// <summary>
/// Stop message delivery for this connection.
/// </summary>
public void Stop()
{
}
/// Stop message delivery for this connection.
/// </summary>
public void Stop()
{
}
/// <summary>
/// Creates a new session to work on this connection
@ -66,7 +66,7 @@ namespace OpenWire.Client
CheckConnected();
SessionInfo info = CreateSessionInfo(transacted, acknowledgementMode);
SyncRequest(info);
Session session = new Session(this, info);
Session session = new Session(this, info, acknowledgementMode);
sessions.Add(session);
return session;
}

View File

@ -25,6 +25,12 @@ namespace OpenWire.Client
public interface IMessage
{
/// <summary>
/// If using client acknowledgement mode on the session then this method will acknowledge that the
/// message has been processed correctly.
/// </summary>
void Acknowledge();
/// <summary>
/// Provides access to the message properties (headers)
/// </summary>

View File

@ -27,12 +27,12 @@ namespace OpenWire.Client
public interface IMessageConsumer : IDisposable
{
/// <summary>
/// Waits until a message is available and returns it
/// </summary>
IMessage Receive();
/// <summary>
/// <summary>
/// Waits until a message is available and returns it
/// </summary>
IMessage Receive();
/// <summary>
/// If a message is available within the timeout duration it is returned otherwise this method returns null
/// </summary>
IMessage Receive(long timeout);
@ -46,6 +46,5 @@ namespace OpenWire.Client
/// An asynchronous listener which can be used to consume messages asynchronously
/// </summary>
event MessageHandler Listener;
}
}

View File

@ -26,9 +26,6 @@ namespace OpenWire.Client
public interface ISession : IDisposable
{
/// <summary>
/// Creates a producer of messages
/// </summary>
@ -50,9 +47,9 @@ namespace OpenWire.Client
IMessageConsumer CreateConsumer(IDestination destination, string selector);
/// <summary>
/// Creates a named durable consumer of messages on a given destination with a selector
/// </summary>
IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal);
/// Creates a named durable consumer of messages on a given destination with a selector
/// </summary>
IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal);
/// <summary>
/// Returns the queue for the given name

View File

@ -22,6 +22,13 @@ using OpenWire.Client.Core;
namespace OpenWire.Client
{
public enum AckType {
DeliveredAck = 0, // Message delivered but not consumed
ConsumedAck = 1, // Message consumed, discard
PoisonAck = 2 // Message could not be processed due to poison pill but discard anyway
}
/// <summary>
/// An object capable of receiving messages from some destination
/// </summary>
@ -30,15 +37,17 @@ namespace OpenWire.Client
private Session session;
private ConsumerInfo info;
private AcknowledgementMode acknowledgementMode;
private bool closed;
private Dispatcher dispatcher = new Dispatcher();
public event MessageHandler Listener;
public MessageConsumer(Session session, ConsumerInfo info)
public MessageConsumer(Session session, ConsumerInfo info, AcknowledgementMode acknowledgementMode)
{
this.session = session;
this.info = info;
this.acknowledgementMode = acknowledgementMode;
}
/// <summary>
@ -53,21 +62,23 @@ namespace OpenWire.Client
public IMessage Receive()
{
CheckClosed();
return dispatcher.Dequeue();
return AutoAcknowledge(dispatcher.Dequeue());
}
public IMessage Receive(long timeout)
{
CheckClosed();
return dispatcher.Dequeue(timeout);
return AutoAcknowledge(dispatcher.Dequeue(timeout));
}
public IMessage ReceiveNoWait()
{
CheckClosed();
return dispatcher.DequeueNoWait();
return AutoAcknowledge(dispatcher.DequeueNoWait());
}
public void Dispose()
{
session.DisposeOf(info.ConsumerId);
@ -81,5 +92,53 @@ namespace OpenWire.Client
throw new ConnectionClosedException();
}
}
protected IMessage AutoAcknowledge(IMessage message)
{
if (message is ActiveMQMessage)
{
ActiveMQMessage activeMessage = (ActiveMQMessage) message;
// lets register the handler for client acknowledgment
activeMessage.Acknowledger += new AcknowledgeHandler(DoClientAcknowledge);
if (acknowledgementMode != AcknowledgementMode.ClientAcknowledge)
{
DoAcknowledge(activeMessage);
}
}
return message;
}
protected void DoClientAcknowledge(Message message)
{
if (acknowledgementMode == AcknowledgementMode.ClientAcknowledge)
{
DoAcknowledge(message);
}
}
protected void DoAcknowledge(Message message)
{
MessageAck ack = CreateMessageAck(message);
//Console.WriteLine("Sending Ack: " + ack);
session.Connection.SyncRequest(ack);
}
protected virtual MessageAck CreateMessageAck(Message message)
{
MessageAck ack = new MessageAck();
ack.AckType = (int) AckType.ConsumedAck;
ack.ConsumerId = info.ConsumerId;
ack.Destination = message.Destination;
ack.FirstMessageId = message.MessageId;
ack.LastMessageId = message.MessageId;
ack.MessageCount = 1;
ack.TransactionId = message.TransactionId;
return ack;
}
}
}

View File

@ -26,16 +26,17 @@ namespace OpenWire.Client
public class Session : ISession
{
private Connection connection;
private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
private SessionInfo info;
private AcknowledgementMode acknowledgementMode;
private long consumerCounter;
private long producerCounter;
private int prefetchSize = 1000;
public Session(Connection connection, SessionInfo info)
public Session(Connection connection, SessionInfo info, AcknowledgementMode acknowledgementMode)
{
this.connection = connection;
this.info = info;
this.acknowledgementMode = acknowledgementMode;
}
public void Dispose()
@ -55,14 +56,6 @@ namespace OpenWire.Client
return new MessageProducer(this, command);
}
public void Acknowledge(Message message)
{
if (acknowledgementMode == AcknowledgementMode.ClientAcknowledge)
{
MessageAck ack = CreateMessageAck(message);
connection.SyncRequest(ack);
}
}
public IMessageConsumer CreateConsumer(IDestination destination)
@ -77,7 +70,7 @@ namespace OpenWire.Client
try
{
MessageConsumer consumer = new MessageConsumer(this, command);
MessageConsumer consumer = new MessageConsumer(this, command, acknowledgementMode);
// lets register the consumer first in case we start dispatching messages immediately
connection.AddConsumer(consumerId, consumer);
@ -91,28 +84,28 @@ namespace OpenWire.Client
}
}
public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal)
{
ConsumerInfo command = CreateConsumerInfo(destination, selector);
ConsumerId consumerId = command.ConsumerId;
command.SubcriptionName = name;
command.NoLocal = noLocal;
public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal)
{
ConsumerInfo command = CreateConsumerInfo(destination, selector);
ConsumerId consumerId = command.ConsumerId;
command.SubcriptionName = name;
command.NoLocal = noLocal;
try
{
MessageConsumer consumer = new MessageConsumer(this, command);
// lets register the consumer first in case we start dispatching messages immediately
connection.AddConsumer(consumerId, consumer);
try
{
MessageConsumer consumer = new MessageConsumer(this, command, acknowledgementMode);
// lets register the consumer first in case we start dispatching messages immediately
connection.AddConsumer(consumerId, consumer);
connection.SyncRequest(command);
return consumer;
}
catch (Exception e)
{
connection.RemoveConsumer(consumerId);
throw e;
}
}
connection.SyncRequest(command);
return consumer;
}
catch (Exception e)
{
connection.RemoveConsumer(consumerId);
throw e;
}
}
public IQueue GetQueue(string name)
{
@ -176,7 +169,12 @@ namespace OpenWire.Client
}
// Properties
public Connection Connection {
get {
return connection;
}
}
// Implementation methods
public void DoSend(IDestination destination, IMessage message)
@ -236,13 +234,6 @@ namespace OpenWire.Client
return answer;
}
protected virtual MessageAck CreateMessageAck(Message message)
{
MessageAck ack = new MessageAck();
// TODO complete packet
return ack;
}
/// <summary>
/// Configures the message command
/// </summary>

View File

@ -105,14 +105,14 @@ namespace OpenWire.Client
Assert.AreEqual(custom4, message.Properties["custom4"], "custom4");
// TODO
//Assert.AreEqual(custom5, message.Properties["custom5"], "custom5");
Assert.AreEqual(custom4, message.Properties["custom6"], "custom6");
Assert.AreEqual(custom6, message.Properties["custom6"], "custom6");
Assert.AreEqual(custom1, message.Properties.GetBool("custom1"), "custom1");
Assert.AreEqual(custom2, message.Properties.GetByte("custom2"), "custom2");
Assert.AreEqual(custom3, message.Properties.GetShort("custom3"), "custom3");
Assert.AreEqual(custom4, message.Properties.GetInt("custom4"), "custom4");
//Assert.AreEqual(custom5, message.Properties.GetLong("custom5"), "custom5");
Assert.AreEqual(custom4, message.Properties.GetChar("custom6"), "custom6");
Assert.AreEqual(custom6, message.Properties.GetChar("custom6"), "custom6");
// lets now look at some standard JMS headers
Console.WriteLine("JMSExpiration: " + message.JMSExpiration);