From 633428c6e39f60be6c45ec43d09a50f8b742f242 Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Fri, 30 Sep 2011 19:35:28 +0000 Subject: [PATCH] fix for: https://issues.apache.org/jira/browse/AMQ-3014 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1177797 13f79535-47bb-0310-9956-ffa450edef68 --- .../DemandForwardingBridgeSupport.java | 35 ++-- .../org/apache/activemq/bugs/AMQ3014Test.java | 195 ++++++++++++++++++ 2 files changed, 213 insertions(+), 17 deletions(-) create mode 100644 activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3014Test.java diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 386ef59657..59df383771 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -81,8 +81,8 @@ import org.slf4j.LoggerFactory; /** * A useful base class for implementing demand forwarding bridges. - * - * + * + * */ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware { private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class); @@ -231,7 +231,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } catch (IOException e) { LOG.warn("Caught exception from remote start", e); } - } else { + } else { LOG.warn ("Bridge was disposed before the start() method was fully executed."); throw new TransportDisposedIOException(); } @@ -339,6 +339,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br IntrospectionSupport.getProperties(configuration, props, null); String str = MarshallingSupport.propertiesToString(props); brokerInfo.setNetworkProperties(str); + localBrokerIdKnownLatch.await(); brokerInfo.setBrokerId(this.localBrokerId); remoteBroker.oneway(brokerInfo); } @@ -487,7 +488,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br if (isDuplex()) { if (command.isMessage()) { ActiveMQMessage message = (ActiveMQMessage) command; - if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination()) + if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination()) || AdvisorySupport.isDestinationAdvisoryTopic(message.getDestination())) { serviceRemoteConsumerAdvisory(message.getDataStructure()); } else { @@ -708,7 +709,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br final MessageDispatch md = (MessageDispatch) command; final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId()); if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) { - + if (suppressMessageDispatch(md, sub)) { if (LOG.isDebugEnabled()) { LOG.debug(configuration.getBrokerName() + " message not forwarded to " + remoteBrokerName + " because message came from there or fails networkTTL, brokerPath: " + Arrays.toString(md.getMessage().getBrokerPath()) + ", message: " + md.getMessage()); @@ -721,14 +722,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } return; } - + Message message = configureMessage(md); if (LOG.isDebugEnabled()) { LOG.debug("bridging (" + configuration.getBrokerName() + " -> " + remoteBrokerName + ") " + message.getMessageId() + ", consumer: " + md.getConsumerId() + ", destination " + message.getDestination() + ", brokerPath: " + Arrays.toString(message.getBrokerPath()) + ", message: " + message); } - + if (!message.isResponseRequired()) { - + // If the message was originally sent using async // send, we will preserve that QOS // by bridging it using an async send (small chance @@ -740,9 +741,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } finally { sub.decrementOutstandingResponses(); } - + } else { - + // The message was not sent using async send, so we // should only ack the local // broker when we get confirmation that the remote @@ -757,7 +758,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } else { localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); dequeueCounter.incrementAndGet(); - } + } } catch (IOException e) { serviceLocalException(e); } finally { @@ -765,9 +766,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } } }; - + remoteBroker.asyncRequest(message, callback); - + } } else { if (LOG.isDebugEnabled()) { @@ -1042,7 +1043,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } List candidateConsumers = consumerInfo.getNetworkConsumerIds(); - Collection currentSubs = + Collection currentSubs = getRegionSubscriptions(consumerInfo.getDestination().isTopic()); for (Subscription sub : currentSubs) { List networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds(); @@ -1070,7 +1071,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) { if (LOG.isDebugEnabled()) { LOG.debug(configuration.getBrokerName() + " Ignoring duplicate subscription from " + remoteBrokerName - + ", sub: " + candidateInfo + " is duplicated by network subscription with equal or higher network priority: " + + ", sub: " + candidateInfo + " is duplicated by network subscription with equal or higher network priority: " + existingSub + ", networkConsumerIds: " + existingSub.getConsumerInfo().getNetworkConsumerIds()); } suppress = true; @@ -1082,7 +1083,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br if (LOG.isDebugEnabled()) { LOG.debug(configuration.getBrokerName() + " Replacing duplicate subscription " + existingSub.getConsumerInfo() + " with sub from " + remoteBrokerName - + ", which has a higher priority, new sub: " + candidateInfo + ", networkComsumerIds: " + + ", which has a higher priority, new sub: " + candidateInfo + ", networkComsumerIds: " + candidateInfo.getNetworkConsumerIds()); } } catch (IOException e) { @@ -1113,7 +1114,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br private final Collection getRegionSubscriptions(boolean isTopic) { RegionBroker region = (RegionBroker) brokerService.getRegionBroker(); - AbstractRegion abstractRegion = (AbstractRegion) + AbstractRegion abstractRegion = (AbstractRegion) (isTopic ? region.getTopicRegion() : region.getQueueRegion()); return abstractRegion.getSubscriptions().values(); } diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3014Test.java b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3014Test.java new file mode 100644 index 0000000000..92835ce4d0 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3014Test.java @@ -0,0 +1,195 @@ +package org.apache.activemq.bugs; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.Connection; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.command.BrokerInfo; +import org.apache.activemq.network.DiscoveryNetworkConnector; +import org.apache.activemq.thread.Task; +import org.apache.activemq.thread.TaskRunner; +import org.apache.activemq.thread.TaskRunnerFactory; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportFactory; +import org.apache.activemq.transport.TransportListener; +import org.apache.activemq.transport.TransportServer; +import org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * This test involves the creation of a local and remote broker, both of which + * communicate over VM and TCP. The local broker establishes a bridge to the + * remote broker for the purposes of verifying that broker info is only + * transfered once the local broker's ID is known to the bridge support. + */ +public class AMQ3014Test { + // Change this URL to be an unused port. + private static final String REMOTE_BROKER_URL = "tcp://localhost:50000"; + + private List remoteBrokerInfos = Collections + .synchronizedList(new ArrayList()); + + private BrokerService localBroker = new BrokerService(); + + // Override the "remote" broker so that it records all (remote) BrokerInfos + // that it receives. + private BrokerService remoteBroker = new BrokerService() { + @Override + protected TransportConnector createTransportConnector(URI brokerURI) + throws Exception { + TransportServer transport = TransportFactory.bind(this, brokerURI); + return new TransportConnector(transport) { + @Override + protected Connection createConnection(Transport transport) + throws IOException { + Connection connection = super.createConnection(transport); + final TransportListener proxiedListener = transport + .getTransportListener(); + transport.setTransportListener(new TransportListener() { + + @Override + public void onCommand(Object command) { + if (command instanceof BrokerInfo) { + remoteBrokerInfos.add((BrokerInfo) command); + } + proxiedListener.onCommand(command); + } + + @Override + public void onException(IOException error) { + proxiedListener.onException(error); + } + + @Override + public void transportInterupted() { + proxiedListener.transportInterupted(); + } + + @Override + public void transportResumed() { + proxiedListener.transportResumed(); + } + }); + return connection; + } + + }; + } + }; + + @Before + public void init() throws Exception { + localBroker.setBrokerName("localBroker"); + localBroker.setPersistent(false); + localBroker.setUseJmx(false); + localBroker.setSchedulerSupport(false); + + remoteBroker.setBrokerName("remoteBroker"); + remoteBroker.setPersistent(false); + remoteBroker.setUseJmx(false); + remoteBroker.addConnector(REMOTE_BROKER_URL); + remoteBroker.setSchedulerSupport(false); + } + + @After + public void cleanup() throws Exception { + try { + localBroker.stop(); + } finally { + remoteBroker.stop(); + } + } + + /** + * This test verifies that the local broker's ID is typically known by the + * bridge support before the local broker's BrokerInfo is sent to the remote + * broker. + */ + @Test + public void NormalCaseTest() throws Exception { + runTest(0, 3000); + } + + /** + * This test verifies that timing can arise under which the local broker's + * ID is not known by the bridge support before the local broker's + * BrokerInfo is sent to the remote broker. + */ + @Test + public void DelayedCaseTest() throws Exception { + runTest(500, 3000); + } + + private void runTest(final long taskRunnerDelay, long timeout) + throws Exception { + // Add a network connector to the local broker that will create a bridge + // to the remote broker. + DiscoveryNetworkConnector dnc = new DiscoveryNetworkConnector(); + SimpleDiscoveryAgent da = new SimpleDiscoveryAgent(); + da.setServices(REMOTE_BROKER_URL); + dnc.setDiscoveryAgent(da); + localBroker.addNetworkConnector(dnc); + + // Before starting the local broker, intercept the task runner factory + // so that the + // local VMTransport dispatcher is artificially delayed. + final TaskRunnerFactory realTaskRunnerFactory = localBroker + .getTaskRunnerFactory(); + localBroker.setTaskRunnerFactory(new TaskRunnerFactory() { + public TaskRunner createTaskRunner(Task task, String name) { + final TaskRunner realTaskRunner = realTaskRunnerFactory + .createTaskRunner(task, name); + if (name.startsWith("ActiveMQ Connection Dispatcher: ")) { + return new TaskRunner() { + @Override + public void shutdown() throws InterruptedException { + realTaskRunner.shutdown(); + } + + @Override + public void shutdown(long timeout) + throws InterruptedException { + realTaskRunner.shutdown(timeout); + } + + @Override + public void wakeup() throws InterruptedException { + Thread.sleep(taskRunnerDelay); + realTaskRunner.wakeup(); + } + }; + } else { + return realTaskRunnerFactory.createTaskRunner(task, name); + } + } + }); + + // Start the brokers and wait for the bridge to be created; the remote + // broker is started first to ensure it is available for the local + // broker to connect to. + remoteBroker.start(); + localBroker.start(); + + // Wait for the remote broker to receive the local broker's BrokerInfo + // and then verify the local broker's ID is known. + long startTimeMillis = System.currentTimeMillis(); + while (remoteBrokerInfos.isEmpty() + && (System.currentTimeMillis() - startTimeMillis) < timeout) { + Thread.sleep(100); + } + + Assert.assertFalse("Timed out waiting for bridge to form.", + remoteBrokerInfos.isEmpty()); + ; + Assert.assertNotNull("Local broker ID is null.", remoteBrokerInfos.get( + 0).getBrokerId()); + } +}