git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1438123 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-01-24 18:39:09 +00:00
parent 61cbe46fc6
commit c5184983cf
2 changed files with 123 additions and 2 deletions

View File

@ -25,7 +25,6 @@ import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@ -140,7 +139,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
private TransportConnection duplexInitiatingConnection;
private BrokerService brokerService = null;
private ObjectName mbeanObjectName;
private ExecutorService serialExecutor = Executors.newSingleThreadExecutor();
private final ExecutorService serialExecutor = Executors.newSingleThreadExecutor();
public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
this.configuration = configuration;
@ -156,6 +155,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
serviceRemoteCommand(remoteBrokerInfo);
}
@Override
public void start() throws Exception {
if (started.compareAndSet(false, true)) {
@ -178,11 +178,13 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
});
remoteBroker.setTransportListener(new DefaultTransportListener() {
@Override
public void onCommand(Object o) {
Command command = (Command) o;
serviceRemoteCommand(command);
}
@Override
public void onException(IOException error) {
serviceRemoteException(error);
}
@ -206,6 +208,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
protected void triggerLocalStartBridge() throws IOException {
brokerService.getTaskRunnerFactory().execute(new Runnable() {
@Override
public void run() {
final String originalName = Thread.currentThread().getName();
Thread.currentThread().setName("StartLocalBridge: localBroker=" + localBroker);
@ -222,6 +225,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
protected void triggerRemoteStartBridge() throws IOException {
brokerService.getTaskRunnerFactory().execute(new Runnable() {
@Override
public void run() {
final String originalName = Thread.currentThread().getName();
Thread.currentThread().setName("StartRemoteBridge: remoteBroker=" + remoteBroker);
@ -344,6 +348,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
}
@Override
public void stop() throws Exception {
if (started.compareAndSet(true, false)) {
if (disposed.compareAndSet(false, true)) {
@ -357,6 +362,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
final CountDownLatch sendShutdown = new CountDownLatch(1);
brokerService.getTaskRunnerFactory().execute(new Runnable() {
@Override
public void run() {
try {
serialExecutor.shutdown();
@ -400,6 +406,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
}
@Override
public void serviceRemoteException(Throwable error) {
if (!disposed.get()) {
if (error instanceof SecurityException || error instanceof GeneralSecurityException) {
@ -409,6 +416,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
LOG.debug("The remote Exception was: " + error, error);
brokerService.getTaskRunnerFactory().execute(new Runnable() {
@Override
public void run() {
ServiceSupport.dispose(getControllingService());
}
@ -631,6 +639,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
if (destInfo.isRemoveOperation()) {
// serialise with removeSub operations such that all removeSub advisories are generated
serialExecutor.execute(new Runnable() {
@Override
public void run() {
try {
localBroker.oneway(destInfo);
@ -648,11 +657,13 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
}
@Override
public void serviceLocalException(Throwable error) {
if (!disposed.get()) {
LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a local error: " + error);
LOG.debug("The local Exception was:" + error, error);
brokerService.getTaskRunnerFactory().execute(new Runnable() {
@Override
public void run() {
ServiceSupport.dispose(getControllingService());
}
@ -683,6 +694,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
// serialise with removeDestination operations so that removeSubs are serialised with removeDestinations
// such that all removeSub advisories are generated
serialExecutor.execute(new Runnable() {
@Override
public void run() {
sub.waitForCompletion();
try {
@ -760,6 +772,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
// broker when we get confirmation that the remote
// broker has received the message.
ResponseCallback callback = new ResponseCallback() {
@Override
public void onCompletion(FutureResponse future) {
try {
Response response = future.getResult();
@ -1184,6 +1197,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
ConsumerInfo info = new ConsumerInfo();
info.setDestination(destination);
// Indicate that this subscription is being made on behalf of the remote broker.
info.setBrokerPath(new BrokerId[] { remoteBrokerId });
// the remote info held by the DemandSubscription holds the original consumerId,
// the local info get's overwritten
info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
@ -1307,6 +1323,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
return remoteBrokerPath;
}
@Override
public void setNetworkBridgeListener(NetworkBridgeListener listener) {
this.networkBridgeListener = listener;
}
@ -1318,26 +1335,32 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
}
@Override
public String getRemoteAddress() {
return remoteBroker.getRemoteAddress();
}
@Override
public String getLocalAddress() {
return localBroker.getRemoteAddress();
}
@Override
public String getRemoteBrokerName() {
return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
}
@Override
public String getLocalBrokerName() {
return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
}
@Override
public long getDequeueCounter() {
return dequeueCounter.get();
}
@Override
public long getEnqueueCounter() {
return enqueueCounter.get();
}
@ -1350,16 +1373,19 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
return subscriptionMapByRemoteId;
}
@Override
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
this.localBrokerId = brokerService.getRegionBroker().getBrokerId();
localBrokerPath[0] = localBrokerId;
}
@Override
public void setMbeanObjectName(ObjectName objectName) {
this.mbeanObjectName = objectName;
}
@Override
public ObjectName getMbeanObjectName() {
return mbeanObjectName;
}

View File

@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import java.net.URI;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.network.DemandForwardingBridgeSupport;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.Wait;
import org.junit.Assert;
/**
* This test demonstrates a bug in {@link DemandForwardingBridgeSupport} whereby
* a static subscription from broker1 to broker2 is forwarded to broker3 even
* though the network TTL is 1. This results in duplicate subscriptions on
* broker3.
*/
public class AMQ4148Test extends JmsMultipleBrokersTestSupport {
public void test() throws Exception {
// Create a hub-and-spoke network where each hub-spoke pair share
// messages on a test queue.
BrokerService hub = createBroker(new URI("broker:(vm://hub)/hub?persistent=false"));
final BrokerService[] spokes = new BrokerService[4];
for (int i = 0; i < spokes.length; i++) {
spokes[i] = createBroker(new URI("broker:(vm://spoke" + i + ")/spoke" + i + "?persistent=false"));
}
startAllBrokers();
ActiveMQDestination testQueue = createDestination(AMQ4148Test.class.getSimpleName() + ".queue", false);
NetworkConnector[] ncs = new NetworkConnector[spokes.length];
for (int i = 0; i < spokes.length; i++) {
NetworkConnector nc = bridgeBrokers("hub", "spoke" + i);
nc.setNetworkTTL(1);
nc.setDuplex(true);
nc.setConduitSubscriptions(false);
nc.setStaticallyIncludedDestinations(Arrays.asList(testQueue));
nc.start();
ncs[i] = nc;
}
waitForBridgeFormation();
// Pause to allow subscriptions to be created.
TimeUnit.SECONDS.sleep(5);
// Verify that the hub has a subscription from each spoke, but that each
// spoke has a single subscription from the hub (since the network TTL is 1).
final Destination hubTestQueue = hub.getDestination(testQueue);
assertTrue("Expecting {" + spokes.length + "} consumer but was {" + hubTestQueue.getConsumers().size() + "}",
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return spokes.length == hubTestQueue.getConsumers().size();
}
})
);
// Now check each spoke has exactly one consumer on the Queue.
for (int i = 0; i < 4; i++) {
Destination spokeTestQueue = spokes[i].getDestination(testQueue);
Assert.assertEquals(1, spokeTestQueue.getConsumers().size());
}
for (NetworkConnector nc : ncs) {
nc.stop();
}
}
}