Revert "https://issues.apache.org/jira/browse/AMQ-4495 - revisit. Reinstate check for space on pagein, so that highWaterMark is respected and full state is not reached, hense pfc is not triggered in error"

This reverts commit d8cf54b0a9.
This commit is contained in:
gtully 2016-03-07 16:26:25 +00:00
parent 903dec615c
commit 13ec994939
17 changed files with 75 additions and 280 deletions

View File

@ -636,8 +636,8 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
if (isProducerFlowControl() && context.isProducerFlowControl()) { if (isProducerFlowControl() && context.isProducerFlowControl()) {
if (warnOnProducerFlowControl) { if (warnOnProducerFlowControl) {
warnOnProducerFlowControl = false; warnOnProducerFlowControl = false;
LOG.info("Usage Manager Memory Limit ({}) reached (%{}) on {}, size {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.", LOG.info("Usage Manager Memory Limit ({}) reached on {}, size {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.",
memoryUsage.getLimit(), memoryUsage.getPercentUsage(), getActiveMQDestination().getQualifiedName(), destinationStatistics.getMessages().getCount()); memoryUsage.getLimit(), getActiveMQDestination().getQualifiedName(), destinationStatistics.getMessages().getCount());
} }
if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {

View File

@ -48,6 +48,8 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
private static int SYNC_ADD = 0; private static int SYNC_ADD = 0;
private static int ASYNC_ADD = 1; private static int ASYNC_ADD = 1;
final MessageId[] lastCachedIds = new MessageId[2]; final MessageId[] lastCachedIds = new MessageId[2];
protected boolean hadSpace = false;
protected AbstractStoreCursor(Destination destination) { protected AbstractStoreCursor(Destination destination) {
@ -399,7 +401,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
resetBatch(); resetBatch();
this.batchResetNeeded = false; this.batchResetNeeded = false;
} }
if (this.batchList.isEmpty() && this.size >0 && hasSpace()) { if (this.batchList.isEmpty() && this.size >0) {
try { try {
doFillBatch(); doFillBatch();
} catch (Exception e) { } catch (Exception e) {

View File

@ -38,14 +38,16 @@ import org.slf4j.LoggerFactory;
class QueueStorePrefetch extends AbstractStoreCursor { class QueueStorePrefetch extends AbstractStoreCursor {
private static final Logger LOG = LoggerFactory.getLogger(QueueStorePrefetch.class); private static final Logger LOG = LoggerFactory.getLogger(QueueStorePrefetch.class);
private final MessageStore store; private final MessageStore store;
private final Broker broker;
/** /**
* Construct it * Construct it
* @param queue * @param queue
*/ */
public QueueStorePrefetch(Queue queue) { public QueueStorePrefetch(Queue queue, Broker broker) {
super(queue); super(queue);
this.store = queue.getMessageStore(); this.store = queue.getMessageStore();
this.broker = broker;
} }
@ -113,9 +115,12 @@ class QueueStorePrefetch extends AbstractStoreCursor {
@Override @Override
protected void doFillBatch() throws Exception { protected void doFillBatch() throws Exception {
hadSpace = this.hasSpace();
if (!broker.getBrokerService().isPersistent() || hadSpace) {
this.store.recoverNextMessages(this.maxBatchSize, this); this.store.recoverNextMessages(this.maxBatchSize, this);
dealWithDuplicates(); // without the index lock dealWithDuplicates(); // without the index lock
} }
}
@Override @Override
public String toString(){ public String toString(){

View File

@ -47,7 +47,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
super((queue != null ? queue.isPrioritizedMessages():false)); super((queue != null ? queue.isPrioritizedMessages():false));
this.broker=broker; this.broker=broker;
this.queue = queue; this.queue = queue;
this.persistent = new QueueStorePrefetch(queue); this.persistent = new QueueStorePrefetch(queue, broker);
currentCursor = persistent; currentCursor = persistent;
} }

View File

@ -40,6 +40,7 @@ class TopicStorePrefetch extends AbstractStoreCursor {
private final String subscriberName; private final String subscriberName;
private final Subscription subscription; private final Subscription subscription;
private byte lastRecoveredPriority = 9; private byte lastRecoveredPriority = 9;
private boolean storeHasMessages = false;
/** /**
* @param topic * @param topic
@ -55,6 +56,7 @@ class TopicStorePrefetch extends AbstractStoreCursor {
this.maxProducersToAudit=32; this.maxProducersToAudit=32;
this.maxAuditDepth=10000; this.maxAuditDepth=10000;
resetSize(); resetSize();
this.storeHasMessages=this.size > 0;
} }
@Override @Override
@ -71,6 +73,11 @@ class TopicStorePrefetch extends AbstractStoreCursor {
//this.messageSize.addSize(node.getMessage().getSize()); //this.messageSize.addSize(node.getMessage().getSize());
} }
@Override
public final synchronized boolean addMessageLast(MessageReference node) throws Exception {
this.storeHasMessages = super.addMessageLast(node);
return this.storeHasMessages;
}
@Override @Override
public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception { public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception {
@ -83,6 +90,7 @@ class TopicStorePrefetch extends AbstractStoreCursor {
if (recovered && !cached) { if (recovered && !cached) {
lastRecoveredPriority = message.getPriority(); lastRecoveredPriority = message.getPriority();
} }
storeHasMessages = true;
} }
return recovered; return recovered;
} }
@ -126,8 +134,13 @@ class TopicStorePrefetch extends AbstractStoreCursor {
@Override @Override
protected void doFillBatch() throws Exception { protected void doFillBatch() throws Exception {
// avoid repeated trips to the store if there is nothing of interest
this.storeHasMessages = false;
this.store.recoverNextMessages(clientId, subscriberName, this.store.recoverNextMessages(clientId, subscriberName,
maxBatchSize, this); maxBatchSize, this);
if (!this.storeHasMessages && (!this.batchList.isEmpty() || !hadSpace)) {
this.storeHasMessages = true;
}
} }
public byte getLastRecoveredPriority() { public byte getLastRecoveredPriority() {
@ -145,6 +158,6 @@ class TopicStorePrefetch extends AbstractStoreCursor {
@Override @Override
public String toString() { public String toString() {
return "TopicStorePrefetch(" + clientId + "," + subscriberName + ") " + this.subscription.getConsumerInfo().getConsumerId() + " - " + super.toString(); return "TopicStorePrefetch(" + clientId + "," + subscriberName + ",storeHasMessages=" + this.storeHasMessages +") " + this.subscription.getConsumerInfo().getConsumerId() + " - " + super.toString();
} }
} }

View File

@ -24,5 +24,4 @@ package org.apache.activemq.store.jdbc;
public interface JDBCMessageRecoveryListener { public interface JDBCMessageRecoveryListener {
boolean recoverMessage(long sequenceId, byte[] message) throws Exception; boolean recoverMessage(long sequenceId, byte[] message) throws Exception;
boolean recoverMessageReference(String reference) throws Exception; boolean recoverMessageReference(String reference) throws Exception;
boolean hasSpace();
} }

View File

@ -279,10 +279,6 @@ public class JDBCMessageStore extends AbstractMessageStore {
public boolean recoverMessageReference(String reference) throws Exception { public boolean recoverMessageReference(String reference) throws Exception {
return listener.recoverMessageReference(new MessageId(reference)); return listener.recoverMessageReference(new MessageId(reference));
} }
public boolean hasSpace() {
return listener.hasSpace();
}
}); });
} catch (SQLException e) { } catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e); JDBCPersistenceAdapter.log("JDBC Failure: ", e);
@ -351,12 +347,11 @@ public class JDBCMessageStore extends AbstractMessageStore {
} }
public boolean recoverMessageReference(String reference) throws Exception { public boolean recoverMessageReference(String reference) throws Exception {
if (listener.hasSpace()) {
listener.recoverMessageReference(new MessageId(reference)); listener.recoverMessageReference(new MessageId(reference));
return true; return true;
} }
return false;
public boolean hasSpace() {
return listener.hasSpace();
} }
}); });

View File

@ -129,10 +129,6 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
return listener.recoverMessageReference(new MessageId(reference)); return listener.recoverMessageReference(new MessageId(reference));
} }
public boolean hasSpace() {
return listener.hasSpace();
}
}); });
} catch (SQLException e) { } catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e); JDBCPersistenceAdapter.log("JDBC Failure: ", e);
@ -242,10 +238,6 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
return false; return false;
} }
public boolean hasSpace() {
return delegate.hasSpace();
}
@Override @Override
public boolean recoverMessageReference(String reference) throws Exception { public boolean recoverMessageReference(String reference) throws Exception {
return delegate.recoverMessageReference(new MessageId(reference)); return delegate.recoverMessageReference(new MessageId(reference));

View File

@ -37,6 +37,7 @@ import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.store.jdbc.JDBCAdapter; import org.apache.activemq.store.jdbc.JDBCAdapter;
import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener; import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener;
import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener; import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener;
import org.apache.activemq.store.jdbc.JDBCMessageStore;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.jdbc.JdbcMemoryTransactionStore; import org.apache.activemq.store.jdbc.JdbcMemoryTransactionStore;
import org.apache.activemq.store.jdbc.Statements; import org.apache.activemq.store.jdbc.Statements;
@ -632,13 +633,13 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
rs = s.executeQuery(); rs = s.executeQuery();
int count = 0; int count = 0;
if (this.statements.isUseExternalMessageReferences()) { if (this.statements.isUseExternalMessageReferences()) {
while (rs.next() && count < maxReturned && listener.hasSpace()) { while (rs.next() && count < maxReturned) {
if (listener.recoverMessageReference(rs.getString(1))) { if (listener.recoverMessageReference(rs.getString(1))) {
count++; count++;
} }
} }
} else { } else {
while (rs.next() && count < maxReturned && listener.hasSpace()) { while (rs.next() && count < maxReturned) {
if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
count++; count++;
} }
@ -669,13 +670,13 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
rs = s.executeQuery(); rs = s.executeQuery();
int count = 0; int count = 0;
if (this.statements.isUseExternalMessageReferences()) { if (this.statements.isUseExternalMessageReferences()) {
while (rs.next() && count < maxReturned && listener.hasSpace() ) { while (rs.next() && count < maxReturned) {
if (listener.recoverMessageReference(rs.getString(1))) { if (listener.recoverMessageReference(rs.getString(1))) {
count++; count++;
} }
} }
} else { } else {
while (rs.next() && count < maxReturned && listener.hasSpace()) { while (rs.next() && count < maxReturned) {
if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
count++; count++;
} }
@ -1143,7 +1144,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
rs = s.executeQuery(); rs = s.executeQuery();
int count = 0; int count = 0;
if (this.statements.isUseExternalMessageReferences()) { if (this.statements.isUseExternalMessageReferences()) {
while (rs.next() && count < maxReturned && listener.hasSpace()) { while (rs.next() && count < maxReturned) {
if (listener.recoverMessageReference(rs.getString(1))) { if (listener.recoverMessageReference(rs.getString(1))) {
count++; count++;
} else { } else {
@ -1152,7 +1153,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
} }
} }
} else { } else {
while (rs.next() && count < maxReturned && listener.hasSpace()) { while (rs.next() && count < maxReturned) {
if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
count++; count++;
} else { } else {

View File

@ -585,7 +585,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
msg.getMessageId().setFutureOrSequenceLong(entry.getKey()); msg.getMessageId().setFutureOrSequenceLong(entry.getKey());
listener.recoverMessage(msg); listener.recoverMessage(msg);
counter++; counter++;
if (counter >= maxReturned || listener.hasSpace() == false) { if (counter >= maxReturned) {
break; break;
} }
} }

View File

@ -737,7 +737,7 @@ class DBManager(val parent:LevelDBStore) {
lastmsgid = msg.getMessageId lastmsgid = msg.getMessageId
count += 1 count += 1
} }
count < max && listener.hasSpace count < max
} }
if( lastmsgid==null ) { if( lastmsgid==null ) {
startPos startPos

View File

@ -27,7 +27,6 @@ import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.PersistenceAdapter;
@ -83,14 +82,10 @@ public class StoreQueueCursorNoDuplicateTest extends TestCase {
queueMessageStore.start(); queueMessageStore.start();
queueMessageStore.registerIndexListener(null); queueMessageStore.registerIndexListener(null);
QueueStorePrefetch underTest = new QueueStorePrefetch(queue); QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker());
SystemUsage systemUsage = new SystemUsage(); SystemUsage systemUsage = new SystemUsage();
ActiveMQTextMessage sampleMessage = getMessage(0);
int unitSize = sampleMessage.getSize();
// ensure memory limit is reached // ensure memory limit is reached
systemUsage.getMemoryUsage().setLimit(unitSize * count); systemUsage.getMemoryUsage().setLimit(messageBytesSize * (count + 2));
underTest.setSystemUsage(systemUsage); underTest.setSystemUsage(systemUsage);
underTest.setEnableAudit(false); underTest.setEnableAudit(false);
underTest.start(); underTest.start();
@ -115,11 +110,8 @@ public class StoreQueueCursorNoDuplicateTest extends TestCase {
ref.decrementReferenceCount(); ref.decrementReferenceCount();
underTest.remove(); underTest.remove();
LOG.info("Received message: {} with body: {}", LOG.info("Received message: {} with body: {}",
ref.getMessageId(), ((ActiveMQTextMessage) ref.getMessage()).getText()); ref.getMessageId(), ((ActiveMQTextMessage)ref.getMessage()).getText());
assertEquals(dequeueCount++, ref.getMessageId().getProducerSequenceId()); assertEquals(dequeueCount++, ref.getMessageId().getProducerSequenceId());
// memory store keeps a message ref that needs releasing to free usage
queueMessageStore.removeMessage(contextNotInTx, new MessageAck(ref.getMessage(), MessageAck.STANDARD_ACK_TYPE, 1));
} }
underTest.release(); underTest.release();
assertEquals(count, dequeueCount); assertEquals(count, dequeueCount);

View File

@ -89,7 +89,7 @@ public class StoreQueueCursorOrderTest {
queueMessageStore.start(); queueMessageStore.start();
queueMessageStore.registerIndexListener(null); queueMessageStore.registerIndexListener(null);
QueueStorePrefetch underTest = new QueueStorePrefetch(queue); QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker());
SystemUsage systemUsage = new SystemUsage(); SystemUsage systemUsage = new SystemUsage();
// ensure memory limit is reached // ensure memory limit is reached
systemUsage.getMemoryUsage().setLimit(messageBytesSize * 1); systemUsage.getMemoryUsage().setLimit(messageBytesSize * 1);
@ -154,7 +154,7 @@ public class StoreQueueCursorOrderTest {
queueMessageStore.start(); queueMessageStore.start();
queueMessageStore.registerIndexListener(null); queueMessageStore.registerIndexListener(null);
QueueStorePrefetch underTest = new QueueStorePrefetch(queue); QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker());
SystemUsage systemUsage = new SystemUsage(); SystemUsage systemUsage = new SystemUsage();
// ensure memory limit is reached // ensure memory limit is reached
systemUsage.getMemoryUsage().setLimit(messageBytesSize * 1); systemUsage.getMemoryUsage().setLimit(messageBytesSize * 1);
@ -222,7 +222,7 @@ public class StoreQueueCursorOrderTest {
queueMessageStore.start(); queueMessageStore.start();
queueMessageStore.registerIndexListener(null); queueMessageStore.registerIndexListener(null);
QueueStorePrefetch underTest = new QueueStorePrefetch(queue); QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker());
SystemUsage systemUsage = new SystemUsage(); SystemUsage systemUsage = new SystemUsage();
// ensure memory limit is reached // ensure memory limit is reached
systemUsage.getMemoryUsage().setLimit(messageBytesSize * 1); systemUsage.getMemoryUsage().setLimit(messageBytesSize * 1);
@ -299,7 +299,7 @@ public class StoreQueueCursorOrderTest {
queueMessageStore.start(); queueMessageStore.start();
queueMessageStore.registerIndexListener(null); queueMessageStore.registerIndexListener(null);
QueueStorePrefetch underTest = new QueueStorePrefetch(queue); QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker());
SystemUsage systemUsage = new SystemUsage(); SystemUsage systemUsage = new SystemUsage();
// ensure memory limit is reached // ensure memory limit is reached
systemUsage.getMemoryUsage().setLimit(messageBytesSize * (count + 6)); systemUsage.getMemoryUsage().setLimit(messageBytesSize * (count + 6));
@ -392,7 +392,7 @@ public class StoreQueueCursorOrderTest {
queueMessageStore.start(); queueMessageStore.start();
queueMessageStore.registerIndexListener(null); queueMessageStore.registerIndexListener(null);
QueueStorePrefetch underTest = new QueueStorePrefetch(queue); QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker());
SystemUsage systemUsage = new SystemUsage(); SystemUsage systemUsage = new SystemUsage();
// ensure memory limit is reached // ensure memory limit is reached
systemUsage.getMemoryUsage().setLimit(messageBytesSize * 5); systemUsage.getMemoryUsage().setLimit(messageBytesSize * 5);

View File

@ -46,7 +46,7 @@ public class AMQ4930Test extends TestCase {
protected void configureBroker() throws Exception { protected void configureBroker() throws Exception {
broker.setDeleteAllMessagesOnStartup(true); broker.setDeleteAllMessagesOnStartup(true);
broker.setAdvisorySupport(false); broker.setAdvisorySupport(false);
broker.getSystemUsage().getMemoryUsage().setLimit(100*1024*1024); broker.getSystemUsage().getMemoryUsage().setLimit(1*1024*1024);
PolicyMap pMap = new PolicyMap(); PolicyMap pMap = new PolicyMap();
PolicyEntry policy = new PolicyEntry(); PolicyEntry policy = new PolicyEntry();

View File

@ -1,213 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.util.ProducerThread;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RunWith(value = Parameterized.class)
public class MemoryLimitPfcTest extends TestSupport {
private static final Logger LOG = LoggerFactory.getLogger(MemoryLimitPfcTest.class);
final String payload = new String(new byte[100 * 1024]);
protected BrokerService broker;
@Parameterized.Parameter
public PersistenceAdapterChoice persistenceAdapterChoice;
@Parameterized.Parameters(name="store={0}")
public static Iterable<Object[]> getTestParameters() {
return Arrays.asList(new Object[][]{{PersistenceAdapterChoice.KahaDB}, {PersistenceAdapterChoice.LevelDB}, {PersistenceAdapterChoice.JDBC}});
}
protected BrokerService createBroker() throws Exception {
BrokerService broker = new BrokerService();
broker.getSystemUsage().getMemoryUsage().setLimit(1 * 1024 * 1024); //1MB
broker.setDeleteAllMessagesOnStartup(true);
PolicyMap policyMap = new PolicyMap();
PolicyEntry policyEntry = new PolicyEntry();
policyEntry.setExpireMessagesPeriod(0); // when this fires it will consume 2*pageSize mem which will throw the test
policyMap.put(new ActiveMQQueue(">"), policyEntry);
broker.setDestinationPolicy(policyMap);
LOG.info("Starting broker with persistenceAdapterChoice " + persistenceAdapterChoice.toString());
setPersistenceAdapter(broker, persistenceAdapterChoice);
return broker;
}
@Override
@Before
public void setUp() throws Exception {
if (broker == null) {
broker = createBroker();
}
broker.start();
broker.waitUntilStarted();
}
@Override
@After
public void tearDown() throws Exception {
if (broker != null) {
broker.stop();
broker.waitUntilStopped();
}
}
@Test(timeout = 120000)
public void testStopCachingDispatchNoPfc() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=10");
factory.setOptimizeAcknowledge(true);
Connection conn = factory.createConnection();
conn.start();
Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = sess.createQueue("STORE");
final ProducerThread producer = new ProducerThread(sess, queue) {
@Override
protected Message createMessage(int i) throws Exception {
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes(payload.getBytes());
return bytesMessage;
}
};
producer.setMessageCount(200);
producer.start();
producer.join();
Thread.sleep(1000);
// assert we didn't break high watermark (70%) usage
final Destination dest = broker.getDestination((ActiveMQQueue) queue);
LOG.info("Destination usage: " + dest.getMemoryUsage());
int percentUsage = dest.getMemoryUsage().getPercentUsage();
assertTrue("Should be less than 70% of limit but was: " + percentUsage, percentUsage <= 80);
LOG.info("Broker usage: " + broker.getSystemUsage().getMemoryUsage());
assertTrue(broker.getSystemUsage().getMemoryUsage().getPercentUsage() <= 80);
assertFalse("cache disabled", ((org.apache.activemq.broker.region.Queue) dest).getMessages().isCacheEnabled());
// consume one message
MessageConsumer consumer = sess.createConsumer(queue);
Message msg = consumer.receive(5000);
msg.acknowledge();
LOG.info("Destination usage after consume one: " + dest.getMemoryUsage());
// ensure we can send more messages
final ProducerThread secondProducer = new ProducerThread(sess, queue) {
@Override
protected Message createMessage(int i) throws Exception {
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes(payload.getBytes());
return bytesMessage;
}
};
secondProducer.setMessageCount(100);
secondProducer.start();
secondProducer.join();
LOG.info("Broker usage: " + broker.getSystemUsage().getMemoryUsage());
assertTrue(broker.getSystemUsage().getMemoryUsage().getPercentUsage() <= 100);
// let's make sure we can consume all messages
for (int i = 1; i < 300; i++) {
msg = consumer.receive(5000);
if (msg == null) {
dumpAllThreads("NoMessage");
}
assertNotNull("Didn't receive message " + i, msg);
msg.acknowledge();
}
}
@Test(timeout = 120000)
public void testConsumeFromTwoAfterPageInToOne() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=10");
factory.setOptimizeAcknowledge(true);
Connection conn = factory.createConnection();
conn.start();
Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
final ProducerThread producer = new ProducerThread(sess, sess.createQueue("STORE.1")) {
@Override
protected Message createMessage(int i) throws Exception {
return session.createTextMessage(payload + "::" + i);
}
};
producer.setMessageCount(20);
final ProducerThread producer2 = new ProducerThread(sess, sess.createQueue("STORE.2")) {
@Override
protected Message createMessage(int i) throws Exception {
return session.createTextMessage(payload + "::" + i);
}
};
producer2.setMessageCount(20);
producer.start();
producer2.start();
producer.join();
producer2.join();
LOG.info("before consumer1, broker % mem usage: " + broker.getSystemUsage().getMemoryUsage().getPercentUsage());
MessageConsumer consumer = sess.createConsumer(sess.createQueue("STORE.1"));
Message msg = null;
for (int i=0; i<10; i++) {
msg = consumer.receive(5000);
LOG.info("% mem usage: " + broker.getSystemUsage().getMemoryUsage().getPercentUsage());
msg.acknowledge();
}
TimeUnit.SECONDS.sleep(2);
LOG.info("Before consumer2, Broker % mem usage: " + broker.getSystemUsage().getMemoryUsage().getPercentUsage());
MessageConsumer consumer2 = sess.createConsumer(sess.createQueue("STORE.2"));
for (int i=0; i<10; i++) {
msg = consumer2.receive(5000);
LOG.info("% mem usage: " + broker.getSystemUsage().getMemoryUsage().getPercentUsage());
msg.acknowledge();
}
}
}

View File

@ -133,9 +133,18 @@ public class MemoryLimitTest extends TestSupport {
Message msg = consumer.receive(5000); Message msg = consumer.receive(5000);
msg.acknowledge(); msg.acknowledge();
assertTrue("Should be less than 70% of limit but was: " + percentUsage, percentUsage <= 71); // this should free some space and allow us to get new batch of messages in the memory
// exceeding the limit
assertTrue("Limit is exceeded", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.info("Destination usage: " + dest.getMemoryUsage());
return dest.getMemoryUsage().getPercentUsage() >= 200;
}
}));
LOG.info("Broker usage: " + broker.getSystemUsage().getMemoryUsage()); LOG.info("Broker usage: " + broker.getSystemUsage().getMemoryUsage());
assertTrue(broker.getSystemUsage().getMemoryUsage().getPercentUsage() <= 71); assertTrue(broker.getSystemUsage().getMemoryUsage().getPercentUsage() >= 200);
// let's make sure we can consume all messages // let's make sure we can consume all messages
for (int i = 1; i < 2000; i++) { for (int i = 1; i < 2000; i++) {

View File

@ -182,7 +182,7 @@ public class QueueBrowsingTest {
@Test @Test
public void testMemoryLimit() throws Exception { public void testMemoryLimit() throws Exception {
broker.getSystemUsage().getMemoryUsage().setLimit((maxPageSize + 10) * 4 * 1024); broker.getSystemUsage().getMemoryUsage().setLimit(16 * 1024);
int messageToSend = 370; int messageToSend = 370;
@ -211,6 +211,6 @@ public class QueueBrowsingTest {
} }
browser.close(); browser.close();
assertTrue("got at least maxPageSize, received: " + received, received >= maxPageSize); assertTrue("got at least maxPageSize", received >= maxPageSize);
} }
} }