From 7450a32ae7a7a34d2ddd68b7668afc82564eb61c Mon Sep 17 00:00:00 2001 From: Bosanac Dejan Date: Fri, 3 May 2013 14:50:56 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-4495 - always get a max batch of messages from the store git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1478823 13f79535-47bb-0310-9956-ffa450edef68 --- .../region/cursors/AbstractStoreCursor.java | 8 +- .../region/cursors/QueueStorePrefetch.java | 14 +- .../region/cursors/StoreQueueCursor.java | 2 +- .../activemq/store/jdbc/JDBCMessageStore.java | 3 - .../activemq/store/kahadb/KahaDBStore.java | 3 +- .../activemq/leveldb/LevelDBStore.scala | 2 +- .../StoreQueueCursorNoDuplicateTest.java | 2 +- .../activemq/usecases/MemoryLimitTest.java | 201 ++++++++++++++++++ .../apache/activemq/util/ConsumerThread.java | 2 +- 9 files changed, 219 insertions(+), 18 deletions(-) create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java index 21876e11cd..7f05bfff04 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java @@ -38,7 +38,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i private boolean storeHasMessages = false; protected int size; private MessageId lastCachedId; - private boolean hadSpace = false; + protected boolean hadSpace = false; protected AbstractStoreCursor(Destination destination) { super((destination != null ? destination.isPrioritizedMessages():false)); @@ -253,12 +253,6 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i setCacheEnabled(false); } - @Override - public boolean hasSpace() { - hadSpace = super.hasSpace(); - return hadSpace; - } - protected final synchronized void fillBatch() { if (LOG.isTraceEnabled()) { LOG.trace(this + " - fillBatch"); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java index 1bee0920e6..b7fd289c23 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java @@ -17,10 +17,15 @@ package org.apache.activemq.broker.region.cursors; import java.io.IOException; + +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.Queue; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageId; import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.memory.MemoryMessageStore; +import org.apache.activemq.store.memory.MemoryTransactionStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,14 +38,16 @@ import org.slf4j.LoggerFactory; class QueueStorePrefetch extends AbstractStoreCursor { private static final Logger LOG = LoggerFactory.getLogger(QueueStorePrefetch.class); private final MessageStore store; + private final Broker broker; /** * Construct it * @param queue */ - public QueueStorePrefetch(Queue queue) { + public QueueStorePrefetch(Queue queue, Broker broker) { super(queue); this.store = queue.getMessageStore(); + this.broker = broker; } @@ -94,7 +101,10 @@ class QueueStorePrefetch extends AbstractStoreCursor { @Override protected void doFillBatch() throws Exception { - this.store.recoverNextMessages(this.maxBatchSize, this); + hadSpace = this.hasSpace(); + if (!broker.getBrokerService().isPersistent() || hadSpace) { + this.store.recoverNextMessages(this.maxBatchSize, this); + } } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java index b8a2a0011a..24f31c7095 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java @@ -47,7 +47,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor { super((queue != null ? queue.isPrioritizedMessages():false)); this.broker=broker; this.queue = queue; - this.persistent = new QueueStorePrefetch(queue); + this.persistent = new QueueStorePrefetch(queue, broker); currentCursor = persistent; } diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java index bbd93ce7b0..753b13cdc5 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java @@ -279,15 +279,12 @@ public class JDBCMessageStore extends AbstractMessageStore { maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener() { public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { - if (listener.hasSpace()) { Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data)); msg.getMessageId().setBrokerSequenceId(sequenceId); listener.recoverMessage(msg); lastRecoveredSequenceId.set(sequenceId); lastRecoveredPriority.set(msg.getPriority()); return true; - } - return false; } public boolean recoverMessageReference(String reference) throws Exception { diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index abb5071ecb..5d8bea0e47 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -548,8 +548,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { StoredDestination sd = getStoredDestination(dest, tx); Entry entry = null; int counter = 0; - for (Iterator> iterator = sd.orderIndex.iterator(tx); - listener.hasSpace() && iterator.hasNext(); ) { + for (Iterator> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) { entry = iterator.next(); if (ackedAndPrepared.contains(entry.getValue().messageId)) { continue; diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala index 19d567cab3..e412b25e45 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala @@ -665,7 +665,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P case class LimitingRecoveryListener(max: Int, listener: MessageRecoveryListener) extends MessageRecoveryListener { private var recovered: Int = 0 - def hasSpace = recovered < max && listener.hasSpace + def hasSpace = recovered < max def recoverMessage(message: Message) = { recovered += 1; listener.recoverMessage(message) diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java index f012c7a4b8..706906e78e 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java @@ -76,7 +76,7 @@ public class StoreQueueCursorNoDuplicateTest extends TestCase { queueMessageStore.start(); - QueueStorePrefetch underTest = new QueueStorePrefetch(queue); + QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker()); SystemUsage systemUsage = new SystemUsage(); // ensure memory limit is reached systemUsage.getMemoryUsage().setLimit(messageBytesSize * (count + 2)); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java new file mode 100644 index 0000000000..5bd7f79153 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.TestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.util.ConsumerThread; +import org.apache.activemq.util.ProducerThread; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +@RunWith(value = Parameterized.class) +public class MemoryLimitTest extends TestSupport { + private static final Logger LOG = LoggerFactory.getLogger(MemoryLimitTest.class); + final String payload = new String(new byte[10 * 1024]); //10KB + protected BrokerService broker; + + private final TestSupport.PersistenceAdapterChoice persistenceAdapterChoice; + + @Parameterized.Parameters + public static Collection getTestParameters() { + TestSupport.PersistenceAdapterChoice[] kahaDb = {TestSupport.PersistenceAdapterChoice.KahaDB}; + TestSupport.PersistenceAdapterChoice[] levelDb = {TestSupport.PersistenceAdapterChoice.LevelDB}; + TestSupport.PersistenceAdapterChoice[] jdbc = {TestSupport.PersistenceAdapterChoice.JDBC}; + List choices = new ArrayList(); + choices.add(kahaDb); + choices.add(levelDb); + choices.add(jdbc); + return choices; + } + + public MemoryLimitTest(TestSupport.PersistenceAdapterChoice choice) { + this.persistenceAdapterChoice = choice; + } + + protected BrokerService createBroker() throws Exception { + BrokerService broker = new BrokerService(); + broker.getSystemUsage().getMemoryUsage().setLimit(1 * 1024 * 1024); //1MB + broker.deleteAllMessages(); + + PolicyMap policyMap = new PolicyMap(); + PolicyEntry policyEntry = new PolicyEntry(); + policyEntry.setProducerFlowControl(false); + policyMap.put(new ActiveMQQueue(">"), policyEntry); + broker.setDestinationPolicy(policyMap); + + LOG.info("Starting broker with persistenceAdapterChoice " + persistenceAdapterChoice.toString()); + setPersistenceAdapter(broker, persistenceAdapterChoice); + broker.getPersistenceAdapter().deleteAllMessages(); + + return broker; + } + + @Before + public void setUp() throws Exception { + if (broker == null) { + broker = createBroker(); + } + broker.start(); + broker.waitUntilStarted(); + } + + @After + public void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + } + + @Test(timeout = 120000) + public void testCursorBatch() throws Exception { + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=10"); + factory.setOptimizeAcknowledge(true); + Connection conn = factory.createConnection(); + conn.start(); + Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = sess.createQueue("STORE"); + final ProducerThread producer = new ProducerThread(sess, queue) { + @Override + protected Message createMessage(int i) throws Exception { + return sess.createTextMessage(payload + "::" + i); + } + }; + producer.setMessageCount(2000); + producer.start(); + producer.join(); + + Thread.sleep(1000); + + // assert we didn't break high watermark (70%) usage + Destination dest = broker.getDestination((ActiveMQQueue) queue); + LOG.info("Destination usage: " + dest.getMemoryUsage()); + assertTrue(dest.getMemoryUsage().getPercentUsage() <= 71); + LOG.info("Broker usage: " + broker.getSystemUsage().getMemoryUsage()); + assertTrue(broker.getSystemUsage().getMemoryUsage().getPercentUsage() <= 71); + + // consume one message + MessageConsumer consumer = sess.createConsumer(queue); + Message msg = consumer.receive(); + msg.acknowledge(); + + Thread.sleep(1000); + // this should free some space and allow us to get new batch of messages in the memory + // exceeding the limit + LOG.info("Destination usage: " + dest.getMemoryUsage()); + assertTrue(dest.getMemoryUsage().getPercentUsage() >= 478); + LOG.info("Broker usage: " + broker.getSystemUsage().getMemoryUsage()); + assertTrue(broker.getSystemUsage().getMemoryUsage().getPercentUsage() >= 478); + + // let's make sure we can consume all messages + for (int i = 1; i < 2000; i++) { + msg = consumer.receive(1000); + assertNotNull("Didn't receive message " + i, msg); + msg.acknowledge(); + } + + } + + /** + * + * Handy test for manually checking what's going on + * + */ + @Ignore + @Test(timeout = 120000) + public void testLimit() throws Exception { + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=10"); + factory.setOptimizeAcknowledge(true); + Connection conn = factory.createConnection(); + conn.start(); + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + final ProducerThread producer = new ProducerThread(sess, sess.createQueue("STORE.1")) { + @Override + protected Message createMessage(int i) throws Exception { + return sess.createTextMessage(payload + "::" + i); + } + }; + producer.setMessageCount(1000); + + final ProducerThread producer2 = new ProducerThread(sess, sess.createQueue("STORE.2")) { + @Override + protected Message createMessage(int i) throws Exception { + return sess.createTextMessage(payload + "::" + i); + } + }; + producer2.setMessageCount(1000); + + + ConsumerThread consumer = new ConsumerThread(sess, sess.createQueue("STORE.1")); + consumer.setBreakOnNull(false); + consumer.setMessageCount(1000); + + producer.start(); + producer.join(); + + producer2.start(); + + Thread.sleep(300); + + consumer.start(); + + consumer.join(); + producer2.join(); + + assertEquals("consumer got all produced messages", producer.getMessageCount(), consumer.getReceived()); + + } +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/util/ConsumerThread.java b/activemq-unit-tests/src/test/java/org/apache/activemq/util/ConsumerThread.java index bfa1d96dea..6b4bad2a8b 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/util/ConsumerThread.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/util/ConsumerThread.java @@ -45,7 +45,7 @@ public class ConsumerThread extends Thread { while (received < messageCount) { Message msg = consumer.receive(3000); if (msg != null) { - LOG.info("Received " + ((TextMessage)msg).getText()); + LOG.info("Received " + received + ": " + ((TextMessage)msg).getText()); received++; } else { if (breakOnNull) {