AMQ-6858 - Fix several durable subscription bridge propagation issues

Durable network proxy subs will now be properly created across multiple
bridges when 3 or more brokers are used.  Demand will be properly synced
and removed.
This commit is contained in:
Christopher L. Shannon (cshannon) 2017-11-06 08:42:03 -05:00
parent 7dad09a9cd
commit 96ce14b278
6 changed files with 384 additions and 22 deletions

View File

@ -56,12 +56,16 @@ public class ConduitBridge extends DemandForwardingBridge {
}
protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info, boolean isForcedDurable) {
// search through existing subscriptions and see if we have a match
if (info.isNetworkSubscription()) {
//If a network subscription and a queue check if isConduitNetworkQueueSubscriptions is true
//If true then we want to try and conduit
//For topics we always want to conduit regardless of network subscription or not
if (info.isNetworkSubscription() && info.getDestination().isQueue() &&
!configuration.isConduitNetworkQueueSubscriptions()) {
return false;
}
boolean matched = false;
// search through existing subscriptions and see if we have a match
for (DemandSubscription ds : subscriptionMapByLocalId.values()) {
DestinationFilter filter = DestinationFilter.parseFilter(ds.getLocalInfo().getDestination());
if (canConduit(ds) && filter.matches(info.getDestination())) {
@ -86,9 +90,13 @@ public class ConduitBridge extends DemandForwardingBridge {
}
// we want to conduit statically included consumers which are local networkSubs
// but we don't want to conduit remote network subs i.e. (proxy proxy) consumers
// but we don't want to conduit remote network queue subs i.e. (proxy proxy) consumers
// unless isConduitNetworkQueueSubscriptions is true
// We always want to conduit topic subscriptions
private boolean canConduit(DemandSubscription ds) {
return ds.isStaticallyIncluded() || !ds.getRemoteInfo().isNetworkSubscription();
return ds.isStaticallyIncluded() || ds.getRemoteInfo().getDestination().isTopic() ||
!ds.getRemoteInfo().isNetworkSubscription() ||
(ds.getRemoteInfo().getDestination().isQueue() && configuration.isConduitNetworkQueueSubscriptions());
}
@Override

View File

@ -661,6 +661,16 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
}
/**
* Checks whether or not this consumer is a direct bridge network subscription
* @param info
* @return
*/
protected boolean isBridgeNS(ConsumerInfo info) {
return (info.getSubscriptionName() != null && info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)) &&
(info.getClientId() == null || info.getClientId().startsWith(configuration.getName()));
}
protected void serviceRemoteCommand(Command command) {
if (!disposed.get()) {
try {
@ -694,7 +704,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
for (ConsumerInfo info : subInfo.getSubscriptionInfos()) {
//re-add any process any non-NC consumers that match the
//dynamicallyIncludedDestinations list
if((info.getSubscriptionName() == null || !info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)) &&
//Also re-add network consumers that are not part of this direct
//bridge (proxy of proxy bridges)
if((info.getSubscriptionName() == null || !isBridgeNS(info)) &&
NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, info.getDestination())) {
serviceRemoteConsumerAdvisory(info);
}
@ -986,7 +998,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
localBroker.oneway(sending);
//remove subscriber from map
i.remove();
if (i != null) {
i.remove();
}
}
}
@ -1072,7 +1086,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
// continue removal in separate thread to free up this thread for outstanding responses
// continue removal in separate thread to free up tshis thread for outstanding responses
// Serialize with removeDestination operations so that removeSubs are serialized with
// removeDestinations such that all removeSub advisories are generated
serialExecutor.execute(new Runnable() {
@ -1080,7 +1094,18 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
public void run() {
sub.waitForCompletion();
try {
localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
//If removing a network durable subscription that still has durable remote subs
//make sure we cleanup the durable subscription properly - necessary when using
//durable subscriptions and 3 or more brokers
if (configuration.isConduitSubscriptions() &&
sub.getLocalInfo().getSubscriptionName() != null &&
sub.getLocalInfo().getSubscriptionName().startsWith(DURABLE_SUB_PREFIX) &&
sub.getDurableRemoteSubs().size() > 0) {
sub.getDurableRemoteSubs().clear();
cleanupDurableSub(sub, null);
} else {
localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
}
} catch (IOException e) {
LOG.warn("failed to deliver remove command for local subscription, for remote {}", sub.getRemoteInfo().getConsumerId(), e);
}
@ -1315,13 +1340,17 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
for (ActiveMQDestination dest : dests) {
if (isPermissableDestination(dest)) {
DemandSubscription sub = createDemandSubscription(dest, null);
sub.setStaticallyIncluded(true);
try {
addSubscription(sub);
} catch (IOException e) {
LOG.error("Failed to add static destination {}", dest, e);
if (sub != null) {
sub.setStaticallyIncluded(true);
try {
addSubscription(sub);
} catch (IOException e) {
LOG.error("Failed to add static destination {}", dest, e);
}
LOG.trace("{}, bridging messages for static destination: {}", configuration.getBrokerName(), dest);
} else {
LOG.info("{}, static destination excluded: {}, demand already exists", configuration.getBrokerName(), dest);
}
LOG.trace("{}, bridging messages for static destination: {}", configuration.getBrokerName(), dest);
} else {
LOG.info("{}, static destination excluded: {}", configuration.getBrokerName(), dest);
}

View File

@ -75,12 +75,15 @@ public class DurableConduitBridge extends ConduitBridge {
String candidateSubName = getSubscriberName(dest);
for (Subscription subscription : topicRegion.getDurableSubscriptions().values()) {
String subName = subscription.getConsumerInfo().getSubscriptionName();
if (subName != null && subName.equals(candidateSubName)) {
String clientId = subscription.getContext().getClientId();
if (subName != null && subName.equals(candidateSubName) && clientId.startsWith(configuration.getName())) {
DemandSubscription sub = createDemandSubscription(dest, subName);
sub.getLocalInfo().setSubscriptionName(getSubscriberName(dest));
sub.setStaticallyIncluded(true);
addSubscription(sub);
break;
if (sub != null) {
sub.getLocalInfo().setSubscriptionName(getSubscriberName(dest));
sub.setStaticallyIncluded(true);
addSubscription(sub);
break;
}
}
}
}
@ -139,8 +142,12 @@ public class DurableConduitBridge extends ConduitBridge {
info.setSubscriptionName(getSubscriberName(info.getDestination()));
// and override the consumerId with something unique so that it won't
// be removed if the durable subscriber (at the other end) goes away
info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),
//Only do this for direct bridge consumers - proxy network consumers we don't
//want to replace the consumerId or cleanup won't happen properly
if (info.getBrokerPath().length == 1 || (info.getBrokerPath().length > 1 && info.getBrokerPath()[0] == remoteBrokerPath[0])) {
info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),
consumerIdGenerator.getNextSequenceId()));
}
}
info.setSelector(null);
DemandSubscription demandSubscription = doCreateDemandSubscription(info);

View File

@ -29,6 +29,11 @@ import org.apache.activemq.command.ConsumerInfo;
public class NetworkBridgeConfiguration {
private boolean conduitSubscriptions = true;
/**
* Whether or not network subscriptions on queues are eligible to be conduit
* Default is false
*/
private boolean conduitNetworkQueueSubscriptions;
private boolean useVirtualDestSubs;
private boolean dynamicOnly;
private boolean syncDurableSubs;
@ -85,6 +90,14 @@ public class NetworkBridgeConfiguration {
this.conduitSubscriptions = conduitSubscriptions;
}
public boolean isConduitNetworkQueueSubscriptions() {
return conduitNetworkQueueSubscriptions;
}
public void setConduitNetworkQueueSubscriptions(boolean conduitNetworkQueueSubscriptions) {
this.conduitNetworkQueueSubscriptions = conduitNetworkQueueSubscriptions;
}
/**
* @return the dynamicOnly
*/

View File

@ -0,0 +1,241 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.network;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.MessageIdList;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.activemq.util.Wait;
import org.apache.activemq.util.Wait.Condition;
import com.google.common.collect.Lists;
import junit.framework.Test;
/**
* Test to make sure durable subscriptions propagate properly throughout network bridges
* and that conduit subscriptions work properly
*/
public class DurableThreeBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSupport {
@Override
protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName) throws Exception {
NetworkConnector connector = super.bridgeBrokers(localBrokerName, remoteBrokerName);
connector.setDynamicallyIncludedDestinations(
Lists.<ActiveMQDestination> newArrayList(new ActiveMQTopic("TEST.FOO?forceDurable=true")));
connector.setDuplex(true);
connector.setDecreaseNetworkConsumerPriority(false);
connector.setConduitSubscriptions(true);
connector.setSyncDurableSubs(true);
connector.setNetworkTTL(-1);
return connector;
}
/**
* BrokerA -> BrokerB -> BrokerC
*/
public void testDurablePropagation() throws Exception {
// Setup broker networks
bridgeBrokers("BrokerA", "BrokerB");
bridgeBrokers("BrokerB", "BrokerC");
startAllBrokers();
// Setup destination
ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
// Setup consumers
Session ses = createSession("BrokerA");
MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
MessageConsumer clientB = ses.createDurableSubscriber(dest, "subB");
// let consumers propagate around the network
assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
sendMessages("BrokerC", dest, 1);
assertNotNull(clientA.receive(1000));
assertNotNull(clientB.receive(1000));
//bring online a consumer on the other side
Session ses2 = createSession("BrokerC");
MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC");
//there will be 2 network durables, 1 for each direction of the bridge
assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
clientA.close();
clientB.close();
clientC.close();
ses.unsubscribe("subA");
ses.unsubscribe("subB");
ses2.unsubscribe("subC");
assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
}
public void testForceDurablePropagation() throws Exception {
// Setup broker networks
bridgeBrokers("BrokerA", "BrokerB");
bridgeBrokers("BrokerB", "BrokerC");
startAllBrokers();
// Setup destination
ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
// Setup consumers
Session ses = createSession("BrokerA");
MessageConsumer clientA = ses.createConsumer(dest);
// let consumers propagate around the network
assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
sendMessages("BrokerC", dest, 1);
assertNotNull(clientA.receive(1000));
Session ses2 = createSession("BrokerC");
MessageConsumer clientC = ses2.createConsumer(dest);
assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
clientA.close();
clientC.close();
assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
}
public void testDurablePropagationSync() throws Exception {
// Setup broker networks
NetworkConnector nc1 = bridgeBrokers("BrokerA", "BrokerB");
NetworkConnector nc2 = bridgeBrokers("BrokerB", "BrokerC");
startAllBrokers();
nc1.stop();
nc2.stop();
// Setup destination
ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
// Setup consumers
Session ses = createSession("BrokerA");
Session ses2 = createSession("BrokerC");
MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
MessageConsumer clientB = ses.createDurableSubscriber(dest, "subB");
MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC");
assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
nc1.start();
nc2.start();
//there will be 2 network durables, 1 for each direction of the bridge
assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
clientA.close();
clientB.close();
clientC.close();
}
protected void assertNCDurableSubsCount(final BrokerService brokerService, final ActiveMQTopic dest,
final int count) throws Exception {
assertTrue(Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
return count == getNCDurableSubs(brokerService, dest).size();
}
}, 10000, 500));
}
protected List<DurableTopicSubscription> getNCDurableSubs(final BrokerService brokerService,
final ActiveMQTopic dest) throws Exception {
List<DurableTopicSubscription> subs = new ArrayList<>();
Destination d = brokerService.getDestination(dest);
org.apache.activemq.broker.region.Topic destination = null;
if (d instanceof DestinationFilter) {
destination = ((DestinationFilter) d).getAdaptor(org.apache.activemq.broker.region.Topic.class);
} else {
destination = (org.apache.activemq.broker.region.Topic) d;
}
for (SubscriptionKey key : destination.getDurableTopicSubs().keySet()) {
if (key.getSubscriptionName().startsWith(DemandForwardingBridge.DURABLE_SUB_PREFIX)) {
DurableTopicSubscription sub = destination.getDurableTopicSubs().get(key);
if (sub != null) {
subs.add(sub);
}
}
}
return subs;
}
@Override
public void setUp() throws Exception {
super.setAutoFail(true);
super.setUp();
String options = new String("?persistent=false&useJmx=false");
createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA" + options));
createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB" + options));
createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC" + options));
}
@Override
protected void configureBroker(BrokerService broker) {
broker.setBrokerId(broker.getBrokerName());
}
protected Session createSession(String broker) throws Exception {
Connection con = createConnection(broker);
con.start();
return con.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
public static Test suite() {
return suite(DurableThreeBrokerNetworkBridgeTest.class);
}
}

View File

@ -61,14 +61,19 @@ public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTest
unhandledExceptions.isEmpty());
}
public NetworkConnector bridge(String from, String to) throws Exception {
public NetworkConnector bridge(String from, String to, boolean conduitNetworkQueueSubscriptions) throws Exception {
NetworkConnector networkConnector = bridgeBrokers(from, to, true, NETWORK_TTL, CONDUIT);
networkConnector.setSuppressDuplicateQueueSubscriptions(true);
networkConnector.setDecreaseNetworkConsumerPriority(true);
networkConnector.setDuplex(DUPLEX);
networkConnector.setConduitNetworkQueueSubscriptions(conduitNetworkQueueSubscriptions);
return networkConnector;
}
public NetworkConnector bridge(String from, String to) throws Exception {
return bridge(from, to, false);
}
/*why conduit proxy proxy consumers gets us in a knot w.r.t removal
DC-7 for CA-9, add DB-15, remove CA-9, add CB-8
CB-8 add DC-7
@ -137,6 +142,63 @@ public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTest
}
public void testConsumerOnEachBrokerNetworkQueueConduitSubs() throws Exception {
bridge("Broker0", "Broker1", true);
if (!DUPLEX) bridge("Broker1", "Broker0", true);
bridge("Broker1", "Broker2", true);
if (!DUPLEX) bridge("Broker2", "Broker1", true);
startAllBrokers();
waitForBridgeFormation(brokers.get("Broker0").broker, 1, 0);
waitForBridgeFormation(brokers.get("Broker2").broker, 1, 0);
waitForBridgeFormation(brokers.get("Broker1").broker, 1, 0);
waitForBridgeFormation(brokers.get("Broker1").broker, 1, 1);
Destination dest = createDestination("TEST.FOO", false);
// Setup consumers
for (int i = 0; i < BROKER_COUNT; i++) {
consumerMap.put("Consumer:" + i + ":0", createConsumer("Broker" + i, dest));
}
//Conduit network queue conduit subs is true so should only be 2 subs
assertExactConsumersConnect("Broker0", 2, 1, TIMEOUT);
assertExactConsumersConnect("Broker2", 2, 1, TIMEOUT);
// still should be 3 subs for the middle broker, 1 for each direction
assertExactConsumersConnect("Broker1", 3, 1, TIMEOUT);
assertNoUnhandledExceptions();
LOG.info("Complete the mesh - 0->2");
// shorter route
NetworkConnector nc = bridge("Broker0", "Broker2");
nc.setBrokerName("Broker0");
nc.start();
if (!DUPLEX) {
LOG.info("... complete the mesh - 2->0");
nc = bridge("Broker2", "Broker0");
nc.setBrokerName("Broker2");
nc.start();
}
// reverse order close
consumerMap.get("Consumer:" + 2 + ":0").close();
TimeUnit.SECONDS.sleep(1);
consumerMap.get("Consumer:" + 1 + ":0").close();
TimeUnit.SECONDS.sleep(1);
consumerMap.get("Consumer:" + 0 + ":0").close();
LOG.info("Check for no consumers..");
for (int i = 0; i < BROKER_COUNT; i++) {
assertExactConsumersConnect("Broker" + i, 0, 0, TIMEOUT);
}
}
public void testXConsumerOnEachBroker() throws Exception {
bridge("Broker0", "Broker1");
if (!DUPLEX) bridge("Broker1", "Broker0");
@ -232,13 +294,14 @@ public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTest
}, timeout));
}
@Override
public void setUp() throws Exception {
super.setAutoFail(true);
super.setUp();
unhandledExceptions.clear();
Thread.setDefaultUncaughtExceptionHandler(this);
// Setup n brokers
for (int i = 0; i < BROKER_COUNT; i++) {
createBroker(new URI("broker:(tcp://localhost:6161" + i + ")/Broker" + i + "?persistent=false&useJmx=true&brokerId=Broker" + i));
@ -256,6 +319,7 @@ public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTest
brokerService.setDestinationPolicy(policyMap);
}
@Override
public void uncaughtException(Thread t, Throwable e) {
synchronized(unhandledExceptions) {
unhandledExceptions.put(t, e);