From 15dc02a88db974005ab48d282101a38899496a6a Mon Sep 17 00:00:00 2001 From: James Strachan Date: Fri, 24 Feb 2006 12:47:14 +0000 Subject: [PATCH] added support for explicit client acknowledgement of messages or for auto-acknowledge git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@380656 13f79535-47bb-0310-9956-ffa450edef68 --- .../Commands/ActiveMQMessage.cs | 13 +++- .../src/OpenWire.Client/Connection.cs | 26 +++---- .../src/OpenWire.Client/IMessage.cs | 6 ++ .../src/OpenWire.Client/IMessageConsumer.cs | 13 ++-- .../src/OpenWire.Client/ISession.cs | 9 +-- .../src/OpenWire.Client/MessageConsumer.cs | 67 ++++++++++++++++-- .../src/OpenWire.Client/Session.cs | 69 ++++++++----------- .../tests/OpenWire.Client/JMSPropertyTest.cs | 4 +- 8 files changed, 135 insertions(+), 72 deletions(-) diff --git a/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQMessage.cs b/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQMessage.cs index 9e8a05b8f1..a3714e3bdc 100644 --- a/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQMessage.cs +++ b/openwire-dotnet/src/OpenWire.Client/Commands/ActiveMQMessage.cs @@ -22,6 +22,8 @@ using OpenWire.Client.Core; namespace OpenWire.Client.Commands { + public delegate void AcknowledgeHandler(ActiveMQMessage message); + public class ActiveMQMessage : Message, IMessage, MarshallAware { public const byte ID_ActiveMQMessage = 23; @@ -30,7 +32,7 @@ namespace OpenWire.Client.Commands private PrimitiveMap properties; - + public event AcknowledgeHandler Acknowledger; public static ActiveMQMessage Transform(IMessage message) { @@ -46,6 +48,15 @@ namespace OpenWire.Client.Commands return ID_ActiveMQMessage; } + public void Acknowledge() + { + if (Acknowledger == null){ + throw new OpenWireException("No Acknowledger has been associated with this message: " + this);} + else { + Acknowledger(this); + } + } + // Properties diff --git a/openwire-dotnet/src/OpenWire.Client/Connection.cs b/openwire-dotnet/src/OpenWire.Client/Connection.cs index ce725e065a..330a636006 100755 --- a/openwire-dotnet/src/OpenWire.Client/Connection.cs +++ b/openwire-dotnet/src/OpenWire.Client/Connection.cs @@ -21,7 +21,7 @@ namespace OpenWire.Client private bool transacted; private bool connected; private bool closed; - private AcknowledgementMode acknowledgementMode; + private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge; private long sessionCounter; private long temporaryDestinationCounter; private IDictionary consumers = new Hashtable(); // TODO threadsafe @@ -35,20 +35,20 @@ namespace OpenWire.Client this.transport.Start(); } - /// - /// Starts message delivery for this connection. - /// - public void Start() - { - } + /// + /// Starts message delivery for this connection. + /// + public void Start() + { + } /// - /// Stop message delivery for this connection. - /// - public void Stop() - { - } + /// Stop message delivery for this connection. + /// + public void Stop() + { + } /// /// Creates a new session to work on this connection @@ -66,7 +66,7 @@ namespace OpenWire.Client CheckConnected(); SessionInfo info = CreateSessionInfo(transacted, acknowledgementMode); SyncRequest(info); - Session session = new Session(this, info); + Session session = new Session(this, info, acknowledgementMode); sessions.Add(session); return session; } diff --git a/openwire-dotnet/src/OpenWire.Client/IMessage.cs b/openwire-dotnet/src/OpenWire.Client/IMessage.cs index 37d838defb..38f4ea4b4c 100755 --- a/openwire-dotnet/src/OpenWire.Client/IMessage.cs +++ b/openwire-dotnet/src/OpenWire.Client/IMessage.cs @@ -25,6 +25,12 @@ namespace OpenWire.Client public interface IMessage { + /// + /// If using client acknowledgement mode on the session then this method will acknowledge that the + /// message has been processed correctly. + /// + void Acknowledge(); + /// /// Provides access to the message properties (headers) /// diff --git a/openwire-dotnet/src/OpenWire.Client/IMessageConsumer.cs b/openwire-dotnet/src/OpenWire.Client/IMessageConsumer.cs index cbfe5f85be..fe4f94dbc9 100755 --- a/openwire-dotnet/src/OpenWire.Client/IMessageConsumer.cs +++ b/openwire-dotnet/src/OpenWire.Client/IMessageConsumer.cs @@ -27,12 +27,12 @@ namespace OpenWire.Client public interface IMessageConsumer : IDisposable { - /// - /// Waits until a message is available and returns it - /// - IMessage Receive(); - - /// + /// + /// Waits until a message is available and returns it + /// + IMessage Receive(); + + /// /// If a message is available within the timeout duration it is returned otherwise this method returns null /// IMessage Receive(long timeout); @@ -46,6 +46,5 @@ namespace OpenWire.Client /// An asynchronous listener which can be used to consume messages asynchronously /// event MessageHandler Listener; - } } diff --git a/openwire-dotnet/src/OpenWire.Client/ISession.cs b/openwire-dotnet/src/OpenWire.Client/ISession.cs index a8b27c709e..216b9bdd67 100755 --- a/openwire-dotnet/src/OpenWire.Client/ISession.cs +++ b/openwire-dotnet/src/OpenWire.Client/ISession.cs @@ -26,9 +26,6 @@ namespace OpenWire.Client public interface ISession : IDisposable { - - - /// /// Creates a producer of messages /// @@ -50,9 +47,9 @@ namespace OpenWire.Client IMessageConsumer CreateConsumer(IDestination destination, string selector); /// - /// Creates a named durable consumer of messages on a given destination with a selector - /// - IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal); + /// Creates a named durable consumer of messages on a given destination with a selector + /// + IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal); /// /// Returns the queue for the given name diff --git a/openwire-dotnet/src/OpenWire.Client/MessageConsumer.cs b/openwire-dotnet/src/OpenWire.Client/MessageConsumer.cs index 49ee118323..6d20133ae4 100755 --- a/openwire-dotnet/src/OpenWire.Client/MessageConsumer.cs +++ b/openwire-dotnet/src/OpenWire.Client/MessageConsumer.cs @@ -22,6 +22,13 @@ using OpenWire.Client.Core; namespace OpenWire.Client { + public enum AckType { + DeliveredAck = 0, // Message delivered but not consumed + ConsumedAck = 1, // Message consumed, discard + PoisonAck = 2 // Message could not be processed due to poison pill but discard anyway + } + + /// /// An object capable of receiving messages from some destination /// @@ -30,15 +37,17 @@ namespace OpenWire.Client private Session session; private ConsumerInfo info; + private AcknowledgementMode acknowledgementMode; private bool closed; private Dispatcher dispatcher = new Dispatcher(); public event MessageHandler Listener; - public MessageConsumer(Session session, ConsumerInfo info) + public MessageConsumer(Session session, ConsumerInfo info, AcknowledgementMode acknowledgementMode) { this.session = session; this.info = info; + this.acknowledgementMode = acknowledgementMode; } /// @@ -53,21 +62,23 @@ namespace OpenWire.Client public IMessage Receive() { CheckClosed(); - return dispatcher.Dequeue(); + return AutoAcknowledge(dispatcher.Dequeue()); } public IMessage Receive(long timeout) { CheckClosed(); - return dispatcher.Dequeue(timeout); + return AutoAcknowledge(dispatcher.Dequeue(timeout)); } public IMessage ReceiveNoWait() { CheckClosed(); - return dispatcher.DequeueNoWait(); + return AutoAcknowledge(dispatcher.DequeueNoWait()); } + + public void Dispose() { session.DisposeOf(info.ConsumerId); @@ -81,5 +92,53 @@ namespace OpenWire.Client throw new ConnectionClosedException(); } } + + protected IMessage AutoAcknowledge(IMessage message) + { + if (message is ActiveMQMessage) + { + ActiveMQMessage activeMessage = (ActiveMQMessage) message; + + // lets register the handler for client acknowledgment + activeMessage.Acknowledger += new AcknowledgeHandler(DoClientAcknowledge); + + if (acknowledgementMode != AcknowledgementMode.ClientAcknowledge) + { + DoAcknowledge(activeMessage); + } + } + return message; + } + + protected void DoClientAcknowledge(Message message) + { + if (acknowledgementMode == AcknowledgementMode.ClientAcknowledge) + { + DoAcknowledge(message); + } + } + + protected void DoAcknowledge(Message message) + { + MessageAck ack = CreateMessageAck(message); + //Console.WriteLine("Sending Ack: " + ack); + session.Connection.SyncRequest(ack); + } + + + protected virtual MessageAck CreateMessageAck(Message message) + { + MessageAck ack = new MessageAck(); + ack.AckType = (int) AckType.ConsumedAck; + ack.ConsumerId = info.ConsumerId; + ack.Destination = message.Destination; + ack.FirstMessageId = message.MessageId; + ack.LastMessageId = message.MessageId; + ack.MessageCount = 1; + ack.TransactionId = message.TransactionId; + return ack; + } + + } } diff --git a/openwire-dotnet/src/OpenWire.Client/Session.cs b/openwire-dotnet/src/OpenWire.Client/Session.cs index 7f1b57bc76..98ce841a8f 100755 --- a/openwire-dotnet/src/OpenWire.Client/Session.cs +++ b/openwire-dotnet/src/OpenWire.Client/Session.cs @@ -26,16 +26,17 @@ namespace OpenWire.Client public class Session : ISession { private Connection connection; - private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge; private SessionInfo info; + private AcknowledgementMode acknowledgementMode; private long consumerCounter; private long producerCounter; private int prefetchSize = 1000; - public Session(Connection connection, SessionInfo info) + public Session(Connection connection, SessionInfo info, AcknowledgementMode acknowledgementMode) { this.connection = connection; this.info = info; + this.acknowledgementMode = acknowledgementMode; } public void Dispose() @@ -55,14 +56,6 @@ namespace OpenWire.Client return new MessageProducer(this, command); } - public void Acknowledge(Message message) - { - if (acknowledgementMode == AcknowledgementMode.ClientAcknowledge) - { - MessageAck ack = CreateMessageAck(message); - connection.SyncRequest(ack); - } - } public IMessageConsumer CreateConsumer(IDestination destination) @@ -77,7 +70,7 @@ namespace OpenWire.Client try { - MessageConsumer consumer = new MessageConsumer(this, command); + MessageConsumer consumer = new MessageConsumer(this, command, acknowledgementMode); // lets register the consumer first in case we start dispatching messages immediately connection.AddConsumer(consumerId, consumer); @@ -91,28 +84,28 @@ namespace OpenWire.Client } } - public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal) - { - ConsumerInfo command = CreateConsumerInfo(destination, selector); - ConsumerId consumerId = command.ConsumerId; - command.SubcriptionName = name; - command.NoLocal = noLocal; + public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal) + { + ConsumerInfo command = CreateConsumerInfo(destination, selector); + ConsumerId consumerId = command.ConsumerId; + command.SubcriptionName = name; + command.NoLocal = noLocal; - try - { - MessageConsumer consumer = new MessageConsumer(this, command); - // lets register the consumer first in case we start dispatching messages immediately - connection.AddConsumer(consumerId, consumer); + try + { + MessageConsumer consumer = new MessageConsumer(this, command, acknowledgementMode); + // lets register the consumer first in case we start dispatching messages immediately + connection.AddConsumer(consumerId, consumer); - connection.SyncRequest(command); - return consumer; - } - catch (Exception e) - { - connection.RemoveConsumer(consumerId); - throw e; - } - } + connection.SyncRequest(command); + return consumer; + } + catch (Exception e) + { + connection.RemoveConsumer(consumerId); + throw e; + } + } public IQueue GetQueue(string name) { @@ -176,7 +169,12 @@ namespace OpenWire.Client } - + // Properties + public Connection Connection { + get { + return connection; + } + } // Implementation methods public void DoSend(IDestination destination, IMessage message) @@ -236,13 +234,6 @@ namespace OpenWire.Client return answer; } - protected virtual MessageAck CreateMessageAck(Message message) - { - MessageAck ack = new MessageAck(); - // TODO complete packet - return ack; - } - /// /// Configures the message command /// diff --git a/openwire-dotnet/tests/OpenWire.Client/JMSPropertyTest.cs b/openwire-dotnet/tests/OpenWire.Client/JMSPropertyTest.cs index cb5815dbe9..8d1304a7df 100644 --- a/openwire-dotnet/tests/OpenWire.Client/JMSPropertyTest.cs +++ b/openwire-dotnet/tests/OpenWire.Client/JMSPropertyTest.cs @@ -105,14 +105,14 @@ namespace OpenWire.Client Assert.AreEqual(custom4, message.Properties["custom4"], "custom4"); // TODO //Assert.AreEqual(custom5, message.Properties["custom5"], "custom5"); - Assert.AreEqual(custom4, message.Properties["custom6"], "custom6"); + Assert.AreEqual(custom6, message.Properties["custom6"], "custom6"); Assert.AreEqual(custom1, message.Properties.GetBool("custom1"), "custom1"); Assert.AreEqual(custom2, message.Properties.GetByte("custom2"), "custom2"); Assert.AreEqual(custom3, message.Properties.GetShort("custom3"), "custom3"); Assert.AreEqual(custom4, message.Properties.GetInt("custom4"), "custom4"); //Assert.AreEqual(custom5, message.Properties.GetLong("custom5"), "custom5"); - Assert.AreEqual(custom4, message.Properties.GetChar("custom6"), "custom6"); + Assert.AreEqual(custom6, message.Properties.GetChar("custom6"), "custom6"); // lets now look at some standard JMS headers Console.WriteLine("JMSExpiration: " + message.JMSExpiration);