From 6f8ff55dec629504d34b7dd3757bff24fa16c575 Mon Sep 17 00:00:00 2001 From: Howard Gao Date: Tue, 7 Jul 2020 17:10:45 +0800 Subject: [PATCH] ARTEMIS-2835 Porting HORNETQ-1575 and HORNETQ-1578 1 of 2) - Porting of HORNETMQ-1575 In a live-backup scenario, when live is down and backup becomes live, clients using HA Connection Factories can failover automatically. However if a client decides to create a new connection by itself (as in camel jms case) there is a chance that the new connection is pointing to the dead live and the connection won't be successful. The reason is that if the old connection is gone the backup will not get a chance to announce itself back to client so it fails on initial connection. The fix is to let CF remember the old topology and use it on any initial connection attempts. --- .../client/impl/ClientSessionFactoryImpl.java | 23 +++++- .../core/client/impl/ServerLocatorImpl.java | 57 ++++++++++--- .../jms/cluster/JMSFailoverTest.java | 82 ++++++++++++++++++- 3 files changed, 144 insertions(+), 18 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java index 583d8fb93e..75981cf8c9 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java @@ -163,6 +163,19 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C final ScheduledExecutorService scheduledThreadPool, final List incomingInterceptors, final List outgoingInterceptors) { + this(serverLocator, new Pair<>(connectorConfig, null), + locatorConfig, reconnectAttempts, threadPool, + scheduledThreadPool, incomingInterceptors, outgoingInterceptors); + } + + ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator, + final Pair connectorConfig, + final ServerLocatorConfig locatorConfig, + final int reconnectAttempts, + final Executor threadPool, + final ScheduledExecutorService scheduledThreadPool, + final List incomingInterceptors, + final List outgoingInterceptors) { createTrace = new Exception(); this.serverLocator = serverLocator; @@ -171,11 +184,11 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C this.clientProtocolManager.setSessionFactory(this); - this.currentConnectorConfig = connectorConfig; + this.currentConnectorConfig = connectorConfig.getA(); - connectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName()); + connectorFactory = instantiateConnectorFactory(connectorConfig.getA().getFactoryClassName()); - checkTransportKeys(connectorFactory, connectorConfig); + checkTransportKeys(connectorFactory, connectorConfig.getA()); this.callTimeout = locatorConfig.callTimeout; @@ -216,6 +229,10 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C confirmationWindowWarning = new ConfirmationWindowWarning(serverLocator.getConfirmationWindowSize() < 0); connectionReadyForWrites = true; + + if (connectorConfig.getB() != null) { + this.backupConfig = connectorConfig.getB(); + } } @Override diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java index 3ac249fd7b..d345e06311 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java @@ -165,8 +165,6 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery private TransportConfiguration clusterTransportConfiguration; - private boolean useTopologyForLoadBalancing; - /** For tests only */ public DiscoveryGroup getDiscoveryGroup() { return discoveryGroup; @@ -422,7 +420,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery clusterTransportConfiguration = locator.clusterTransportConfiguration; } - private TransportConfiguration selectConnector() { + private synchronized Pair selectConnector(boolean useInitConnector) { Pair[] usedTopology; flushTopology(); @@ -432,14 +430,14 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery } synchronized (this) { - if (usedTopology != null && config.useTopologyForLoadBalancing) { + if (usedTopology != null && config.useTopologyForLoadBalancing && !useInitConnector) { if (logger.isTraceEnabled()) { logger.trace("Selecting connector from topology."); } int pos = loadBalancingPolicy.select(usedTopology.length); Pair pair = usedTopology[pos]; - return pair.getA(); + return pair; } else { if (logger.isTraceEnabled()) { logger.trace("Selecting connector from initial connectors."); @@ -447,7 +445,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery int pos = loadBalancingPolicy.select(initialConnectors.length); - return initialConnectors[pos]; + return new Pair(initialConnectors[pos], null); } } } @@ -658,10 +656,19 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery synchronized (this) { boolean retry = true; int attempts = 0; + boolean topologyArrayTried = !config.useTopologyForLoadBalancing || topologyArray == null || topologyArray.length == 0; + boolean staticTried = false; + boolean shouldTryStatic = !config.useTopologyForLoadBalancing || !receivedTopology || topologyArray == null || topologyArray.length == 0; + while (retry && !isClosed()) { retry = false; - TransportConfiguration tc = selectConnector(); + /* + * The logic is: If receivedTopology is false, try static first. + * if receivedTopology is true, try topologyArray first + */ + Pair tc = selectConnector(shouldTryStatic); + if (tc == null) { throw ActiveMQClientMessageBundle.BUNDLE.noTCForSessionFactory(); } @@ -682,12 +689,32 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery try { if (e.getType() == ActiveMQExceptionType.NOT_CONNECTED) { attempts++; - - int connectorsSize = getConnectorsSize(); int maxAttempts = config.initialConnectAttempts == 0 ? 1 : config.initialConnectAttempts; - if (config.initialConnectAttempts >= 0 && attempts >= maxAttempts * connectorsSize) { - throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers(); + if (shouldTryStatic) { + //we know static is used + if (config.initialConnectAttempts >= 0 && attempts >= maxAttempts * this.getNumInitialConnectors()) { + if (topologyArrayTried) { + //stop retry and throw exception + throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers(); + } else { + //lets try topologyArray + staticTried = true; + shouldTryStatic = false; + attempts = 0; + } + } + } else { + //we know topologyArray is used + if (config.initialConnectAttempts >= 0 && attempts >= maxAttempts * getConnectorsSize()) { + if (staticTried) { + throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers(); + } else { + topologyArrayTried = true; + shouldTryStatic = true; + attempts = 0; + } + } } if (factory.waitForRetry(config.retryInterval)) { throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers(); @@ -1414,7 +1441,6 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery if (topology.isEmpty()) { // Resetting the topology to its original condition as it was brand new receivedTopology = false; - topologyArray = null; } else { updateArraysAndPairs(eventTime); @@ -1492,6 +1518,12 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery synchronized (topologyArrayGuard) { Collection membersCopy = topology.getMembers(); + if (membersCopy.size() == 0) { + //it could happen when live is down, in that case we keeps the old copy + //and don't update + return; + } + Pair[] topologyArrayLocal = (Pair[]) Array.newInstance(Pair.class, membersCopy.size()); int count = 0; @@ -1557,7 +1589,6 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery if (!clusterConnection && isEmpty) { receivedTopology = false; - topologyArray = null; } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverTest.java index 7f22235c07..7a6ca6286d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverTest.java @@ -26,6 +26,8 @@ import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; +import java.lang.reflect.Field; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -34,6 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Interceptor; +import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; @@ -41,6 +44,9 @@ import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.api.jms.JMSFactoryType; import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; +import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl; +import org.apache.activemq.artemis.core.client.impl.Topology; +import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration; import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration; @@ -254,6 +260,7 @@ public class JMSFailoverTest extends ActiveMQTestBase { jbcfLive.setBlockOnDurableSend(true); ActiveMQConnectionFactory jbcfBackup = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY, backupParams)); + jbcfBackup.setBlockOnNonDurableSend(true); jbcfBackup.setBlockOnDurableSend(true); jbcfBackup.setInitialConnectAttempts(-1); @@ -437,6 +444,74 @@ public class JMSFailoverTest extends ActiveMQTestBase { } + @Test + public void testCreateNewConnectionAfterFailover() throws Exception { + ActiveMQConnectionFactory jbcf = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.CF, livetc); + jbcf.setInitialConnectAttempts(5); + jbcf.setRetryInterval(1000); + jbcf.setReconnectAttempts(-1); + + Connection conn1 = null, conn2 = null, conn3 = null; + + try { + conn1 = JMSUtil.createConnectionAndWaitForTopology(jbcf, 2, 5); + + conn2 = JMSUtil.createConnectionAndWaitForTopology(jbcf, 2, 5); + + Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE); + + ClientSession coreSession1 = ((ActiveMQSession)sess1).getCoreSession(); + ClientSession coreSession2 = ((ActiveMQSession)sess2).getCoreSession(); + + Topology fullTopology = jbcf.getServerLocator().getTopology(); + Collection members = fullTopology.getMembers(); + assertEquals(1, members.size()); + TopologyMemberImpl member = members.iterator().next(); + TransportConfiguration tcLive = member.getLive(); + TransportConfiguration tcBackup = member.getBackup(); + + System.out.println("live tc: " + tcLive); + System.out.println("Backup tc: " + tcBackup); + + JMSUtil.crash(liveServer, coreSession1, coreSession2); + + waitForServerToStart(backupServer); + + //now pretending that the live down event hasn't been propagated to client + simulateLiveDownHasNotReachClient((ServerLocatorImpl) jbcf.getServerLocator(), tcLive, tcBackup); + + //now create a new connection after live is down + try { + conn3 = jbcf.createConnection(); + } catch (Exception e) { + fail("The new connection should be established successfully after failover"); + } + } finally { + if (conn1 != null) { + conn1.close(); + } + if (conn2 != null) { + conn2.close(); + } + if (conn3 != null) { + conn3.close(); + } + } + } + + private void simulateLiveDownHasNotReachClient(ServerLocatorImpl locator, TransportConfiguration tcLive, TransportConfiguration tcBackup) throws NoSuchFieldException, IllegalAccessException { + Field f = locator.getClass().getDeclaredField("topologyArray"); + f.setAccessible(true); + + Pair[] value = (Pair[]) f.get(locator); + assertEquals(1, value.length); + Pair member = value[0]; + member.setA(tcLive); + member.setB(tcBackup); + f.set(locator, value); + } + // Package protected --------------------------------------------- // Protected ----------------------------------------------------- @@ -463,7 +538,10 @@ public class JMSFailoverTest extends ActiveMQTestBase { backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1); - backupConf = createBasicConfig().addAcceptorConfiguration(backupAcceptortc).addConnectorConfiguration(livetc.getName(), livetc).addConnectorConfiguration(backuptc.getName(), backuptc).setSecurityEnabled(false).setJournalType(getDefaultJournalType()).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, backupParams)).setBindingsDirectory(getBindingsDir()).setJournalMinFiles(2).setJournalDirectory(getJournalDir()).setPagingDirectory(getPageDir()).setLargeMessagesDirectory(getLargeMessagesDir()).setPersistenceEnabled(true).setHAPolicyConfiguration(sharedStore ? new SharedStoreSlavePolicyConfiguration() : new ReplicaPolicyConfiguration()).addClusterConfiguration(basicClusterConnectionConfig(backuptc.getName(), livetc.getName())); + backuptc.getParams().put(TransportConstants.SERVER_ID_PROP_NAME, 1); + backupAcceptortc.getParams().put(TransportConstants.SERVER_ID_PROP_NAME, 1); + + backupConf = createBasicConfig().addConnectorConfiguration(livetc.getName(), livetc).addConnectorConfiguration(backuptc.getName(), backuptc).setSecurityEnabled(false).setJournalType(getDefaultJournalType()).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, backupParams)).setBindingsDirectory(getBindingsDir()).setJournalMinFiles(2).setJournalDirectory(getJournalDir()).setPagingDirectory(getPageDir()).setLargeMessagesDirectory(getLargeMessagesDir()).setPersistenceEnabled(true).setHAPolicyConfiguration(sharedStore ? new SharedStoreSlavePolicyConfiguration() : new ReplicaPolicyConfiguration()).addClusterConfiguration(basicClusterConnectionConfig(backuptc.getName(), livetc.getName())); backupServer = addServer(new InVMNodeManagerServer(backupConf, nodeManager)); @@ -484,7 +562,7 @@ public class JMSFailoverTest extends ActiveMQTestBase { liveJMSServer.setRegistry(new JndiBindingRegistry(ctx1)); liveJMSServer.getActiveMQServer().setIdentity("JMSLive"); - log.debug("Starting life"); + log.debug("Starting live"); liveJMSServer.start();