This commit is contained in:
Justin Bertram 2021-07-04 14:32:58 -05:00
commit 07331f9aa9
2 changed files with 29 additions and 34 deletions

View File

@ -147,13 +147,13 @@ public class BackupManager implements ActiveMQComponent {
if (dg == null)
return;
DiscoveryBackupConnector backupConnector = new DiscoveryBackupConnector(dg, config.getName(), connector, config.getRetryInterval(), clusterManager);
DiscoveryBackupConnector backupConnector = new DiscoveryBackupConnector(dg, config, connector, clusterManager);
backupConnectors.add(backupConnector);
} else {
TransportConfiguration[] tcConfigs = config.getTransportConfigurations(configuration);
StaticBackupConnector backupConnector = new StaticBackupConnector(tcConfigs, config.getName(), connector, config.getRetryInterval(), clusterManager);
StaticBackupConnector backupConnector = new StaticBackupConnector(tcConfigs, config, connector, clusterManager);
backupConnectors.add(backupConnector);
}
@ -188,9 +188,8 @@ public class BackupManager implements ActiveMQComponent {
public abstract class BackupConnector {
private volatile ServerLocatorInternal backupServerLocator;
private String name;
protected final ClusterConnectionConfiguration config;
private TransportConfiguration connector;
protected long retryInterval;
private ClusterManager clusterManager;
private volatile boolean stopping = false;
private volatile boolean announcingBackup;
@ -198,16 +197,14 @@ public class BackupManager implements ActiveMQComponent {
@Override
public String toString() {
return "BackupConnector{" + "name='" + name + '\'' + ", connector=" + connector + '}';
return "BackupConnector{" + "name='" + config.getName() + '\'' + ", connector=" + connector + '}';
}
private BackupConnector(String name,
private BackupConnector(ClusterConnectionConfiguration config,
TransportConfiguration connector,
long retryInterval,
ClusterManager clusterManager) {
this.name = name;
this.config = config;
this.connector = connector;
this.retryInterval = retryInterval;
this.clusterManager = clusterManager;
}
@ -228,7 +225,7 @@ public class BackupManager implements ActiveMQComponent {
stopping = false;
backupAnnounced = false;
ClusterConnection clusterConnection = clusterManager.getClusterConnection(name);
ClusterConnection clusterConnection = clusterManager.getClusterConnection(config.getName());
//NB we use the same topology as the sister cluster connection so it knows when started about all the nodes to bridge to
backupServerLocator = createServerLocator(clusterConnection.getTopology());
@ -298,14 +295,14 @@ public class BackupManager implements ActiveMQComponent {
announceBackup();
}
}, retryInterval, TimeUnit.MILLISECONDS);
}, config.getRetryInterval(), TimeUnit.MILLISECONDS);
}
/*
* called to notify the cluster manager about the backup
* */
public void informTopology() {
clusterManager.informClusterOfBackup(name);
clusterManager.informClusterOfBackup(config.getName());
}
/*
@ -351,11 +348,10 @@ public class BackupManager implements ActiveMQComponent {
private final TransportConfiguration[] tcConfigs;
private StaticBackupConnector(TransportConfiguration[] tcConfigs,
String name,
ClusterConnectionConfiguration config,
TransportConfiguration connector,
long retryInterval,
ClusterManager clusterManager) {
super(name, connector, retryInterval, clusterManager);
super(config, connector, clusterManager);
this.tcConfigs = tcConfigs;
}
@ -367,7 +363,9 @@ public class BackupManager implements ActiveMQComponent {
}
ServerLocatorImpl locator = new ServerLocatorImpl(topology, true, tcConfigs);
locator.setClusterConnection(true);
locator.setRetryInterval(retryInterval);
locator.setRetryInterval(config.getRetryInterval());
locator.setClientFailureCheckPeriod(config.getClientFailureCheckPeriod());
locator.setConnectionTTL(config.getConnectionTTL());
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator, server.getStorageManager()));
return locator;
}
@ -389,17 +387,19 @@ public class BackupManager implements ActiveMQComponent {
private final DiscoveryGroupConfiguration discoveryGroupConfiguration;
private DiscoveryBackupConnector(DiscoveryGroupConfiguration discoveryGroupConfiguration,
String name,
ClusterConnectionConfiguration config,
TransportConfiguration connector,
long retryInterval,
ClusterManager clusterManager) {
super(name, connector, retryInterval, clusterManager);
super(config, connector, clusterManager);
this.discoveryGroupConfiguration = discoveryGroupConfiguration;
}
@Override
public ServerLocatorInternal createServerLocator(Topology topology) {
return new ServerLocatorImpl(topology, true, discoveryGroupConfiguration).setRetryInterval(retryInterval);
return new ServerLocatorImpl(topology, true, discoveryGroupConfiguration)
.setRetryInterval(config.getRetryInterval())
.setClientFailureCheckPeriod(config.getClientFailureCheckPeriod())
.setConnectionTTL(config.getConnectionTTL());
}
@Override

View File

@ -17,27 +17,16 @@
package org.apache.activemq.artemis.tests.integration.cluster.failover;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration;
import org.apache.activemq.artemis.core.server.cluster.BackupManager;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.tests.util.CountDownSessionFailureListener;
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
import org.apache.activemq.artemis.utils.Wait;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Test;
public class CheckRetryIntervalBackupManagerTest extends FailoverTestBase {
private static final Logger log = Logger.getLogger(CheckRetryIntervalBackupManagerTest.class);
private volatile CountDownSessionFailureListener listener;
private volatile ClientSessionFactoryInternal sf;
private final Object lockFail = new Object();
public class BackupManagerInheritedConfigTest extends FailoverTestBase {
@Override
protected void createConfigs() throws Exception {
@ -45,7 +34,11 @@ public class CheckRetryIntervalBackupManagerTest extends FailoverTestBase {
TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
backupConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration()).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(createBasicClusterConfig(backupConnector.getName(), liveConnector.getName()).setRetryInterval(333));
backupConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration()).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector)
.addClusterConfiguration(createBasicClusterConfig(backupConnector.getName(), liveConnector.getName())
.setRetryInterval(333)
.setClientFailureCheckPeriod(1000)
.setConnectionTTL(5000));
backupServer = createTestableServer(backupConfig);
@ -55,13 +48,15 @@ public class CheckRetryIntervalBackupManagerTest extends FailoverTestBase {
}
@Test
public void testValidateRetryInterval() {
public void testValidateInheritedClusterConnectionConfig() {
ActiveMQServerImpl server = (ActiveMQServerImpl) backupServer.getServer();
for (BackupManager.BackupConnector backupConnector : server.getBackupManager().getBackupConnectors()) {
Wait.assertTrue(() -> backupConnector.getBackupServerLocator() != null);
Assert.assertEquals(333, backupConnector.getBackupServerLocator().getRetryInterval());
Assert.assertEquals(-1, backupConnector.getBackupServerLocator().getReconnectAttempts());
Assert.assertEquals(1000, backupConnector.getBackupServerLocator().getClientFailureCheckPeriod());
Assert.assertEquals(5000, backupConnector.getBackupServerLocator().getConnectionTTL());
}
}