This closes #1247
This commit is contained in:
commit
dee143fd3d
|
@ -214,6 +214,8 @@ public class QueueImpl implements Queue {
|
|||
|
||||
// We cache the consumers here since we don't want to include the redistributor
|
||||
|
||||
private final AtomicInteger consumersCount = new AtomicInteger();
|
||||
|
||||
private final Set<Consumer> consumerSet = new HashSet<>();
|
||||
|
||||
private final Map<SimpleString, Consumer> groups = new HashMap<>();
|
||||
|
@ -807,7 +809,9 @@ public class QueueImpl implements Queue {
|
|||
|
||||
consumerList.add(new ConsumerHolder(consumer));
|
||||
|
||||
consumerSet.add(consumer);
|
||||
if (consumerSet.add(consumer)) {
|
||||
consumersCount.incrementAndGet();
|
||||
}
|
||||
|
||||
if (refCountForConsumers != null) {
|
||||
refCountForConsumers.increment();
|
||||
|
@ -837,7 +841,9 @@ public class QueueImpl implements Queue {
|
|||
pos = consumerList.size() - 1;
|
||||
}
|
||||
|
||||
consumerSet.remove(consumer);
|
||||
if (consumerSet.remove(consumer)) {
|
||||
consumersCount.decrementAndGet();
|
||||
}
|
||||
|
||||
LinkedList<SimpleString> groupsToRemove = null;
|
||||
|
||||
|
@ -924,8 +930,8 @@ public class QueueImpl implements Queue {
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized int getConsumerCount() {
|
||||
return consumerSet.size();
|
||||
public int getConsumerCount() {
|
||||
return consumersCount.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1011,16 +1017,14 @@ public class QueueImpl implements Queue {
|
|||
|
||||
@Override
|
||||
public long getMessageCount() {
|
||||
synchronized (this) {
|
||||
if (pageSubscription != null) {
|
||||
// messageReferences will have depaged messages which we need to discount from the counter as they are
|
||||
// counted on the pageSubscription as well
|
||||
return messageReferences.size() + getScheduledCount() +
|
||||
deliveringCount.get() +
|
||||
pageSubscription.getMessageCount();
|
||||
} else {
|
||||
return messageReferences.size() + getScheduledCount() + deliveringCount.get();
|
||||
}
|
||||
if (pageSubscription != null) {
|
||||
// messageReferences will have depaged messages which we need to discount from the counter as they are
|
||||
// counted on the pageSubscription as well
|
||||
return messageReferences.size() + getScheduledCount() +
|
||||
deliveringCount.get() +
|
||||
pageSubscription.getMessageCount();
|
||||
} else {
|
||||
return messageReferences.size() + getScheduledCount() + deliveringCount.get();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue