diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java index c4a1b978f4..8152d5f941 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -284,13 +284,13 @@ public class Topic extends BaseDestination implements Task { if (warnOnProducerFlowControl) { warnOnProducerFlowControl = false; - LOG.info("Usage Manager memory limit reached for " + getActiveMQDestination().getQualifiedName() + LOG.info("Usage Manager memory limit ("+ memoryUsage.getLimit() + ") reached for " + getActiveMQDestination().getQualifiedName() + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it." + " See http://activemq.apache.org/producer-flow-control.html for more info"); } if (systemUsage.isSendFailIfNoSpace()) { - throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding " + throw new javax.jms.ResourceAllocationException("Usage Manager memory limit ("+ memoryUsage.getLimit() + ") reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info"); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index a789ddca43..c2b38b7732 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -88,7 +88,7 @@ public class TopicSubscription extends AbstractSubscription { dispatch(node); slowConsumer=false; } else { - //we are slow + //we are slow if(!slowConsumer) { slowConsumer=true; for (Destination dest: destinations) { @@ -124,8 +124,11 @@ public class TopicSubscription extends AbstractSubscription { LinkedList list = null; MessageReference[] oldMessages=null; synchronized(matched){ - list = matched.pageInList(pageInSize); + list = matched.pageInList(pageInSize); oldMessages = messageEvictionStrategy.evictMessages(list); + for (MessageReference ref : list) { + ref.decrementReferenceCount(); + } } int messagesToEvict = 0; if (oldMessages != null){ @@ -478,17 +481,5 @@ public class TopicSubscription extends AbstractSubscription { public int getPrefetchSize() { return (int)info.getPrefetchSize(); } - - /** - * Get the list of inflight messages - * @return the list - */ - public synchronized List getInFlightMessages(){ - List result = new ArrayList(); - synchronized(matched) { - result.addAll(matched.pageInList(1000)); - } - return result; - } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java index 55b4b19446..b48254de97 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java @@ -211,7 +211,7 @@ public interface PendingMessageCursor extends Service { void destroy() throws Exception; /** - * Page in a restricted number of messages + * Page in a restricted number of messages and increment the reference count * * @param maxItems * @return a list of paged in messages diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index 651954f0da..8ec393c8d0 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -147,6 +147,11 @@ public class PolicyEntry extends DestinationMapEntry { } public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) { + //override prefetch size if not set by the Consumer + int prefetch=subscription.getConsumerInfo().getPrefetchSize(); + if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH){ + subscription.getConsumerInfo().setPrefetchSize(getTopicPrefetch()); + } if (pendingMessageLimitStrategy != null) { int value = pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription); int consumerLimit = subscription.getInfo().getMaximumPendingMessageLimit(); @@ -167,11 +172,6 @@ public class PolicyEntry extends DestinationMapEntry { } if (pendingSubscriberPolicy != null) { String name = subscription.getContext().getClientId() + "_" + subscription.getConsumerInfo().getConsumerId(); - //override prefetch size if not set by the Consumer - int prefetch=subscription.getConsumerInfo().getPrefetchSize(); - if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH){ - subscription.getConsumerInfo().setPrefetchSize(getTopicPrefetch()); - } int maxBatchSize = subscription.getConsumerInfo().getPrefetchSize(); subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(broker,name, maxBatchSize)); } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java index 4faea2e34b..661ed5c0a0 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java @@ -176,8 +176,10 @@ public class PListStore extends ServiceSupport { } public long size() { - if (!initialized) { - return 0; + synchronized (this) { + if (!initialized) { + return 0; + } } try { return journal.getDiskSize() + pageFile.getDiskSize(); diff --git a/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java b/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java index 843325923b..a7afc8f035 100755 --- a/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java +++ b/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java @@ -240,7 +240,7 @@ public abstract class Usage implements Service { private void fireEvent(final int oldPercentUsage, final int newPercentUsage) { if (debug) { - LOG.debug("Memory usage change from: " + oldPercentUsage + "% of available memory, to: " + LOG.debug(getName() + ": usage change from: " + oldPercentUsage + "% of available memory, to: " + newPercentUsage + "% of available memory"); } if (started.get()) { diff --git a/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java b/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java index 2149e96714..cf727843c0 100644 --- a/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java +++ b/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java @@ -43,12 +43,14 @@ public class ThreadTracker { * output the result of stack trace capture to the log */ public static void result() { - for (Entry t: trackers.entrySet()) { - LOG.info("Tracker: " + t.getKey() + ", " + t.getValue().size() + " entry points..."); - for (Trace trace : t.getValue().values()) { - LOG.info("count: " + trace.count, trace); + synchronized(trackers) { + for (Entry t: trackers.entrySet()) { + LOG.info("Tracker: " + t.getKey() + ", " + t.getValue().size() + " entry points..."); + for (Trace trace : t.getValue().values()) { + LOG.info("count: " + trace.count, trace); + } + LOG.info("Tracker: " + t.getKey() + ", done."); } - LOG.info("Tracker: " + t.getKey() + ", done."); } } diff --git a/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java b/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java new file mode 100644 index 0000000000..dde21361dc --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/MessageEvictionTest.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import static junit.framework.Assert.fail; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy; +import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy; +import org.apache.activemq.broker.region.policy.PendingMessageLimitStrategy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.PrefetchRatePendingMessageLimitStrategy; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.util.ThreadTracker; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class MessageEvictionTest { + static final Log LOG = LogFactory.getLog(MessageEvictionTest.class); + private BrokerService broker; + private ConnectionFactory connectionFactory; + Connection connection; + private Session session; + private Topic destination; + protected int numMessages = 4000; + protected String payload = new String(new byte[1024*2]); + + @Before + public void setUp() throws Exception { + broker = createBroker(); + broker.start(); + connectionFactory = createConnectionFactory(); + connection = connectionFactory.createConnection(); + connection.start(); + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + destination = session.createTopic("verifyEvection"); + } + + @After + public void tearDown() throws Exception { + ThreadTracker.result(); + connection.stop(); + broker.stop(); + } + + @Test + public void testMessageEvictionMemoryUsage() throws Exception { + ExecutorService executor = Executors.newCachedThreadPool(); + final CountDownLatch doAck = new CountDownLatch(1); + final CountDownLatch consumerRegistered = new CountDownLatch(1); + executor.execute(new Runnable() { + public void run() { + try { + final MessageConsumer consumer = session.createConsumer(destination); + consumer.setMessageListener(new MessageListener() { + public void onMessage(Message message) { + try { + // very slow, only ack once + doAck.await(60, TimeUnit.SECONDS); + message.acknowledge(); + } catch (Exception e) { + e.printStackTrace(); + consumerRegistered.countDown(); + fail(e.toString()); + } + } + }); + consumerRegistered.countDown(); + doAck.await(60, TimeUnit.SECONDS); + consumer.close(); + } catch (Exception e) { + e.printStackTrace(); + fail(e.toString()); + } + } + }); + + assertTrue("we have a consumer", consumerRegistered.await(10, TimeUnit.SECONDS)); + + final AtomicInteger sent = new AtomicInteger(0); + final CountDownLatch sendDone = new CountDownLatch(1); + executor.execute(new Runnable() { + public void run() { + MessageProducer producer; + try { + producer = session.createProducer(destination); + for (int i=0; i< numMessages; i++) { + producer.send(session.createTextMessage(payload)); + sent.incrementAndGet(); + TimeUnit.MILLISECONDS.sleep(10); + } + producer.close(); + sendDone.countDown(); + } catch (Exception e) { + sendDone.countDown(); + e.printStackTrace(); + fail(e.toString()); + } + } + }); + + assertTrue("messages sending done", sendDone.await(90, TimeUnit.SECONDS)); + assertEquals("all message were sent", numMessages, sent.get()); + + doAck.countDown(); + executor.shutdown(); + executor.awaitTermination(30, TimeUnit.SECONDS); + + assertEquals("usage goes to 0", 0, + TestSupport.getDestination(broker, + ActiveMQDestination.transform(destination)).getMemoryUsage().getPercentUsage()); + + } + + BrokerService createBroker() throws Exception { + BrokerService brokerService = new BrokerService(); + brokerService.addConnector("tcp://localhost:0"); + brokerService.setUseJmx(false); + brokerService.setDeleteAllMessagesOnStartup(true); + + // spooling to disk early so topic memory limit is not reached + brokerService.getSystemUsage().getMemoryUsage().setLimit(500*1024); + + final List policyEntries = new ArrayList(); + final PolicyEntry entry = new PolicyEntry(); + entry.setTopic(">"); + + // so consumer does not get over run while blocked limit the prefetch + entry.setTopicPrefetch(50); + + // limit the number of outstanding messages, large enough to use the file store + ConstantPendingMessageLimitStrategy pendingMessageLimitStrategy = new ConstantPendingMessageLimitStrategy(); + pendingMessageLimitStrategy.setLimit(500); + entry.setPendingMessageLimitStrategy(pendingMessageLimitStrategy); + + // to keep the limit in check and up to date rather than just the first few, evict some + OldestMessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy(); + messageEvictionStrategy.setEvictExpiredMessagesHighWatermark(100); + entry.setMessageEvictionStrategy(messageEvictionStrategy); + + // let evicted messaged disappear + entry.setDeadLetterStrategy(null); + policyEntries.add(entry); + + final PolicyMap policyMap = new PolicyMap(); + policyMap.setPolicyEntries(policyEntries); + brokerService.setDestinationPolicy(policyMap); + + brokerService.setAdvisorySupport(false); + + return brokerService; + } + + ConnectionFactory createConnectionFactory() throws Exception { + String url = ((TransportConnector) broker.getTransportConnectors().get(0)).getServer().getConnectURI().toString(); + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); + factory.setWatchTopicAdvisories(false); + return factory; + } + +} diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java index bdb5767dc4..5dc1871a2f 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java @@ -16,8 +16,6 @@ */ package org.apache.activemq.broker; -import javax.jms.JMSException; - import org.apache.activemq.TestSupport; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.util.ThreadTracker; diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java index 66083652fc..79233fb6d4 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.util.ArrayList; import java.util.concurrent.CountDownLatch; @@ -157,7 +158,19 @@ public class FailoverConsumerOutstandingCommitTest { } }); - produceMessage(producerSession, destination, prefetch * 2); + // may block if broker shutodwn happens quickly + Executors.newSingleThreadExecutor().execute(new Runnable() { + public void run() { + LOG.info("producer started"); + try { + produceMessage(producerSession, destination, prefetch * 2); + } catch (JMSException e) { + e.printStackTrace(); + fail("unexpceted ex on producer: " + e); + } + LOG.info("producer done"); + } + }); // will be stopped by the plugin broker.waitUntilStopped(); @@ -245,7 +258,19 @@ public class FailoverConsumerOutstandingCommitTest { } }); - produceMessage(producerSession, destination, prefetch * 2); + // may block if broker shutdown happens quickly + Executors.newSingleThreadExecutor().execute(new Runnable() { + public void run() { + LOG.info("producer started"); + try { + produceMessage(producerSession, destination, prefetch * 2); + } catch (JMSException e) { + e.printStackTrace(); + fail("unexpceted ex on producer: " + e); + } + LOG.info("producer done"); + } + }); // will be stopped by the plugin broker.waitUntilStopped();