We now send REDELIVERY acks to the broker when a message is redelivered.  This allows the broker to update the message with the number of times redelivery has occured so that if the message is delivered to another consumer it can DQL the message when max redeliveries have occured across consumers.



git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@581747 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2007-10-03 23:32:29 +00:00
parent 1401e69956
commit c256dcf9e8
4 changed files with 61 additions and 8 deletions

View File

@ -104,7 +104,6 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
private final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>(); private final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>();
private int deliveredCounter; private int deliveredCounter;
private int additionalWindowSize; private int additionalWindowSize;
private int rollbackCounter;
private long redeliveryDelay; private long redeliveryDelay;
private int ackCounter; private int ackCounter;
private int dispatchedCount; private int dispatchedCount;
@ -627,6 +626,14 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
public void dispose() throws JMSException { public void dispose() throws JMSException {
if (!unconsumedMessages.isClosed()) { if (!unconsumedMessages.isClosed()) {
// if ( !deliveredMessages.isEmpty() ) {
// // We need to let the broker know how many times that message
// // was rolled back.
// rollbackCounter++;
// MessageDispatch lastMd = deliveredMessages.get(0);
// }
// Do we have any acks we need to send out before closing? // Do we have any acks we need to send out before closing?
// Ack any delivered messages now. (session may still // Ack any delivered messages now. (session may still
// commit/rollback the acks). // commit/rollback the acks).
@ -829,7 +836,6 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
public void commit() throws JMSException { public void commit() throws JMSException {
deliveredMessages.clear(); deliveredMessages.clear();
rollbackCounter = 0;
redeliveryDelay = 0; redeliveryDelay = 0;
} }
@ -851,31 +857,39 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
} }
// Only increase the redlivery delay after the first redelivery.. // Only increase the redlivery delay after the first redelivery..
if (rollbackCounter > 0) { MessageDispatch lastMd = deliveredMessages.getFirst();
if (lastMd.getMessage().getRedeliveryCounter() > 0) {
redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay); redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
} }
rollbackCounter++;
for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) {
MessageDispatch md = (MessageDispatch)iter.next();
md.getMessage().onMessageRolledBack();
}
if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
&& rollbackCounter > redeliveryPolicy.getMaximumRedeliveries()) { && lastMd.getMessage().getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()) {
// We need to NACK the messages so that they get sent to the // We need to NACK the messages so that they get sent to the
// DLQ. // DLQ.
// Acknowledge the last message. // Acknowledge the last message.
MessageDispatch lastMd = deliveredMessages.get(0);
MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size()); MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
session.asyncSendPacket(ack); session.asyncSendPacket(ack);
// ensure we don't filter this as a duplicate // ensure we don't filter this as a duplicate
session.connection.rollbackDuplicate(this, lastMd.getMessage()); session.connection.rollbackDuplicate(this, lastMd.getMessage());
// Adjust the window size. // Adjust the window size.
additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size()); additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
rollbackCounter = 0;
redeliveryDelay = 0; redeliveryDelay = 0;
} else { } else {
MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size());
session.asyncSendPacket(ack);
// stop the delivery of messages. // stop the delivery of messages.
unconsumedMessages.stop(); unconsumedMessages.stop();
for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) { for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) {
MessageDispatch md = (MessageDispatch)iter.next(); MessageDispatch md = (MessageDispatch)iter.next();
md.getMessage().onMessageRolledBack();
unconsumedMessages.enqueueFirst(md); unconsumedMessages.enqueueFirst(md);
} }

View File

@ -755,6 +755,11 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
ack.setFirstMessageId(md.getMessage().getMessageId()); ack.setFirstMessageId(md.getMessage().getMessageId());
asyncSendPacket(ack); asyncSendPacket(ack);
} else { } else {
MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1);
ack.setFirstMessageId(md.getMessage().getMessageId());
asyncSendPacket(ack);
// Figure out how long we should wait to resend // Figure out how long we should wait to resend
// this message. // this message.
long redeliveryDelay = 0; long redeliveryDelay = 0;

View File

@ -230,6 +230,28 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
if (!callDispatchMatched) { if (!callDispatchMatched) {
throw new JMSException("Could not correlate acknowledgment with dispatched message: " + ack); throw new JMSException("Could not correlate acknowledgment with dispatched message: " + ack);
} }
} else if (ack.isRedeliveredAck() ) {
// Message was re-delivered but it was not yet considered to be a DLQ message.
// Acknowledge all dispatched messages up till the message id of the
// acknowledgment.
boolean inAckRange = false;
for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext();) {
final MessageReference node = iter.next();
MessageId messageId = node.getMessageId();
if (ack.getFirstMessageId() == null || ack.getFirstMessageId().equals(messageId)) {
inAckRange = true;
}
if (inAckRange) {
node.incrementRedeliveryCounter();
if (ack.getLastMessageId().equals(messageId)) {
callDispatchMatched = true;
break;
}
}
}
if (!callDispatchMatched) {
throw new JMSException("Could not correlate acknowledgment with dispatched message: " + ack);
}
} else if (ack.isPoisonAck()) { } else if (ack.isPoisonAck()) {
// TODO: what if the message is already in a DLQ??? // TODO: what if the message is already in a DLQ???
// Handle the poison ACK case: we need to send the message to a DLQ // Handle the poison ACK case: we need to send the message to a DLQ

View File

@ -46,6 +46,14 @@ public class MessageAck extends BaseCommand {
*/ */
public static final byte POSION_ACK_TYPE = 1; public static final byte POSION_ACK_TYPE = 1;
/**
* In case the client want's to explicitly let the broker know that a
* message was not processed and it was re-delivered to the consumer
* but it was not yet considered to be a poison message. The messageCount
* field will hold the number of times the message was re-delivered.
*/
public static final byte REDELIVERED_ACK_TYPE = 3;
protected byte ackType; protected byte ackType;
protected ConsumerId consumerId; protected ConsumerId consumerId;
protected MessageId firstMessageId; protected MessageId firstMessageId;
@ -97,6 +105,10 @@ public class MessageAck extends BaseCommand {
return ackType == DELIVERED_ACK_TYPE; return ackType == DELIVERED_ACK_TYPE;
} }
public boolean isRedeliveredAck() {
return ackType == REDELIVERED_ACK_TYPE;
}
/** /**
* @openwire:property version=1 cache=true * @openwire:property version=1 cache=true
*/ */