From 6a73c40c187f80a21ddc61ccbf6d257003ba6fe3 Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Tue, 20 Apr 2010 15:13:18 +0000 Subject: [PATCH] resolve https://issues.apache.org/activemq/browse/AMQ-2651 - modified patch applied with thanks, did not change the default as there are a bunch of tests and user applications that depend on the current default git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@935954 13f79535-47bb-0310-9956-ffa450edef68 --- .../broker/region/PrefetchSubscription.java | 15 +- .../broker/region/policy/PolicyEntry.java | 25 +- .../OnePrefetchAsyncConsumerTest.java | 214 ++++++++++++++++++ 3 files changed, 245 insertions(+), 9 deletions(-) create mode 100644 activemq-core/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 241057497e..7ca55a447d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -60,6 +60,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { protected PendingMessageCursor pending; protected final List dispatched = new CopyOnWriteArrayList(); protected int prefetchExtension; + protected boolean usePrefetchExtension = true; protected long enqueueCounter; protected long dispatchCounter; protected long dequeueCounter; @@ -257,7 +258,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { // contract prefetch if dispatch required a pull if (getPrefetchSize() == 0) { prefetchExtension = Math.max(0, prefetchExtension - index); - } else if (context.isInTransaction()) { + } else if (usePrefetchExtension && context.isInTransaction()) { // extend prefetch window only if not a pulling consumer prefetchExtension = Math.max(prefetchExtension, index); } @@ -307,7 +308,9 @@ public abstract class PrefetchSubscription extends AbstractSubscription { node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); } if (ack.getLastMessageId().equals(node.getMessageId())) { - prefetchExtension = Math.max(prefetchExtension, index + 1); + if (usePrefetchExtension) { + prefetchExtension = Math.max(prefetchExtension, index + 1); + } destination = node.getRegionDestination(); callDispatchMatched = true; break; @@ -746,4 +749,12 @@ public abstract class PrefetchSubscription extends AbstractSubscription { public void setMaxAuditDepth(int maxAuditDepth) { this.maxAuditDepth = maxAuditDepth; } + + public boolean isUsePrefetchExtension() { + return usePrefetchExtension; + } + + public void setUsePrefetchExtension(boolean usePrefetchExtension) { + this.usePrefetchExtension = usePrefetchExtension; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index b88fabf2ff..68a38de880 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -83,7 +83,8 @@ public class PolicyEntry extends DestinationMapEntry { private int queueBrowserPrefetch=ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH; private int topicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH; private int durableTopicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH; - private int cursorMemoryHighWaterMark=70; + private boolean usePrefetchExtension = true; + private int cursorMemoryHighWaterMark = 70; private int storeUsageHighWaterMark = 100; @@ -195,7 +196,7 @@ public class PolicyEntry extends DestinationMapEntry { } sub.setMaxAuditDepth(getMaxAuditDepth()); sub.setMaxProducersToAudit(getMaxProducersToAudit()); - + sub.setUsePrefetchExtension(isUsePrefetchExtension()); } public void configure(Broker broker, SystemUsage memoryManager, QueueBrowserSubscription sub) { @@ -207,6 +208,7 @@ public class PolicyEntry extends DestinationMapEntry { sub.setPrefetchSize(getQueueBrowserPrefetch()); } sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); + sub.setUsePrefetchExtension(isUsePrefetchExtension()); } public void configure(Broker broker, SystemUsage memoryManager, QueueSubscription sub) { @@ -218,6 +220,7 @@ public class PolicyEntry extends DestinationMapEntry { sub.setPrefetchSize(getQueuePrefetch()); } sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); + sub.setUsePrefetchExtension(isUsePrefetchExtension()); } // Properties @@ -692,12 +695,20 @@ public class PolicyEntry extends DestinationMapEntry { this.durableTopicPrefetch = durableTopicPrefetch; } - public int getCursorMemoryHighWaterMark() { - return this.cursorMemoryHighWaterMark; - } + public boolean isUsePrefetchExtension() { + return this.usePrefetchExtension; + } - public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) { - this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark; + public void setUsePrefetchExtension(boolean usePrefetchExtension) { + this.usePrefetchExtension = usePrefetchExtension; + } + + public int getCursorMemoryHighWaterMark() { + return this.cursorMemoryHighWaterMark; + } + + public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) { + this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark; } public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) { diff --git a/activemq-core/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java new file mode 100644 index 0000000000..042ca51768 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.ConnectionConsumer; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.ServerSession; +import javax.jms.ServerSessionPool; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +// see: https://issues.apache.org/activemq/browse/AMQ-2651 +public class OnePrefetchAsyncConsumerTest extends EmbeddedBrokerTestSupport { + private static final Log LOG = LogFactory.getLog(OnePrefetchAsyncConsumerTest.class); + + private TestMutex testMutex; + protected Connection connection; + protected ConnectionConsumer connectionConsumer; + protected Queue queue; + protected CountDownLatch messageTwoDelay = new CountDownLatch(1); + + public void testPrefetchExtension() throws Exception { + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(queue); + + // when Msg1 is acked, the PrefetchSubscription will (incorrectly?) increment its prefetchExtension + producer.send(session.createTextMessage("Msg1")); + + // Msg2 will exhaust the ServerSessionPool (since it only has 1 ServerSession) + producer.send(session.createTextMessage("Msg2")); + + // Msg3 will cause the test to fail as it will attempt to retrieve an additional ServerSession from + // an exhausted ServerSessionPool due to the (incorrectly?) incremented prefetchExtension in the PrefetchSubscription + producer.send(session.createTextMessage("Msg3")); + + session.commit(); + + // wait for test to complete and the test result to get set + // this happens asynchronously since the messages are delivered asynchronously + synchronized (testMutex) { + while (!testMutex.testCompleted) { + testMutex.wait(); + } + } + + //test completed, result is ready + assertTrue("Attempted to retrieve more than one ServerSession at a time", testMutex.testSuccessful); + } + + protected void setUp() throws Exception { + bindAddress = "tcp://localhost:61616"; + super.setUp(); + + testMutex = new TestMutex(); + connection = createConnection(); + queue = createQueue(); + // note the last arg of 1, this becomes the prefetchSize in PrefetchSubscription + connectionConsumer = connection.createConnectionConsumer( + queue, null, new TestServerSessionPool(connection), 1); + connection.start(); + } + + protected void tearDown() throws Exception { + connectionConsumer.close(); + connection.close(); + super.tearDown(); + } + + protected BrokerService createBroker() throws Exception { + BrokerService answer = super.createBroker(); + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultEntry = new PolicyEntry(); + // ensure prefetch is exact. only delivery next when current is acked + defaultEntry.setUsePrefetchExtension(false); + policyMap.setDefaultEntry(defaultEntry); + answer.setDestinationPolicy(policyMap); + return answer; + } + + protected Queue createQueue() { + return new ActiveMQQueue(getDestinationString()); + } + + // simulates a ServerSessionPool with only 1 ServerSession + private class TestServerSessionPool implements ServerSessionPool { + Connection connection; + TestServerSession serverSession; + boolean serverSessionInUse = false; + + public TestServerSessionPool(Connection connection) throws JMSException { + this.connection = connection; + serverSession = new TestServerSession(this); + } + + public ServerSession getServerSession() throws JMSException { + synchronized (this) { + if (serverSessionInUse) { + LOG.info("asked for session while in use, not serialised delivery"); + synchronized (testMutex) { + testMutex.testSuccessful = false; + testMutex.testCompleted = true; + } + } + serverSessionInUse = true; + return serverSession; + } + } + } + + private class TestServerSession implements ServerSession { + TestServerSessionPool pool; + Session session; + + public TestServerSession(TestServerSessionPool pool) throws JMSException { + this.pool = pool; + session = pool.connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + session.setMessageListener(new TestMessageListener()); + } + + public Session getSession() throws JMSException { + return session; + } + + public void start() throws JMSException { + // use a separate thread to process the message asynchronously + new Thread() { + public void run() { + // let the session deliver the message + session.run(); + + // commit the tx + try { + session.commit(); + } + catch (JMSException e) { + } + + // return ServerSession to pool + synchronized (pool) { + pool.serverSessionInUse = false; + } + + // let the test check if the test was completed + synchronized (testMutex) { + testMutex.notify(); + } + } + }.start(); + } + } + + private class TestMessageListener implements MessageListener { + public void onMessage(Message message) { + try { + String text = ((TextMessage)message).getText(); + LOG.info("got message: " + text); + if (text.equals("Msg3")) { + // if we get here, Exception in getServerSession() was not thrown, test is successful + // this obviously doesn't happen now, + // need to fix prefetchExtension computation logic in PrefetchSubscription to get here + synchronized (testMutex) { + if (!testMutex.testCompleted) { + testMutex.testSuccessful = true; + testMutex.testCompleted = true; + } + } + } + else if (text.equals("Msg2")) { + // simulate long message processing so that Msg3 comes when Msg2 is still being processed + // and thus the single ServerSession is in use + TimeUnit.SECONDS.sleep(4); + } + } + catch (JMSException e) { + } + catch (InterruptedException e) { + } + } + } + + private class TestMutex { + boolean testCompleted = false; + boolean testSuccessful = true; + } +} \ No newline at end of file