From 687badb4fdd1b738103ab409f4342a82fb0e42fd Mon Sep 17 00:00:00 2001 From: Vasco Veloso Date: Wed, 25 Jan 2017 10:08:37 +0000 Subject: [PATCH] AMQ-6577: honour usePrefetchExtension in TopicSubscription. AMQ-6577: move usePrefetchExtension flag to AbstractSubscription to promote reuse. --- .../broker/region/AbstractSubscription.java | 9 + .../broker/region/PrefetchSubscription.java | 17 +- .../broker/region/TopicSubscription.java | 7 +- .../broker/region/policy/PolicyEntry.java | 1 + .../java/org/apache/activemq/TestSupport.java | 13 +- .../usecases/ExpiredMessagesTest.java | 235 +++++++++++------- 6 files changed, 183 insertions(+), 99 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java index 3cb2f1fc07..9a51a837d7 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java @@ -52,6 +52,7 @@ public abstract class AbstractSubscription implements Subscription { protected final CopyOnWriteArrayList destinations = new CopyOnWriteArrayList(); protected final AtomicInteger prefetchExtension = new AtomicInteger(0); + private boolean usePrefetchExtension = true; private BooleanExpression selectorExpression; private ObjectName objectName; private int cursorMemoryHighWaterMark = 70; @@ -185,6 +186,14 @@ public abstract class AbstractSubscription implements Subscription { return info.getPrefetchSize(); } + public boolean isUsePrefetchExtension() { + return usePrefetchExtension; + } + + public void setUsePrefetchExtension(boolean usePrefetchExtension) { + this.usePrefetchExtension = usePrefetchExtension; + } + public void setPrefetchSize(int newSize) { info.setPrefetchSize(newSize); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index db133c1a43..314285fdfa 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -56,7 +56,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription { protected PendingMessageCursor pending; protected final List dispatched = new ArrayList(); - protected boolean usePrefetchExtension = true; private int maxProducersToAudit=32; private int maxAuditDepth=2048; protected final SystemUsage usageManager; @@ -263,7 +262,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { registerRemoveSync(context, node); } - if (usePrefetchExtension && getPrefetchSize() != 0 && ack.isInTransaction()) { + if (isUsePrefetchExtension() && getPrefetchSize() != 0 && ack.isInTransaction()) { // allow transaction batch to exceed prefetch while (true) { int currentExtension = prefetchExtension.get(); @@ -288,7 +287,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { final MessageReference node = iter.next(); Destination nodeDest = (Destination) node.getRegionDestination(); if (ack.getLastMessageId().equals(node.getMessageId())) { - if (usePrefetchExtension && getPrefetchSize() != 0) { + if (isUsePrefetchExtension() && getPrefetchSize() != 0) { // allow batch to exceed prefetch while (true) { int currentExtension = prefetchExtension.get(); @@ -328,7 +327,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { nodeDest.getDestinationStatistics().getInflight().decrement(); if (ack.getLastMessageId().equals(messageId)) { - if (usePrefetchExtension && getPrefetchSize() != 0) { + if (isUsePrefetchExtension() && getPrefetchSize() != 0) { // allow batch to exceed prefetch while (true) { int currentExtension = prefetchExtension.get(); @@ -444,7 +443,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { @Override public void beforeEnd() { - if (usePrefetchExtension && getPrefetchSize() != 0) { + if (isUsePrefetchExtension() && getPrefetchSize() != 0) { while (true) { int currentExtension = prefetchExtension.get(); int newExtension = Math.max(0, currentExtension - 1); @@ -892,14 +891,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription { } } - public boolean isUsePrefetchExtension() { - return usePrefetchExtension; - } - - public void setUsePrefetchExtension(boolean usePrefetchExtension) { - this.usePrefetchExtension = usePrefetchExtension; - } - @Override public void setPrefetchSize(int prefetchSize) { this.info.setPrefetchSize(prefetchSize); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index 095735a6fb..65c2ba954b 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -422,6 +422,9 @@ public class TopicSubscription extends AbstractSubscription { } private void incrementPrefetchExtension(int amount) { + if (!isUsePrefetchExtension()) { + return; + } while (true) { int currentExtension = prefetchExtension.get(); int newExtension = Math.max(0, currentExtension + amount); @@ -748,7 +751,8 @@ public class TopicSubscription extends AbstractSubscription { @Override public String toString() { return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered=" - + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded() + ", prefetchExtension=" + prefetchExtension.get(); + + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded() + ", prefetchExtension=" + prefetchExtension.get() + + ", usePrefetchExtension=" + isUsePrefetchExtension(); } @Override @@ -781,4 +785,5 @@ public class TopicSubscription extends AbstractSubscription { LOG.trace("Caught exception on dispatch after prefetch size change."); } } + } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index b8401a434b..5b7ff0e184 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -313,6 +313,7 @@ public class PolicyEntry extends DestinationMapEntry { public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) { configurePrefetch(subscription); + subscription.setUsePrefetchExtension(isUsePrefetchExtension()); subscription.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); if (pendingMessageLimitStrategy != null) { int value = pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java index 877b22a30a..fa07b8dfe9 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java @@ -18,7 +18,7 @@ package org.apache.activemq; import java.io.File; import java.io.IOException; -import java.net.ServerSocket; +import java.util.List; import java.util.Map; import javax.jms.Connection; @@ -28,13 +28,13 @@ import javax.jms.Message; import javax.jms.TextMessage; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; -import javax.net.ServerSocketFactory; import org.apache.activemq.broker.BrokerRegistry; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.broker.region.DestinationStatistics; import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQQueue; @@ -157,6 +157,15 @@ public abstract class TestSupport extends CombinationTestSupport { return result; } + public static List getDestinationConsumers(BrokerService broker, ActiveMQDestination destination) { + List result = null; + org.apache.activemq.broker.region.Destination dest = getDestination(broker, destination); + if (dest != null) { + result = dest.getConsumers(); + } + return result; + } + public static org.apache.activemq.broker.region.Destination getDestination(BrokerService target, ActiveMQDestination destination) { org.apache.activemq.broker.region.Destination result = null; for (org.apache.activemq.broker.region.Destination dest : getDestinationMap(target, destination).values()) { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java index 0187aadc80..9b5354225d 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java @@ -22,11 +22,14 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.DestinationStatistics; +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.TopicSubscription; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.store.memory.MemoryPersistenceAdapter; import org.apache.activemq.util.Wait; import org.slf4j.Logger; @@ -34,9 +37,11 @@ import org.slf4j.LoggerFactory; import javax.jms.*; +import java.util.List; import java.util.concurrent.atomic.AtomicLong; import static org.apache.activemq.TestSupport.getDestination; +import static org.apache.activemq.TestSupport.getDestinationConsumers; import static org.apache.activemq.TestSupport.getDestinationStatistics; public class ExpiredMessagesTest extends CombinationTestSupport { @@ -48,11 +53,12 @@ public class ExpiredMessagesTest extends CombinationTestSupport { Session session; MessageProducer producer; MessageConsumer consumer; - public ActiveMQDestination destination = new ActiveMQQueue("test"); - public ActiveMQDestination dlqDestination = new ActiveMQQueue("ActiveMQ.DLQ"); - public boolean useTextMessage = true; - public boolean useVMCursor = true; - protected String brokerUri; + private ActiveMQDestination dlqDestination = new ActiveMQQueue("ActiveMQ.DLQ"); + private boolean useTextMessage = true; + private boolean useVMCursor = true; + private boolean deleteAllMessages = true; + private boolean usePrefetchExtension = true; + private String brokerUri; public static Test suite() { return suite(ExpiredMessagesTest.class); @@ -64,20 +70,153 @@ public class ExpiredMessagesTest extends CombinationTestSupport { @Override protected void setUp() throws Exception { - final boolean deleteAllMessages = true; - broker = createBroker(deleteAllMessages, 100); - brokerUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); + } @Override protected void tearDown() throws Exception { + if (null != producer) { + producer.close(); + } + if (null != consumer) { + consumer.close(); + } + session.close(); connection.stop(); broker.stop(); broker.waitUntilStopped(); } public void testExpiredMessages() throws Exception { + final ActiveMQDestination destination = new ActiveMQQueue("test"); + final int numMessagesToSend = 10000; + buildBroker(destination); + + final DestinationStatistics view = verifyMessageExpirationOnDestination(destination, numMessagesToSend); + + verifyDestinationDlq(destination, numMessagesToSend, view); + } + + public void testExpiredMessages_onTopic_withPrefetchExtension() throws Exception { + final ActiveMQDestination destination = new ActiveMQTopic("test"); + final int numMessagesToSend = 10000; + + usePrefetchExtension = true; + + buildBroker(destination); + + verifyMessageExpirationOnDestination(destination, numMessagesToSend); + // We don't check the DLQ because non-persistent messages on topics are discarded instead. + + final List subscriptions = getDestinationConsumers(broker, destination); + + assertTrue("prefetch extension was not incremented", + subscriptions.stream(). + filter(s -> s instanceof TopicSubscription). + mapToInt(s -> ((TopicSubscription)s).getPrefetchExtension().get()). + allMatch(e -> e > 0)); + } + + public void testExpiredMessages_onTopic_withoutPrefetchExtension() throws Exception { + final ActiveMQDestination destination = new ActiveMQTopic("test"); + final int numMessagesToSend = 10000; + + usePrefetchExtension = false; + + buildBroker(destination); + + verifyMessageExpirationOnDestination(destination, numMessagesToSend); + // We don't check the DLQ because non-persistent messages on topics are discarded instead. + + final List subscriptions = getDestinationConsumers(broker, destination); + + assertTrue("prefetch extension was incremented", + subscriptions.stream(). + filter(s -> s instanceof TopicSubscription). + mapToInt(s -> ((TopicSubscription)s).getPrefetchExtension().get()). + allMatch(e -> e == 0)); + } + + private void buildBroker(ActiveMQDestination destination) throws Exception { + broker = createBroker(deleteAllMessages, usePrefetchExtension, 100, destination); + brokerUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); + } + + public void testRecoverExpiredMessages() throws Exception { + final ActiveMQDestination destination = new ActiveMQQueue("test"); + + buildBroker(destination); + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( + "failover://"+brokerUri); + connection = factory.createConnection(); + connection.start(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + producer = session.createProducer(destination); + producer.setTimeToLive(2000); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + Thread producingThread = new Thread("Producing Thread") { + @Override + public void run() { + try { + int i = 0; + while (i++ < 1000) { + Message message = useTextMessage ? session + .createTextMessage("test") : session + .createObjectMessage("test"); + producer.send(message); + } + } catch (Throwable ex) { + ex.printStackTrace(); + } + } + }; + + producingThread.start(); + producingThread.join(); + + DestinationStatistics view = getDestinationStatistics(broker, destination); + LOG.info("Stats: size: " + view.getMessages().getCount() + ", enqueues: " + + view.getEnqueues().getCount() + ", dequeues: " + + view.getDequeues().getCount() + ", dispatched: " + + view.getDispatched().getCount() + ", inflight: " + + view.getInflight().getCount() + ", expiries: " + + view.getExpired().getCount()); + + LOG.info("stopping broker"); + broker.stop(); + broker.waitUntilStopped(); + + Thread.sleep(5000); + + LOG.info("recovering broker"); + final boolean deleteAllMessages = false; + final boolean usePrefetchExtension = true; + broker = createBroker(deleteAllMessages, usePrefetchExtension, 5000, destination); + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + DestinationStatistics view = getDestinationStatistics(broker, destination); + LOG.info("Stats: size: " + view.getMessages().getCount() + ", enqueues: " + + view.getEnqueues().getCount() + ", dequeues: " + + view.getDequeues().getCount() + ", dispatched: " + + view.getDispatched().getCount() + ", inflight: " + + view.getInflight().getCount() + ", expiries: " + + view.getExpired().getCount()); + + return view.getMessages().getCount() == 0; + } + }); + + view = getDestinationStatistics(broker, destination); + assertEquals("Expect empty queue, QueueSize: ", 0, view.getMessages().getCount()); + assertEquals("all dequeues were expired", view.getDequeues().getCount(), view.getExpired().getCount()); + } + + private DestinationStatistics verifyMessageExpirationOnDestination(ActiveMQDestination destination, final int numMessagesToSend) throws Exception { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri); connection = factory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -100,7 +239,6 @@ public class ExpiredMessagesTest extends CombinationTestSupport { Thread.sleep(100); end = System.currentTimeMillis(); } - consumer.close(); } catch (Throwable ex) { ex.printStackTrace(); } @@ -109,7 +247,6 @@ public class ExpiredMessagesTest extends CombinationTestSupport { consumerThread.start(); - final int numMessagesToSend = 10000; Thread producingThread = new Thread("Producing Thread") { @Override public void run() { @@ -118,7 +255,6 @@ public class ExpiredMessagesTest extends CombinationTestSupport { while (i++ < numMessagesToSend) { producer.send(session.createTextMessage("test")); } - producer.close(); } catch (Throwable ex) { ex.printStackTrace(); } @@ -129,7 +265,6 @@ public class ExpiredMessagesTest extends CombinationTestSupport { consumerThread.join(); producingThread.join(); - session.close(); final DestinationStatistics view = getDestinationStatistics(broker, destination); @@ -171,7 +306,10 @@ public class ExpiredMessagesTest extends CombinationTestSupport { return view.getMessages().getCount() == 0; } })); + return view; + } + private void verifyDestinationDlq(ActiveMQDestination destination, int numMessagesToSend, DestinationStatistics view) throws Exception { final long expiredBeforeEnqueue = numMessagesToSend - view.getEnqueues().getCount(); final long totalExpiredCount = view.getExpired().getCount() + expiredBeforeEnqueue; @@ -225,77 +363,7 @@ public class ExpiredMessagesTest extends CombinationTestSupport { addCombinationValues("useVMCursor", new Object[] {Boolean.TRUE, Boolean.FALSE}); } - public void testRecoverExpiredMessages() throws Exception { - - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( - "failover://"+brokerUri); - connection = factory.createConnection(); - connection.start(); - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - producer = session.createProducer(destination); - producer.setTimeToLive(2000); - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - - Thread producingThread = new Thread("Producing Thread") { - @Override - public void run() { - try { - int i = 0; - while (i++ < 1000) { - Message message = useTextMessage ? session - .createTextMessage("test") : session - .createObjectMessage("test"); - producer.send(message); - } - producer.close(); - } catch (Throwable ex) { - ex.printStackTrace(); - } - } - }; - - producingThread.start(); - producingThread.join(); - - DestinationStatistics view = getDestinationStatistics(broker, destination); - LOG.info("Stats: size: " + view.getMessages().getCount() + ", enqueues: " - + view.getEnqueues().getCount() + ", dequeues: " - + view.getDequeues().getCount() + ", dispatched: " - + view.getDispatched().getCount() + ", inflight: " - + view.getInflight().getCount() + ", expiries: " - + view.getExpired().getCount()); - - LOG.info("stopping broker"); - broker.stop(); - broker.waitUntilStopped(); - - Thread.sleep(5000); - - LOG.info("recovering broker"); - final boolean deleteAllMessages = false; - broker = createBroker(deleteAllMessages, 5000); - - Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - DestinationStatistics view = getDestinationStatistics(broker, destination); - LOG.info("Stats: size: " + view.getMessages().getCount() + ", enqueues: " - + view.getEnqueues().getCount() + ", dequeues: " - + view.getDequeues().getCount() + ", dispatched: " - + view.getDispatched().getCount() + ", inflight: " - + view.getInflight().getCount() + ", expiries: " - + view.getExpired().getCount()); - - return view.getMessages().getCount() == 0; - } - }); - - view = getDestinationStatistics(broker, destination); - assertEquals("Expect empty queue, QueueSize: ", 0, view.getMessages().getCount()); - assertEquals("all dequeues were expired", view.getDequeues().getCount(), view.getExpired().getCount()); - } - - private BrokerService createBroker(boolean deleteAllMessages, long expireMessagesPeriod) throws Exception { + private BrokerService createBroker(boolean deleteAllMessages, boolean usePrefetchExtension, long expireMessagesPeriod, ActiveMQDestination destination) throws Exception { BrokerService broker = new BrokerService(); broker.setBrokerName("localhost"); broker.setDestinations(new ActiveMQDestination[]{destination}); @@ -307,6 +375,7 @@ public class ExpiredMessagesTest extends CombinationTestSupport { } defaultPolicy.setExpireMessagesPeriod(expireMessagesPeriod); defaultPolicy.setMaxExpirePageSize(1200); + defaultPolicy.setUsePrefetchExtension(usePrefetchExtension); PolicyMap policyMap = new PolicyMap(); policyMap.setDefaultEntry(defaultPolicy); broker.setDestinationPolicy(policyMap);