ARTEMIS-3360 Backup connector ignores TTL settings on the connection factories

This commit is contained in:
franz1981 2021-06-21 11:41:47 +02:00 committed by Justin Bertram
parent a2a13b6cdd
commit bea0568a88
2 changed files with 29 additions and 34 deletions

View File

@ -147,13 +147,13 @@ public class BackupManager implements ActiveMQComponent {
if (dg == null) if (dg == null)
return; return;
DiscoveryBackupConnector backupConnector = new DiscoveryBackupConnector(dg, config.getName(), connector, config.getRetryInterval(), clusterManager); DiscoveryBackupConnector backupConnector = new DiscoveryBackupConnector(dg, config, connector, clusterManager);
backupConnectors.add(backupConnector); backupConnectors.add(backupConnector);
} else { } else {
TransportConfiguration[] tcConfigs = config.getTransportConfigurations(configuration); TransportConfiguration[] tcConfigs = config.getTransportConfigurations(configuration);
StaticBackupConnector backupConnector = new StaticBackupConnector(tcConfigs, config.getName(), connector, config.getRetryInterval(), clusterManager); StaticBackupConnector backupConnector = new StaticBackupConnector(tcConfigs, config, connector, clusterManager);
backupConnectors.add(backupConnector); backupConnectors.add(backupConnector);
} }
@ -188,9 +188,8 @@ public class BackupManager implements ActiveMQComponent {
public abstract class BackupConnector { public abstract class BackupConnector {
private volatile ServerLocatorInternal backupServerLocator; private volatile ServerLocatorInternal backupServerLocator;
private String name; protected final ClusterConnectionConfiguration config;
private TransportConfiguration connector; private TransportConfiguration connector;
protected long retryInterval;
private ClusterManager clusterManager; private ClusterManager clusterManager;
private volatile boolean stopping = false; private volatile boolean stopping = false;
private volatile boolean announcingBackup; private volatile boolean announcingBackup;
@ -198,16 +197,14 @@ public class BackupManager implements ActiveMQComponent {
@Override @Override
public String toString() { public String toString() {
return "BackupConnector{" + "name='" + name + '\'' + ", connector=" + connector + '}'; return "BackupConnector{" + "name='" + config.getName() + '\'' + ", connector=" + connector + '}';
} }
private BackupConnector(String name, private BackupConnector(ClusterConnectionConfiguration config,
TransportConfiguration connector, TransportConfiguration connector,
long retryInterval,
ClusterManager clusterManager) { ClusterManager clusterManager) {
this.name = name; this.config = config;
this.connector = connector; this.connector = connector;
this.retryInterval = retryInterval;
this.clusterManager = clusterManager; this.clusterManager = clusterManager;
} }
@ -228,7 +225,7 @@ public class BackupManager implements ActiveMQComponent {
stopping = false; stopping = false;
backupAnnounced = false; backupAnnounced = false;
ClusterConnection clusterConnection = clusterManager.getClusterConnection(name); ClusterConnection clusterConnection = clusterManager.getClusterConnection(config.getName());
//NB we use the same topology as the sister cluster connection so it knows when started about all the nodes to bridge to //NB we use the same topology as the sister cluster connection so it knows when started about all the nodes to bridge to
backupServerLocator = createServerLocator(clusterConnection.getTopology()); backupServerLocator = createServerLocator(clusterConnection.getTopology());
@ -298,14 +295,14 @@ public class BackupManager implements ActiveMQComponent {
announceBackup(); announceBackup();
} }
}, retryInterval, TimeUnit.MILLISECONDS); }, config.getRetryInterval(), TimeUnit.MILLISECONDS);
} }
/* /*
* called to notify the cluster manager about the backup * called to notify the cluster manager about the backup
* */ * */
public void informTopology() { public void informTopology() {
clusterManager.informClusterOfBackup(name); clusterManager.informClusterOfBackup(config.getName());
} }
/* /*
@ -351,11 +348,10 @@ public class BackupManager implements ActiveMQComponent {
private final TransportConfiguration[] tcConfigs; private final TransportConfiguration[] tcConfigs;
private StaticBackupConnector(TransportConfiguration[] tcConfigs, private StaticBackupConnector(TransportConfiguration[] tcConfigs,
String name, ClusterConnectionConfiguration config,
TransportConfiguration connector, TransportConfiguration connector,
long retryInterval,
ClusterManager clusterManager) { ClusterManager clusterManager) {
super(name, connector, retryInterval, clusterManager); super(config, connector, clusterManager);
this.tcConfigs = tcConfigs; this.tcConfigs = tcConfigs;
} }
@ -367,7 +363,9 @@ public class BackupManager implements ActiveMQComponent {
} }
ServerLocatorImpl locator = new ServerLocatorImpl(topology, true, tcConfigs); ServerLocatorImpl locator = new ServerLocatorImpl(topology, true, tcConfigs);
locator.setClusterConnection(true); locator.setClusterConnection(true);
locator.setRetryInterval(retryInterval); locator.setRetryInterval(config.getRetryInterval());
locator.setClientFailureCheckPeriod(config.getClientFailureCheckPeriod());
locator.setConnectionTTL(config.getConnectionTTL());
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator, server.getStorageManager())); locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator, server.getStorageManager()));
return locator; return locator;
} }
@ -389,17 +387,19 @@ public class BackupManager implements ActiveMQComponent {
private final DiscoveryGroupConfiguration discoveryGroupConfiguration; private final DiscoveryGroupConfiguration discoveryGroupConfiguration;
private DiscoveryBackupConnector(DiscoveryGroupConfiguration discoveryGroupConfiguration, private DiscoveryBackupConnector(DiscoveryGroupConfiguration discoveryGroupConfiguration,
String name, ClusterConnectionConfiguration config,
TransportConfiguration connector, TransportConfiguration connector,
long retryInterval,
ClusterManager clusterManager) { ClusterManager clusterManager) {
super(name, connector, retryInterval, clusterManager); super(config, connector, clusterManager);
this.discoveryGroupConfiguration = discoveryGroupConfiguration; this.discoveryGroupConfiguration = discoveryGroupConfiguration;
} }
@Override @Override
public ServerLocatorInternal createServerLocator(Topology topology) { public ServerLocatorInternal createServerLocator(Topology topology) {
return new ServerLocatorImpl(topology, true, discoveryGroupConfiguration).setRetryInterval(retryInterval); return new ServerLocatorImpl(topology, true, discoveryGroupConfiguration)
.setRetryInterval(config.getRetryInterval())
.setClientFailureCheckPeriod(config.getClientFailureCheckPeriod())
.setConnectionTTL(config.getConnectionTTL());
} }
@Override @Override

View File

@ -17,27 +17,16 @@
package org.apache.activemq.artemis.tests.integration.cluster.failover; package org.apache.activemq.artemis.tests.integration.cluster.failover;
import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration; import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration; import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration;
import org.apache.activemq.artemis.core.server.cluster.BackupManager; import org.apache.activemq.artemis.core.server.cluster.BackupManager;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.tests.util.CountDownSessionFailureListener;
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils; import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
import org.apache.activemq.artemis.utils.Wait; import org.apache.activemq.artemis.utils.Wait;
import org.jboss.logging.Logger;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
public class CheckRetryIntervalBackupManagerTest extends FailoverTestBase { public class BackupManagerInheritedConfigTest extends FailoverTestBase {
private static final Logger log = Logger.getLogger(CheckRetryIntervalBackupManagerTest.class);
private volatile CountDownSessionFailureListener listener;
private volatile ClientSessionFactoryInternal sf;
private final Object lockFail = new Object();
@Override @Override
protected void createConfigs() throws Exception { protected void createConfigs() throws Exception {
@ -45,7 +34,11 @@ public class CheckRetryIntervalBackupManagerTest extends FailoverTestBase {
TransportConfiguration liveConnector = getConnectorTransportConfiguration(true); TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
TransportConfiguration backupConnector = getConnectorTransportConfiguration(false); TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
backupConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration()).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(createBasicClusterConfig(backupConnector.getName(), liveConnector.getName()).setRetryInterval(333)); backupConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration()).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector)
.addClusterConfiguration(createBasicClusterConfig(backupConnector.getName(), liveConnector.getName())
.setRetryInterval(333)
.setClientFailureCheckPeriod(1000)
.setConnectionTTL(5000));
backupServer = createTestableServer(backupConfig); backupServer = createTestableServer(backupConfig);
@ -55,13 +48,15 @@ public class CheckRetryIntervalBackupManagerTest extends FailoverTestBase {
} }
@Test @Test
public void testValidateRetryInterval() { public void testValidateInheritedClusterConnectionConfig() {
ActiveMQServerImpl server = (ActiveMQServerImpl) backupServer.getServer(); ActiveMQServerImpl server = (ActiveMQServerImpl) backupServer.getServer();
for (BackupManager.BackupConnector backupConnector : server.getBackupManager().getBackupConnectors()) { for (BackupManager.BackupConnector backupConnector : server.getBackupManager().getBackupConnectors()) {
Wait.assertTrue(() -> backupConnector.getBackupServerLocator() != null); Wait.assertTrue(() -> backupConnector.getBackupServerLocator() != null);
Assert.assertEquals(333, backupConnector.getBackupServerLocator().getRetryInterval()); Assert.assertEquals(333, backupConnector.getBackupServerLocator().getRetryInterval());
Assert.assertEquals(-1, backupConnector.getBackupServerLocator().getReconnectAttempts()); Assert.assertEquals(-1, backupConnector.getBackupServerLocator().getReconnectAttempts());
Assert.assertEquals(1000, backupConnector.getBackupServerLocator().getClientFailureCheckPeriod());
Assert.assertEquals(5000, backupConnector.getBackupServerLocator().getConnectionTTL());
} }
} }