ARTEMIS-3495 Fix backup cluster controller connection loops

Skip backup connector equivalent to cluster connector for cluster connections.
This commit is contained in:
Domenico Francesco Bruscino 2021-10-18 16:48:59 +02:00 committed by Gary Tully
parent 6d52f20edd
commit dca3facb55
7 changed files with 89 additions and 4 deletions

View File

@ -289,7 +289,9 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
localConnector = connectorFactory.createConnector(currentConnectorConfig.getParams(), new DelegatingBufferHandler(), this, closeExecutor, threadPool, scheduledThreadPool, clientProtocolManager);
}
if (localConnector.isEquivalent(live.getParams()) && backUp != null && !localConnector.isEquivalent(backUp.getParams())) {
if (localConnector.isEquivalent(live.getParams()) && backUp != null && !localConnector.isEquivalent(backUp.getParams())
// check if a server is trying to set its cluster connector config as backup connector config
&& !(serverLocator.getClusterTransportConfiguration() != null && serverLocator.getClusterTransportConfiguration().isSameParams(backUp))) {
if (logger.isDebugEnabled()) {
logger.debug("Setting up backup config = " + backUp + " for live = " + live);
}

View File

@ -170,6 +170,11 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
return discoveryGroup;
}
/** For tests only */
public Set<ClientSessionFactoryInternal> getFactories() {
return factories;
}
private final Exception traceException = new Exception();
private ServerLocatorConfig config = new ServerLocatorConfig();

View File

@ -195,6 +195,12 @@ public class BackupManager implements ActiveMQComponent {
private volatile boolean announcingBackup;
private volatile boolean backupAnnounced = false;
public TransportConfiguration getConnector() {
return connector;
}
@Override
public String toString() {
return "BackupConnector{" + "name='" + config.getName() + '\'' + ", connector=" + connector + '}';
@ -363,6 +369,7 @@ public class BackupManager implements ActiveMQComponent {
}
ServerLocatorImpl locator = new ServerLocatorImpl(topology, true, tcConfigs);
locator.setClusterConnection(true);
locator.setClusterTransportConfiguration(getConnector());
locator.setRetryInterval(config.getRetryInterval());
locator.setClientFailureCheckPeriod(config.getClientFailureCheckPeriod());
locator.setConnectionTTL(config.getConnectionTTL());

View File

@ -80,6 +80,11 @@ public class ClusterController implements ActiveMQComponent {
private boolean started;
private SimpleString replicatedClusterName;
/** For tests only */
public ServerLocator getDefaultLocator() {
return defaultLocator;
}
public ClusterController(ActiveMQServer server,
ScheduledExecutorService scheduledExecutor,
boolean useQuorumManager) {
@ -207,9 +212,10 @@ public class ClusterController implements ActiveMQComponent {
*/
public void addClusterConnection(SimpleString name,
TransportConfiguration[] tcConfigs,
ClusterConnectionConfiguration config) {
ClusterConnectionConfiguration config,
TransportConfiguration connector) {
ServerLocatorImpl serverLocator = (ServerLocatorImpl) ActiveMQClient.createServerLocatorWithHA(tcConfigs);
configAndAdd(name, serverLocator, config, null);
configAndAdd(name, serverLocator, config, connector);
}
private void configAndAdd(SimpleString name,

View File

@ -639,7 +639,7 @@ public class ClusterManager implements ActiveMQComponent {
clusterConnection = new ClusterConnectionImpl(this, tcConfigs, connector, new SimpleString(config.getName()), new SimpleString(config.getAddress()), config.getMinLargeMessageSize(), config.getClientFailureCheckPeriod(), config.getConnectionTTL(), config.getRetryInterval(), config.getRetryIntervalMultiplier(), config.getMaxRetryInterval(), config.getInitialConnectAttempts(), config.getReconnectAttempts(), config.getCallTimeout(), config.getCallFailoverTimeout(), config.isDuplicateDetection(), config.getMessageLoadBalancingType(), config.getConfirmationWindowSize(), config.getProducerWindowSize(), executorFactory, server, postOffice, managementService, scheduledExecutor, config.getMaxHops(), nodeManager, server.getConfiguration().getClusterUser(), server.getConfiguration().getClusterPassword(), config.isAllowDirectConnectionsOnly(), config.getClusterNotificationInterval(), config.getClusterNotificationAttempts());
clusterController.addClusterConnection(clusterConnection.getName(), tcConfigs, config);
clusterController.addClusterConnection(clusterConnection.getName(), tcConfigs, config, connector);
}
if (defaultClusterConnection == null) {

View File

@ -179,6 +179,13 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
private boolean splitBrainDetection;
/** For tests only */
public ServerLocatorInternal getServerLocator() {
return serverLocator;
}
public ClusterConnectionImpl(final ClusterManager manager,
final TransportConfiguration[] staticTranspConfigs,
final TransportConfiguration connector,
@ -1553,6 +1560,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
}
ServerLocatorImpl locator = new ServerLocatorImpl(topology, true, tcConfigs);
locator.setClusterConnection(true);
locator.setClusterTransportConfiguration(connector);
return locator;
}
return null;

View File

@ -46,12 +46,14 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.FailoverEventType;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext;
@ -59,6 +61,8 @@ import org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
import org.apache.activemq.artemis.core.server.cluster.BackupManager;
import org.apache.activemq.artemis.core.server.cluster.ClusterController;
import org.apache.activemq.artemis.core.server.cluster.ha.BackupPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.HAPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.ReplicaPolicy;
@ -67,7 +71,9 @@ import org.apache.activemq.artemis.core.server.cluster.ha.ReplicationBackupPolic
import org.apache.activemq.artemis.core.server.cluster.ha.ReplicationPrimaryPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreMasterPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreSlavePolicy;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
import org.apache.activemq.artemis.core.server.files.FileMoveManager;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage;
@ -80,6 +86,7 @@ import org.apache.activemq.artemis.utils.RetryRule;
import org.apache.activemq.artemis.utils.Wait;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@ -2459,6 +2466,56 @@ public class FailoverTest extends FailoverTestBase {
Assert.assertEquals("message0", cm.getBodyBuffer().readString());
}
@Test(timeout = 120000)
public void testBackupConnections() throws Exception {
Assume.assumeTrue(backupServer.getServer().getHAPolicy().isBackup());
createSessionFactory();
CountDownLatch latch = new CountDownLatch(1);
sf.addFailoverListener(eventType -> {
if (eventType == FailoverEventType.FAILOVER_COMPLETED) {
latch.countDown();
}
});
BackupManager backupManager = ((ActiveMQServerImpl)backupServer.getServer()).getBackupManager();
ClusterController backupClusterController = backupServer.getServer().getClusterManager().getClusterController();
ClusterConnectionImpl backupClusterConnection = (ClusterConnectionImpl)backupServer.getServer().getClusterManager().getClusterConnections().stream().findFirst().get();
for (BackupManager.BackupConnector backupConnector : backupManager.getBackupConnectors()) {
for (ClientSessionFactoryInternal factory : ((ServerLocatorImpl)backupConnector.getBackupServerLocator()).getFactories()) {
Assert.assertNotNull(factory.getConnection());
}
}
for (ClientSessionFactoryInternal factory : ((ServerLocatorImpl)backupClusterController.getDefaultLocator()).getFactories()) {
Assert.assertNotNull(factory.getConnection());
}
Assert.assertNull(backupClusterConnection.getServerLocator());
Assert.assertNotNull(sf.getConnection());
crash();
latch.await();
for (BackupManager.BackupConnector backupConnector : backupManager.getBackupConnectors()) {
Assert.assertNull(backupConnector.getBackupServerLocator());
}
for (ClientSessionFactoryInternal factory : ((ServerLocatorImpl)backupServer.getServer().getClusterManager().getClusterController().getDefaultLocator()).getFactories()) {
Assert.assertNull(factory.getConnection());
}
for (ClientSessionFactoryInternal factory : ((ServerLocatorImpl)backupClusterConnection.getServerLocator()).getFactories()) {
Assert.assertNull(factory.getConnection());
}
Assert.assertNotNull(sf.getConnection());
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------