diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index a265570c94..a3b9a4d980 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -532,9 +532,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { @Override public long getPendingMessageSize() { - synchronized (pendingLock) { - return pending.messageSize(); - } + return pending.messageSize(); } @Override diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index bf3f97bebf..0bf1c4e19e 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -16,14 +16,6 @@ */ package org.apache.activemq.broker.region; -import java.io.IOException; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.atomic.AtomicLong; - -import javax.jms.JMSException; - import org.apache.activemq.ActiveMQMessageAudit; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; @@ -32,15 +24,7 @@ import org.apache.activemq.broker.region.cursors.PendingMessageCursor; import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; import org.apache.activemq.broker.region.policy.MessageEvictionStrategy; import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy; -import org.apache.activemq.command.ConsumerControl; -import org.apache.activemq.command.ConsumerInfo; -import org.apache.activemq.command.Message; -import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.MessageDispatch; -import org.apache.activemq.command.MessageDispatchNotification; -import org.apache.activemq.command.MessageId; -import org.apache.activemq.command.MessagePull; -import org.apache.activemq.command.Response; +import org.apache.activemq.command.*; import org.apache.activemq.thread.Scheduler; import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.transport.TransmitCallback; @@ -48,6 +32,14 @@ import org.apache.activemq.usage.SystemUsage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.jms.JMSException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + public class TopicSubscription extends AbstractSubscription { private static final Logger LOG = LoggerFactory.getLogger(TopicSubscription.class); @@ -61,7 +53,7 @@ public class TopicSubscription extends AbstractSubscription { private int maximumPendingMessages = -1; private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy(); - private int discarded; + private final AtomicInteger discarded = new AtomicInteger(); private final Object matchedListMutex = new Object(); private int memoryUsageHighWaterMark = 95; // allow duplicate suppression in a ring network of brokers @@ -448,9 +440,7 @@ public class TopicSubscription extends AbstractSubscription { @Override public long getPendingMessageSize() { - synchronized (matchedListMutex) { - return matched.messageSize(); - } + return matched.messageSize(); } @Override @@ -482,9 +472,7 @@ public class TopicSubscription extends AbstractSubscription { * @return the number of messages discarded due to being a slow consumer */ public int discarded() { - synchronized (matchedListMutex) { - return discarded; - } + return discarded.get(); } /** @@ -493,9 +481,7 @@ public class TopicSubscription extends AbstractSubscription { * prefetch buffer being full). */ public int matched() { - synchronized (matchedListMutex) { - return matched.size(); - } + return matched.size(); } /** @@ -727,7 +713,7 @@ public class TopicSubscription extends AbstractSubscription { try { message.decrementReferenceCount(); matched.remove(message); - discarded++; + discarded.incrementAndGet(); if (destination != null) { destination.getDestinationStatistics().getDequeues().increment(); }