diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 1efb9c7755..71d3cc5e1a 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -140,7 +139,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br private TransportConnection duplexInitiatingConnection; private BrokerService brokerService = null; private ObjectName mbeanObjectName; - private ExecutorService serialExecutor = Executors.newSingleThreadExecutor(); + private final ExecutorService serialExecutor = Executors.newSingleThreadExecutor(); public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) { this.configuration = configuration; @@ -156,6 +155,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br serviceRemoteCommand(remoteBrokerInfo); } + @Override public void start() throws Exception { if (started.compareAndSet(false, true)) { @@ -178,11 +178,13 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br }); remoteBroker.setTransportListener(new DefaultTransportListener() { + @Override public void onCommand(Object o) { Command command = (Command) o; serviceRemoteCommand(command); } + @Override public void onException(IOException error) { serviceRemoteException(error); } @@ -206,6 +208,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br protected void triggerLocalStartBridge() throws IOException { brokerService.getTaskRunnerFactory().execute(new Runnable() { + @Override public void run() { final String originalName = Thread.currentThread().getName(); Thread.currentThread().setName("StartLocalBridge: localBroker=" + localBroker); @@ -222,6 +225,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br protected void triggerRemoteStartBridge() throws IOException { brokerService.getTaskRunnerFactory().execute(new Runnable() { + @Override public void run() { final String originalName = Thread.currentThread().getName(); Thread.currentThread().setName("StartRemoteBridge: remoteBroker=" + remoteBroker); @@ -344,6 +348,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } } + @Override public void stop() throws Exception { if (started.compareAndSet(true, false)) { if (disposed.compareAndSet(false, true)) { @@ -357,6 +362,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br final CountDownLatch sendShutdown = new CountDownLatch(1); brokerService.getTaskRunnerFactory().execute(new Runnable() { + @Override public void run() { try { serialExecutor.shutdown(); @@ -400,6 +406,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } } + @Override public void serviceRemoteException(Throwable error) { if (!disposed.get()) { if (error instanceof SecurityException || error instanceof GeneralSecurityException) { @@ -409,6 +416,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } LOG.debug("The remote Exception was: " + error, error); brokerService.getTaskRunnerFactory().execute(new Runnable() { + @Override public void run() { ServiceSupport.dispose(getControllingService()); } @@ -631,6 +639,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br if (destInfo.isRemoveOperation()) { // serialise with removeSub operations such that all removeSub advisories are generated serialExecutor.execute(new Runnable() { + @Override public void run() { try { localBroker.oneway(destInfo); @@ -648,11 +657,13 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } } + @Override public void serviceLocalException(Throwable error) { if (!disposed.get()) { LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a local error: " + error); LOG.debug("The local Exception was:" + error, error); brokerService.getTaskRunnerFactory().execute(new Runnable() { + @Override public void run() { ServiceSupport.dispose(getControllingService()); } @@ -683,6 +694,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br // serialise with removeDestination operations so that removeSubs are serialised with removeDestinations // such that all removeSub advisories are generated serialExecutor.execute(new Runnable() { + @Override public void run() { sub.waitForCompletion(); try { @@ -760,6 +772,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br // broker when we get confirmation that the remote // broker has received the message. ResponseCallback callback = new ResponseCallback() { + @Override public void onCompletion(FutureResponse future) { try { Response response = future.getResult(); @@ -1184,6 +1197,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br ConsumerInfo info = new ConsumerInfo(); info.setDestination(destination); + // Indicate that this subscription is being made on behalf of the remote broker. + info.setBrokerPath(new BrokerId[] { remoteBrokerId }); + // the remote info held by the DemandSubscription holds the original consumerId, // the local info get's overwritten info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId())); @@ -1307,6 +1323,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br return remoteBrokerPath; } + @Override public void setNetworkBridgeListener(NetworkBridgeListener listener) { this.networkBridgeListener = listener; } @@ -1318,26 +1335,32 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } } + @Override public String getRemoteAddress() { return remoteBroker.getRemoteAddress(); } + @Override public String getLocalAddress() { return localBroker.getRemoteAddress(); } + @Override public String getRemoteBrokerName() { return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName(); } + @Override public String getLocalBrokerName() { return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName(); } + @Override public long getDequeueCounter() { return dequeueCounter.get(); } + @Override public long getEnqueueCounter() { return enqueueCounter.get(); } @@ -1350,16 +1373,19 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br return subscriptionMapByRemoteId; } + @Override public void setBrokerService(BrokerService brokerService) { this.brokerService = brokerService; this.localBrokerId = brokerService.getRegionBroker().getBrokerId(); localBrokerPath[0] = localBrokerId; } + @Override public void setMbeanObjectName(ObjectName objectName) { this.mbeanObjectName = objectName; } + @Override public ObjectName getMbeanObjectName() { return mbeanObjectName; } diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4148Test.java b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4148Test.java new file mode 100644 index 0000000000..906131ef18 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4148Test.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.bugs; + +import java.net.URI; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.JmsMultipleBrokersTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.network.DemandForwardingBridgeSupport; +import org.apache.activemq.network.NetworkConnector; +import org.apache.activemq.util.Wait; +import org.junit.Assert; + +/** + * This test demonstrates a bug in {@link DemandForwardingBridgeSupport} whereby + * a static subscription from broker1 to broker2 is forwarded to broker3 even + * though the network TTL is 1. This results in duplicate subscriptions on + * broker3. + */ +public class AMQ4148Test extends JmsMultipleBrokersTestSupport { + + public void test() throws Exception { + // Create a hub-and-spoke network where each hub-spoke pair share + // messages on a test queue. + BrokerService hub = createBroker(new URI("broker:(vm://hub)/hub?persistent=false")); + + final BrokerService[] spokes = new BrokerService[4]; + for (int i = 0; i < spokes.length; i++) { + spokes[i] = createBroker(new URI("broker:(vm://spoke" + i + ")/spoke" + i + "?persistent=false")); + + } + startAllBrokers(); + + ActiveMQDestination testQueue = createDestination(AMQ4148Test.class.getSimpleName() + ".queue", false); + + NetworkConnector[] ncs = new NetworkConnector[spokes.length]; + for (int i = 0; i < spokes.length; i++) { + NetworkConnector nc = bridgeBrokers("hub", "spoke" + i); + nc.setNetworkTTL(1); + nc.setDuplex(true); + nc.setConduitSubscriptions(false); + nc.setStaticallyIncludedDestinations(Arrays.asList(testQueue)); + nc.start(); + + ncs[i] = nc; + } + + waitForBridgeFormation(); + + // Pause to allow subscriptions to be created. + TimeUnit.SECONDS.sleep(5); + + // Verify that the hub has a subscription from each spoke, but that each + // spoke has a single subscription from the hub (since the network TTL is 1). + final Destination hubTestQueue = hub.getDestination(testQueue); + assertTrue("Expecting {" + spokes.length + "} consumer but was {" + hubTestQueue.getConsumers().size() + "}", + Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return spokes.length == hubTestQueue.getConsumers().size(); + } + }) + ); + + // Now check each spoke has exactly one consumer on the Queue. + for (int i = 0; i < 4; i++) { + Destination spokeTestQueue = spokes[i].getDestination(testQueue); + Assert.assertEquals(1, spokeTestQueue.getConsumers().size()); + } + + for (NetworkConnector nc : ncs) { + nc.stop(); + } + } +}