Switching to a List to track dispatched messages in a TopicSubscription
to be consistent with a PrefetchSubscription and to prevent an error
in case acks come back out of order.
This commit is contained in:
Christopher L. Shannon (cshannon) 2015-08-06 17:12:18 +00:00
parent 9827427f46
commit 2b7bb6f81b
1 changed files with 42 additions and 39 deletions

View File

@ -17,11 +17,9 @@
package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.Comparator;
import java.util.Iterator;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -76,19 +74,9 @@ public class TopicSubscription extends AbstractSubscription {
protected boolean active = false;
protected boolean discarding = false;
/**
* This Map is used to keep track of messages that have been dispatched in sorted order to
* optimize message acknowledgement
*/
private NavigableMap<MessageId, MessageReference> dispatched = new ConcurrentSkipListMap<>(
new Comparator<MessageId>() {
@Override
public int compare(MessageId m1, MessageId m2) {
return m1 == null ? (m2 == null ? 0 : -1) : (m2 == null ? 1
: Long.compare(m1.getBrokerSequenceId(), m2.getBrokerSequenceId()));
}
});
//Used for inflight message size calculations
protected final Object dispatchLock = new Object();
protected final List<MessageReference> dispatched = new ArrayList<MessageReference>();
public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
super(broker, context, info);
@ -267,11 +255,13 @@ public class TopicSubscription extends AbstractSubscription {
MessageReference node = matched.next();
node.decrementReferenceCount();
if (node.getMessageId().equals(mdn.getMessageId())) {
matched.remove();
getSubscriptionStatistics().getDispatched().increment();
dispatched.put(node.getMessageId(), node);
getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
node.decrementReferenceCount();
synchronized(dispatchLock) {
matched.remove();
getSubscriptionStatistics().getDispatched().increment();
dispatched.add(node);
getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
node.decrementReferenceCount();
}
break;
}
}
@ -403,23 +393,31 @@ public class TopicSubscription extends AbstractSubscription {
}
/**
* Update the inflight statistics on message ack. Since a message ack could be a range,
* we need to grab a subtree of the dispatched map to acknowledge messages. Finding the
* subMap is an O(log n) operation.
* Update the inflight statistics on message ack.
* @param ack
*/
private void updateInflightMessageSizeOnAck(final MessageAck ack) {
if (ack.getFirstMessageId() != null) {
NavigableMap<MessageId, MessageReference> acked = dispatched
.subMap(ack.getFirstMessageId(), true, ack.getLastMessageId(), true);
Iterator<MessageId> i = acked.keySet().iterator();
while (i.hasNext()) {
getSubscriptionStatistics().getInflightMessageSize().addSize(-acked.get(i.next()).getSize());
i.remove();
synchronized(dispatchLock) {
boolean inAckRange = false;
List<MessageReference> removeList = new ArrayList<MessageReference>();
for (final MessageReference node : dispatched) {
MessageId messageId = node.getMessageId();
if (ack.getFirstMessageId() == null
|| ack.getFirstMessageId().equals(messageId)) {
inAckRange = true;
}
if (inAckRange) {
removeList.add(node);
if (ack.getLastMessageId().equals(messageId)) {
break;
}
}
}
for (final MessageReference node : removeList) {
dispatched.remove(node);
getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
}
} else {
getSubscriptionStatistics().getInflightMessageSize().addSize(-dispatched.get(ack.getLastMessageId()).getSize());
dispatched.remove(ack.getLastMessageId());
}
}
@ -645,9 +643,12 @@ public class TopicSubscription extends AbstractSubscription {
md.setConsumerId(info.getConsumerId());
if (node != null) {
md.setDestination(((Destination)node.getRegionDestination()).getActiveMQDestination());
getSubscriptionStatistics().getDispatched().increment();
dispatched.put(node.getMessageId(), node);
getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
synchronized(dispatchLock) {
getSubscriptionStatistics().getDispatched().increment();
dispatched.add(node);
getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
}
// Keep track if this subscription is receiving messages from a single destination.
if (singleDestination) {
if (destination == null) {
@ -729,7 +730,9 @@ public class TopicSubscription extends AbstractSubscription {
}
}
setSlowConsumer(false);
dispatched.clear();
synchronized(dispatchLock) {
dispatched.clear();
}
}
@Override