ARTEMIS-1654 - fix brige reconnect logic

Make sure that if a bridge disconnects and there is no record in the topology that it uses the original bridge connector to reconnect.

Originally the live broker that disconnected was left in the Topology, thie broke quorum voting as when th evote happened all brokers when asked though th etarget broker was still alive.
The fix for this was to remove the target live broker from the Topology. Since the bridge reconnect logic relied on this in a non HA environment to reconnect this stopped working.
The fix now uses the original target connector (or backup) to reconnect in the case where the broker was actually removed from the cluster.

https://issues.apache.org/jira/browse/ARTEMIS-1654
This commit is contained in:
Andy Taylor 2018-02-01 13:22:59 +00:00 committed by Clebert Suconic
parent 2a72923e8c
commit 032210a7c6
3 changed files with 154 additions and 14 deletions
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge

View File

@ -506,10 +506,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
}
}
protected boolean isPlainCoreBridge() {
return true;
}
/* Hook for processing message before forwarding */
protected Message beforeForward(final Message message, final SimpleString forwardingAddress) {
if (useDuplicateDetection) {
@ -824,7 +820,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
return csf;
}
private ClientSessionFactoryInternal reconnectOnOriginalNode() throws Exception {
protected ClientSessionFactoryInternal reconnectOnOriginalNode() throws Exception {
String targetNodeIdUse = targetNodeID;
TopologyMember nodeUse = targetNode;
if (targetNodeIdUse != null && nodeUse != null) {
@ -916,10 +912,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
ActiveMQServerLogger.LOGGER.bridgeConnected(this);
// We only do this on plain core bridges
if (isPlainCoreBridge()) {
serverLocator.addClusterTopologyListener(new TopologyListener());
}
serverLocator.addClusterTopologyListener(new TopologyListener());
keepConnecting = false;
return;

View File

@ -36,6 +36,7 @@ import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -79,6 +80,7 @@ public class ClusterConnectionBridge extends BridgeImpl {
private final ServerLocatorInternal discoveryLocator;
private final String storeAndForwardPrefix;
private TopologyMemberImpl member;
public ClusterConnectionBridge(final ClusterConnection clusterConnection,
final ClusterManager clusterManager,
@ -139,6 +141,13 @@ public class ClusterConnectionBridge extends BridgeImpl {
protected ClientSessionFactoryInternal createSessionFactory() throws Exception {
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator));
ClientSessionFactoryInternal factory = (ClientSessionFactoryInternal) serverLocator.createSessionFactory(targetNodeID);
//if it is null then its possible the broker was removed after a disconnect so lets try the original connectors
if (factory == null) {
factory = reconnectOnOriginalNode();
if (factory == null) {
return null;
}
}
setSessionFactory(factory);
if (factory == null) {
@ -372,9 +381,4 @@ public class ClusterConnectionBridge extends BridgeImpl {
clusterConnection.disconnectRecord(targetNodeID);
}
}
@Override
protected boolean isPlainCoreBridge() {
return false;
}
}

View File

@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.cluster.bridge;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionBridge;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
/**
* This will simulate a failure of a failure.
* The bridge could eventually during a race or multiple failures not be able to reconnect because it failed again.
* this should make the bridge to always reconnect itself.
*/
public class ClusteredBridgeReconnectTest extends ClusterTestBase {
@Test
public void testReconnectBridge() throws Exception {
setupServer(0, isFileStorage(), isNetty());
setupServer(1, isFileStorage(), isNetty());
setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
startServers(0, 1);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
createQueue(0, "queues.testaddress", "queue0", null, true);
createQueue(1, "queues.testaddress", "queue0", null, true);
addConsumer(0, 0, "queue0", null);
addConsumer(1, 1, "queue0", null);
waitForBindings(0, "queues.testaddress", 1, 1, true);
waitForBindings(1, "queues.testaddress", 1, 1, true);
waitForBindings(0, "queues.testaddress", 1, 1, false);
waitForBindings(1, "queues.testaddress", 1, 1, false);
ClientSession session0 = sfs[0].createSession();
ClientSession session1 = sfs[0].createSession();
session0.start();
session1.start();
ClientProducer producer = session0.createProducer("queues.testaddress");
int NUMBER_OF_MESSAGES = 100;
Assert.assertEquals(1, servers[0].getClusterManager().getClusterConnections().size());
ClusterConnectionImpl connection = servers[0].getClusterManager().getClusterConnections().toArray(new ClusterConnectionImpl[0])[0];
Assert.assertEquals(1, connection.getRecords().size());
MessageFlowRecord record = connection.getRecords().values().toArray(new MessageFlowRecord[1])[0];
ClusterConnectionBridge bridge = (ClusterConnectionBridge) record.getBridge();
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
ClientMessage msg = session0.createMessage(true);
producer.send(msg);
session0.commit();
if (i == 17) {
bridge.getSessionFactory().getConnection().fail(new ActiveMQException("failed once!"));
}
}
int cons0Count = 0, cons1Count = 0;
while (true) {
ClientMessage msg = consumers[0].getConsumer().receive(1000);
if (msg == null) {
break;
}
cons0Count++;
msg.acknowledge();
session0.commit();
}
while (true) {
ClientMessage msg = consumers[1].getConsumer().receive(1000);
if (msg == null) {
break;
}
cons1Count++;
msg.acknowledge();
session1.commit();
}
Assert.assertEquals("cons0 = " + cons0Count + ", cons1 = " + cons1Count, NUMBER_OF_MESSAGES, cons0Count + cons1Count);
session0.commit();
session1.commit();
connection = servers[0].getClusterManager().getClusterConnections().toArray(new ClusterConnectionImpl[0])[0];
Assert.assertEquals(1, connection.getRecords().size());
Assert.assertNotNull(bridge.getSessionFactory());
stopServers(0, 1);
}
@Override
@After
public void tearDown() throws Exception {
closeAllConsumers();
closeAllSessionFactories();
closeAllServerLocatorsFactories();
super.tearDown();
}
public boolean isNetty() {
return true;
}
}