From a0d2282a418470c4256193f2085a0c3c46d2b67e Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Thu, 15 Apr 2010 10:04:55 +0000 Subject: [PATCH] resolve https://issues.apache.org/activemq/browse/AMQ-2668 - implement destination PolicyEntry storeUsageHighWaterMark to allow rough store usage split across destinations. an individual dest can use a 70% high watermark leaving the default 100% for a DLQ, so that it is not blocked on a store limit git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@934352 13f79535-47bb-0310-9956-ffa450edef68 --- .../broker/region/BaseDestination.java | 39 ++++ .../apache/activemq/broker/region/Queue.java | 29 +-- .../apache/activemq/broker/region/Topic.java | 44 ++-- .../broker/region/policy/PolicyEntry.java | 8 + .../java/org/apache/activemq/usage/Usage.java | 22 +- .../PerDestinationStoreLimitTest.java | 198 ++++++++++++++++++ 6 files changed, 283 insertions(+), 57 deletions(-) create mode 100644 activemq-core/src/test/java/org/apache/activemq/PerDestinationStoreLimitTest.java diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 34eb754013..a7f169c2f6 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -18,6 +18,8 @@ package org.apache.activemq.broker.region; import java.io.IOException; +import javax.jms.ResourceAllocationException; + import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerService; @@ -34,6 +36,7 @@ import org.apache.activemq.store.MessageStore; import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.usage.Usage; +import org.apache.commons.logging.Log; /** * @version $Revision: 1.12 $ @@ -77,6 +80,7 @@ public abstract class BaseDestination implements Destination { protected long expireMessagesPeriod = EXPIRE_MESSAGE_PERIOD; private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE; protected int cursorMemoryHighWaterMark = 70; + protected int storeUsageHighWaterMark = 100; /** * @param broker @@ -533,6 +537,41 @@ public abstract class BaseDestination implements Destination { public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { } + + public final int getStoreUsageHighWaterMark() { + return this.storeUsageHighWaterMark; + } + + public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) { + this.storeUsageHighWaterMark = storeUsageHighWaterMark; + } + + protected final void waitForSpace(ConnectionContext context, Usage usage, String warning) throws IOException, InterruptedException, ResourceAllocationException { + waitForSpace(context, usage, 100, warning); + } + protected final void waitForSpace(ConnectionContext context, Usage usage, int highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException { + if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) { + if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout(), highWaterMark)) { + throw new ResourceAllocationException(warning); + } + } else { + long start = System.currentTimeMillis(); + long nextWarn = start; + while (!usage.waitForSpace(1000, highWaterMark)) { + if (context.getStopping().get()) { + throw new IOException("Connection closed, send aborted."); + } + + long now = System.currentTimeMillis(); + if (now >= nextWarn) { + getLog().info(warning + " (blocking for: " + (now - start) / 1000 + "s)"); + nextWarn = now + blockedProducerWarningInterval; + } + } + } + } + + protected abstract Log getLog(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index b1b2f3206d..67173d3e07 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -556,16 +556,16 @@ public class Queue extends BaseDestination implements Task, UsageListener { final ConnectionContext context = producerExchange.getConnectionContext(); synchronized (sendLock) { if (store != null && message.isPersistent()) { - if (systemUsage.getStoreUsage().isFull()) { + if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) { - String logMessage = "Usage Manager Store is Full. Producer (" + message.getProducerId() + ") stopped to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." + String logMessage = "Usage Manager Store is Full, " + getStoreUsageHighWaterMark() + "% of " + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info"; if (systemUsage.isSendFailIfNoSpace()) { throw new ResourceAllocationException(logMessage); } - waitForSpace(context, systemUsage.getStoreUsage(), logMessage); + waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage); } message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); store.addMessage(context, message); @@ -1718,25 +1718,8 @@ public class Queue extends BaseDestination implements Task, UsageListener { } } - private final void waitForSpace(ConnectionContext context, Usage usage, String warning) throws IOException, InterruptedException, ResourceAllocationException { - if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) { - if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout())) { - throw new ResourceAllocationException(warning); - } - } else { - long start = System.currentTimeMillis(); - long nextWarn = start + blockedProducerWarningInterval; - while (!usage.waitForSpace(1000)) { - if (context.getStopping().get()) { - throw new IOException("Connection closed, send aborted."); - } - - long now = System.currentTimeMillis(); - if (now >= nextWarn) { - LOG.info(warning + " (blocking for: " + (now - start) / 1000 + "s)"); - nextWarn = now + blockedProducerWarningInterval; - } - } - } + @Override + protected Log getLog() { + return LOG; } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java index 8152d5f941..38be904d91 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -16,6 +16,15 @@ */ package org.apache.activemq.broker.region; +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CopyOnWriteArraySet; + import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; @@ -41,18 +50,9 @@ import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.thread.Valve; import org.apache.activemq.transaction.Synchronization; -import org.apache.activemq.usage.Usage; import org.apache.activemq.util.SubscriptionKey; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import java.io.IOException; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CopyOnWriteArraySet; /** * The Topic is a destination that sends a copy of a message to every active @@ -404,14 +404,14 @@ public class Topic extends BaseDestination implements Task { message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) { - if (systemUsage.getStoreUsage().isFull()) { - final String logMessage = "Usage Manager Store is Full. Stopping producer (" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." + if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) { + final String logMessage = "Usage Manager Store is Full, " + getStoreUsageHighWaterMark() + "% of " + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info"; if (systemUsage.isSendFailIfNoSpace()) { throw new javax.jms.ResourceAllocationException(logMessage); } - waitForSpace(context, systemUsage.getStoreUsage(), logMessage); + waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage); } topicStore.addMessage(context, message); } @@ -610,21 +610,11 @@ public class Topic extends BaseDestination implements Task { LOG.error("Failed to remove expired Message from the store ", e); } } - - private final void waitForSpace(ConnectionContext context, Usage usage, String warning) throws IOException, InterruptedException { - long start = System.currentTimeMillis(); - long nextWarn = start + blockedProducerWarningInterval; - while (!usage.waitForSpace(1000)) { - if (context.getStopping().get()) { - throw new IOException("Connection closed, send aborted."); - } - - long now = System.currentTimeMillis(); - if (now >= nextWarn) { - LOG.info(warning + " (blocking for: " + (now - start) / 1000 + "s)"); - nextWarn = now + blockedProducerWarningInterval; - } - } + + @Override + protected Log getLog() { + return LOG; } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index 8ec393c8d0..b88fabf2ff 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -84,6 +84,7 @@ public class PolicyEntry extends DestinationMapEntry { private int topicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH; private int durableTopicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH; private int cursorMemoryHighWaterMark=70; + private int storeUsageHighWaterMark = 100; public void configure(Broker broker,Queue queue) { @@ -144,6 +145,7 @@ public class PolicyEntry extends DestinationMapEntry { destination.setExpireMessagesPeriod(getExpireMessagesPeriod()); destination.setMaxExpirePageSize(getMaxExpirePageSize()); destination.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); + destination.setStoreUsageHighWaterMark(getStoreUsageHighWaterMark()); } public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) { @@ -698,6 +700,12 @@ public class PolicyEntry extends DestinationMapEntry { this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark; } + public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) { + this.storeUsageHighWaterMark = storeUsageHighWaterMark; + } + public int getStoreUsageHighWaterMark() { + return storeUsageHighWaterMark; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java b/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java index 65c7717c17..7e2aa336d5 100755 --- a/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java +++ b/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java @@ -78,25 +78,29 @@ public abstract class Usage implements Service { waitForSpace(0); } + public boolean waitForSpace(long timeout) throws InterruptedException { + return waitForSpace(timeout, 100); + } + /** * @param timeout * @throws InterruptedException * @return true if space */ - public boolean waitForSpace(long timeout) throws InterruptedException { + public boolean waitForSpace(long timeout, int highWaterMark) throws InterruptedException { if (parent != null) { - if (!parent.waitForSpace(timeout)) { + if (!parent.waitForSpace(timeout, highWaterMark)) { return false; } } synchronized (usageMutex) { percentUsage=caclPercentUsage(); - if (percentUsage >= 100) { + if (percentUsage >= highWaterMark) { long deadline = timeout > 0 ? System.currentTimeMillis() + timeout : Long.MAX_VALUE; long timeleft = deadline; while (timeleft > 0) { percentUsage=caclPercentUsage(); - if (percentUsage >= 100) { + if (percentUsage >= highWaterMark) { usageMutex.wait(pollingTime); timeleft = deadline - System.currentTimeMillis(); } else { @@ -104,17 +108,21 @@ public abstract class Usage implements Service { } } } - return percentUsage < 100; + return percentUsage < highWaterMark; } } public boolean isFull() { - if (parent != null && parent.isFull()) { + return isFull(100); + } + + public boolean isFull(int highWaterMark) { + if (parent != null && parent.isFull(highWaterMark)) { return true; } synchronized (usageMutex) { percentUsage=caclPercentUsage(); - return percentUsage >= 100; + return percentUsage >= highWaterMark; } } diff --git a/activemq-core/src/test/java/org/apache/activemq/PerDestinationStoreLimitTest.java b/activemq-core/src/test/java/org/apache/activemq/PerDestinationStoreLimitTest.java new file mode 100644 index 0000000000..a0f296dcad --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/PerDestinationStoreLimitTest.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.transport.tcp.TcpTransport; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +// see: https://issues.apache.org/activemq/browse/AMQ-2668 +public class PerDestinationStoreLimitTest extends JmsTestSupport { + static final Log LOG = LogFactory.getLog(PerDestinationStoreLimitTest.class); + final String oneKb = new String(new byte[1024]); + + ActiveMQDestination queueDest = new ActiveMQQueue("PerDestinationStoreLimitTest.Queue"); + ActiveMQDestination topicDest = new ActiveMQTopic("PerDestinationStoreLimitTest.Topic"); + + protected TransportConnector connector; + protected ActiveMQConnection connection; + + public void testDLQAfterBlockTopic() throws Exception { + doTestDLQAfterBlock(topicDest); + } + + public void testDLQAfterBlockQueue() throws Exception { + doTestDLQAfterBlock(queueDest); + } + + public void doTestDLQAfterBlock(ActiveMQDestination destination) throws Exception { + ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory(); + RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); + // Immediately sent to the DLQ on rollback, no redelivery + redeliveryPolicy.setMaximumRedeliveries(0); + factory.setRedeliveryPolicy(redeliveryPolicy); + + // Separate connection for consumer so it will not be blocked by filler thread + // sending when it blocks + connection = (ActiveMQConnection)factory.createConnection(); + connections.add(connection); + connection.setClientID("someId"); + connection.start(); + + final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumer = destination.isQueue() ? + consumerSession.createConsumer(destination) : + consumerSession.createDurableSubscriber((Topic) destination, "Durable"); + + connection = (ActiveMQConnection)factory.createConnection(); + connections.add(connection); + connection.start(); + + final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + final MessageProducer producer = session.createProducer(destination); + + final AtomicBoolean done = new AtomicBoolean(true); + final AtomicBoolean keepGoing = new AtomicBoolean(true); + final CountDownLatch fillerStarted = new CountDownLatch(1); + + final AtomicLong sent = new AtomicLong(0); + Thread thread = new Thread("Filler") { + int i; + @Override + public void run() { + while (keepGoing.get()) { + done.set(false); + fillerStarted.countDown(); + try { + producer.send(session.createTextMessage(oneKb + ++i)); + if (i%10 == 0) { + session.commit(); + sent.getAndAdd(10); + LOG.info("committed/sent: " + sent.get()); + } + LOG.info("sent: " + i); + } catch (JMSException e) { + } + } + } + }; + thread.start(); + + assertTrue("filler started..", fillerStarted.await(20, TimeUnit.SECONDS)); + waitForBlocked(done); + + // consume and rollback some so message gets to DLQ + connection = (ActiveMQConnection)factory.createConnection(); + connections.add(connection); + connection.start(); + TextMessage msg; + int received = 0; + for (;received < sent.get(); ++received) { + msg = (TextMessage) consumer.receive(4000); + if (msg == null) { + LOG.info("received null on count: " + received); + break; + } + LOG.info("received: " + received + ", msg: " + msg.getJMSMessageID()); + if (received%5==0) { + if (received%3==0) { + // force the use of the DLQ which will use some more store + LOG.info("rollback on : " + received); + consumerSession.rollback(); + } else { + LOG.info("commit on : " + received); + consumerSession.commit(); + } + } + } + LOG.info("Done:: sent: " + sent.get() + ", received: " + received); + keepGoing.set(false); + assertTrue("some were sent:", sent.get() > 0); + assertEquals("received what was committed", sent.get(), received); + } + + protected void waitForBlocked(final AtomicBoolean done) + throws InterruptedException { + while (true) { + Thread.sleep(1000); + // the producer is blocked once the done flag stays true + if (done.get()) { + break; + } + done.set(true); + } + } + + protected BrokerService createBroker() throws Exception { + BrokerService service = new BrokerService(); + service.setDeleteAllMessagesOnStartup(true); + + service.setUseJmx(false); + + service.getSystemUsage().getStoreUsage().setLimit(200*1024); + + // allow destination to use 50% of store, leaving 50% for DLQ. + PolicyMap policyMap = new PolicyMap(); + PolicyEntry policy = new PolicyEntry(); + policy.setStoreUsageHighWaterMark(50); + policyMap.put(queueDest, policy); + policyMap.put(topicDest, policy); + service.setDestinationPolicy(policyMap); + + connector = service.addConnector("tcp://localhost:0"); + return service; + } + + public void setUp() throws Exception { + setAutoFail(true); + super.setUp(); + } + + protected void tearDown() throws Exception { + if (connection != null) { + TcpTransport t = (TcpTransport)connection.getTransport().narrow(TcpTransport.class); + t.getTransportListener().onException(new IOException("Disposed.")); + connection.getTransport().stop(); + super.tearDown(); + } + } + + protected ConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory(connector.getConnectUri()); + } +}