https://issues.apache.org/jira/browse/AMQ-3551 - exclude networkConnectors from sendFailIfNoSpace on producer flow control, with test for topic and queue networks

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1186952 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-10-20 17:35:04 +00:00
parent fa40250863
commit 8ce077800f
4 changed files with 153 additions and 15 deletions

View File

@ -601,11 +601,11 @@ public abstract class BaseDestination implements Destination {
}
protected final void waitForSpace(ConnectionContext context, Usage<?> usage, int highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException {
if (systemUsage.isSendFailIfNoSpace()) {
if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
getLog().debug("sendFailIfNoSpace, forcing exception on send, usage: " + usage + ": " + warning);
throw new ResourceAllocationException(warning);
}
if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout(), highWaterMark)) {
getLog().debug("sendFailIfNoSpaceAfterTimeout expired, forcing exception on send, usage: " + usage + ": " + warning);
throw new ResourceAllocationException(warning);

View File

@ -555,7 +555,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
+ " See http://activemq.apache.org/producer-flow-control.html for more info");
}
if (systemUsage.isSendFailIfNoSpace()) {
if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer ("
+ message.getProducerId() + ") to prevent flooding "
+ getActiveMQDestination().getQualifiedName() + "."
@ -613,7 +613,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
});
if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
flowControlTimeoutMessages.add(new TimeoutMessage(message, context, systemUsage
.getSendFailIfNoSpaceAfterTimeout()));
}

View File

@ -298,7 +298,7 @@ public class Topic extends BaseDestination implements Task {
+ " See http://activemq.apache.org/producer-flow-control.html for more info");
}
if (systemUsage.isSendFailIfNoSpace()) {
if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit ("
+ memoryUsage.getLimit() + ") reached. Rejecting send for producer (" + message.getProducerId()
+ ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
@ -427,7 +427,7 @@ public class Topic extends BaseDestination implements Task {
+ systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId()
+ ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
+ " See http://activemq.apache.org/producer-flow-control.html for more info";
if (systemUsage.isSendFailIfNoSpace()) {
if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
throw new javax.jms.ResourceAllocationException(logMessage);
}

View File

@ -20,6 +20,7 @@ package org.apache.activemq.usecases;
import java.net.URI;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.MessageConsumer;
import junit.framework.Test;
@ -27,7 +28,9 @@ import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.MessageIdList;
import org.apache.commons.logging.Log;
@ -99,15 +102,6 @@ public class NetworkBridgeProducerFlowControlTest extends
private static final Log LOG = LogFactory
.getLog(NetworkBridgeProducerFlowControlTest.class);
// Consumer prefetch is disabled for broker1's consumers.
private static final ActiveMQQueue SLOW_SHARED_QUEUE = new ActiveMQQueue(
NetworkBridgeProducerFlowControlTest.class.getSimpleName()
+ ".slow.shared?consumer.prefetchSize=1");
private static final ActiveMQQueue FAST_SHARED_QUEUE = new ActiveMQQueue(
NetworkBridgeProducerFlowControlTest.class.getSimpleName()
+ ".fast.shared?consumer.prefetchSize=1");
// Combo flag set to true/false by the test framework.
public boolean persistentTestMessages;
public boolean networkIsAlwaysSendSync;
@ -146,6 +140,15 @@ public class NetworkBridgeProducerFlowControlTest extends
final long TEST_MESSAGE_SIZE = 1024;
final long SLOW_CONSUMER_DELAY_MILLIS = 100;
// Consumer prefetch is disabled for broker1's consumers.
final ActiveMQQueue SLOW_SHARED_QUEUE = new ActiveMQQueue(
NetworkBridgeProducerFlowControlTest.class.getSimpleName()
+ ".slow.shared?consumer.prefetchSize=1");
final ActiveMQQueue FAST_SHARED_QUEUE = new ActiveMQQueue(
NetworkBridgeProducerFlowControlTest.class.getSimpleName()
+ ".fast.shared?consumer.prefetchSize=1");
// Start a local and a remote broker.
createBroker(new URI("broker:(tcp://localhost:0"
+ ")?brokerName=broker0&persistent=false&useJmx=true"));
@ -246,4 +249,139 @@ public class NetworkBridgeProducerFlowControlTest extends
fastConsumerTime.get() < slowConsumerTime.get() / 10);
}
}
public void testSendFailIfNoSpaceDoesNotBlockQueueNetwork() throws Exception {
// Consumer prefetch is disabled for broker1's consumers.
final ActiveMQQueue SLOW_SHARED_QUEUE = new ActiveMQQueue(
NetworkBridgeProducerFlowControlTest.class.getSimpleName()
+ ".slow.shared?consumer.prefetchSize=1");
final ActiveMQQueue FAST_SHARED_QUEUE = new ActiveMQQueue(
NetworkBridgeProducerFlowControlTest.class.getSimpleName()
+ ".fast.shared?consumer.prefetchSize=1");
doTestSendFailIfNoSpaceDoesNotBlockNetwork(
SLOW_SHARED_QUEUE,
FAST_SHARED_QUEUE);
}
public void testSendFailIfNoSpaceDoesNotBlockTopicNetwork() throws Exception {
// Consumer prefetch is disabled for broker1's consumers.
final ActiveMQTopic SLOW_SHARED_TOPIC = new ActiveMQTopic(
NetworkBridgeProducerFlowControlTest.class.getSimpleName()
+ ".slow.shared?consumer.prefetchSize=1");
final ActiveMQTopic FAST_SHARED_TOPIC = new ActiveMQTopic(
NetworkBridgeProducerFlowControlTest.class.getSimpleName()
+ ".fast.shared?consumer.prefetchSize=1");
doTestSendFailIfNoSpaceDoesNotBlockNetwork(
SLOW_SHARED_TOPIC,
FAST_SHARED_TOPIC);
}
public void doTestSendFailIfNoSpaceDoesNotBlockNetwork(
ActiveMQDestination slowDestination, ActiveMQDestination fastDestination) throws Exception {
final int NUM_MESSAGES = 100;
final long TEST_MESSAGE_SIZE = 1024;
final long SLOW_CONSUMER_DELAY_MILLIS = 100;
// Start a local and a remote broker.
createBroker(new URI("broker:(tcp://localhost:0"
+ ")?brokerName=broker0&persistent=false&useJmx=true"));
BrokerService remoteBroker = createBroker(new URI(
"broker:(tcp://localhost:0"
+ ")?brokerName=broker1&persistent=false&useJmx=true"));
remoteBroker.getSystemUsage().setSendFailIfNoSpace(true);
// Set a policy on the remote broker that limits the maximum size of the
// slow shared queue.
PolicyEntry policyEntry = new PolicyEntry();
policyEntry.setMemoryLimit(5 * TEST_MESSAGE_SIZE);
PolicyMap policyMap = new PolicyMap();
policyMap.put(slowDestination, policyEntry);
remoteBroker.setDestinationPolicy(policyMap);
// Create an outbound bridge from the local broker to the remote broker.
// The bridge is configured with the remoteDispatchType enhancement.
NetworkConnector nc = bridgeBrokers("broker0", "broker1");
nc.setAlwaysSyncSend(true);
nc.setPrefetchSize(1);
startAllBrokers();
waitForBridgeFormation();
// Start two asynchronous consumers on the remote broker, one for each
// of the two shared queues, and keep track of how long it takes for
// each of the consumers to receive all the messages.
final CountDownLatch fastConsumerLatch = new CountDownLatch(
NUM_MESSAGES);
final CountDownLatch slowConsumerLatch = new CountDownLatch(
NUM_MESSAGES);
final long startTimeMillis = System.currentTimeMillis();
final AtomicLong fastConsumerTime = new AtomicLong();
final AtomicLong slowConsumerTime = new AtomicLong();
Thread fastWaitThread = new Thread() {
@Override
public void run() {
try {
fastConsumerLatch.await();
fastConsumerTime.set(System.currentTimeMillis()
- startTimeMillis);
} catch (InterruptedException ex) {
exceptions.add(ex);
Assert.fail(ex.getMessage());
}
}
};
Thread slowWaitThread = new Thread() {
@Override
public void run() {
try {
slowConsumerLatch.await();
slowConsumerTime.set(System.currentTimeMillis()
- startTimeMillis);
} catch (InterruptedException ex) {
exceptions.add(ex);
Assert.fail(ex.getMessage());
}
}
};
fastWaitThread.start();
slowWaitThread.start();
createConsumer("broker1", fastDestination, fastConsumerLatch);
MessageConsumer slowConsumer = createConsumer("broker1",
slowDestination, slowConsumerLatch);
MessageIdList messageIdList = brokers.get("broker1").consumers
.get(slowConsumer);
messageIdList.setProcessingDelay(SLOW_CONSUMER_DELAY_MILLIS);
// Send the test messages to the local broker's shared queues. The
// messages are either persistent or non-persistent to demonstrate the
// difference between synchronous and asynchronous dispatch.
persistentDelivery = false;
sendMessages("broker0", fastDestination, NUM_MESSAGES);
sendMessages("broker0", slowDestination, NUM_MESSAGES);
fastWaitThread.join(TimeUnit.SECONDS.toMillis(60));
slowWaitThread.join(TimeUnit.SECONDS.toMillis(60));
assertTrue("no exceptions on the wait threads:" + exceptions,
exceptions.isEmpty());
LOG.info("Fast consumer duration (ms): " + fastConsumerTime.get());
LOG.info("Slow consumer duration (ms): " + slowConsumerTime.get());
assertTrue("fast time set", fastConsumerTime.get() > 0);
assertTrue("slow time set", slowConsumerTime.get() > 0);
// Verify the behaviour as described in the description of this class.
Assert.assertTrue(fastConsumerTime.get() < slowConsumerTime.get() / 10);
}
}