resolve https://issues.apache.org/activemq/browse/AMQ-2626 - ensure reference count is decremented on non evicted messages

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@915770 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-02-24 12:53:37 +00:00
parent aa9b5666ce
commit a2af47bd1d
10 changed files with 251 additions and 34 deletions

View File

@ -284,13 +284,13 @@ public class Topic extends BaseDestination implements Task {
if (warnOnProducerFlowControl) {
warnOnProducerFlowControl = false;
LOG.info("Usage Manager memory limit reached for " + getActiveMQDestination().getQualifiedName()
LOG.info("Usage Manager memory limit ("+ memoryUsage.getLimit() + ") reached for " + getActiveMQDestination().getQualifiedName()
+ ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it."
+ " See http://activemq.apache.org/producer-flow-control.html for more info");
}
if (systemUsage.isSendFailIfNoSpace()) {
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding "
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit ("+ memoryUsage.getLimit() + ") reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding "
+ getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
}

View File

@ -88,7 +88,7 @@ public class TopicSubscription extends AbstractSubscription {
dispatch(node);
slowConsumer=false;
} else {
//we are slow
//we are slow
if(!slowConsumer) {
slowConsumer=true;
for (Destination dest: destinations) {
@ -124,8 +124,11 @@ public class TopicSubscription extends AbstractSubscription {
LinkedList<MessageReference> list = null;
MessageReference[] oldMessages=null;
synchronized(matched){
list = matched.pageInList(pageInSize);
list = matched.pageInList(pageInSize);
oldMessages = messageEvictionStrategy.evictMessages(list);
for (MessageReference ref : list) {
ref.decrementReferenceCount();
}
}
int messagesToEvict = 0;
if (oldMessages != null){
@ -478,17 +481,5 @@ public class TopicSubscription extends AbstractSubscription {
public int getPrefetchSize() {
return (int)info.getPrefetchSize();
}
/**
* Get the list of inflight messages
* @return the list
*/
public synchronized List<MessageReference> getInFlightMessages(){
List<MessageReference> result = new ArrayList<MessageReference>();
synchronized(matched) {
result.addAll(matched.pageInList(1000));
}
return result;
}
}

View File

@ -211,7 +211,7 @@ public interface PendingMessageCursor extends Service {
void destroy() throws Exception;
/**
* Page in a restricted number of messages
* Page in a restricted number of messages and increment the reference count
*
* @param maxItems
* @return a list of paged in messages

View File

@ -147,6 +147,11 @@ public class PolicyEntry extends DestinationMapEntry {
}
public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) {
//override prefetch size if not set by the Consumer
int prefetch=subscription.getConsumerInfo().getPrefetchSize();
if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH){
subscription.getConsumerInfo().setPrefetchSize(getTopicPrefetch());
}
if (pendingMessageLimitStrategy != null) {
int value = pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription);
int consumerLimit = subscription.getInfo().getMaximumPendingMessageLimit();
@ -167,11 +172,6 @@ public class PolicyEntry extends DestinationMapEntry {
}
if (pendingSubscriberPolicy != null) {
String name = subscription.getContext().getClientId() + "_" + subscription.getConsumerInfo().getConsumerId();
//override prefetch size if not set by the Consumer
int prefetch=subscription.getConsumerInfo().getPrefetchSize();
if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH){
subscription.getConsumerInfo().setPrefetchSize(getTopicPrefetch());
}
int maxBatchSize = subscription.getConsumerInfo().getPrefetchSize();
subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(broker,name, maxBatchSize));
}

View File

@ -176,8 +176,10 @@ public class PListStore extends ServiceSupport {
}
public long size() {
if (!initialized) {
return 0;
synchronized (this) {
if (!initialized) {
return 0;
}
}
try {
return journal.getDiskSize() + pageFile.getDiskSize();

View File

@ -240,7 +240,7 @@ public abstract class Usage<T extends Usage> implements Service {
private void fireEvent(final int oldPercentUsage, final int newPercentUsage) {
if (debug) {
LOG.debug("Memory usage change from: " + oldPercentUsage + "% of available memory, to: "
LOG.debug(getName() + ": usage change from: " + oldPercentUsage + "% of available memory, to: "
+ newPercentUsage + "% of available memory");
}
if (started.get()) {

View File

@ -43,12 +43,14 @@ public class ThreadTracker {
* output the result of stack trace capture to the log
*/
public static void result() {
for (Entry<String, Tracker> t: trackers.entrySet()) {
LOG.info("Tracker: " + t.getKey() + ", " + t.getValue().size() + " entry points...");
for (Trace trace : t.getValue().values()) {
LOG.info("count: " + trace.count, trace);
synchronized(trackers) {
for (Entry<String, Tracker> t: trackers.entrySet()) {
LOG.info("Tracker: " + t.getKey() + ", " + t.getValue().size() + " entry points...");
for (Trace trace : t.getValue().values()) {
LOG.info("count: " + trace.count, trace);
}
LOG.info("Tracker: " + t.getKey() + ", done.");
}
LOG.info("Tracker: " + t.getKey() + ", done.");
}
}

View File

@ -0,0 +1,199 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
import static junit.framework.Assert.fail;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
import org.apache.activemq.broker.region.policy.PendingMessageLimitStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.PrefetchRatePendingMessageLimitStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.util.ThreadTracker;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class MessageEvictionTest {
static final Log LOG = LogFactory.getLog(MessageEvictionTest.class);
private BrokerService broker;
private ConnectionFactory connectionFactory;
Connection connection;
private Session session;
private Topic destination;
protected int numMessages = 4000;
protected String payload = new String(new byte[1024*2]);
@Before
public void setUp() throws Exception {
broker = createBroker();
broker.start();
connectionFactory = createConnectionFactory();
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
destination = session.createTopic("verifyEvection");
}
@After
public void tearDown() throws Exception {
ThreadTracker.result();
connection.stop();
broker.stop();
}
@Test
public void testMessageEvictionMemoryUsage() throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
final CountDownLatch doAck = new CountDownLatch(1);
final CountDownLatch consumerRegistered = new CountDownLatch(1);
executor.execute(new Runnable() {
public void run() {
try {
final MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
try {
// very slow, only ack once
doAck.await(60, TimeUnit.SECONDS);
message.acknowledge();
} catch (Exception e) {
e.printStackTrace();
consumerRegistered.countDown();
fail(e.toString());
}
}
});
consumerRegistered.countDown();
doAck.await(60, TimeUnit.SECONDS);
consumer.close();
} catch (Exception e) {
e.printStackTrace();
fail(e.toString());
}
}
});
assertTrue("we have a consumer", consumerRegistered.await(10, TimeUnit.SECONDS));
final AtomicInteger sent = new AtomicInteger(0);
final CountDownLatch sendDone = new CountDownLatch(1);
executor.execute(new Runnable() {
public void run() {
MessageProducer producer;
try {
producer = session.createProducer(destination);
for (int i=0; i< numMessages; i++) {
producer.send(session.createTextMessage(payload));
sent.incrementAndGet();
TimeUnit.MILLISECONDS.sleep(10);
}
producer.close();
sendDone.countDown();
} catch (Exception e) {
sendDone.countDown();
e.printStackTrace();
fail(e.toString());
}
}
});
assertTrue("messages sending done", sendDone.await(90, TimeUnit.SECONDS));
assertEquals("all message were sent", numMessages, sent.get());
doAck.countDown();
executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);
assertEquals("usage goes to 0", 0,
TestSupport.getDestination(broker,
ActiveMQDestination.transform(destination)).getMemoryUsage().getPercentUsage());
}
BrokerService createBroker() throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.addConnector("tcp://localhost:0");
brokerService.setUseJmx(false);
brokerService.setDeleteAllMessagesOnStartup(true);
// spooling to disk early so topic memory limit is not reached
brokerService.getSystemUsage().getMemoryUsage().setLimit(500*1024);
final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();
final PolicyEntry entry = new PolicyEntry();
entry.setTopic(">");
// so consumer does not get over run while blocked limit the prefetch
entry.setTopicPrefetch(50);
// limit the number of outstanding messages, large enough to use the file store
ConstantPendingMessageLimitStrategy pendingMessageLimitStrategy = new ConstantPendingMessageLimitStrategy();
pendingMessageLimitStrategy.setLimit(500);
entry.setPendingMessageLimitStrategy(pendingMessageLimitStrategy);
// to keep the limit in check and up to date rather than just the first few, evict some
OldestMessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
messageEvictionStrategy.setEvictExpiredMessagesHighWatermark(100);
entry.setMessageEvictionStrategy(messageEvictionStrategy);
// let evicted messaged disappear
entry.setDeadLetterStrategy(null);
policyEntries.add(entry);
final PolicyMap policyMap = new PolicyMap();
policyMap.setPolicyEntries(policyEntries);
brokerService.setDestinationPolicy(policyMap);
brokerService.setAdvisorySupport(false);
return brokerService;
}
ConnectionFactory createConnectionFactory() throws Exception {
String url = ((TransportConnector) broker.getTransportConnectors().get(0)).getServer().getConnectURI().toString();
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
factory.setWatchTopicAdvisories(false);
return factory;
}
}

View File

@ -16,8 +16,6 @@
*/
package org.apache.activemq.broker;
import javax.jms.JMSException;
import org.apache.activemq.TestSupport;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.util.ThreadTracker;

View File

@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
@ -157,7 +158,19 @@ public class FailoverConsumerOutstandingCommitTest {
}
});
produceMessage(producerSession, destination, prefetch * 2);
// may block if broker shutodwn happens quickly
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
LOG.info("producer started");
try {
produceMessage(producerSession, destination, prefetch * 2);
} catch (JMSException e) {
e.printStackTrace();
fail("unexpceted ex on producer: " + e);
}
LOG.info("producer done");
}
});
// will be stopped by the plugin
broker.waitUntilStopped();
@ -245,7 +258,19 @@ public class FailoverConsumerOutstandingCommitTest {
}
});
produceMessage(producerSession, destination, prefetch * 2);
// may block if broker shutdown happens quickly
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
LOG.info("producer started");
try {
produceMessage(producerSession, destination, prefetch * 2);
} catch (JMSException e) {
e.printStackTrace();
fail("unexpceted ex on producer: " + e);
}
LOG.info("producer done");
}
});
// will be stopped by the plugin
broker.waitUntilStopped();