https://issues.apache.org/jira/browse/AMQ-6016 - rework fix for https://issues.apache.org/jira/browse/AMQ-2106 - account group assignment on a per destination basis to prevent modification during consumer ordering

This commit is contained in:
gtully 2015-10-21 14:02:56 +01:00
parent 92d5efc32c
commit 5d697cff3b
3 changed files with 33 additions and 16 deletions
activemq-broker/src/main/java/org/apache/activemq/broker/region
activemq-client/src/main/java/org/apache/activemq/command

View File

@ -221,8 +221,8 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
if (val == 0 && messageGroupOwners != null) {
// then ascending order of assigned message groups to favour less loaded consumers
// Long.compare in jdk7
long x = s1.getConsumerInfo().getAssignedGroupCount();
long y = s2.getConsumerInfo().getAssignedGroupCount();
long x = s1.getConsumerInfo().getAssignedGroupCount(destination);
long y = s2.getConsumerInfo().getAssignedGroupCount(destination);
val = (x < y) ? -1 : ((x == y) ? 0 : 1);
}
return val;
@ -498,7 +498,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
getDestinationStatistics().getDequeues().getCount(),
getDestinationStatistics().getDispatched().getCount(),
getDestinationStatistics().getInflight().getCount(),
sub.getConsumerInfo().getAssignedGroupCount()
sub.getConsumerInfo().getAssignedGroupCount(destination)
});
consumersLock.writeLock().lock();
try {
@ -2099,7 +2099,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
// A group sequence < 1 is an end of group signal.
if (sequence < 0) {
messageGroupOwners.removeGroup(groupId);
subscription.getConsumerInfo().decrementAssignedGroupCount();
subscription.getConsumerInfo().decrementAssignedGroupCount(destination);
}
} else {
result = false;
@ -2115,7 +2115,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
messageGroupOwners.put(groupId, subs.getConsumerInfo().getConsumerId());
Message message = n.getMessage();
message.setJMSXGroupFirstForConsumer(true);
subs.getConsumerInfo().incrementAssignedGroupCount();
subs.getConsumerInfo().incrementAssignedGroupCount(destination);
}
protected void pageInMessages(boolean force) throws Exception {

View File

@ -44,7 +44,7 @@ public class CachedMessageGroupMap implements MessageGroupMap {
if (destination != null) {
for (Subscription s : destination.getConsumers()) {
if (s.getConsumerInfo().getConsumerId().equals(eldest.getValue())) {
s.getConsumerInfo().decrementAssignedGroupCount();
s.getConsumerInfo().decrementAssignedGroupCount(destination.getActiveMQDestination());
break;
}
}
@ -90,7 +90,7 @@ public class CachedMessageGroupMap implements MessageGroupMap {
cache.clear();
if (destination != null) {
for (Subscription s : destination.getConsumers()) {
s.getConsumerInfo().clearAssignedGroupCount();
s.getConsumerInfo().clearAssignedGroupCount(destination.getActiveMQDestination());
}
}
}

View File

@ -17,7 +17,11 @@
package org.apache.activemq.command;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.state.CommandVisitor;
@ -63,7 +67,7 @@ public class ConsumerInfo extends BaseCommand {
// not marshalled, populated from RemoveInfo, the last message delivered, used
// to suppress redelivery on prefetched messages after close
private transient long lastDeliveredSequenceId = RemoveInfo.LAST_DELIVERED_UNSET;
private transient long assignedGroupCount;
private transient Map<ActiveMQDestination, AtomicLong> assignedGroupCount = new ConcurrentHashMap<>();
// originated from a
// network connection
@ -494,20 +498,33 @@ public class ConsumerInfo extends BaseCommand {
return lastDeliveredSequenceId;
}
public void incrementAssignedGroupCount() {
this.assignedGroupCount++;
public void incrementAssignedGroupCount(final ActiveMQDestination dest) {
AtomicLong value = assignedGroupCount.get(dest);
if (value == null) {
value = new AtomicLong(0);
assignedGroupCount.put(dest, value);
}
value.incrementAndGet();
}
public void clearAssignedGroupCount() {
this.assignedGroupCount=0;
public void clearAssignedGroupCount(final ActiveMQDestination dest) {
assignedGroupCount.remove(dest);
}
public void decrementAssignedGroupCount() {
this.assignedGroupCount--;
public void decrementAssignedGroupCount(final ActiveMQDestination dest) {
AtomicLong value = assignedGroupCount.get(dest);
if (value != null) {
value.decrementAndGet();
}
}
public long getAssignedGroupCount() {
return assignedGroupCount;
public long getAssignedGroupCount(final ActiveMQDestination dest) {
long result = 0l;
AtomicLong value = assignedGroupCount.get(dest);
if (value != null) {
result = value.longValue();
}
return result;
}
}