https://issues.apache.org/jira/browse/AMQ-5830 - ensure duplex inbound connection sets network=true flag, fix and test

This commit is contained in:
gtully 2015-06-09 11:29:20 +01:00
parent 062b0c2c08
commit 3100909041
5 changed files with 232 additions and 31 deletions

View File

@ -404,11 +404,7 @@ public class BrokerService implements Service {
*/
public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception {
connector.setBrokerService(this);
URI uri = getVmConnectorURI();
Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
map.put("network", "true");
uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
connector.setLocalUri(uri);
connector.setLocalUri(getVmConnectorURI());
// Set a connection filter so that the connector does not establish loop
// back connections.
connector.setConnectionFilter(new ConnectionFilter() {
@ -2499,7 +2495,6 @@ public class BrokerService implements Service {
this.slave = false;
URI uri = getVmConnectorURI();
Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
map.put("network", "true");
map.put("async", "false");
uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));

View File

@ -475,6 +475,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
if (configuration.isDuplex()) {
// separate in-bound channel for forwards so we don't
// contend with out-bound dispatch on same connection
remoteBrokerInfo.setNetworkConnection(true);
duplexInboundLocalBroker.oneway(remoteBrokerInfo);
ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo();
duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
duplexLocalConnectionInfo.setClientId(configuration.getName() + "_" + remoteBrokerName + "_inbound_duplex_"

View File

@ -32,19 +32,6 @@ public final class NetworkBridgeFactory {
private NetworkBridgeFactory() {
}
/**
* Create a network bridge
*
* @param config
* @param localTransport
* @param remoteTransport
* @return the NetworkBridge
*/
public static DemandForwardingBridge createBridge(NetworkBridgeConfiguration config,
Transport localTransport, Transport remoteTransport) {
return createBridge(config, localTransport, remoteTransport, null);
}
/**
* create a network bridge
@ -74,7 +61,6 @@ public final class NetworkBridgeFactory {
public static Transport createLocalTransport(Broker broker) throws Exception {
URI uri = broker.getVmConnectorURI();
HashMap<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
map.put("network", "true");
map.put("async", "true");
map.put("create", "false"); // we don't want a vm connect during shutdown to trigger a broker create
uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));

View File

@ -49,7 +49,6 @@ public class VMTransport implements Transport, Task {
protected VMTransport peer;
protected TransportListener transportListener;
protected boolean marshal;
protected boolean network;
protected boolean async = true;
protected int asyncQueueDepth = 2000;
protected final URI location;
@ -358,14 +357,6 @@ public class VMTransport implements Transport, Task {
this.marshal = marshal;
}
public boolean isNetwork() {
return network;
}
public void setNetwork(boolean network) {
this.network = network;
}
@Override
public String toString() {
return location + "#" + id;

View File

@ -17,6 +17,7 @@
package org.apache.activemq.usecases;
import java.io.IOException;
import java.net.URI;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
@ -31,7 +32,10 @@ import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.DiscoveryEvent;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent;
import org.apache.activemq.util.MessageIdList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -242,7 +246,7 @@ public class NetworkBridgeProducerFlowControlTest extends
// Verify the behaviour as described in the description of this class.
if (networkIsAlwaysSendSync) {
Assert
.assertTrue(fastConsumerTime.get() < slowConsumerTime.get() / 10);
.assertTrue(fastConsumerTime.get() < slowConsumerTime.get() / 20);
} else {
Assert.assertEquals(persistentTestMessages,
@ -384,4 +388,226 @@ public class NetworkBridgeProducerFlowControlTest extends
// Verify the behaviour as described in the description of this class.
Assert.assertTrue(fastConsumerTime.get() < slowConsumerTime.get() / 10);
}
}
public void testSendFailIfNoSpaceReverseDoesNotBlockQueueNetwork() throws Exception {
final int NUM_MESSAGES = 100;
final long TEST_MESSAGE_SIZE = 1024;
final long SLOW_CONSUMER_DELAY_MILLIS = 100;
final ActiveMQQueue slowDestination = new ActiveMQQueue(
NetworkBridgeProducerFlowControlTest.class.getSimpleName()
+ ".slow.shared?consumer.prefetchSize=1");
final ActiveMQQueue fastDestination = new ActiveMQQueue(
NetworkBridgeProducerFlowControlTest.class.getSimpleName()
+ ".fast.shared?consumer.prefetchSize=1");
// Start a local and a remote broker.
BrokerService localBroker = createBroker(new URI("broker:(tcp://localhost:0"
+ ")?brokerName=broker0&persistent=false&useJmx=true"));
createBroker(new URI(
"broker:(tcp://localhost:0"
+ ")?brokerName=broker1&persistent=false&useJmx=true"));
localBroker.getSystemUsage().setSendFailIfNoSpace(true);
// Set a policy on the local broker that limits the maximum size of the
// slow shared queue.
PolicyEntry policyEntry = new PolicyEntry();
policyEntry.setMemoryLimit(5 * TEST_MESSAGE_SIZE);
PolicyMap policyMap = new PolicyMap();
policyMap.put(slowDestination, policyEntry);
localBroker.setDestinationPolicy(policyMap);
// Create an outbound bridge from the local broker to the remote broker.
// The bridge is configured with the remoteDispatchType enhancement.
NetworkConnector nc = bridgeBrokers("broker0", "broker1");
nc.setAlwaysSyncSend(true);
nc.setPrefetchSize(1);
nc.setDuplex(true);
startAllBrokers();
waitForBridgeFormation();
// Start two asynchronous consumers on the local broker, one for each
// of the two shared queues, and keep track of how long it takes for
// each of the consumers to receive all the messages.
final CountDownLatch fastConsumerLatch = new CountDownLatch(
NUM_MESSAGES);
final CountDownLatch slowConsumerLatch = new CountDownLatch(
NUM_MESSAGES);
final long startTimeMillis = System.currentTimeMillis();
final AtomicLong fastConsumerTime = new AtomicLong();
final AtomicLong slowConsumerTime = new AtomicLong();
Thread fastWaitThread = new Thread() {
@Override
public void run() {
try {
fastConsumerLatch.await();
fastConsumerTime.set(System.currentTimeMillis()
- startTimeMillis);
} catch (InterruptedException ex) {
exceptions.add(ex);
Assert.fail(ex.getMessage());
}
}
};
Thread slowWaitThread = new Thread() {
@Override
public void run() {
try {
slowConsumerLatch.await();
slowConsumerTime.set(System.currentTimeMillis()
- startTimeMillis);
} catch (InterruptedException ex) {
exceptions.add(ex);
Assert.fail(ex.getMessage());
}
}
};
fastWaitThread.start();
slowWaitThread.start();
createConsumer("broker0", fastDestination, fastConsumerLatch);
MessageConsumer slowConsumer = createConsumer("broker0",
slowDestination, slowConsumerLatch);
MessageIdList messageIdList = brokers.get("broker0").consumers
.get(slowConsumer);
messageIdList.setProcessingDelay(SLOW_CONSUMER_DELAY_MILLIS);
// Send the test messages to the local broker's shared queues. The
// messages are either persistent or non-persistent to demonstrate the
// difference between synchronous and asynchronous dispatch.
persistentDelivery = false;
sendMessages("broker1", fastDestination, NUM_MESSAGES);
sendMessages("broker1", slowDestination, NUM_MESSAGES);
fastWaitThread.join(TimeUnit.SECONDS.toMillis(60));
slowWaitThread.join(TimeUnit.SECONDS.toMillis(60));
assertTrue("no exceptions on the wait threads:" + exceptions,
exceptions.isEmpty());
LOG.info("Fast consumer duration (ms): " + fastConsumerTime.get());
LOG.info("Slow consumer duration (ms): " + slowConsumerTime.get());
assertTrue("fast time set", fastConsumerTime.get() > 0);
assertTrue("slow time set", slowConsumerTime.get() > 0);
// Verify the behaviour as described in the description of this class.
Assert.assertTrue(fastConsumerTime.get() < slowConsumerTime.get() / 10);
}
/**
* create a duplex network bridge from broker0 to broker1
* add a topic consumer on broker0
* set the setSendFailIfNoSpace() on the local broker.
* create a SimpleDiscoveryAgent impl that tracks a network reconnect
*
* producer connects to broker1 and messages should be sent across the network to broker0
*
* Ensure broker0 will not send the javax.jms.ResourceAllocationException (when broker0 runs out of space).
* If the javax.jms.ResourceAllocationException is sent across the wire it will force the network connector
* to shutdown
*
*
* @throws Exception
*/
public void testDuplexSendFailIfNoSpaceDoesNotBlockNetwork() throws Exception {
// Consumer prefetch is disabled for broker1's consumers.
final ActiveMQTopic destination = new ActiveMQTopic(
NetworkBridgeProducerFlowControlTest.class.getSimpleName()
+ ".duplexTest?consumer.prefetchSize=1");
final int NUM_MESSAGES = 100;
final long TEST_MESSAGE_SIZE = 1024;
final long SLOW_CONSUMER_DELAY_MILLIS = 100;
// Start a local and a remote broker.
BrokerService localBroker = createBroker(new URI("broker:(tcp://localhost:0"
+ ")?brokerName=broker0&persistent=false&useJmx=true"));
BrokerService remoteBroker = createBroker(new URI(
"broker:(tcp://localhost:0"
+ ")?brokerName=broker1&persistent=false&useJmx=true"));
localBroker.getSystemUsage().setSendFailIfNoSpace(true);
// Set a policy on the remote broker that limits the maximum size of the
// slow shared queue.
PolicyEntry policyEntry = new PolicyEntry();
policyEntry.setMemoryLimit(5 * TEST_MESSAGE_SIZE);
PolicyMap policyMap = new PolicyMap();
policyMap.put(destination, policyEntry);
localBroker.setDestinationPolicy(policyMap);
// Create a duplex network bridge from the local broker to the remote broker
// create a SimpleDiscoveryAgent impl that tracks a reconnect
DiscoveryNetworkConnector discoveryNetworkConnector = (DiscoveryNetworkConnector)bridgeBrokers("broker0", "broker1");
URI originURI = discoveryNetworkConnector.getUri();
discoveryNetworkConnector.setAlwaysSyncSend(true);
discoveryNetworkConnector.setPrefetchSize(1);
discoveryNetworkConnector.setDuplex(true);
DummySimpleDiscoveryAgent dummySimpleDiscoveryAgent = new DummySimpleDiscoveryAgent();
dummySimpleDiscoveryAgent.setServices(originURI.toString().substring(8,originURI.toString().lastIndexOf(')')));
discoveryNetworkConnector.setDiscoveryAgent(dummySimpleDiscoveryAgent);
startAllBrokers();
waitForBridgeFormation();
final CountDownLatch consumerLatch = new CountDownLatch(
NUM_MESSAGES);
//createConsumer("broker0", fastDestination, fastConsumerLatch);
MessageConsumer consumer = createConsumer("broker0",
destination, consumerLatch);
MessageIdList messageIdList = brokers.get("broker0").consumers
.get(consumer);
messageIdList.setProcessingDelay(SLOW_CONSUMER_DELAY_MILLIS);
// Send the test messages to the local broker's shared queues. The
// messages are either persistent or non-persistent to demonstrate the
// difference between synchronous and asynchronous dispatch.
persistentDelivery = false;
sendMessages("broker1", destination, NUM_MESSAGES);
//wait for 5 seconds for the consumer to complete
consumerLatch.await(5, TimeUnit.SECONDS);
assertFalse("dummySimpleDiscoveryAgent.serviceFail has been invoked - should not have been",
dummySimpleDiscoveryAgent.isServiceFailed);
}
/**
* When the network connector fails it records the failure and delegates to real SimpleDiscoveryAgent
*/
class DummySimpleDiscoveryAgent extends SimpleDiscoveryAgent {
boolean isServiceFailed = false;
public void serviceFailed(DiscoveryEvent devent) throws IOException {
//should never get in here
LOG.info("!!!!! DummySimpleDiscoveryAgent.serviceFailed() invoked with event:"+devent+"!!!!!!");
isServiceFailed = true;
super.serviceFailed(devent);
}
}
}