From 06972183f9dc8ed575e070d553f189552b027601 Mon Sep 17 00:00:00 2001 From: gtully Date: Tue, 5 Apr 2016 13:22:23 +0100 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-6215 - support 0 maxBrowsePageSize and maxExpirePageSize such that lazyDispatch ensures highest priority messages is available to a pull consumer (cherry picked from commit a3a8c1c5256aa8ea1067afe3e1586832e5aa1821) --- .../broker/region/BaseDestination.java | 2 +- .../apache/activemq/broker/region/Queue.java | 83 ++-- ...eZeroPrefetchLazyDispatchPriorityTest.java | 393 ++++++++++++++++++ 3 files changed, 439 insertions(+), 39 deletions(-) create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 5ae2d286af..75f2ee0e56 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -316,7 +316,7 @@ public abstract class BaseDestination implements Destination { @Override public int getMaxBrowsePageSize() { - return this.maxBrowsePageSize > 0 ? this.maxBrowsePageSize : getMaxPageSize(); + return this.maxBrowsePageSize; } @Override diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index cf53b5178b..786640e8b7 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1144,17 +1144,17 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index final ConnectionContext connectionContext = createConnectionContext(); try { int maxPageInAttempts = 1; - messagesLock.readLock().lock(); - try { - maxPageInAttempts += (messages.size() / getMaxPageSize()); - } finally { - messagesLock.readLock().unlock(); + if (max > 0) { + messagesLock.readLock().lock(); + try { + maxPageInAttempts += (messages.size() / max); + } finally { + messagesLock.readLock().unlock(); + } + while (shouldPageInMoreForBrowse(max) && maxPageInAttempts-- > 0) { + pageInMessages(!memoryUsage.isFull(110), max); + } } - - while (shouldPageInMoreForBrowse(max) && maxPageInAttempts-- > 0) { - pageInMessages(!memoryUsage.isFull(110)); - }; - doBrowseList(browseList, max, dispatchPendingList, pagedInPendingDispatchLock, connectionContext, "redeliveredWaitingDispatch+pagedInPendingDispatch"); doBrowseList(browseList, max, pagedInMessages, pagedInMessagesLock, connectionContext, "pagedInMessages"); @@ -1262,7 +1262,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index List list = null; long originalMessageCount = this.destinationStatistics.getMessages().getCount(); do { - doPageIn(true, false); // signal no expiry processing needed. + doPageIn(true, false, getMaxPageSize()); // signal no expiry processing needed. pagedInMessagesLock.readLock().lock(); try { list = new ArrayList(pagedInMessages.values()); @@ -1630,7 +1630,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index if (pageInMoreMessages || hasBrowsers || !dispatchPendingList.hasRedeliveries()) { try { - pageInMessages(hasBrowsers); + pageInMessages(hasBrowsers && getMaxBrowsePageSize() > 0, getMaxPageSize()); } catch (Throwable e) { LOG.error("Failed to page in more queue messages ", e); } @@ -1895,11 +1895,11 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index } private void doPageIn(boolean force) throws Exception { - doPageIn(force, true); + doPageIn(force, true, getMaxPageSize()); } - private void doPageIn(boolean force, boolean processExpired) throws Exception { - PendingList newlyPaged = doPageInForDispatch(force, processExpired); + private void doPageIn(boolean force, boolean processExpired, int maxPageSize) throws Exception { + PendingList newlyPaged = doPageInForDispatch(force, processExpired, maxPageSize); pagedInPendingDispatchLock.writeLock().lock(); try { if (dispatchPendingList.isEmpty()) { @@ -1917,11 +1917,11 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index } } - private PendingList doPageInForDispatch(boolean force, boolean processExpired) throws Exception { + private PendingList doPageInForDispatch(boolean force, boolean processExpired, int maxPageSize) throws Exception { List result = null; PendingList resultList = null; - int toPageIn = Math.min(getMaxPageSize(), messages.size()); + int toPageIn = Math.min(maxPageSize, messages.size()); int pagedInPendingSize = 0; pagedInPendingDispatchLock.readLock().lock(); try { @@ -1929,24 +1929,29 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index } finally { pagedInPendingDispatchLock.readLock().unlock(); } - - LOG.debug("{} toPageIn: {}, Inflight: {}, pagedInMessages.size {}, pagedInPendingDispatch.size {}, enqueueCount: {}, dequeueCount: {}, memUsage:{}", - new Object[]{ - this, - toPageIn, - destinationStatistics.getInflight().getCount(), - pagedInMessages.size(), - pagedInPendingSize, - destinationStatistics.getEnqueues().getCount(), - destinationStatistics.getDequeues().getCount(), - getMemoryUsage().getUsage() - }); if (isLazyDispatch() && !force) { // Only page in the minimum number of messages which can be // dispatched immediately. toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn); } - if (toPageIn > 0 && (force || (!consumers.isEmpty() && pagedInPendingSize < getMaxPageSize()))) { + + if (LOG.isDebugEnabled()) { + LOG.debug("{} toPageIn: {}, force:{}, Inflight: {}, pagedInMessages.size {}, pagedInPendingDispatch.size {}, enqueueCount: {}, dequeueCount: {}, memUsage:{}, maxPageSize:{}", + new Object[]{ + this, + toPageIn, + force, + destinationStatistics.getInflight().getCount(), + pagedInMessages.size(), + pagedInPendingSize, + destinationStatistics.getEnqueues().getCount(), + destinationStatistics.getDequeues().getCount(), + getMemoryUsage().getUsage(), + maxPageSize + }); + } + + if (toPageIn > 0 && (force || (haveRealConsumer() && pagedInPendingSize < maxPageSize))) { int count = 0; result = new ArrayList(toPageIn); messagesLock.writeLock().lock(); @@ -1954,7 +1959,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index try { messages.setMaxBatchSize(toPageIn); messages.reset(); - while (messages.hasNext() && count < toPageIn) { + while (count < toPageIn && messages.hasNext()) { MessageReference node = messages.next(); messages.remove(); @@ -2013,6 +2018,10 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index return resultList; } + private final boolean haveRealConsumer() { + return consumers.size() - browserDispatches.size() > 0; + } + private void doDispatch(PendingList list) throws Exception { boolean doWakeUp = false; @@ -2173,8 +2182,8 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index subs.getConsumerInfo().incrementAssignedGroupCount(destination); } - protected void pageInMessages(boolean force) throws Exception { - doDispatch(doPageInForDispatch(force, true)); + protected void pageInMessages(boolean force, int maxPageSize) throws Exception { + doDispatch(doPageInForDispatch(force, true, maxPageSize)); } private void addToConsumerList(Subscription sub) { @@ -2192,20 +2201,18 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index private int getConsumerMessageCountBeforeFull() throws Exception { int total = 0; - boolean zeroPrefetch = false; consumersLock.readLock().lock(); try { for (Subscription s : consumers) { - zeroPrefetch |= s.getPrefetchSize() == 0; + if (s.isBrowser()) { + continue; + } int countBeforeFull = s.countBeforeFull(); total += countBeforeFull; } } finally { consumersLock.readLock().unlock(); } - if (total == 0 && zeroPrefetch) { - total = 1; - } return total; } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java new file mode 100644 index 0000000000..cff3beeee5 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java @@ -0,0 +1,393 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.List; +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.QueueBrowser; +import javax.jms.Session; +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class QueueZeroPrefetchLazyDispatchPriorityTest extends TestCase { + + static final Logger LOG = LoggerFactory.getLogger(QueueZeroPrefetchLazyDispatchPriorityTest.class); + private BrokerService broker; + public static final byte[] PAYLOAD = new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; + + + protected void setUp() throws Exception { + broker = createBroker(); + broker.start(); + broker.waitUntilStarted(); + } + + protected void tearDown() throws Exception { + + if (broker != null) { + broker.stop(); + } + } + + + public void testPriorityMessages() throws Exception { + + + for (int i = 0; i < 5; i++) { + + + //send 4 message priority MEDIUM + produceMessages(4, 4, "TestQ"); + + + //send 1 message priority HIGH + produceMessages(1, 5, "TestQ"); + + + LOG.info("On iteration " + i); + + + Thread.sleep(500); + + + // consume messages + ArrayList consumeList = consumeMessages("TestQ"); + LOG.info("Consumed list " + consumeList.size()); + + + // compare lists + assertEquals("message 1 should be priority high", 5, consumeList.get(0).getJMSPriority()); + assertEquals("message 2 should be priority medium", 4, consumeList.get(1).getJMSPriority()); + assertEquals("message 3 should be priority medium", 4, consumeList.get(2).getJMSPriority()); + assertEquals("message 4 should be priority medium", 4, consumeList.get(3).getJMSPriority()); + assertEquals("message 5 should be priority medium", 4, consumeList.get(4).getJMSPriority()); + } + + } + + + public void testPriorityMessagesMoreThanPageSize() throws Exception { + + + final int numToSend = 450; + for (int i = 0; i < 5; i++) { + + produceMessages(numToSend - 1, 4, "TestQ"); + + // ensure we get expiry processing + Thread.sleep(700); + + + //send 1 message priority HIGH + produceMessages(1, 5, "TestQ"); + + Thread.sleep(500); + + LOG.info("On iteration " + i); + + // consume messages + ArrayList consumeList = consumeMessages("TestQ"); + LOG.info("Consumed list " + consumeList.size()); + + + // compare lists + assertEquals("message 1 should be priority high", 5, consumeList.get(0).getJMSPriority()); + for (int j = 1; j < (numToSend - 1); j++) { + assertEquals("message " + j + " should be priority medium", 4, consumeList.get(j).getJMSPriority()); + } + } + + } + + + public void testLongLivedPriorityConsumer() throws Exception { + + final int numToSend = 150; + + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()); + Connection connection = connectionFactory.createConnection(); + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(new ActiveMQQueue("TestQ")); + connection.start(); + + for (int i = 0; i < 5; i++) { + + produceMessages(numToSend - 1, 4, "TestQ"); + + //send 1 message priority HIGH + produceMessages(1, 5, "TestQ"); + + Message message = consumer.receive(4000); + + assertEquals("message should be priority high", 5, message.getJMSPriority()); + + } + } finally { + connection.close(); + } + + ArrayList consumeList = consumeMessages("TestQ"); + LOG.info("Consumed list " + consumeList.size()); + + for (Message message : consumeList) { + assertEquals("should be priority medium", 4, message.getJMSPriority()); + } + + } + + + public void testPriorityMessagesWithJmsBrowser() throws Exception { + + + final int numToSend = 250; + for (int i = 0; i < 5; i++) { + + produceMessages(numToSend - 1, 4, "TestQ"); + + ArrayList browsed = browseMessages("TestQ"); + + LOG.info("Browsed: " + browsed.size()); + + //send 1 message priority HIGH + produceMessages(1, 5, "TestQ"); + + Thread.sleep(500); + + LOG.info("On iteration " + i); + + Message message = consumeOneMessage("TestQ"); + assertNotNull(message); + assertEquals(5, message.getJMSPriority()); + + // consume messages + ArrayList consumeList = consumeMessages("TestQ"); + LOG.info("Consumed list " + consumeList.size()); + + + // compare lists + //assertEquals("Iteration: " + i +", message 1 should be priority high", 5, consumeList.get(0).getJMSPriority()); + for (int j = 1; j < (numToSend - 1); j++) { + assertEquals("Iteration: " + i + ", message " + j + " should be priority medium", 4, consumeList.get(j).getJMSPriority()); + } + } + + } + + public void testJmsBrowserGetsPagedIn() throws Exception { + + + final int numToSend = 10; + for (int i = 0; i < 10; i++) { + + produceMessages(numToSend, 4, "TestQ"); + + ArrayList browsed = browseMessages("TestQ"); + + LOG.info("Browsed: " + browsed.size()); + + assertEquals(0, browsed.size()); + + Message message = consumeOneMessage("TestQ", Session.CLIENT_ACKNOWLEDGE); + assertNotNull(message); + + browsed = browseMessages("TestQ"); + + LOG.info("Browsed: " + browsed.size()); + + assertEquals("see only the paged in for pull", 1, browsed.size()); + + // consume messages + ArrayList consumeList = consumeMessages("TestQ"); + LOG.info("Consumed list " + consumeList.size()); + assertEquals(numToSend, consumeList.size()); + + } + + } + + + private void produceMessages(int numberOfMessages, int priority, String queueName) throws Exception { + + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()); + connectionFactory.setConnectionIDPrefix("pri-" + priority); + Connection connection = connectionFactory.createConnection(); + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(new ActiveMQQueue(queueName)); + connection.start(); + + + for (int i = 0; i < numberOfMessages; i++) { + BytesMessage m = session.createBytesMessage(); + m.writeBytes(PAYLOAD); + m.setJMSPriority(priority); + producer.send(m, Message.DEFAULT_DELIVERY_MODE, m.getJMSPriority(), Message.DEFAULT_TIME_TO_LIVE); + } + + } finally { + + if (connection != null) { + connection.close(); + } + } + } + + + private ArrayList consumeMessages(String queueName) throws Exception { + + ArrayList returnedMessages = new ArrayList(); + + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()); + Connection connection = connectionFactory.createConnection(); + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(new ActiveMQQueue(queueName)); + connection.start(); + boolean finished = false; + + while (!finished) { + + Message message = consumer.receive(1000); + if (message == null) { + finished = true; + } + + if (message != null) { + returnedMessages.add(message); + } + + } + + consumer.close(); + return returnedMessages; + + } finally { + + if (connection != null) { + connection.close(); + } + } + + + } + + private Message consumeOneMessage(String queueName) throws Exception { + return consumeOneMessage(queueName, Session.AUTO_ACKNOWLEDGE); + } + + private Message consumeOneMessage(String queueName, int ackMode) throws Exception { + + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()); + Connection connection = connectionFactory.createConnection(); + try { + Session session = connection.createSession(false, ackMode); + MessageConsumer consumer = session.createConsumer(new ActiveMQQueue(queueName)); + connection.start(); + + return consumer.receive(1000); + + } finally { + + if (connection != null) { + connection.close(); + } + } + + } + + private ArrayList browseMessages(String queueName) throws Exception { + + ArrayList returnedMessages = new ArrayList(); + + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()); + Connection connection = connectionFactory.createConnection(); + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + QueueBrowser consumer = session.createBrowser(new ActiveMQQueue(queueName)); + connection.start(); + + Enumeration enumeration = consumer.getEnumeration(); + while (enumeration.hasMoreElements()) { + + Message message = (Message) enumeration.nextElement(); + returnedMessages.add(message); + + } + + return returnedMessages; + + } finally { + + if (connection != null) { + connection.close(); + } + } + + + } + + private BrokerService createBroker() throws Exception { + BrokerService broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(true); + + //add the policy entries + PolicyMap policyMap = new PolicyMap(); + List entries = new ArrayList(); + PolicyEntry pe = new PolicyEntry(); + + pe.setPrioritizedMessages(true); + + pe.setExpireMessagesPeriod(500); + + pe.setMaxPageSize(100); + pe.setMaxExpirePageSize(0); + pe.setMaxBrowsePageSize(0); + + pe.setQueuePrefetch(0); + pe.setLazyDispatch(true); + + pe.setOptimizedDispatch(true); + + pe.setUseCache(false); + + pe.setQueue(">"); + entries.add(pe); + policyMap.setPolicyEntries(entries); + broker.setDestinationPolicy(policyMap); + + + broker.addConnector("tcp://0.0.0.0:0"); + return broker; + } + + +} \ No newline at end of file