From 2d9959a6f6f33f7138606073e425a74261ec3125 Mon Sep 17 00:00:00 2001 From: gtully Date: Mon, 15 Dec 2014 14:12:08 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5476 - ZeroPrefetchConsumerTest regression - fix default in connection factory and refactor prefetchExtension support - https://issues.apache.org/activemq/browse/AMQ-2560 --- .../broker/region/PrefetchSubscription.java | 60 +++++++++---------- .../activemq/ActiveMQConnectionFactory.java | 2 +- .../activemq/ZeroPrefetchConsumerTest.java | 14 +++-- 3 files changed, 38 insertions(+), 38 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 8b8a78840c..b101d72de2 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -234,26 +234,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription { index++; acknowledge(context, ack, node); if (ack.getLastMessageId().equals(messageId)) { - // contract prefetch if dispatch required a pull - if (getPrefetchSize() == 0) { - // Protect extension update against parallel updates. - while (true) { - int currentExtension = prefetchExtension.get(); - int newExtension = Math.max(0, currentExtension - index); - if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { - break; - } - } - } else if (usePrefetchExtension && context.isInTransaction()) { - // extend prefetch window only if not a pulling consumer - while (true) { - int currentExtension = prefetchExtension.get(); - int newExtension = Math.max(currentExtension, index); - if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { - break; - } - } - } destination = (Destination) node.getRegionDestination(); callDispatchMatched = true; break; @@ -283,14 +263,17 @@ public abstract class PrefetchSubscription extends AbstractSubscription { registerRemoveSync(context, node); } - // Protect extension update against parallel updates. - while (true) { - int currentExtension = prefetchExtension.get(); - int newExtension = Math.max(0, currentExtension - 1); - if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { - break; + if (usePrefetchExtension && getPrefetchSize() != 0 && ack.isInTransaction()) { + // allow transaction batch to exceed prefetch + while (true) { + int currentExtension = prefetchExtension.get(); + int newExtension = Math.max(currentExtension, currentExtension + 1); + if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { + break; + } } } + acknowledge(context, ack, node); destination = (Destination) node.getRegionDestination(); callDispatchMatched = true; @@ -313,7 +296,8 @@ public abstract class PrefetchSubscription extends AbstractSubscription { nodeDest.getDestinationStatistics().getInflight().decrement(); } if (ack.getLastMessageId().equals(node.getMessageId())) { - if (usePrefetchExtension) { + if (usePrefetchExtension && getPrefetchSize() != 0) { + // allow batch to exceed prefetch while (true) { int currentExtension = prefetchExtension.get(); int newExtension = Math.max(currentExtension, index + 1); @@ -425,6 +409,19 @@ public abstract class PrefetchSubscription extends AbstractSubscription { context.getTransaction().addSynchronization( new Synchronization() { + @Override + public void beforeEnd() { + if (usePrefetchExtension && getPrefetchSize() != 0) { + while (true) { + int currentExtension = prefetchExtension.get(); + int newExtension = Math.max(0, currentExtension - 1); + if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { + break; + } + } + } + } + @Override public void afterCommit() throws Exception { @@ -516,7 +513,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { */ @Override public boolean isFull() { - return dispatched.size() - prefetchExtension.get() >= info.getPrefetchSize(); + return getPrefetchSize() == 0 ? prefetchExtension.get() == 0 : dispatched.size() - prefetchExtension.get() >= info.getPrefetchSize(); } /** @@ -537,7 +534,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { @Override public int countBeforeFull() { - return info.getPrefetchSize() + prefetchExtension.get() - dispatched.size(); + return getPrefetchSize() == 0 ? prefetchExtension.get() : info.getPrefetchSize() + prefetchExtension.get() - dispatched.size(); } @Override @@ -696,13 +693,12 @@ public abstract class PrefetchSubscription extends AbstractSubscription { okForAckAsDispatchDone.countDown(); - // No reentrant lock - Patch needed to IndirectMessageReference on method lock MessageDispatch md = createMessageDispatch(node, message); - // NULL messages don't count... they don't get Acked. if (node != QueueMessageReference.NULL_MESSAGE) { dispatchCounter++; dispatched.add(node); - } else { + } + if (getPrefetchSize() == 0) { while (true) { int currentExtension = prefetchExtension.get(); int newExtension = Math.max(0, currentExtension - 1); diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java index 3354ab3669..1fbf604ecb 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java @@ -173,7 +173,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne private long consumerFailoverRedeliveryWaitPeriod = 0; private boolean checkForDuplicates = true; private ClientInternalExceptionListener clientInternalExceptionListener; - private boolean messagePrioritySupported = true; + private boolean messagePrioritySupported = false; private boolean transactedIndividualAck = false; private boolean nonBlockingRedelivery = false; private int maxThreadPoolSize = ActiveMQConnection.DEFAULT_THREAD_POOL_SIZE; diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java index acf9c03285..d4cecaba06 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java @@ -174,7 +174,7 @@ public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport { } private void doTestManyMessageConsumer(boolean transacted) throws Exception { - Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); + Session session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE); MessageProducer producer = session.createProducer(queue); producer.send(session.createTextMessage("Msg1")); @@ -221,12 +221,11 @@ public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport { session.commit(); } // Now using other consumer - // this call should return the next message (Msg5) still left on the queue + // this call should return the next message still left on the queue answer = (TextMessage)consumer.receive(5000); assertEquals("Should have received a message!", answer.getText(), "Msg6"); // read one more message without commit - // Now using other consumer - // this call should return the next message (Msg5) still left on the queue + // this call should return the next message still left on the queue answer = (TextMessage)consumer.receive(5000); assertEquals("Should have received a message!", answer.getText(), "Msg7"); if (transacted) { @@ -247,12 +246,17 @@ public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport { doTestManyMessageConsumerWithSend(true); } + public void testManyMessageConsumerWithTxSendPrioritySupport() throws Exception { + ((ActiveMQConnection)connection).setMessagePrioritySupported(true); + doTestManyMessageConsumerWithSend(true); + } + public void testManyMessageConsumerWithSendNoTransaction() throws Exception { doTestManyMessageConsumerWithSend(false); } private void doTestManyMessageConsumerWithSend(boolean transacted) throws Exception { - Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); + Session session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED :Session.AUTO_ACKNOWLEDGE); MessageProducer producer = session.createProducer(queue); producer.send(session.createTextMessage("Msg1"));