This commit is contained in:
Clebert Suconic 2019-09-10 15:03:02 -04:00
commit 2a2b6bc260
6 changed files with 128 additions and 29 deletions

View File

@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
@ -71,6 +72,12 @@ public class BackupManager implements ActiveMQComponent {
this.clusterManager = clusterManager;
}
/** This is meant for testing and assertions, please don't do anything stupid with it!
* I mean, please don't use it outside of testing context */
public List<BackupConnector> getBackupConnectors() {
return backupConnectors;
}
/*
* Start the backup manager if not already started. This entails deploying a backup connector based on a cluster
* configuration, informing the cluster manager so that it can add it to its topology and announce itself to the cluster.
@ -174,12 +181,12 @@ public class BackupManager implements ActiveMQComponent {
/*
* A backup connector will connect to the cluster and announce that we are a backup server ready to fail over.
* */
private abstract class BackupConnector {
public abstract class BackupConnector {
private volatile ServerLocatorInternal backupServerLocator;
private String name;
private TransportConfiguration connector;
private long retryInterval;
protected long retryInterval;
private ClusterManager clusterManager;
private boolean stopping = false;
private boolean announcingBackup;
@ -200,6 +207,11 @@ public class BackupManager implements ActiveMQComponent {
* */
abstract ServerLocatorInternal createServerLocator(Topology topology);
/** This is for test assertions, please be careful, don't use outside of testing! */
public ServerLocator getBackupServerLocator() {
return backupServerLocator;
}
/*
* start the connector by creating the server locator to use.
* */
@ -261,13 +273,7 @@ public class BackupManager implements ActiveMQComponent {
return;
ActiveMQServerLogger.LOGGER.errorAnnouncingBackup(e);
scheduledExecutor.schedule(new Runnable() {
@Override
public void run() {
announceBackup();
}
}, retryInterval, TimeUnit.MILLISECONDS);
retryConnection();
} finally {
announcingBackup = false;
}
@ -275,6 +281,17 @@ public class BackupManager implements ActiveMQComponent {
});
}
/** it will re-schedule the connection after a timeout, using a scheduled executor */
protected void retryConnection() {
scheduledExecutor.schedule(new Runnable() {
@Override
public void run() {
announceBackup();
}
}, retryInterval, TimeUnit.MILLISECONDS);
}
/*
* called to notify the cluster manager about the backup
* */
@ -341,6 +358,7 @@ public class BackupManager implements ActiveMQComponent {
}
ServerLocatorImpl locator = new ServerLocatorImpl(topology, true, tcConfigs);
locator.setClusterConnection(true);
locator.setRetryInterval(retryInterval);
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator));
return locator;
}
@ -372,7 +390,7 @@ public class BackupManager implements ActiveMQComponent {
@Override
public ServerLocatorInternal createServerLocator(Topology topology) {
return new ServerLocatorImpl(topology, true, discoveryGroupConfiguration);
return new ServerLocatorImpl(topology, true, discoveryGroupConfiguration).setRetryInterval(retryInterval);
}
@Override

View File

@ -70,17 +70,17 @@ public final class InVMNodeManager extends NodeManager {
public void awaitLiveNode() throws Exception {
do {
while (state == NOT_STARTED) {
Thread.sleep(2000);
Thread.sleep(10);
}
liveLock.acquire();
if (state == PAUSED) {
liveLock.release();
Thread.sleep(2000);
Thread.sleep(10);
} else if (state == FAILING_BACK) {
liveLock.release();
Thread.sleep(2000);
Thread.sleep(10);
} else if (state == LIVE) {
break;
}

View File

@ -586,7 +586,7 @@ public abstract class ActiveMQTestBase extends Assert {
}
ClusterConnectionConfiguration clusterConnectionConfiguration = new ClusterConnectionConfiguration().
setName("cluster1").setAddress("jms").setConnectorName(connectorName).
setRetryInterval(1000).setDuplicateDetection(false).setMaxHops(1).
setRetryInterval(100).setDuplicateDetection(false).setMaxHops(1).
setConfirmationWindowSize(1).setMessageLoadBalancingType(MessageLoadBalancingType.STRICT).
setStaticConnectors(connectors0);

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.cluster.failover;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration;
import org.apache.activemq.artemis.core.server.cluster.BackupManager;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.util.CountDownSessionFailureListener;
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.Assert;
import org.junit.Test;
public class CheckRetryIntervalBackupManagerTest extends FailoverTestBase {
private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
private volatile CountDownSessionFailureListener listener;
private volatile ClientSessionFactoryInternal sf;
private final Object lockFail = new Object();
@Override
protected void createConfigs() throws Exception {
nodeManager = createNodeManager();
TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
backupConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration()).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(createBasicClusterConfig(backupConnector.getName(), liveConnector.getName()).setRetryInterval(333));
backupServer = createTestableServer(backupConfig);
liveConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)).setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration()).addClusterConfiguration(createBasicClusterConfig(liveConnector.getName()).setRetryInterval(333)).addConnectorConfiguration(liveConnector.getName(), liveConnector);
liveServer = createTestableServer(liveConfig);
}
@Test
public void testValidateRetryInterval() {
ActiveMQServerImpl server = (ActiveMQServerImpl) backupServer.getServer();
for (BackupManager.BackupConnector backupConnector : server.getBackupManager().getBackupConnectors()) {
Wait.assertTrue(() -> backupConnector.getBackupServerLocator() != null);
Assert.assertEquals(333, backupConnector.getBackupServerLocator().getRetryInterval());
Assert.assertEquals(-1, backupConnector.getBackupServerLocator().getReconnectAttempts());
}
}
@Override
protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) {
return TransportConfigurationUtils.getInVMAcceptor(live);
}
@Override
protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) {
return TransportConfigurationUtils.getInVMConnector(live);
}
}

View File

@ -64,6 +64,7 @@ import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer
import org.apache.activemq.artemis.tests.util.CountDownSessionFailureListener;
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -248,10 +249,10 @@ public class FailoverTest extends FailoverTestBase {
@Test(timeout = 120000)
public void testTimeoutOnFailoverConsumeBlocked() throws Exception {
locator.setCallTimeout(5000).setBlockOnNonDurableSend(true).setConsumerWindowSize(0).setBlockOnDurableSend(true).setAckBatchSize(0).setBlockOnAcknowledge(true).setReconnectAttempts(-1).setAckBatchSize(0);
locator.setCallTimeout(1000).setBlockOnNonDurableSend(true).setConsumerWindowSize(0).setBlockOnDurableSend(true).setAckBatchSize(0).setBlockOnAcknowledge(true).setReconnectAttempts(-1).setAckBatchSize(0);
if (nodeManager instanceof InVMNodeManager) {
((InVMNodeManager) nodeManager).failoverPause = 5000L;
((InVMNodeManager) nodeManager).failoverPause = 2000L;
}
ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator);
@ -343,10 +344,10 @@ public class FailoverTest extends FailoverTestBase {
// https://issues.jboss.org/browse/HORNETQ-685
@Test(timeout = 120000)
public void testTimeoutOnFailoverTransactionCommit() throws Exception {
locator.setCallTimeout(5000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(100);
locator.setCallTimeout(1000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(100);
if (nodeManager instanceof InVMNodeManager) {
((InVMNodeManager) nodeManager).failoverPause = 5000L;
((InVMNodeManager) nodeManager).failoverPause = 2000L;
}
ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator);
@ -415,7 +416,7 @@ public class FailoverTest extends FailoverTestBase {
locator.setCallTimeout(1000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(500);
if (nodeManager instanceof InVMNodeManager) {
((InVMNodeManager) nodeManager).failoverPause = 6000L;
((InVMNodeManager) nodeManager).failoverPause = 2000L;
}
ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator);
@ -472,7 +473,7 @@ public class FailoverTest extends FailoverTestBase {
expected.printStackTrace();
}
Thread.sleep(2000);
Thread.sleep(1000);
m = null;
for (int i = 0; i < 500; i++) {
@ -493,7 +494,7 @@ public class FailoverTest extends FailoverTestBase {
locator.setCallTimeout(2000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(100);
if (nodeManager instanceof InVMNodeManager) {
((InVMNodeManager) nodeManager).failoverPause = 5000L;
((InVMNodeManager) nodeManager).failoverPause = 1000L;
}
ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator);
@ -533,7 +534,7 @@ public class FailoverTest extends FailoverTestBase {
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
session.start();
ClientMessage m = consumer.receive(1000);
ClientMessage m = consumer.receiveImmediate();
Assert.assertNull(m);
}
@ -708,10 +709,7 @@ public class FailoverTest extends FailoverTestBase {
liveServer.getServer().start();
Assert.assertTrue("live initialized...", liveServer.getServer().waitForActivation(40, TimeUnit.SECONDS));
int i = 0;
while (!backupServer.isStarted() && i++ < 100) {
Thread.sleep(100);
}
Wait.assertTrue(backupServer::isStarted);
liveServer.getServer().waitForActivation(5, TimeUnit.SECONDS);
Assert.assertTrue(backupServer.isStarted());
@ -1807,7 +1805,7 @@ public class FailoverTest extends FailoverTestBase {
message = repeatMessage;
repeatMessage = null;
} else {
message = consumer.receive(1000);
message = consumer.receive(50);
}
if (message != null) {

View File

@ -98,9 +98,12 @@ public class JMSFailoverListenerTest extends ActiveMQTestBase {
public void testAutomaticFailover() throws Exception {
ActiveMQConnectionFactory jbcf = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.CF, livetc);
jbcf.setReconnectAttempts(-1);
jbcf.setRetryInterval(10);
jbcf.setRetryInterval(100);
jbcf.setConnectionTTL(500);
jbcf.setClientFailureCheckPeriod(100);
jbcf.setBlockOnDurableSend(true);
jbcf.setBlockOnNonDurableSend(true);
jbcf.setCallTimeout(1000);
// Note we set consumer window size to a value so we can verify that consumer credit re-sending
// works properly on failover
@ -152,7 +155,7 @@ public class JMSFailoverListenerTest extends ActiveMQTestBase {
JMSUtil.crash(liveServer, ((ActiveMQSession) sess).getCoreSession());
Assert.assertEquals(FailoverEventType.FAILURE_DETECTED, listener.get(0));
Wait.assertTrue(() -> FailoverEventType.FAILURE_DETECTED == listener.get(0));
for (int i = 0; i < numMessages; i++) {
JMSFailoverListenerTest.log.info("got message " + i);
@ -178,12 +181,14 @@ public class JMSFailoverListenerTest extends ActiveMQTestBase {
jbcfLive.setBlockOnNonDurableSend(true);
jbcfLive.setBlockOnDurableSend(true);
jbcfLive.setCallTimeout(1000);
ActiveMQConnectionFactory jbcfBackup = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY, backupParams));
jbcfBackup.setBlockOnNonDurableSend(true);
jbcfBackup.setBlockOnDurableSend(true);
jbcfBackup.setInitialConnectAttempts(-1);
jbcfBackup.setReconnectAttempts(-1);
jbcfBackup.setRetryInterval(100);
ActiveMQConnection connLive = (ActiveMQConnection) jbcfLive.createConnection();