diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index e151bc4ca1..be4a69e739 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -104,7 +104,6 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC private final LinkedList deliveredMessages = new LinkedList(); private int deliveredCounter; private int additionalWindowSize; - private int rollbackCounter; private long redeliveryDelay; private int ackCounter; private int dispatchedCount; @@ -627,6 +626,14 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC public void dispose() throws JMSException { if (!unconsumedMessages.isClosed()) { + +// if ( !deliveredMessages.isEmpty() ) { +// // We need to let the broker know how many times that message +// // was rolled back. +// rollbackCounter++; +// MessageDispatch lastMd = deliveredMessages.get(0); +// } + // Do we have any acks we need to send out before closing? // Ack any delivered messages now. (session may still // commit/rollback the acks). @@ -829,7 +836,6 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC public void commit() throws JMSException { deliveredMessages.clear(); - rollbackCounter = 0; redeliveryDelay = 0; } @@ -851,31 +857,39 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } // Only increase the redlivery delay after the first redelivery.. - if (rollbackCounter > 0) { + MessageDispatch lastMd = deliveredMessages.getFirst(); + if (lastMd.getMessage().getRedeliveryCounter() > 0) { redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay); } - rollbackCounter++; + + for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) { + MessageDispatch md = (MessageDispatch)iter.next(); + md.getMessage().onMessageRolledBack(); + } + if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES - && rollbackCounter > redeliveryPolicy.getMaximumRedeliveries()) { + && lastMd.getMessage().getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()) { // We need to NACK the messages so that they get sent to the // DLQ. // Acknowledge the last message. - MessageDispatch lastMd = deliveredMessages.get(0); + MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size()); session.asyncSendPacket(ack); // ensure we don't filter this as a duplicate session.connection.rollbackDuplicate(this, lastMd.getMessage()); // Adjust the window size. additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size()); - rollbackCounter = 0; redeliveryDelay = 0; } else { + + MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size()); + session.asyncSendPacket(ack); + // stop the delivery of messages. unconsumedMessages.stop(); for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) { MessageDispatch md = (MessageDispatch)iter.next(); - md.getMessage().onMessageRolledBack(); unconsumedMessages.enqueueFirst(md); } diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java index 879363f9e8..d4dbbf43f9 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -755,6 +755,11 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta ack.setFirstMessageId(md.getMessage().getMessageId()); asyncSendPacket(ack); } else { + + MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1); + ack.setFirstMessageId(md.getMessage().getMessageId()); + asyncSendPacket(ack); + // Figure out how long we should wait to resend // this message. long redeliveryDelay = 0; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index e9e4e32981..8cc2761757 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -230,6 +230,28 @@ public abstract class PrefetchSubscription extends AbstractSubscription { if (!callDispatchMatched) { throw new JMSException("Could not correlate acknowledgment with dispatched message: " + ack); } + } else if (ack.isRedeliveredAck() ) { + // Message was re-delivered but it was not yet considered to be a DLQ message. + // Acknowledge all dispatched messages up till the message id of the + // acknowledgment. + boolean inAckRange = false; + for (Iterator iter = dispatched.iterator(); iter.hasNext();) { + final MessageReference node = iter.next(); + MessageId messageId = node.getMessageId(); + if (ack.getFirstMessageId() == null || ack.getFirstMessageId().equals(messageId)) { + inAckRange = true; + } + if (inAckRange) { + node.incrementRedeliveryCounter(); + if (ack.getLastMessageId().equals(messageId)) { + callDispatchMatched = true; + break; + } + } + } + if (!callDispatchMatched) { + throw new JMSException("Could not correlate acknowledgment with dispatched message: " + ack); + } } else if (ack.isPoisonAck()) { // TODO: what if the message is already in a DLQ??? // Handle the poison ACK case: we need to send the message to a DLQ diff --git a/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java b/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java index e30c124416..fba68f829e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java @@ -46,6 +46,14 @@ public class MessageAck extends BaseCommand { */ public static final byte POSION_ACK_TYPE = 1; + /** + * In case the client want's to explicitly let the broker know that a + * message was not processed and it was re-delivered to the consumer + * but it was not yet considered to be a poison message. The messageCount + * field will hold the number of times the message was re-delivered. + */ + public static final byte REDELIVERED_ACK_TYPE = 3; + protected byte ackType; protected ConsumerId consumerId; protected MessageId firstMessageId; @@ -96,6 +104,10 @@ public class MessageAck extends BaseCommand { public boolean isDeliveredAck() { return ackType == DELIVERED_ACK_TYPE; } + + public boolean isRedeliveredAck() { + return ackType == REDELIVERED_ACK_TYPE; + } /** * @openwire:property version=1 cache=true