https://issues.apache.org/jira/browse/AMQ-5513 - have lastDeliveredSequence -1 indicate nothing was received, respect this value in queue sub remove and durable sub deactivate. 0 still constitutes no info - so old clients see same behaviour as before

This commit is contained in:
gtully 2015-01-09 13:28:58 +00:00
parent 05f6cd6cfc
commit dbb1d8b83d
6 changed files with 61 additions and 12 deletions

View File

@ -184,7 +184,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
}
}
public void deactivate(boolean keepDurableSubsActive) throws Exception {
public void deactivate(boolean keepDurableSubsActive, long lastDeliveredSequenceId) throws Exception {
LOG.debug("Deactivating keepActive={}, {}", keepDurableSubsActive, this);
active.set(false);
offlineTimestamp.set(System.currentTimeMillis());
@ -214,12 +214,14 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
for (final MessageReference node : dispatched) {
// Mark the dispatched messages as redelivered for next time.
if (lastDeliveredSequenceId == 0 || (lastDeliveredSequenceId > 0 && node.getMessageId().getBrokerSequenceId() <= lastDeliveredSequenceId)) {
Integer count = redeliveredMessages.get(node.getMessageId());
if (count != null) {
redeliveredMessages.put(node.getMessageId(), Integer.valueOf(count.intValue() + 1));
} else {
redeliveredMessages.put(node.getMessageId(), Integer.valueOf(1));
}
}
if (keepDurableSubsActive && pending.isTransient()) {
pending.addMessageFirst(node);
pending.rollback(node.getMessageId());

View File

@ -547,12 +547,12 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
List<MessageReference> unAckedMessages = sub.remove(context, this);
// locate last redelivered in unconsumed list (list in delivery rather than seq order)
if (lastDeiveredSequenceId != 0) {
if (lastDeiveredSequenceId > 0) {
for (MessageReference ref : unAckedMessages) {
if (ref.getMessageId().getBrokerSequenceId() == lastDeiveredSequenceId) {
lastDeliveredRef = ref;
markAsRedelivered = true;
LOG.debug("found lastDeliveredSeqID: {}, message reference: {}", lastDeiveredSequenceId, ref.getMessageId());
LOG.error("found lastDeliveredSeqID: {}, message reference: {}", lastDeiveredSequenceId, ref.getMessageId());
break;
}
}

View File

@ -203,7 +203,7 @@ public class Topic extends BaseDestination implements Task {
if (removed != null) {
destinationStatistics.getConsumers().decrement();
// deactivate and remove
removed.deactivate(false);
removed.deactivate(false, 0l);
consumers.remove(removed);
}
}

View File

@ -155,7 +155,7 @@ public class TopicRegion extends AbstractRegion {
if ((sub.context != context) || (sub.info != info)) {
sub.info = info;
sub.context = context;
sub.deactivate(keepDurableSubsActive);
sub.deactivate(keepDurableSubsActive, info.getLastDeliveredSequenceId());
}
subscriptions.put(info.getConsumerId(), sub);
}
@ -185,7 +185,7 @@ public class TopicRegion extends AbstractRegion {
// as what is in the sub. otherwise, during linksteal
// sub will get new context, but will be removed here
if (sub.getContext() == context)
sub.deactivate(keepDurableSubsActive);
sub.deactivate(keepDurableSubsActive, info.getLastDeliveredSequenceId());
}
} else {
super.removeConsumer(context, info);

View File

@ -147,7 +147,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
AtomicInteger inProgressClearRequiredFlag = new AtomicInteger(0);
private MessageAck pendingAck;
private long lastDeliveredSequenceId;
private long lastDeliveredSequenceId = -1;
private IOException failureError;

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
@ -400,6 +401,52 @@ public class JmsRedeliveredTest extends TestCase {
session.close();
}
public void testNoReceiveConsumerDoesNotIncrementRedelivery() throws Exception {
connection.setClientID(getName());
connection.start();
Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue("queue-" + getName());
MessageConsumer consumer = session.createConsumer(queue);
MessageProducer producer = createProducer(session, queue);
producer.send(createTextMessage(session));
session.commit();
TimeUnit.SECONDS.sleep(1);
consumer.close();
consumer = session.createConsumer(queue);
Message msg = consumer.receive(1000);
assertNotNull(msg);
assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
session.close();
}
public void testNoReceiveDurableConsumerDoesNotIncrementRedelivery() throws Exception {
connection.setClientID(getName());
connection.start();
Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
Topic topic = session.createTopic("topic-" + getName());
MessageConsumer consumer = session.createDurableSubscriber(topic, "sub");
MessageProducer producer = createProducer(session, topic);
producer.send(createTextMessage(session));
session.commit();
TimeUnit.SECONDS.sleep(1);
consumer.close();
consumer = session.createDurableSubscriber(topic, "sub");
Message msg = consumer.receive(1000);
assertNotNull(msg);
assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
session.close();
}
/**
* Creates a text message.
*