From 13ec9949397848c57653845b35e8003f8c490ebd Mon Sep 17 00:00:00 2001 From: gtully Date: Mon, 7 Mar 2016 16:26:25 +0000 Subject: [PATCH] Revert "https://issues.apache.org/jira/browse/AMQ-4495 - revisit. Reinstate check for space on pagein, so that highWaterMark is respected and full state is not reached, hense pfc is not triggered in error" This reverts commit d8cf54b0a9eee4b86db1ffef2cb3dd1171067307. --- .../apache/activemq/broker/region/Queue.java | 4 +- .../region/cursors/AbstractStoreCursor.java | 4 +- .../region/cursors/QueueStorePrefetch.java | 11 +- .../region/cursors/StoreQueueCursor.java | 2 +- .../region/cursors/TopicStorePrefetch.java | 15 +- .../jdbc/JDBCMessageRecoveryListener.java | 1 - .../activemq/store/jdbc/JDBCMessageStore.java | 37 ++- .../store/jdbc/JDBCTopicMessageStore.java | 8 - .../jdbc/adapter/DefaultJDBCAdapter.java | 13 +- .../activemq/store/kahadb/KahaDBStore.java | 2 +- .../apache/activemq/leveldb/DBManager.scala | 2 +- .../StoreQueueCursorNoDuplicateTest.java | 14 +- .../cursors/StoreQueueCursorOrderTest.java | 10 +- .../org/apache/activemq/bugs/AMQ4930Test.java | 2 +- .../activemq/usecases/MemoryLimitPfcTest.java | 213 ------------------ .../activemq/usecases/MemoryLimitTest.java | 13 +- .../activemq/usecases/QueueBrowsingTest.java | 4 +- 17 files changed, 75 insertions(+), 280 deletions(-) delete mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitPfcTest.java diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index eb1e812a11..3b1f85f8b0 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -636,8 +636,8 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index if (isProducerFlowControl() && context.isProducerFlowControl()) { if (warnOnProducerFlowControl) { warnOnProducerFlowControl = false; - LOG.info("Usage Manager Memory Limit ({}) reached (%{}) on {}, size {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.", - memoryUsage.getLimit(), memoryUsage.getPercentUsage(), getActiveMQDestination().getQualifiedName(), destinationStatistics.getMessages().getCount()); + LOG.info("Usage Manager Memory Limit ({}) reached on {}, size {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.", + memoryUsage.getLimit(), getActiveMQDestination().getQualifiedName(), destinationStatistics.getMessages().getCount()); } if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java index d84379d507..06bae97572 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java @@ -48,6 +48,8 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i private static int SYNC_ADD = 0; private static int ASYNC_ADD = 1; final MessageId[] lastCachedIds = new MessageId[2]; + protected boolean hadSpace = false; + protected AbstractStoreCursor(Destination destination) { @@ -399,7 +401,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i resetBatch(); this.batchResetNeeded = false; } - if (this.batchList.isEmpty() && this.size >0 && hasSpace()) { + if (this.batchList.isEmpty() && this.size >0) { try { doFillBatch(); } catch (Exception e) { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java index dacae78565..b10b2e2af0 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java @@ -38,14 +38,16 @@ import org.slf4j.LoggerFactory; class QueueStorePrefetch extends AbstractStoreCursor { private static final Logger LOG = LoggerFactory.getLogger(QueueStorePrefetch.class); private final MessageStore store; + private final Broker broker; /** * Construct it * @param queue */ - public QueueStorePrefetch(Queue queue) { + public QueueStorePrefetch(Queue queue, Broker broker) { super(queue); this.store = queue.getMessageStore(); + this.broker = broker; } @@ -113,8 +115,11 @@ class QueueStorePrefetch extends AbstractStoreCursor { @Override protected void doFillBatch() throws Exception { - this.store.recoverNextMessages(this.maxBatchSize, this); - dealWithDuplicates(); // without the index lock + hadSpace = this.hasSpace(); + if (!broker.getBrokerService().isPersistent() || hadSpace) { + this.store.recoverNextMessages(this.maxBatchSize, this); + dealWithDuplicates(); // without the index lock + } } @Override diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java index e6de82e45d..7f26b43cc1 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java @@ -47,7 +47,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor { super((queue != null ? queue.isPrioritizedMessages():false)); this.broker=broker; this.queue = queue; - this.persistent = new QueueStorePrefetch(queue); + this.persistent = new QueueStorePrefetch(queue, broker); currentCursor = persistent; } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java index 1a6a851ebf..35ec3edbb4 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java @@ -40,6 +40,7 @@ class TopicStorePrefetch extends AbstractStoreCursor { private final String subscriberName; private final Subscription subscription; private byte lastRecoveredPriority = 9; + private boolean storeHasMessages = false; /** * @param topic @@ -55,6 +56,7 @@ class TopicStorePrefetch extends AbstractStoreCursor { this.maxProducersToAudit=32; this.maxAuditDepth=10000; resetSize(); + this.storeHasMessages=this.size > 0; } @Override @@ -71,6 +73,11 @@ class TopicStorePrefetch extends AbstractStoreCursor { //this.messageSize.addSize(node.getMessage().getSize()); } + @Override + public final synchronized boolean addMessageLast(MessageReference node) throws Exception { + this.storeHasMessages = super.addMessageLast(node); + return this.storeHasMessages; + } @Override public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception { @@ -83,6 +90,7 @@ class TopicStorePrefetch extends AbstractStoreCursor { if (recovered && !cached) { lastRecoveredPriority = message.getPriority(); } + storeHasMessages = true; } return recovered; } @@ -126,8 +134,13 @@ class TopicStorePrefetch extends AbstractStoreCursor { @Override protected void doFillBatch() throws Exception { + // avoid repeated trips to the store if there is nothing of interest + this.storeHasMessages = false; this.store.recoverNextMessages(clientId, subscriberName, maxBatchSize, this); + if (!this.storeHasMessages && (!this.batchList.isEmpty() || !hadSpace)) { + this.storeHasMessages = true; + } } public byte getLastRecoveredPriority() { @@ -145,6 +158,6 @@ class TopicStorePrefetch extends AbstractStoreCursor { @Override public String toString() { - return "TopicStorePrefetch(" + clientId + "," + subscriberName + ") " + this.subscription.getConsumerInfo().getConsumerId() + " - " + super.toString(); + return "TopicStorePrefetch(" + clientId + "," + subscriberName + ",storeHasMessages=" + this.storeHasMessages +") " + this.subscription.getConsumerInfo().getConsumerId() + " - " + super.toString(); } } diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java index 5ade77358a..07f48167ab 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java @@ -24,5 +24,4 @@ package org.apache.activemq.store.jdbc; public interface JDBCMessageRecoveryListener { boolean recoverMessage(long sequenceId, byte[] message) throws Exception; boolean recoverMessageReference(String reference) throws Exception; - boolean hasSpace(); } diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java index 27313f4da9..175002aec3 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java @@ -279,10 +279,6 @@ public class JDBCMessageStore extends AbstractMessageStore { public boolean recoverMessageReference(String reference) throws Exception { return listener.recoverMessageReference(new MessageId(reference)); } - - public boolean hasSpace() { - return listener.hasSpace(); - } }); } catch (SQLException e) { JDBCPersistenceAdapter.log("JDBC Failure: ", e); @@ -341,25 +337,24 @@ public class JDBCMessageStore extends AbstractMessageStore { adapter.doRecoverNextMessages(c, destination, perPriorityLastRecovered, minPendingSequeunceId(), maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener() { - public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { - Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data)); - msg.getMessageId().setBrokerSequenceId(sequenceId); - msg.getMessageId().setFutureOrSequenceLong(sequenceId); - listener.recoverMessage(msg); - trackLastRecovered(sequenceId, msg.getPriority()); - return true; - } + public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { + Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data)); + msg.getMessageId().setBrokerSequenceId(sequenceId); + msg.getMessageId().setFutureOrSequenceLong(sequenceId); + listener.recoverMessage(msg); + trackLastRecovered(sequenceId, msg.getPriority()); + return true; + } - public boolean recoverMessageReference(String reference) throws Exception { - listener.recoverMessageReference(new MessageId(reference)); - return true; - } + public boolean recoverMessageReference(String reference) throws Exception { + if (listener.hasSpace()) { + listener.recoverMessageReference(new MessageId(reference)); + return true; + } + return false; + } - public boolean hasSpace() { - return listener.hasSpace(); - } - - }); + }); } catch (SQLException e) { JDBCPersistenceAdapter.log("JDBC Failure: ", e); } finally { diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java index 7203f928dc..3bff9b254a 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java @@ -129,10 +129,6 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess return listener.recoverMessageReference(new MessageId(reference)); } - public boolean hasSpace() { - return listener.hasSpace(); - } - }); } catch (SQLException e) { JDBCPersistenceAdapter.log("JDBC Failure: ", e); @@ -242,10 +238,6 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess return false; } - public boolean hasSpace() { - return delegate.hasSpace(); - } - @Override public boolean recoverMessageReference(String reference) throws Exception { return delegate.recoverMessageReference(new MessageId(reference)); diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java index 6fe83c833d..facf969b34 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java @@ -37,6 +37,7 @@ import org.apache.activemq.command.XATransactionId; import org.apache.activemq.store.jdbc.JDBCAdapter; import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener; import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener; +import org.apache.activemq.store.jdbc.JDBCMessageStore; import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; import org.apache.activemq.store.jdbc.JdbcMemoryTransactionStore; import org.apache.activemq.store.jdbc.Statements; @@ -632,13 +633,13 @@ public class DefaultJDBCAdapter implements JDBCAdapter { rs = s.executeQuery(); int count = 0; if (this.statements.isUseExternalMessageReferences()) { - while (rs.next() && count < maxReturned && listener.hasSpace()) { + while (rs.next() && count < maxReturned) { if (listener.recoverMessageReference(rs.getString(1))) { count++; } } } else { - while (rs.next() && count < maxReturned && listener.hasSpace()) { + while (rs.next() && count < maxReturned) { if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { count++; } @@ -669,13 +670,13 @@ public class DefaultJDBCAdapter implements JDBCAdapter { rs = s.executeQuery(); int count = 0; if (this.statements.isUseExternalMessageReferences()) { - while (rs.next() && count < maxReturned && listener.hasSpace() ) { + while (rs.next() && count < maxReturned) { if (listener.recoverMessageReference(rs.getString(1))) { count++; } } } else { - while (rs.next() && count < maxReturned && listener.hasSpace()) { + while (rs.next() && count < maxReturned) { if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { count++; } @@ -1143,7 +1144,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { rs = s.executeQuery(); int count = 0; if (this.statements.isUseExternalMessageReferences()) { - while (rs.next() && count < maxReturned && listener.hasSpace()) { + while (rs.next() && count < maxReturned) { if (listener.recoverMessageReference(rs.getString(1))) { count++; } else { @@ -1152,7 +1153,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter { } } } else { - while (rs.next() && count < maxReturned && listener.hasSpace()) { + while (rs.next() && count < maxReturned) { if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { count++; } else { diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 69319a0e92..e1c1df4b8b 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -585,7 +585,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { msg.getMessageId().setFutureOrSequenceLong(entry.getKey()); listener.recoverMessage(msg); counter++; - if (counter >= maxReturned || listener.hasSpace() == false) { + if (counter >= maxReturned) { break; } } diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala index 09fd3501c7..b0051ccc57 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala @@ -737,7 +737,7 @@ class DBManager(val parent:LevelDBStore) { lastmsgid = msg.getMessageId count += 1 } - count < max && listener.hasSpace + count < max } if( lastmsgid==null ) { startPos diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java index 7680ca9e70..2406e88b94 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java @@ -27,7 +27,6 @@ import org.apache.activemq.broker.region.Queue; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.command.ConsumerInfo; -import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.PersistenceAdapter; @@ -83,14 +82,10 @@ public class StoreQueueCursorNoDuplicateTest extends TestCase { queueMessageStore.start(); queueMessageStore.registerIndexListener(null); - QueueStorePrefetch underTest = new QueueStorePrefetch(queue); + QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker()); SystemUsage systemUsage = new SystemUsage(); - - ActiveMQTextMessage sampleMessage = getMessage(0); - int unitSize = sampleMessage.getSize(); - // ensure memory limit is reached - systemUsage.getMemoryUsage().setLimit(unitSize * count); + systemUsage.getMemoryUsage().setLimit(messageBytesSize * (count + 2)); underTest.setSystemUsage(systemUsage); underTest.setEnableAudit(false); underTest.start(); @@ -115,11 +110,8 @@ public class StoreQueueCursorNoDuplicateTest extends TestCase { ref.decrementReferenceCount(); underTest.remove(); LOG.info("Received message: {} with body: {}", - ref.getMessageId(), ((ActiveMQTextMessage) ref.getMessage()).getText()); + ref.getMessageId(), ((ActiveMQTextMessage)ref.getMessage()).getText()); assertEquals(dequeueCount++, ref.getMessageId().getProducerSequenceId()); - - // memory store keeps a message ref that needs releasing to free usage - queueMessageStore.removeMessage(contextNotInTx, new MessageAck(ref.getMessage(), MessageAck.STANDARD_ACK_TYPE, 1)); } underTest.release(); assertEquals(count, dequeueCount); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java index 92c646bae2..90b8428640 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java @@ -89,7 +89,7 @@ public class StoreQueueCursorOrderTest { queueMessageStore.start(); queueMessageStore.registerIndexListener(null); - QueueStorePrefetch underTest = new QueueStorePrefetch(queue); + QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker()); SystemUsage systemUsage = new SystemUsage(); // ensure memory limit is reached systemUsage.getMemoryUsage().setLimit(messageBytesSize * 1); @@ -154,7 +154,7 @@ public class StoreQueueCursorOrderTest { queueMessageStore.start(); queueMessageStore.registerIndexListener(null); - QueueStorePrefetch underTest = new QueueStorePrefetch(queue); + QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker()); SystemUsage systemUsage = new SystemUsage(); // ensure memory limit is reached systemUsage.getMemoryUsage().setLimit(messageBytesSize * 1); @@ -222,7 +222,7 @@ public class StoreQueueCursorOrderTest { queueMessageStore.start(); queueMessageStore.registerIndexListener(null); - QueueStorePrefetch underTest = new QueueStorePrefetch(queue); + QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker()); SystemUsage systemUsage = new SystemUsage(); // ensure memory limit is reached systemUsage.getMemoryUsage().setLimit(messageBytesSize * 1); @@ -299,7 +299,7 @@ public class StoreQueueCursorOrderTest { queueMessageStore.start(); queueMessageStore.registerIndexListener(null); - QueueStorePrefetch underTest = new QueueStorePrefetch(queue); + QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker()); SystemUsage systemUsage = new SystemUsage(); // ensure memory limit is reached systemUsage.getMemoryUsage().setLimit(messageBytesSize * (count + 6)); @@ -392,7 +392,7 @@ public class StoreQueueCursorOrderTest { queueMessageStore.start(); queueMessageStore.registerIndexListener(null); - QueueStorePrefetch underTest = new QueueStorePrefetch(queue); + QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker()); SystemUsage systemUsage = new SystemUsage(); // ensure memory limit is reached systemUsage.getMemoryUsage().setLimit(messageBytesSize * 5); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java index 8f6fbb2445..e65ad91f14 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java @@ -46,7 +46,7 @@ public class AMQ4930Test extends TestCase { protected void configureBroker() throws Exception { broker.setDeleteAllMessagesOnStartup(true); broker.setAdvisorySupport(false); - broker.getSystemUsage().getMemoryUsage().setLimit(100*1024*1024); + broker.getSystemUsage().getMemoryUsage().setLimit(1*1024*1024); PolicyMap pMap = new PolicyMap(); PolicyEntry policy = new PolicyEntry(); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitPfcTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitPfcTest.java deleted file mode 100644 index 5b2dc23c00..0000000000 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitPfcTest.java +++ /dev/null @@ -1,213 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.usecases; - -import java.util.Arrays; -import java.util.concurrent.TimeUnit; -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.Queue; -import javax.jms.Session; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.TestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.Destination; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.util.ProducerThread; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@RunWith(value = Parameterized.class) -public class MemoryLimitPfcTest extends TestSupport { - private static final Logger LOG = LoggerFactory.getLogger(MemoryLimitPfcTest.class); - final String payload = new String(new byte[100 * 1024]); - protected BrokerService broker; - - @Parameterized.Parameter - public PersistenceAdapterChoice persistenceAdapterChoice; - - @Parameterized.Parameters(name="store={0}") - public static Iterable getTestParameters() { - return Arrays.asList(new Object[][]{{PersistenceAdapterChoice.KahaDB}, {PersistenceAdapterChoice.LevelDB}, {PersistenceAdapterChoice.JDBC}}); - } - - protected BrokerService createBroker() throws Exception { - BrokerService broker = new BrokerService(); - broker.getSystemUsage().getMemoryUsage().setLimit(1 * 1024 * 1024); //1MB - broker.setDeleteAllMessagesOnStartup(true); - - PolicyMap policyMap = new PolicyMap(); - PolicyEntry policyEntry = new PolicyEntry(); - policyEntry.setExpireMessagesPeriod(0); // when this fires it will consume 2*pageSize mem which will throw the test - policyMap.put(new ActiveMQQueue(">"), policyEntry); - broker.setDestinationPolicy(policyMap); - - LOG.info("Starting broker with persistenceAdapterChoice " + persistenceAdapterChoice.toString()); - setPersistenceAdapter(broker, persistenceAdapterChoice); - - return broker; - } - - @Override - @Before - public void setUp() throws Exception { - if (broker == null) { - broker = createBroker(); - } - broker.start(); - broker.waitUntilStarted(); - } - - @Override - @After - public void tearDown() throws Exception { - if (broker != null) { - broker.stop(); - broker.waitUntilStopped(); - } - } - - - @Test(timeout = 120000) - public void testStopCachingDispatchNoPfc() throws Exception { - - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=10"); - factory.setOptimizeAcknowledge(true); - Connection conn = factory.createConnection(); - conn.start(); - Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); - Queue queue = sess.createQueue("STORE"); - final ProducerThread producer = new ProducerThread(sess, queue) { - @Override - protected Message createMessage(int i) throws Exception { - BytesMessage bytesMessage = session.createBytesMessage(); - bytesMessage.writeBytes(payload.getBytes()); - return bytesMessage; - } - }; - producer.setMessageCount(200); - producer.start(); - producer.join(); - - Thread.sleep(1000); - - // assert we didn't break high watermark (70%) usage - final Destination dest = broker.getDestination((ActiveMQQueue) queue); - LOG.info("Destination usage: " + dest.getMemoryUsage()); - int percentUsage = dest.getMemoryUsage().getPercentUsage(); - assertTrue("Should be less than 70% of limit but was: " + percentUsage, percentUsage <= 80); - LOG.info("Broker usage: " + broker.getSystemUsage().getMemoryUsage()); - assertTrue(broker.getSystemUsage().getMemoryUsage().getPercentUsage() <= 80); - - assertFalse("cache disabled", ((org.apache.activemq.broker.region.Queue) dest).getMessages().isCacheEnabled()); - - // consume one message - MessageConsumer consumer = sess.createConsumer(queue); - Message msg = consumer.receive(5000); - msg.acknowledge(); - - LOG.info("Destination usage after consume one: " + dest.getMemoryUsage()); - - // ensure we can send more messages - final ProducerThread secondProducer = new ProducerThread(sess, queue) { - @Override - protected Message createMessage(int i) throws Exception { - BytesMessage bytesMessage = session.createBytesMessage(); - bytesMessage.writeBytes(payload.getBytes()); - return bytesMessage; - } - }; - secondProducer.setMessageCount(100); - secondProducer.start(); - secondProducer.join(); - - LOG.info("Broker usage: " + broker.getSystemUsage().getMemoryUsage()); - assertTrue(broker.getSystemUsage().getMemoryUsage().getPercentUsage() <= 100); - - // let's make sure we can consume all messages - for (int i = 1; i < 300; i++) { - msg = consumer.receive(5000); - if (msg == null) { - dumpAllThreads("NoMessage"); - } - assertNotNull("Didn't receive message " + i, msg); - msg.acknowledge(); - } - } - - @Test(timeout = 120000) - public void testConsumeFromTwoAfterPageInToOne() throws Exception { - - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=10"); - factory.setOptimizeAcknowledge(true); - Connection conn = factory.createConnection(); - conn.start(); - Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); - final ProducerThread producer = new ProducerThread(sess, sess.createQueue("STORE.1")) { - @Override - protected Message createMessage(int i) throws Exception { - return session.createTextMessage(payload + "::" + i); - } - }; - producer.setMessageCount(20); - - final ProducerThread producer2 = new ProducerThread(sess, sess.createQueue("STORE.2")) { - @Override - protected Message createMessage(int i) throws Exception { - return session.createTextMessage(payload + "::" + i); - } - }; - producer2.setMessageCount(20); - - producer.start(); - producer2.start(); - - producer.join(); - producer2.join(); - - LOG.info("before consumer1, broker % mem usage: " + broker.getSystemUsage().getMemoryUsage().getPercentUsage()); - - MessageConsumer consumer = sess.createConsumer(sess.createQueue("STORE.1")); - Message msg = null; - for (int i=0; i<10; i++) { - msg = consumer.receive(5000); - LOG.info("% mem usage: " + broker.getSystemUsage().getMemoryUsage().getPercentUsage()); - msg.acknowledge(); - } - - TimeUnit.SECONDS.sleep(2); - LOG.info("Before consumer2, Broker % mem usage: " + broker.getSystemUsage().getMemoryUsage().getPercentUsage()); - - MessageConsumer consumer2 = sess.createConsumer(sess.createQueue("STORE.2")); - for (int i=0; i<10; i++) { - msg = consumer2.receive(5000); - LOG.info("% mem usage: " + broker.getSystemUsage().getMemoryUsage().getPercentUsage()); - msg.acknowledge(); - } - - } - -} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java index d3af604a14..760876cd60 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java @@ -133,9 +133,18 @@ public class MemoryLimitTest extends TestSupport { Message msg = consumer.receive(5000); msg.acknowledge(); - assertTrue("Should be less than 70% of limit but was: " + percentUsage, percentUsage <= 71); + // this should free some space and allow us to get new batch of messages in the memory + // exceeding the limit + assertTrue("Limit is exceeded", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("Destination usage: " + dest.getMemoryUsage()); + return dest.getMemoryUsage().getPercentUsage() >= 200; + } + })); + LOG.info("Broker usage: " + broker.getSystemUsage().getMemoryUsage()); - assertTrue(broker.getSystemUsage().getMemoryUsage().getPercentUsage() <= 71); + assertTrue(broker.getSystemUsage().getMemoryUsage().getPercentUsage() >= 200); // let's make sure we can consume all messages for (int i = 1; i < 2000; i++) { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java index 05540a5936..29b6e7297c 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java @@ -182,7 +182,7 @@ public class QueueBrowsingTest { @Test public void testMemoryLimit() throws Exception { - broker.getSystemUsage().getMemoryUsage().setLimit((maxPageSize + 10) * 4 * 1024); + broker.getSystemUsage().getMemoryUsage().setLimit(16 * 1024); int messageToSend = 370; @@ -211,6 +211,6 @@ public class QueueBrowsingTest { } browser.close(); - assertTrue("got at least maxPageSize, received: " + received, received >= maxPageSize); + assertTrue("got at least maxPageSize", received >= maxPageSize); } }