diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index 6b61379823..8db7c62423 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -106,26 +106,26 @@ public class TopicSubscription extends AbstractSubscription { // locator /w the message. node = new IndirectMessageReference(node.getMessage()); enqueueCounter.incrementAndGet(); - if (!isFull() && matched.isEmpty()) { - // if maximumPendingMessages is set we will only discard messages which - // have not been dispatched (i.e. we allow the prefetch buffer to be filled) - dispatch(node); - setSlowConsumer(false); - } else { - if (info.getPrefetchSize() > 1 && matched.size() > info.getPrefetchSize()) { - // Slow consumers should log and set their state as such. - if (!isSlowConsumer()) { - LOG.warn("{}: has twice its prefetch limit pending, without an ack; it appears to be slow", toString()); - setSlowConsumer(true); - for (Destination dest: destinations) { - dest.slowConsumer(getContext(), this); + synchronized (matchedListMutex) { + if (!isFull() && matched.isEmpty()) { + // if maximumPendingMessages is set we will only discard messages which + // have not been dispatched (i.e. we allow the prefetch buffer to be filled) + dispatch(node); + setSlowConsumer(false); + } else { + if (info.getPrefetchSize() > 1 && matched.size() > info.getPrefetchSize()) { + // Slow consumers should log and set their state as such. + if (!isSlowConsumer()) { + LOG.warn("{}: has twice its prefetch limit pending, without an ack; it appears to be slow", toString()); + setSlowConsumer(true); + for (Destination dest: destinations) { + dest.slowConsumer(getContext(), this); + } } } - } - if (maximumPendingMessages != 0) { - boolean warnedAboutWait = false; - while (active) { - synchronized (matchedListMutex) { + if (maximumPendingMessages != 0) { + boolean warnedAboutWait = false; + while (active) { while (matched.isFull()) { if (getContext().getStopping().get()) { LOG.warn("{}: stopped waiting for space in pendingMessage cursor for: {}", toString(), node.getMessageId()); @@ -150,9 +150,6 @@ public class TopicSubscription extends AbstractSubscription { break; } } - } - synchronized (matchedListMutex) { - // NOTE - be careful about the slaveBroker! if (maximumPendingMessages > 0) { // calculate the high water mark from which point we // will eagerly evict expired messages @@ -195,8 +192,8 @@ public class TopicSubscription extends AbstractSubscription { } } } + dispatchMatched(); } - dispatchMatched(); } } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoMulticastDiscoveryBrokerTopicSendReceiveTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoMulticastDiscoveryBrokerTopicSendReceiveTest.java index 8b8643a535..f05f993c98 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoMulticastDiscoveryBrokerTopicSendReceiveTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoMulticastDiscoveryBrokerTopicSendReceiveTest.java @@ -21,7 +21,7 @@ import javax.jms.JMSException; import org.apache.activemq.ActiveMQConnectionFactory; /** - * + * reproduced: https://issues.apache.org/jira/browse/AMQ-4107 */ public class TwoMulticastDiscoveryBrokerTopicSendReceiveTest extends TwoBrokerTopicSendReceiveTest {