diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index d16ef123cb..179d9d9a1d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -601,11 +601,11 @@ public abstract class BaseDestination implements Destination { } protected final void waitForSpace(ConnectionContext context, Usage usage, int highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException { - if (systemUsage.isSendFailIfNoSpace()) { + if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { getLog().debug("sendFailIfNoSpace, forcing exception on send, usage: " + usage + ": " + warning); throw new ResourceAllocationException(warning); } - if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) { + if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) { if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout(), highWaterMark)) { getLog().debug("sendFailIfNoSpaceAfterTimeout expired, forcing exception on send, usage: " + usage + ": " + warning); throw new ResourceAllocationException(warning); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index b44c3695c2..3e9a12baef 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -555,7 +555,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { + " See http://activemq.apache.org/producer-flow-control.html for more info"); } - if (systemUsage.isSendFailIfNoSpace()) { + if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." @@ -613,7 +613,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { } }); - if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) { + if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) { flowControlTimeoutMessages.add(new TimeoutMessage(message, context, systemUsage .getSendFailIfNoSpaceAfterTimeout())); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java index 0704030088..30cbc819f7 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -298,7 +298,7 @@ public class Topic extends BaseDestination implements Task { + " See http://activemq.apache.org/producer-flow-control.html for more info"); } - if (systemUsage.isSendFailIfNoSpace()) { + if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { throw new javax.jms.ResourceAllocationException("Usage Manager memory limit (" + memoryUsage.getLimit() + ") reached. Rejecting send for producer (" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." @@ -427,7 +427,7 @@ public class Topic extends BaseDestination implements Task { + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info"; - if (systemUsage.isSendFailIfNoSpace()) { + if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { throw new javax.jms.ResourceAllocationException(logMessage); } diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkBridgeProducerFlowControlTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkBridgeProducerFlowControlTest.java index 312e0ff357..e950b7dc85 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkBridgeProducerFlowControlTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkBridgeProducerFlowControlTest.java @@ -20,6 +20,7 @@ package org.apache.activemq.usecases; import java.net.URI; import java.util.Vector; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import javax.jms.MessageConsumer; import junit.framework.Test; @@ -27,7 +28,9 @@ import org.apache.activemq.JmsMultipleBrokersTestSupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.util.MessageIdList; import org.apache.commons.logging.Log; @@ -99,15 +102,6 @@ public class NetworkBridgeProducerFlowControlTest extends private static final Log LOG = LogFactory .getLog(NetworkBridgeProducerFlowControlTest.class); - // Consumer prefetch is disabled for broker1's consumers. - private static final ActiveMQQueue SLOW_SHARED_QUEUE = new ActiveMQQueue( - NetworkBridgeProducerFlowControlTest.class.getSimpleName() - + ".slow.shared?consumer.prefetchSize=1"); - - private static final ActiveMQQueue FAST_SHARED_QUEUE = new ActiveMQQueue( - NetworkBridgeProducerFlowControlTest.class.getSimpleName() - + ".fast.shared?consumer.prefetchSize=1"); - // Combo flag set to true/false by the test framework. public boolean persistentTestMessages; public boolean networkIsAlwaysSendSync; @@ -146,6 +140,15 @@ public class NetworkBridgeProducerFlowControlTest extends final long TEST_MESSAGE_SIZE = 1024; final long SLOW_CONSUMER_DELAY_MILLIS = 100; + // Consumer prefetch is disabled for broker1's consumers. + final ActiveMQQueue SLOW_SHARED_QUEUE = new ActiveMQQueue( + NetworkBridgeProducerFlowControlTest.class.getSimpleName() + + ".slow.shared?consumer.prefetchSize=1"); + + final ActiveMQQueue FAST_SHARED_QUEUE = new ActiveMQQueue( + NetworkBridgeProducerFlowControlTest.class.getSimpleName() + + ".fast.shared?consumer.prefetchSize=1"); + // Start a local and a remote broker. createBroker(new URI("broker:(tcp://localhost:0" + ")?brokerName=broker0&persistent=false&useJmx=true")); @@ -246,4 +249,139 @@ public class NetworkBridgeProducerFlowControlTest extends fastConsumerTime.get() < slowConsumerTime.get() / 10); } } + + public void testSendFailIfNoSpaceDoesNotBlockQueueNetwork() throws Exception { + // Consumer prefetch is disabled for broker1's consumers. + final ActiveMQQueue SLOW_SHARED_QUEUE = new ActiveMQQueue( + NetworkBridgeProducerFlowControlTest.class.getSimpleName() + + ".slow.shared?consumer.prefetchSize=1"); + + final ActiveMQQueue FAST_SHARED_QUEUE = new ActiveMQQueue( + NetworkBridgeProducerFlowControlTest.class.getSimpleName() + + ".fast.shared?consumer.prefetchSize=1"); + + doTestSendFailIfNoSpaceDoesNotBlockNetwork( + SLOW_SHARED_QUEUE, + FAST_SHARED_QUEUE); + } + + public void testSendFailIfNoSpaceDoesNotBlockTopicNetwork() throws Exception { + // Consumer prefetch is disabled for broker1's consumers. + final ActiveMQTopic SLOW_SHARED_TOPIC = new ActiveMQTopic( + NetworkBridgeProducerFlowControlTest.class.getSimpleName() + + ".slow.shared?consumer.prefetchSize=1"); + + final ActiveMQTopic FAST_SHARED_TOPIC = new ActiveMQTopic( + NetworkBridgeProducerFlowControlTest.class.getSimpleName() + + ".fast.shared?consumer.prefetchSize=1"); + + doTestSendFailIfNoSpaceDoesNotBlockNetwork( + SLOW_SHARED_TOPIC, + FAST_SHARED_TOPIC); + } + + public void doTestSendFailIfNoSpaceDoesNotBlockNetwork( + ActiveMQDestination slowDestination, ActiveMQDestination fastDestination) throws Exception { + + final int NUM_MESSAGES = 100; + final long TEST_MESSAGE_SIZE = 1024; + final long SLOW_CONSUMER_DELAY_MILLIS = 100; + + // Start a local and a remote broker. + createBroker(new URI("broker:(tcp://localhost:0" + + ")?brokerName=broker0&persistent=false&useJmx=true")); + BrokerService remoteBroker = createBroker(new URI( + "broker:(tcp://localhost:0" + + ")?brokerName=broker1&persistent=false&useJmx=true")); + remoteBroker.getSystemUsage().setSendFailIfNoSpace(true); + + // Set a policy on the remote broker that limits the maximum size of the + // slow shared queue. + PolicyEntry policyEntry = new PolicyEntry(); + policyEntry.setMemoryLimit(5 * TEST_MESSAGE_SIZE); + PolicyMap policyMap = new PolicyMap(); + policyMap.put(slowDestination, policyEntry); + remoteBroker.setDestinationPolicy(policyMap); + + // Create an outbound bridge from the local broker to the remote broker. + // The bridge is configured with the remoteDispatchType enhancement. + NetworkConnector nc = bridgeBrokers("broker0", "broker1"); + nc.setAlwaysSyncSend(true); + nc.setPrefetchSize(1); + + startAllBrokers(); + waitForBridgeFormation(); + + // Start two asynchronous consumers on the remote broker, one for each + // of the two shared queues, and keep track of how long it takes for + // each of the consumers to receive all the messages. + final CountDownLatch fastConsumerLatch = new CountDownLatch( + NUM_MESSAGES); + final CountDownLatch slowConsumerLatch = new CountDownLatch( + NUM_MESSAGES); + + final long startTimeMillis = System.currentTimeMillis(); + final AtomicLong fastConsumerTime = new AtomicLong(); + final AtomicLong slowConsumerTime = new AtomicLong(); + + Thread fastWaitThread = new Thread() { + @Override + public void run() { + try { + fastConsumerLatch.await(); + fastConsumerTime.set(System.currentTimeMillis() + - startTimeMillis); + } catch (InterruptedException ex) { + exceptions.add(ex); + Assert.fail(ex.getMessage()); + } + } + }; + + Thread slowWaitThread = new Thread() { + @Override + public void run() { + try { + slowConsumerLatch.await(); + slowConsumerTime.set(System.currentTimeMillis() + - startTimeMillis); + } catch (InterruptedException ex) { + exceptions.add(ex); + Assert.fail(ex.getMessage()); + } + } + }; + + fastWaitThread.start(); + slowWaitThread.start(); + + createConsumer("broker1", fastDestination, fastConsumerLatch); + MessageConsumer slowConsumer = createConsumer("broker1", + slowDestination, slowConsumerLatch); + MessageIdList messageIdList = brokers.get("broker1").consumers + .get(slowConsumer); + messageIdList.setProcessingDelay(SLOW_CONSUMER_DELAY_MILLIS); + + // Send the test messages to the local broker's shared queues. The + // messages are either persistent or non-persistent to demonstrate the + // difference between synchronous and asynchronous dispatch. + persistentDelivery = false; + sendMessages("broker0", fastDestination, NUM_MESSAGES); + sendMessages("broker0", slowDestination, NUM_MESSAGES); + + fastWaitThread.join(TimeUnit.SECONDS.toMillis(60)); + slowWaitThread.join(TimeUnit.SECONDS.toMillis(60)); + + assertTrue("no exceptions on the wait threads:" + exceptions, + exceptions.isEmpty()); + + LOG.info("Fast consumer duration (ms): " + fastConsumerTime.get()); + LOG.info("Slow consumer duration (ms): " + slowConsumerTime.get()); + + assertTrue("fast time set", fastConsumerTime.get() > 0); + assertTrue("slow time set", slowConsumerTime.get() > 0); + + // Verify the behaviour as described in the description of this class. + Assert.assertTrue(fastConsumerTime.get() < slowConsumerTime.get() / 10); + } } \ No newline at end of file