Fixing the duplex bridge case for restarting durable subscriptions when
dynamicOnly is false
This commit is contained in:
Christopher L. Shannon (cshannon) 2016-07-19 11:30:32 -04:00
parent 4b018b4206
commit 39184e2fb0
2 changed files with 151 additions and 4 deletions

View File

@ -27,6 +27,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
@ -116,9 +117,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
protected final Map<ConnectionId, ConnectionState> brokerConnectionStates;
// The broker and wireformat info that was exchanged.
protected BrokerInfo brokerInfo;
protected final List<Command> dispatchQueue = new LinkedList<Command>();
protected final List<Command> dispatchQueue = new LinkedList<>();
protected TaskRunner taskRunner;
protected final AtomicReference<Throwable> transportException = new AtomicReference<Throwable>();
protected final AtomicReference<Throwable> transportException = new AtomicReference<>();
protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);
private final Transport transport;
private MessageAuthorizationPolicy messageAuthorizationPolicy;
@ -140,8 +141,8 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
private final AtomicBoolean stopping = new AtomicBoolean(false);
private final CountDownLatch stopped = new CountDownLatch(1);
private final AtomicBoolean asyncException = new AtomicBoolean(false);
private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, ProducerBrokerExchange>();
private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId, ConsumerBrokerExchange>();
private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<>();
private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<>();
private final CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
private ConnectionContext context;
private boolean networkConnection;
@ -1419,6 +1420,11 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
listener.setCreatedByDuplex(true);
duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener);
duplexBridge.setBrokerService(brokerService);
Set<ActiveMQDestination> durableDestinations = broker.getDurableDestinations();
//Need to set durableDestinations to properly restart subs when dynamicOnly=false
if (durableDestinations != null) {
duplexBridge.setDurableDestinations(broker.getDurableDestinations().toArray(new ActiveMQDestination[0]));
}
// now turn duplex off this side
info.setDuplexConnection(false);
duplexBridge.setCreatedByDuplex(true);

View File

@ -0,0 +1,141 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.io.File;
import java.net.URI;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.jms.MessageConsumer;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.IOHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Show that both directions of a duplex bridge will properly restart the
* network durable consumers if dynamicOnly is false.
*/
public class AMQ6366Test extends JmsMultipleBrokersTestSupport {
protected static final Logger LOG = LoggerFactory.getLogger(AMQ6366Test.class);
final ActiveMQTopic dest = new ActiveMQTopic("TEST.FOO");
/**
* This test works even before AMQ6366
* @throws Exception
*/
public void testDuplexDurableSubRestarted() throws Exception {
testNonDurableReceiveThrougRestart("BrokerA", "BrokerB");
}
/**
* This test failed before AMQ6366 because the NC durable consumer was
* never properly activated.
*
* @throws Exception
*/
public void testDuplexDurableSubRestartedReverse() throws Exception {
testNonDurableReceiveThrougRestart("BrokerB", "BrokerA");
}
protected void testNonDurableReceiveThrougRestart(String pubBroker, String conBroker) throws Exception {
NetworkConnector networkConnector = bridgeBrokerPair("BrokerA", "BrokerB");
startAllBrokers();
waitForBridgeFormation();
MessageConsumer client = createDurableSubscriber(conBroker, dest, "sub1");
client.close();
Thread.sleep(1000);
networkConnector.stop();
Thread.sleep(1000);
Set<ActiveMQDestination> durableDests = new HashSet<>();
durableDests.add(dest);
//Normally set on broker start from the persistence layer but
//simulate here since we just stopped and started the network connector
//without a restart
networkConnector.setDurableDestinations(durableDests);
networkConnector.start();
waitForBridgeFormation();
// Send messages
sendMessages(pubBroker, dest, 1);
Thread.sleep(1000);
Topic destination = (Topic) brokers.get(conBroker).broker.getDestination(dest);
DurableTopicSubscription sub = destination.getDurableTopicSubs().
values().toArray(new DurableTopicSubscription[0])[0];
//Assert that the message made it to the other broker
assertEquals(1, sub.getSubscriptionStatistics().getEnqueues().getCount());
}
@Override
protected void configureBroker(BrokerService broker) {
broker.getManagementContext().setCreateConnector(false);
broker.setAdvisorySupport(true);
}
protected NetworkConnector bridgeBrokerPair(String localBrokerName, String remoteBrokerName) throws Exception {
BrokerService localBroker = brokers.get(localBrokerName).broker;
BrokerService remoteBroker = brokers.get(remoteBrokerName).broker;
List<TransportConnector> transportConnectors = remoteBroker.getTransportConnectors();
URI remoteURI;
if (!transportConnectors.isEmpty()) {
remoteURI = transportConnectors.get(0).getConnectUri();
String uri = "static:(" + remoteURI + ")";
NetworkConnector connector = new DiscoveryNetworkConnector(new URI(uri));
connector.setDynamicOnly(false); // so matching durable subs are loaded on start
connector.setStaticBridge(false);
connector.setDuplex(true);
connector.addDynamicallyIncludedDestination(dest);
localBroker.addNetworkConnector(connector);
return connector;
} else {
throw new Exception("Remote broker has no registered connectors.");
}
}
@Override
public void setUp() throws Exception {
File dataDir = new File(IOHelper.getDefaultDataDirectory());
LOG.info("Delete dataDir.." + dataDir.getCanonicalPath());
org.apache.activemq.TestSupport.recursiveDelete(dataDir);
super.setAutoFail(true);
super.setUp();
createBroker(new URI(
"broker:(tcp://0.0.0.0:0)/BrokerA"));
createBroker(new URI(
"broker:(tcp://0.0.0.0:0)/BrokerB"));
}
}