mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-4495 - always get a max batch of messages from the store
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1478823 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
943011e92e
commit
7450a32ae7
|
@ -38,7 +38,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
||||||
private boolean storeHasMessages = false;
|
private boolean storeHasMessages = false;
|
||||||
protected int size;
|
protected int size;
|
||||||
private MessageId lastCachedId;
|
private MessageId lastCachedId;
|
||||||
private boolean hadSpace = false;
|
protected boolean hadSpace = false;
|
||||||
|
|
||||||
protected AbstractStoreCursor(Destination destination) {
|
protected AbstractStoreCursor(Destination destination) {
|
||||||
super((destination != null ? destination.isPrioritizedMessages():false));
|
super((destination != null ? destination.isPrioritizedMessages():false));
|
||||||
|
@ -253,12 +253,6 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
||||||
setCacheEnabled(false);
|
setCacheEnabled(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean hasSpace() {
|
|
||||||
hadSpace = super.hasSpace();
|
|
||||||
return hadSpace;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected final synchronized void fillBatch() {
|
protected final synchronized void fillBatch() {
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace(this + " - fillBatch");
|
LOG.trace(this + " - fillBatch");
|
||||||
|
|
|
@ -17,10 +17,15 @@
|
||||||
package org.apache.activemq.broker.region.cursors;
|
package org.apache.activemq.broker.region.cursors;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.activemq.broker.Broker;
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.broker.region.Queue;
|
import org.apache.activemq.broker.region.Queue;
|
||||||
import org.apache.activemq.command.Message;
|
import org.apache.activemq.command.Message;
|
||||||
import org.apache.activemq.command.MessageId;
|
import org.apache.activemq.command.MessageId;
|
||||||
import org.apache.activemq.store.MessageStore;
|
import org.apache.activemq.store.MessageStore;
|
||||||
|
import org.apache.activemq.store.memory.MemoryMessageStore;
|
||||||
|
import org.apache.activemq.store.memory.MemoryTransactionStore;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -33,14 +38,16 @@ import org.slf4j.LoggerFactory;
|
||||||
class QueueStorePrefetch extends AbstractStoreCursor {
|
class QueueStorePrefetch extends AbstractStoreCursor {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(QueueStorePrefetch.class);
|
private static final Logger LOG = LoggerFactory.getLogger(QueueStorePrefetch.class);
|
||||||
private final MessageStore store;
|
private final MessageStore store;
|
||||||
|
private final Broker broker;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Construct it
|
* Construct it
|
||||||
* @param queue
|
* @param queue
|
||||||
*/
|
*/
|
||||||
public QueueStorePrefetch(Queue queue) {
|
public QueueStorePrefetch(Queue queue, Broker broker) {
|
||||||
super(queue);
|
super(queue);
|
||||||
this.store = queue.getMessageStore();
|
this.store = queue.getMessageStore();
|
||||||
|
this.broker = broker;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -94,7 +101,10 @@ class QueueStorePrefetch extends AbstractStoreCursor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doFillBatch() throws Exception {
|
protected void doFillBatch() throws Exception {
|
||||||
this.store.recoverNextMessages(this.maxBatchSize, this);
|
hadSpace = this.hasSpace();
|
||||||
|
if (!broker.getBrokerService().isPersistent() || hadSpace) {
|
||||||
|
this.store.recoverNextMessages(this.maxBatchSize, this);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,7 +47,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
|
||||||
super((queue != null ? queue.isPrioritizedMessages():false));
|
super((queue != null ? queue.isPrioritizedMessages():false));
|
||||||
this.broker=broker;
|
this.broker=broker;
|
||||||
this.queue = queue;
|
this.queue = queue;
|
||||||
this.persistent = new QueueStorePrefetch(queue);
|
this.persistent = new QueueStorePrefetch(queue, broker);
|
||||||
currentCursor = persistent;
|
currentCursor = persistent;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -279,15 +279,12 @@ public class JDBCMessageStore extends AbstractMessageStore {
|
||||||
maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener() {
|
maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener() {
|
||||||
|
|
||||||
public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
|
public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
|
||||||
if (listener.hasSpace()) {
|
|
||||||
Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
|
Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
|
||||||
msg.getMessageId().setBrokerSequenceId(sequenceId);
|
msg.getMessageId().setBrokerSequenceId(sequenceId);
|
||||||
listener.recoverMessage(msg);
|
listener.recoverMessage(msg);
|
||||||
lastRecoveredSequenceId.set(sequenceId);
|
lastRecoveredSequenceId.set(sequenceId);
|
||||||
lastRecoveredPriority.set(msg.getPriority());
|
lastRecoveredPriority.set(msg.getPriority());
|
||||||
return true;
|
return true;
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean recoverMessageReference(String reference) throws Exception {
|
public boolean recoverMessageReference(String reference) throws Exception {
|
||||||
|
|
|
@ -548,8 +548,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
||||||
StoredDestination sd = getStoredDestination(dest, tx);
|
StoredDestination sd = getStoredDestination(dest, tx);
|
||||||
Entry<Long, MessageKeys> entry = null;
|
Entry<Long, MessageKeys> entry = null;
|
||||||
int counter = 0;
|
int counter = 0;
|
||||||
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx);
|
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) {
|
||||||
listener.hasSpace() && iterator.hasNext(); ) {
|
|
||||||
entry = iterator.next();
|
entry = iterator.next();
|
||||||
if (ackedAndPrepared.contains(entry.getValue().messageId)) {
|
if (ackedAndPrepared.contains(entry.getValue().messageId)) {
|
||||||
continue;
|
continue;
|
||||||
|
|
|
@ -665,7 +665,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
|
||||||
|
|
||||||
case class LimitingRecoveryListener(max: Int, listener: MessageRecoveryListener) extends MessageRecoveryListener {
|
case class LimitingRecoveryListener(max: Int, listener: MessageRecoveryListener) extends MessageRecoveryListener {
|
||||||
private var recovered: Int = 0
|
private var recovered: Int = 0
|
||||||
def hasSpace = recovered < max && listener.hasSpace
|
def hasSpace = recovered < max
|
||||||
def recoverMessage(message: Message) = {
|
def recoverMessage(message: Message) = {
|
||||||
recovered += 1;
|
recovered += 1;
|
||||||
listener.recoverMessage(message)
|
listener.recoverMessage(message)
|
||||||
|
|
|
@ -76,7 +76,7 @@ public class StoreQueueCursorNoDuplicateTest extends TestCase {
|
||||||
|
|
||||||
queueMessageStore.start();
|
queueMessageStore.start();
|
||||||
|
|
||||||
QueueStorePrefetch underTest = new QueueStorePrefetch(queue);
|
QueueStorePrefetch underTest = new QueueStorePrefetch(queue, brokerService.getBroker());
|
||||||
SystemUsage systemUsage = new SystemUsage();
|
SystemUsage systemUsage = new SystemUsage();
|
||||||
// ensure memory limit is reached
|
// ensure memory limit is reached
|
||||||
systemUsage.getMemoryUsage().setLimit(messageBytesSize * (count + 2));
|
systemUsage.getMemoryUsage().setLimit(messageBytesSize * (count + 2));
|
||||||
|
|
|
@ -0,0 +1,201 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.usecases;
|
||||||
|
|
||||||
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.TestSupport;
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.region.Destination;
|
||||||
|
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||||
|
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||||
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
|
import org.apache.activemq.util.ConsumerThread;
|
||||||
|
import org.apache.activemq.util.ProducerThread;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Ignore;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import javax.jms.*;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
@RunWith(value = Parameterized.class)
|
||||||
|
public class MemoryLimitTest extends TestSupport {
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(MemoryLimitTest.class);
|
||||||
|
final String payload = new String(new byte[10 * 1024]); //10KB
|
||||||
|
protected BrokerService broker;
|
||||||
|
|
||||||
|
private final TestSupport.PersistenceAdapterChoice persistenceAdapterChoice;
|
||||||
|
|
||||||
|
@Parameterized.Parameters
|
||||||
|
public static Collection<TestSupport.PersistenceAdapterChoice[]> getTestParameters() {
|
||||||
|
TestSupport.PersistenceAdapterChoice[] kahaDb = {TestSupport.PersistenceAdapterChoice.KahaDB};
|
||||||
|
TestSupport.PersistenceAdapterChoice[] levelDb = {TestSupport.PersistenceAdapterChoice.LevelDB};
|
||||||
|
TestSupport.PersistenceAdapterChoice[] jdbc = {TestSupport.PersistenceAdapterChoice.JDBC};
|
||||||
|
List<TestSupport.PersistenceAdapterChoice[]> choices = new ArrayList<TestSupport.PersistenceAdapterChoice[]>();
|
||||||
|
choices.add(kahaDb);
|
||||||
|
choices.add(levelDb);
|
||||||
|
choices.add(jdbc);
|
||||||
|
return choices;
|
||||||
|
}
|
||||||
|
|
||||||
|
public MemoryLimitTest(TestSupport.PersistenceAdapterChoice choice) {
|
||||||
|
this.persistenceAdapterChoice = choice;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected BrokerService createBroker() throws Exception {
|
||||||
|
BrokerService broker = new BrokerService();
|
||||||
|
broker.getSystemUsage().getMemoryUsage().setLimit(1 * 1024 * 1024); //1MB
|
||||||
|
broker.deleteAllMessages();
|
||||||
|
|
||||||
|
PolicyMap policyMap = new PolicyMap();
|
||||||
|
PolicyEntry policyEntry = new PolicyEntry();
|
||||||
|
policyEntry.setProducerFlowControl(false);
|
||||||
|
policyMap.put(new ActiveMQQueue(">"), policyEntry);
|
||||||
|
broker.setDestinationPolicy(policyMap);
|
||||||
|
|
||||||
|
LOG.info("Starting broker with persistenceAdapterChoice " + persistenceAdapterChoice.toString());
|
||||||
|
setPersistenceAdapter(broker, persistenceAdapterChoice);
|
||||||
|
broker.getPersistenceAdapter().deleteAllMessages();
|
||||||
|
|
||||||
|
return broker;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
if (broker == null) {
|
||||||
|
broker = createBroker();
|
||||||
|
}
|
||||||
|
broker.start();
|
||||||
|
broker.waitUntilStarted();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
if (broker != null) {
|
||||||
|
broker.stop();
|
||||||
|
broker.waitUntilStopped();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 120000)
|
||||||
|
public void testCursorBatch() throws Exception {
|
||||||
|
|
||||||
|
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=10");
|
||||||
|
factory.setOptimizeAcknowledge(true);
|
||||||
|
Connection conn = factory.createConnection();
|
||||||
|
conn.start();
|
||||||
|
Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
|
||||||
|
Queue queue = sess.createQueue("STORE");
|
||||||
|
final ProducerThread producer = new ProducerThread(sess, queue) {
|
||||||
|
@Override
|
||||||
|
protected Message createMessage(int i) throws Exception {
|
||||||
|
return sess.createTextMessage(payload + "::" + i);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
producer.setMessageCount(2000);
|
||||||
|
producer.start();
|
||||||
|
producer.join();
|
||||||
|
|
||||||
|
Thread.sleep(1000);
|
||||||
|
|
||||||
|
// assert we didn't break high watermark (70%) usage
|
||||||
|
Destination dest = broker.getDestination((ActiveMQQueue) queue);
|
||||||
|
LOG.info("Destination usage: " + dest.getMemoryUsage());
|
||||||
|
assertTrue(dest.getMemoryUsage().getPercentUsage() <= 71);
|
||||||
|
LOG.info("Broker usage: " + broker.getSystemUsage().getMemoryUsage());
|
||||||
|
assertTrue(broker.getSystemUsage().getMemoryUsage().getPercentUsage() <= 71);
|
||||||
|
|
||||||
|
// consume one message
|
||||||
|
MessageConsumer consumer = sess.createConsumer(queue);
|
||||||
|
Message msg = consumer.receive();
|
||||||
|
msg.acknowledge();
|
||||||
|
|
||||||
|
Thread.sleep(1000);
|
||||||
|
// this should free some space and allow us to get new batch of messages in the memory
|
||||||
|
// exceeding the limit
|
||||||
|
LOG.info("Destination usage: " + dest.getMemoryUsage());
|
||||||
|
assertTrue(dest.getMemoryUsage().getPercentUsage() >= 478);
|
||||||
|
LOG.info("Broker usage: " + broker.getSystemUsage().getMemoryUsage());
|
||||||
|
assertTrue(broker.getSystemUsage().getMemoryUsage().getPercentUsage() >= 478);
|
||||||
|
|
||||||
|
// let's make sure we can consume all messages
|
||||||
|
for (int i = 1; i < 2000; i++) {
|
||||||
|
msg = consumer.receive(1000);
|
||||||
|
assertNotNull("Didn't receive message " + i, msg);
|
||||||
|
msg.acknowledge();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Handy test for manually checking what's going on
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
@Ignore
|
||||||
|
@Test(timeout = 120000)
|
||||||
|
public void testLimit() throws Exception {
|
||||||
|
|
||||||
|
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=10");
|
||||||
|
factory.setOptimizeAcknowledge(true);
|
||||||
|
Connection conn = factory.createConnection();
|
||||||
|
conn.start();
|
||||||
|
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
final ProducerThread producer = new ProducerThread(sess, sess.createQueue("STORE.1")) {
|
||||||
|
@Override
|
||||||
|
protected Message createMessage(int i) throws Exception {
|
||||||
|
return sess.createTextMessage(payload + "::" + i);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
producer.setMessageCount(1000);
|
||||||
|
|
||||||
|
final ProducerThread producer2 = new ProducerThread(sess, sess.createQueue("STORE.2")) {
|
||||||
|
@Override
|
||||||
|
protected Message createMessage(int i) throws Exception {
|
||||||
|
return sess.createTextMessage(payload + "::" + i);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
producer2.setMessageCount(1000);
|
||||||
|
|
||||||
|
|
||||||
|
ConsumerThread consumer = new ConsumerThread(sess, sess.createQueue("STORE.1"));
|
||||||
|
consumer.setBreakOnNull(false);
|
||||||
|
consumer.setMessageCount(1000);
|
||||||
|
|
||||||
|
producer.start();
|
||||||
|
producer.join();
|
||||||
|
|
||||||
|
producer2.start();
|
||||||
|
|
||||||
|
Thread.sleep(300);
|
||||||
|
|
||||||
|
consumer.start();
|
||||||
|
|
||||||
|
consumer.join();
|
||||||
|
producer2.join();
|
||||||
|
|
||||||
|
assertEquals("consumer got all produced messages", producer.getMessageCount(), consumer.getReceived());
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -45,7 +45,7 @@ public class ConsumerThread extends Thread {
|
||||||
while (received < messageCount) {
|
while (received < messageCount) {
|
||||||
Message msg = consumer.receive(3000);
|
Message msg = consumer.receive(3000);
|
||||||
if (msg != null) {
|
if (msg != null) {
|
||||||
LOG.info("Received " + ((TextMessage)msg).getText());
|
LOG.info("Received " + received + ": " + ((TextMessage)msg).getText());
|
||||||
received++;
|
received++;
|
||||||
} else {
|
} else {
|
||||||
if (breakOnNull) {
|
if (breakOnNull) {
|
||||||
|
|
Loading…
Reference in New Issue