diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java index 835cc1f0fd..5aef7ac46f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl; import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal; @@ -71,6 +72,12 @@ public class BackupManager implements ActiveMQComponent { this.clusterManager = clusterManager; } + /** This is meant for testing and assertions, please don't do anything stupid with it! + * I mean, please don't use it outside of testing context */ + public List getBackupConnectors() { + return backupConnectors; + } + /* * Start the backup manager if not already started. This entails deploying a backup connector based on a cluster * configuration, informing the cluster manager so that it can add it to its topology and announce itself to the cluster. @@ -174,12 +181,12 @@ public class BackupManager implements ActiveMQComponent { /* * A backup connector will connect to the cluster and announce that we are a backup server ready to fail over. * */ - private abstract class BackupConnector { + public abstract class BackupConnector { private volatile ServerLocatorInternal backupServerLocator; private String name; private TransportConfiguration connector; - private long retryInterval; + protected long retryInterval; private ClusterManager clusterManager; private boolean stopping = false; private boolean announcingBackup; @@ -200,6 +207,11 @@ public class BackupManager implements ActiveMQComponent { * */ abstract ServerLocatorInternal createServerLocator(Topology topology); + /** This is for test assertions, please be careful, don't use outside of testing! */ + public ServerLocator getBackupServerLocator() { + return backupServerLocator; + } + /* * start the connector by creating the server locator to use. * */ @@ -261,13 +273,7 @@ public class BackupManager implements ActiveMQComponent { return; ActiveMQServerLogger.LOGGER.errorAnnouncingBackup(e); - scheduledExecutor.schedule(new Runnable() { - @Override - public void run() { - announceBackup(); - } - - }, retryInterval, TimeUnit.MILLISECONDS); + retryConnection(); } finally { announcingBackup = false; } @@ -275,6 +281,17 @@ public class BackupManager implements ActiveMQComponent { }); } + /** it will re-schedule the connection after a timeout, using a scheduled executor */ + protected void retryConnection() { + scheduledExecutor.schedule(new Runnable() { + @Override + public void run() { + announceBackup(); + } + + }, retryInterval, TimeUnit.MILLISECONDS); + } + /* * called to notify the cluster manager about the backup * */ @@ -341,6 +358,7 @@ public class BackupManager implements ActiveMQComponent { } ServerLocatorImpl locator = new ServerLocatorImpl(topology, true, tcConfigs); locator.setClusterConnection(true); + locator.setRetryInterval(retryInterval); locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator)); return locator; } @@ -372,7 +390,7 @@ public class BackupManager implements ActiveMQComponent { @Override public ServerLocatorInternal createServerLocator(Topology topology) { - return new ServerLocatorImpl(topology, true, discoveryGroupConfiguration); + return new ServerLocatorImpl(topology, true, discoveryGroupConfiguration).setRetryInterval(retryInterval); } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/InVMNodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/InVMNodeManager.java index a0e7601891..7af25defde 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/InVMNodeManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/InVMNodeManager.java @@ -70,17 +70,17 @@ public final class InVMNodeManager extends NodeManager { public void awaitLiveNode() throws Exception { do { while (state == NOT_STARTED) { - Thread.sleep(2000); + Thread.sleep(10); } liveLock.acquire(); if (state == PAUSED) { liveLock.release(); - Thread.sleep(2000); + Thread.sleep(10); } else if (state == FAILING_BACK) { liveLock.release(); - Thread.sleep(2000); + Thread.sleep(10); } else if (state == LIVE) { break; } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index 75386486e3..bf2af4b1a3 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -586,7 +586,7 @@ public abstract class ActiveMQTestBase extends Assert { } ClusterConnectionConfiguration clusterConnectionConfiguration = new ClusterConnectionConfiguration(). setName("cluster1").setAddress("jms").setConnectorName(connectorName). - setRetryInterval(1000).setDuplicateDetection(false).setMaxHops(1). + setRetryInterval(100).setDuplicateDetection(false).setMaxHops(1). setConfirmationWindowSize(1).setMessageLoadBalancingType(MessageLoadBalancingType.STRICT). setStaticConnectors(connectors0); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/CheckRetryIntervalBackupManagerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/CheckRetryIntervalBackupManagerTest.java new file mode 100644 index 0000000000..d533588a2f --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/CheckRetryIntervalBackupManagerTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.cluster.failover; + +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; +import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration; +import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration; +import org.apache.activemq.artemis.core.server.cluster.BackupManager; +import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; +import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; +import org.apache.activemq.artemis.tests.util.CountDownSessionFailureListener; +import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils; +import org.apache.activemq.artemis.utils.Wait; +import org.junit.Assert; +import org.junit.Test; + +public class CheckRetryIntervalBackupManagerTest extends FailoverTestBase { + + private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER; + + private volatile CountDownSessionFailureListener listener; + + private volatile ClientSessionFactoryInternal sf; + + private final Object lockFail = new Object(); + + @Override + protected void createConfigs() throws Exception { + nodeManager = createNodeManager(); + TransportConfiguration liveConnector = getConnectorTransportConfiguration(true); + TransportConfiguration backupConnector = getConnectorTransportConfiguration(false); + + backupConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration()).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(createBasicClusterConfig(backupConnector.getName(), liveConnector.getName()).setRetryInterval(333)); + + backupServer = createTestableServer(backupConfig); + + liveConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)).setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration()).addClusterConfiguration(createBasicClusterConfig(liveConnector.getName()).setRetryInterval(333)).addConnectorConfiguration(liveConnector.getName(), liveConnector); + + liveServer = createTestableServer(liveConfig); + } + + @Test + public void testValidateRetryInterval() { + ActiveMQServerImpl server = (ActiveMQServerImpl) backupServer.getServer(); + for (BackupManager.BackupConnector backupConnector : server.getBackupManager().getBackupConnectors()) { + + Wait.assertTrue(() -> backupConnector.getBackupServerLocator() != null); + Assert.assertEquals(333, backupConnector.getBackupServerLocator().getRetryInterval()); + Assert.assertEquals(-1, backupConnector.getBackupServerLocator().getReconnectAttempts()); + } + } + + @Override + protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) { + return TransportConfigurationUtils.getInVMAcceptor(live); + } + + @Override + protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) { + return TransportConfigurationUtils.getInVMConnector(live); + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java index 4c40448a9c..5042afab40 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java @@ -64,6 +64,7 @@ import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer import org.apache.activemq.artemis.tests.util.CountDownSessionFailureListener; import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils; import org.apache.activemq.artemis.utils.RandomUtil; +import org.apache.activemq.artemis.utils.Wait; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -248,10 +249,10 @@ public class FailoverTest extends FailoverTestBase { @Test(timeout = 120000) public void testTimeoutOnFailoverConsumeBlocked() throws Exception { - locator.setCallTimeout(5000).setBlockOnNonDurableSend(true).setConsumerWindowSize(0).setBlockOnDurableSend(true).setAckBatchSize(0).setBlockOnAcknowledge(true).setReconnectAttempts(-1).setAckBatchSize(0); + locator.setCallTimeout(1000).setBlockOnNonDurableSend(true).setConsumerWindowSize(0).setBlockOnDurableSend(true).setAckBatchSize(0).setBlockOnAcknowledge(true).setReconnectAttempts(-1).setAckBatchSize(0); if (nodeManager instanceof InVMNodeManager) { - ((InVMNodeManager) nodeManager).failoverPause = 5000L; + ((InVMNodeManager) nodeManager).failoverPause = 2000L; } ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator); @@ -343,10 +344,10 @@ public class FailoverTest extends FailoverTestBase { // https://issues.jboss.org/browse/HORNETQ-685 @Test(timeout = 120000) public void testTimeoutOnFailoverTransactionCommit() throws Exception { - locator.setCallTimeout(5000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(100); + locator.setCallTimeout(1000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(100); if (nodeManager instanceof InVMNodeManager) { - ((InVMNodeManager) nodeManager).failoverPause = 5000L; + ((InVMNodeManager) nodeManager).failoverPause = 2000L; } ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator); @@ -415,7 +416,7 @@ public class FailoverTest extends FailoverTestBase { locator.setCallTimeout(1000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(500); if (nodeManager instanceof InVMNodeManager) { - ((InVMNodeManager) nodeManager).failoverPause = 6000L; + ((InVMNodeManager) nodeManager).failoverPause = 2000L; } ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator); @@ -472,7 +473,7 @@ public class FailoverTest extends FailoverTestBase { expected.printStackTrace(); } - Thread.sleep(2000); + Thread.sleep(1000); m = null; for (int i = 0; i < 500; i++) { @@ -493,7 +494,7 @@ public class FailoverTest extends FailoverTestBase { locator.setCallTimeout(2000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(100); if (nodeManager instanceof InVMNodeManager) { - ((InVMNodeManager) nodeManager).failoverPause = 5000L; + ((InVMNodeManager) nodeManager).failoverPause = 1000L; } ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator); @@ -533,7 +534,7 @@ public class FailoverTest extends FailoverTestBase { ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS); session.start(); - ClientMessage m = consumer.receive(1000); + ClientMessage m = consumer.receiveImmediate(); Assert.assertNull(m); } @@ -708,10 +709,7 @@ public class FailoverTest extends FailoverTestBase { liveServer.getServer().start(); Assert.assertTrue("live initialized...", liveServer.getServer().waitForActivation(40, TimeUnit.SECONDS)); - int i = 0; - while (!backupServer.isStarted() && i++ < 100) { - Thread.sleep(100); - } + Wait.assertTrue(backupServer::isStarted); liveServer.getServer().waitForActivation(5, TimeUnit.SECONDS); Assert.assertTrue(backupServer.isStarted()); @@ -1807,7 +1805,7 @@ public class FailoverTest extends FailoverTestBase { message = repeatMessage; repeatMessage = null; } else { - message = consumer.receive(1000); + message = consumer.receive(50); } if (message != null) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverListenerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverListenerTest.java index c6953912d9..4bd0b7ca4f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverListenerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverListenerTest.java @@ -98,9 +98,12 @@ public class JMSFailoverListenerTest extends ActiveMQTestBase { public void testAutomaticFailover() throws Exception { ActiveMQConnectionFactory jbcf = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.CF, livetc); jbcf.setReconnectAttempts(-1); - jbcf.setRetryInterval(10); + jbcf.setRetryInterval(100); + jbcf.setConnectionTTL(500); + jbcf.setClientFailureCheckPeriod(100); jbcf.setBlockOnDurableSend(true); jbcf.setBlockOnNonDurableSend(true); + jbcf.setCallTimeout(1000); // Note we set consumer window size to a value so we can verify that consumer credit re-sending // works properly on failover @@ -152,7 +155,7 @@ public class JMSFailoverListenerTest extends ActiveMQTestBase { JMSUtil.crash(liveServer, ((ActiveMQSession) sess).getCoreSession()); - Assert.assertEquals(FailoverEventType.FAILURE_DETECTED, listener.get(0)); + Wait.assertTrue(() -> FailoverEventType.FAILURE_DETECTED == listener.get(0)); for (int i = 0; i < numMessages; i++) { JMSFailoverListenerTest.log.info("got message " + i); @@ -178,12 +181,14 @@ public class JMSFailoverListenerTest extends ActiveMQTestBase { jbcfLive.setBlockOnNonDurableSend(true); jbcfLive.setBlockOnDurableSend(true); + jbcfLive.setCallTimeout(1000); ActiveMQConnectionFactory jbcfBackup = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY, backupParams)); jbcfBackup.setBlockOnNonDurableSend(true); jbcfBackup.setBlockOnDurableSend(true); jbcfBackup.setInitialConnectAttempts(-1); jbcfBackup.setReconnectAttempts(-1); + jbcfBackup.setRetryInterval(100); ActiveMQConnection connLive = (ActiveMQConnection) jbcfLive.createConnection();