AMQ-7228 - Avoid unnecessary lock contention when getting pending

metrics
This commit is contained in:
Christopher L. Shannon (cshannon) 2019-06-14 10:46:21 -04:00
parent 814a286dfe
commit dc56fa3f6e
2 changed files with 15 additions and 31 deletions

View File

@ -532,10 +532,8 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
@Override
public long getPendingMessageSize() {
synchronized (pendingLock) {
return pending.messageSize();
}
}
@Override
public int getDispatchedQueueSize() {

View File

@ -16,14 +16,6 @@
*/
package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.JMSException;
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
@ -32,15 +24,7 @@ import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.*;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.transport.TransmitCallback;
@ -48,6 +32,14 @@ import org.apache.activemq.usage.SystemUsage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.JMSException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class TopicSubscription extends AbstractSubscription {
private static final Logger LOG = LoggerFactory.getLogger(TopicSubscription.class);
@ -61,7 +53,7 @@ public class TopicSubscription extends AbstractSubscription {
private int maximumPendingMessages = -1;
private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
private int discarded;
private final AtomicInteger discarded = new AtomicInteger();
private final Object matchedListMutex = new Object();
private int memoryUsageHighWaterMark = 95;
// allow duplicate suppression in a ring network of brokers
@ -448,10 +440,8 @@ public class TopicSubscription extends AbstractSubscription {
@Override
public long getPendingMessageSize() {
synchronized (matchedListMutex) {
return matched.messageSize();
}
}
@Override
public int getDispatchedQueueSize() {
@ -482,9 +472,7 @@ public class TopicSubscription extends AbstractSubscription {
* @return the number of messages discarded due to being a slow consumer
*/
public int discarded() {
synchronized (matchedListMutex) {
return discarded;
}
return discarded.get();
}
/**
@ -493,10 +481,8 @@ public class TopicSubscription extends AbstractSubscription {
* prefetch buffer being full).
*/
public int matched() {
synchronized (matchedListMutex) {
return matched.size();
}
}
/**
* Sets the maximum number of pending messages that can be matched against
@ -727,7 +713,7 @@ public class TopicSubscription extends AbstractSubscription {
try {
message.decrementReferenceCount();
matched.remove(message);
discarded++;
discarded.incrementAndGet();
if (destination != null) {
destination.getDestinationStatistics().getDequeues().increment();
}