diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java index 583d8fb93e..75981cf8c9 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java @@ -163,6 +163,19 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C final ScheduledExecutorService scheduledThreadPool, final List incomingInterceptors, final List outgoingInterceptors) { + this(serverLocator, new Pair<>(connectorConfig, null), + locatorConfig, reconnectAttempts, threadPool, + scheduledThreadPool, incomingInterceptors, outgoingInterceptors); + } + + ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator, + final Pair connectorConfig, + final ServerLocatorConfig locatorConfig, + final int reconnectAttempts, + final Executor threadPool, + final ScheduledExecutorService scheduledThreadPool, + final List incomingInterceptors, + final List outgoingInterceptors) { createTrace = new Exception(); this.serverLocator = serverLocator; @@ -171,11 +184,11 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C this.clientProtocolManager.setSessionFactory(this); - this.currentConnectorConfig = connectorConfig; + this.currentConnectorConfig = connectorConfig.getA(); - connectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName()); + connectorFactory = instantiateConnectorFactory(connectorConfig.getA().getFactoryClassName()); - checkTransportKeys(connectorFactory, connectorConfig); + checkTransportKeys(connectorFactory, connectorConfig.getA()); this.callTimeout = locatorConfig.callTimeout; @@ -216,6 +229,10 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C confirmationWindowWarning = new ConfirmationWindowWarning(serverLocator.getConfirmationWindowSize() < 0); connectionReadyForWrites = true; + + if (connectorConfig.getB() != null) { + this.backupConfig = connectorConfig.getB(); + } } @Override diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java index 3ac249fd7b..d345e06311 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java @@ -165,8 +165,6 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery private TransportConfiguration clusterTransportConfiguration; - private boolean useTopologyForLoadBalancing; - /** For tests only */ public DiscoveryGroup getDiscoveryGroup() { return discoveryGroup; @@ -422,7 +420,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery clusterTransportConfiguration = locator.clusterTransportConfiguration; } - private TransportConfiguration selectConnector() { + private synchronized Pair selectConnector(boolean useInitConnector) { Pair[] usedTopology; flushTopology(); @@ -432,14 +430,14 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery } synchronized (this) { - if (usedTopology != null && config.useTopologyForLoadBalancing) { + if (usedTopology != null && config.useTopologyForLoadBalancing && !useInitConnector) { if (logger.isTraceEnabled()) { logger.trace("Selecting connector from topology."); } int pos = loadBalancingPolicy.select(usedTopology.length); Pair pair = usedTopology[pos]; - return pair.getA(); + return pair; } else { if (logger.isTraceEnabled()) { logger.trace("Selecting connector from initial connectors."); @@ -447,7 +445,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery int pos = loadBalancingPolicy.select(initialConnectors.length); - return initialConnectors[pos]; + return new Pair(initialConnectors[pos], null); } } } @@ -658,10 +656,19 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery synchronized (this) { boolean retry = true; int attempts = 0; + boolean topologyArrayTried = !config.useTopologyForLoadBalancing || topologyArray == null || topologyArray.length == 0; + boolean staticTried = false; + boolean shouldTryStatic = !config.useTopologyForLoadBalancing || !receivedTopology || topologyArray == null || topologyArray.length == 0; + while (retry && !isClosed()) { retry = false; - TransportConfiguration tc = selectConnector(); + /* + * The logic is: If receivedTopology is false, try static first. + * if receivedTopology is true, try topologyArray first + */ + Pair tc = selectConnector(shouldTryStatic); + if (tc == null) { throw ActiveMQClientMessageBundle.BUNDLE.noTCForSessionFactory(); } @@ -682,12 +689,32 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery try { if (e.getType() == ActiveMQExceptionType.NOT_CONNECTED) { attempts++; - - int connectorsSize = getConnectorsSize(); int maxAttempts = config.initialConnectAttempts == 0 ? 1 : config.initialConnectAttempts; - if (config.initialConnectAttempts >= 0 && attempts >= maxAttempts * connectorsSize) { - throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers(); + if (shouldTryStatic) { + //we know static is used + if (config.initialConnectAttempts >= 0 && attempts >= maxAttempts * this.getNumInitialConnectors()) { + if (topologyArrayTried) { + //stop retry and throw exception + throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers(); + } else { + //lets try topologyArray + staticTried = true; + shouldTryStatic = false; + attempts = 0; + } + } + } else { + //we know topologyArray is used + if (config.initialConnectAttempts >= 0 && attempts >= maxAttempts * getConnectorsSize()) { + if (staticTried) { + throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers(); + } else { + topologyArrayTried = true; + shouldTryStatic = true; + attempts = 0; + } + } } if (factory.waitForRetry(config.retryInterval)) { throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers(); @@ -1414,7 +1441,6 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery if (topology.isEmpty()) { // Resetting the topology to its original condition as it was brand new receivedTopology = false; - topologyArray = null; } else { updateArraysAndPairs(eventTime); @@ -1492,6 +1518,12 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery synchronized (topologyArrayGuard) { Collection membersCopy = topology.getMembers(); + if (membersCopy.size() == 0) { + //it could happen when live is down, in that case we keeps the old copy + //and don't update + return; + } + Pair[] topologyArrayLocal = (Pair[]) Array.newInstance(Pair.class, membersCopy.size()); int count = 0; @@ -1557,7 +1589,6 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery if (!clusterConnection && isEmpty) { receivedTopology = false; - topologyArray = null; } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index 3ae605eff8..ff4c72842f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -2100,4 +2100,8 @@ public interface ActiveMQServerLogger extends BasicLogger { @LogMessage(level = Logger.Level.ERROR) @Message(id = 224104, value = "Error starting the Acceptor {0} {1}", format = Message.Format.MESSAGE_FORMAT) void errorStartingAcceptor(String name, Object configuration); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 224105, value = "Connecting to cluster failed") + void failedConnectingToCluster(@Cause Exception e); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java index 660d442a1c..5e3f225b06 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java @@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.ActiveMQAlreadyReplicatingException; import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; import org.apache.activemq.artemis.api.core.Pair; @@ -330,6 +331,10 @@ public class SharedNothingLiveActivation extends LiveActivation { // Just try connecting listener.latch.await(5, TimeUnit.SECONDS); } catch (Exception notConnected) { + if (!(notConnected instanceof ActiveMQException) || ActiveMQExceptionType.INTERNAL_ERROR.equals(((ActiveMQException) notConnected).getType())) { + // report all exceptions that aren't ActiveMQException and all INTERNAL_ERRORs + ActiveMQServerLogger.LOGGER.failedConnectingToCluster(notConnected); + } return false; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverTest.java index 7f22235c07..7a6ca6286d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverTest.java @@ -26,6 +26,8 @@ import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; +import java.lang.reflect.Field; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -34,6 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Interceptor; +import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; @@ -41,6 +44,9 @@ import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.api.jms.JMSFactoryType; import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; +import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl; +import org.apache.activemq.artemis.core.client.impl.Topology; +import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration; import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration; @@ -254,6 +260,7 @@ public class JMSFailoverTest extends ActiveMQTestBase { jbcfLive.setBlockOnDurableSend(true); ActiveMQConnectionFactory jbcfBackup = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY, backupParams)); + jbcfBackup.setBlockOnNonDurableSend(true); jbcfBackup.setBlockOnDurableSend(true); jbcfBackup.setInitialConnectAttempts(-1); @@ -437,6 +444,74 @@ public class JMSFailoverTest extends ActiveMQTestBase { } + @Test + public void testCreateNewConnectionAfterFailover() throws Exception { + ActiveMQConnectionFactory jbcf = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.CF, livetc); + jbcf.setInitialConnectAttempts(5); + jbcf.setRetryInterval(1000); + jbcf.setReconnectAttempts(-1); + + Connection conn1 = null, conn2 = null, conn3 = null; + + try { + conn1 = JMSUtil.createConnectionAndWaitForTopology(jbcf, 2, 5); + + conn2 = JMSUtil.createConnectionAndWaitForTopology(jbcf, 2, 5); + + Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE); + + ClientSession coreSession1 = ((ActiveMQSession)sess1).getCoreSession(); + ClientSession coreSession2 = ((ActiveMQSession)sess2).getCoreSession(); + + Topology fullTopology = jbcf.getServerLocator().getTopology(); + Collection members = fullTopology.getMembers(); + assertEquals(1, members.size()); + TopologyMemberImpl member = members.iterator().next(); + TransportConfiguration tcLive = member.getLive(); + TransportConfiguration tcBackup = member.getBackup(); + + System.out.println("live tc: " + tcLive); + System.out.println("Backup tc: " + tcBackup); + + JMSUtil.crash(liveServer, coreSession1, coreSession2); + + waitForServerToStart(backupServer); + + //now pretending that the live down event hasn't been propagated to client + simulateLiveDownHasNotReachClient((ServerLocatorImpl) jbcf.getServerLocator(), tcLive, tcBackup); + + //now create a new connection after live is down + try { + conn3 = jbcf.createConnection(); + } catch (Exception e) { + fail("The new connection should be established successfully after failover"); + } + } finally { + if (conn1 != null) { + conn1.close(); + } + if (conn2 != null) { + conn2.close(); + } + if (conn3 != null) { + conn3.close(); + } + } + } + + private void simulateLiveDownHasNotReachClient(ServerLocatorImpl locator, TransportConfiguration tcLive, TransportConfiguration tcBackup) throws NoSuchFieldException, IllegalAccessException { + Field f = locator.getClass().getDeclaredField("topologyArray"); + f.setAccessible(true); + + Pair[] value = (Pair[]) f.get(locator); + assertEquals(1, value.length); + Pair member = value[0]; + member.setA(tcLive); + member.setB(tcBackup); + f.set(locator, value); + } + // Package protected --------------------------------------------- // Protected ----------------------------------------------------- @@ -463,7 +538,10 @@ public class JMSFailoverTest extends ActiveMQTestBase { backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1); - backupConf = createBasicConfig().addAcceptorConfiguration(backupAcceptortc).addConnectorConfiguration(livetc.getName(), livetc).addConnectorConfiguration(backuptc.getName(), backuptc).setSecurityEnabled(false).setJournalType(getDefaultJournalType()).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, backupParams)).setBindingsDirectory(getBindingsDir()).setJournalMinFiles(2).setJournalDirectory(getJournalDir()).setPagingDirectory(getPageDir()).setLargeMessagesDirectory(getLargeMessagesDir()).setPersistenceEnabled(true).setHAPolicyConfiguration(sharedStore ? new SharedStoreSlavePolicyConfiguration() : new ReplicaPolicyConfiguration()).addClusterConfiguration(basicClusterConnectionConfig(backuptc.getName(), livetc.getName())); + backuptc.getParams().put(TransportConstants.SERVER_ID_PROP_NAME, 1); + backupAcceptortc.getParams().put(TransportConstants.SERVER_ID_PROP_NAME, 1); + + backupConf = createBasicConfig().addConnectorConfiguration(livetc.getName(), livetc).addConnectorConfiguration(backuptc.getName(), backuptc).setSecurityEnabled(false).setJournalType(getDefaultJournalType()).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, backupParams)).setBindingsDirectory(getBindingsDir()).setJournalMinFiles(2).setJournalDirectory(getJournalDir()).setPagingDirectory(getPageDir()).setLargeMessagesDirectory(getLargeMessagesDir()).setPersistenceEnabled(true).setHAPolicyConfiguration(sharedStore ? new SharedStoreSlavePolicyConfiguration() : new ReplicaPolicyConfiguration()).addClusterConfiguration(basicClusterConnectionConfig(backuptc.getName(), livetc.getName())); backupServer = addServer(new InVMNodeManagerServer(backupConf, nodeManager)); @@ -484,7 +562,7 @@ public class JMSFailoverTest extends ActiveMQTestBase { liveJMSServer.setRegistry(new JndiBindingRegistry(ctx1)); liveJMSServer.getActiveMQServer().setIdentity("JMSLive"); - log.debug("Starting life"); + log.debug("Starting live"); liveJMSServer.start();