ARTEMIS-1140 Avoid lock on queue for message counts
(cherry picked from commit 33f2ad65c9
)
This commit is contained in:
parent
6f0babb368
commit
534fd8093d
|
@ -208,6 +208,8 @@ public class QueueImpl implements Queue {
|
|||
|
||||
// We cache the consumers here since we don't want to include the redistributor
|
||||
|
||||
private final AtomicInteger consumersCount = new AtomicInteger();
|
||||
|
||||
private final Set<Consumer> consumerSet = new HashSet<>();
|
||||
|
||||
private final Map<SimpleString, Consumer> groups = new HashMap<>();
|
||||
|
@ -717,7 +719,9 @@ public class QueueImpl implements Queue {
|
|||
|
||||
consumerList.add(new ConsumerHolder(consumer));
|
||||
|
||||
consumerSet.add(consumer);
|
||||
if (consumerSet.add(consumer)) {
|
||||
consumersCount.incrementAndGet();
|
||||
}
|
||||
|
||||
if (refCountForConsumers != null) {
|
||||
refCountForConsumers.increment();
|
||||
|
@ -745,7 +749,9 @@ public class QueueImpl implements Queue {
|
|||
pos = consumerList.size() - 1;
|
||||
}
|
||||
|
||||
consumerSet.remove(consumer);
|
||||
if (consumerSet.remove(consumer)) {
|
||||
consumersCount.decrementAndGet();
|
||||
}
|
||||
|
||||
LinkedList<SimpleString> groupsToRemove = null;
|
||||
|
||||
|
@ -830,8 +836,8 @@ public class QueueImpl implements Queue {
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized int getConsumerCount() {
|
||||
return consumerSet.size();
|
||||
public int getConsumerCount() {
|
||||
return consumersCount.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -917,16 +923,14 @@ public class QueueImpl implements Queue {
|
|||
|
||||
@Override
|
||||
public long getMessageCount() {
|
||||
synchronized (this) {
|
||||
if (pageSubscription != null) {
|
||||
// messageReferences will have depaged messages which we need to discount from the counter as they are
|
||||
// counted on the pageSubscription as well
|
||||
return messageReferences.size() + getScheduledCount() +
|
||||
deliveringCount.get() +
|
||||
pageSubscription.getMessageCount();
|
||||
} else {
|
||||
return messageReferences.size() + getScheduledCount() + deliveringCount.get();
|
||||
}
|
||||
if (pageSubscription != null) {
|
||||
// messageReferences will have depaged messages which we need to discount from the counter as they are
|
||||
// counted on the pageSubscription as well
|
||||
return messageReferences.size() + getScheduledCount() +
|
||||
deliveringCount.get() +
|
||||
pageSubscription.getMessageCount();
|
||||
} else {
|
||||
return messageReferences.size() + getScheduledCount() + deliveringCount.get();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue