diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index b0b609b9e3..f6fff6853d 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -221,8 +221,8 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index if (val == 0 && messageGroupOwners != null) { // then ascending order of assigned message groups to favour less loaded consumers // Long.compare in jdk7 - long x = s1.getConsumerInfo().getAssignedGroupCount(); - long y = s2.getConsumerInfo().getAssignedGroupCount(); + long x = s1.getConsumerInfo().getAssignedGroupCount(destination); + long y = s2.getConsumerInfo().getAssignedGroupCount(destination); val = (x < y) ? -1 : ((x == y) ? 0 : 1); } return val; @@ -498,7 +498,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index getDestinationStatistics().getDequeues().getCount(), getDestinationStatistics().getDispatched().getCount(), getDestinationStatistics().getInflight().getCount(), - sub.getConsumerInfo().getAssignedGroupCount() + sub.getConsumerInfo().getAssignedGroupCount(destination) }); consumersLock.writeLock().lock(); try { @@ -2099,7 +2099,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index // A group sequence < 1 is an end of group signal. if (sequence < 0) { messageGroupOwners.removeGroup(groupId); - subscription.getConsumerInfo().decrementAssignedGroupCount(); + subscription.getConsumerInfo().decrementAssignedGroupCount(destination); } } else { result = false; @@ -2115,7 +2115,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index messageGroupOwners.put(groupId, subs.getConsumerInfo().getConsumerId()); Message message = n.getMessage(); message.setJMSXGroupFirstForConsumer(true); - subs.getConsumerInfo().incrementAssignedGroupCount(); + subs.getConsumerInfo().incrementAssignedGroupCount(destination); } protected void pageInMessages(boolean force) throws Exception { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/CachedMessageGroupMap.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/CachedMessageGroupMap.java index b17f8cef52..2d5bd20fd4 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/CachedMessageGroupMap.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/group/CachedMessageGroupMap.java @@ -44,7 +44,7 @@ public class CachedMessageGroupMap implements MessageGroupMap { if (destination != null) { for (Subscription s : destination.getConsumers()) { if (s.getConsumerInfo().getConsumerId().equals(eldest.getValue())) { - s.getConsumerInfo().decrementAssignedGroupCount(); + s.getConsumerInfo().decrementAssignedGroupCount(destination.getActiveMQDestination()); break; } } @@ -90,7 +90,7 @@ public class CachedMessageGroupMap implements MessageGroupMap { cache.clear(); if (destination != null) { for (Subscription s : destination.getConsumers()) { - s.getConsumerInfo().clearAssignedGroupCount(); + s.getConsumerInfo().clearAssignedGroupCount(destination.getActiveMQDestination()); } } } diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java b/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java index 0c1e6913e6..ed97a48b68 100755 --- a/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java @@ -17,7 +17,11 @@ package org.apache.activemq.command; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.filter.BooleanExpression; import org.apache.activemq.state.CommandVisitor; @@ -63,7 +67,7 @@ public class ConsumerInfo extends BaseCommand { // not marshalled, populated from RemoveInfo, the last message delivered, used // to suppress redelivery on prefetched messages after close private transient long lastDeliveredSequenceId = RemoveInfo.LAST_DELIVERED_UNSET; - private transient long assignedGroupCount; + private transient Map assignedGroupCount = new ConcurrentHashMap<>(); // originated from a // network connection @@ -494,20 +498,33 @@ public class ConsumerInfo extends BaseCommand { return lastDeliveredSequenceId; } - public void incrementAssignedGroupCount() { - this.assignedGroupCount++; + public void incrementAssignedGroupCount(final ActiveMQDestination dest) { + AtomicLong value = assignedGroupCount.get(dest); + if (value == null) { + value = new AtomicLong(0); + assignedGroupCount.put(dest, value); + } + value.incrementAndGet(); } - public void clearAssignedGroupCount() { - this.assignedGroupCount=0; + public void clearAssignedGroupCount(final ActiveMQDestination dest) { + assignedGroupCount.remove(dest); } - public void decrementAssignedGroupCount() { - this.assignedGroupCount--; + public void decrementAssignedGroupCount(final ActiveMQDestination dest) { + AtomicLong value = assignedGroupCount.get(dest); + if (value != null) { + value.decrementAndGet(); + } } - public long getAssignedGroupCount() { - return assignedGroupCount; + public long getAssignedGroupCount(final ActiveMQDestination dest) { + long result = 0l; + AtomicLong value = assignedGroupCount.get(dest); + if (value != null) { + result = value.longValue(); + } + return result; } }