ARTEMIS-2835 Porting HORNETQ-1575 and HORNETQ-1578

1 of 2) - Porting of HORNETMQ-1575

In a live-backup scenario, when live is down and backup becomes live, clients
using HA Connection Factories can failover automatically. However if a
client decides to create a new connection by itself (as in camel jms case)
there is a chance that the new connection is pointing to the dead live
and the connection won't be successful. The reason is that if the old
connection is gone the backup will not get a chance to announce itself
back to client so it fails on initial connection.

The fix is to let CF remember the old topology and use it on any
initial connection attempts.
This commit is contained in:
Howard Gao 2020-07-07 17:10:45 +08:00 committed by Clebert Suconic
parent a69b2aee7b
commit 6f8ff55dec
3 changed files with 144 additions and 18 deletions

View File

@ -163,6 +163,19 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
final ScheduledExecutorService scheduledThreadPool, final ScheduledExecutorService scheduledThreadPool,
final List<Interceptor> incomingInterceptors, final List<Interceptor> incomingInterceptors,
final List<Interceptor> outgoingInterceptors) { final List<Interceptor> outgoingInterceptors) {
this(serverLocator, new Pair<>(connectorConfig, null),
locatorConfig, reconnectAttempts, threadPool,
scheduledThreadPool, incomingInterceptors, outgoingInterceptors);
}
ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
final Pair<TransportConfiguration, TransportConfiguration> connectorConfig,
final ServerLocatorConfig locatorConfig,
final int reconnectAttempts,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool,
final List<Interceptor> incomingInterceptors,
final List<Interceptor> outgoingInterceptors) {
createTrace = new Exception(); createTrace = new Exception();
this.serverLocator = serverLocator; this.serverLocator = serverLocator;
@ -171,11 +184,11 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
this.clientProtocolManager.setSessionFactory(this); this.clientProtocolManager.setSessionFactory(this);
this.currentConnectorConfig = connectorConfig; this.currentConnectorConfig = connectorConfig.getA();
connectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName()); connectorFactory = instantiateConnectorFactory(connectorConfig.getA().getFactoryClassName());
checkTransportKeys(connectorFactory, connectorConfig); checkTransportKeys(connectorFactory, connectorConfig.getA());
this.callTimeout = locatorConfig.callTimeout; this.callTimeout = locatorConfig.callTimeout;
@ -216,6 +229,10 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
confirmationWindowWarning = new ConfirmationWindowWarning(serverLocator.getConfirmationWindowSize() < 0); confirmationWindowWarning = new ConfirmationWindowWarning(serverLocator.getConfirmationWindowSize() < 0);
connectionReadyForWrites = true; connectionReadyForWrites = true;
if (connectorConfig.getB() != null) {
this.backupConfig = connectorConfig.getB();
}
} }
@Override @Override

View File

@ -165,8 +165,6 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
private TransportConfiguration clusterTransportConfiguration; private TransportConfiguration clusterTransportConfiguration;
private boolean useTopologyForLoadBalancing;
/** For tests only */ /** For tests only */
public DiscoveryGroup getDiscoveryGroup() { public DiscoveryGroup getDiscoveryGroup() {
return discoveryGroup; return discoveryGroup;
@ -422,7 +420,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
clusterTransportConfiguration = locator.clusterTransportConfiguration; clusterTransportConfiguration = locator.clusterTransportConfiguration;
} }
private TransportConfiguration selectConnector() { private synchronized Pair<TransportConfiguration, TransportConfiguration> selectConnector(boolean useInitConnector) {
Pair<TransportConfiguration, TransportConfiguration>[] usedTopology; Pair<TransportConfiguration, TransportConfiguration>[] usedTopology;
flushTopology(); flushTopology();
@ -432,14 +430,14 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
} }
synchronized (this) { synchronized (this) {
if (usedTopology != null && config.useTopologyForLoadBalancing) { if (usedTopology != null && config.useTopologyForLoadBalancing && !useInitConnector) {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Selecting connector from topology."); logger.trace("Selecting connector from topology.");
} }
int pos = loadBalancingPolicy.select(usedTopology.length); int pos = loadBalancingPolicy.select(usedTopology.length);
Pair<TransportConfiguration, TransportConfiguration> pair = usedTopology[pos]; Pair<TransportConfiguration, TransportConfiguration> pair = usedTopology[pos];
return pair.getA(); return pair;
} else { } else {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Selecting connector from initial connectors."); logger.trace("Selecting connector from initial connectors.");
@ -447,7 +445,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
int pos = loadBalancingPolicy.select(initialConnectors.length); int pos = loadBalancingPolicy.select(initialConnectors.length);
return initialConnectors[pos]; return new Pair(initialConnectors[pos], null);
} }
} }
} }
@ -658,10 +656,19 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
synchronized (this) { synchronized (this) {
boolean retry = true; boolean retry = true;
int attempts = 0; int attempts = 0;
boolean topologyArrayTried = !config.useTopologyForLoadBalancing || topologyArray == null || topologyArray.length == 0;
boolean staticTried = false;
boolean shouldTryStatic = !config.useTopologyForLoadBalancing || !receivedTopology || topologyArray == null || topologyArray.length == 0;
while (retry && !isClosed()) { while (retry && !isClosed()) {
retry = false; retry = false;
TransportConfiguration tc = selectConnector(); /*
* The logic is: If receivedTopology is false, try static first.
* if receivedTopology is true, try topologyArray first
*/
Pair<TransportConfiguration, TransportConfiguration> tc = selectConnector(shouldTryStatic);
if (tc == null) { if (tc == null) {
throw ActiveMQClientMessageBundle.BUNDLE.noTCForSessionFactory(); throw ActiveMQClientMessageBundle.BUNDLE.noTCForSessionFactory();
} }
@ -682,12 +689,32 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
try { try {
if (e.getType() == ActiveMQExceptionType.NOT_CONNECTED) { if (e.getType() == ActiveMQExceptionType.NOT_CONNECTED) {
attempts++; attempts++;
int connectorsSize = getConnectorsSize();
int maxAttempts = config.initialConnectAttempts == 0 ? 1 : config.initialConnectAttempts; int maxAttempts = config.initialConnectAttempts == 0 ? 1 : config.initialConnectAttempts;
if (config.initialConnectAttempts >= 0 && attempts >= maxAttempts * connectorsSize) { if (shouldTryStatic) {
//we know static is used
if (config.initialConnectAttempts >= 0 && attempts >= maxAttempts * this.getNumInitialConnectors()) {
if (topologyArrayTried) {
//stop retry and throw exception
throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers(); throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers();
} else {
//lets try topologyArray
staticTried = true;
shouldTryStatic = false;
attempts = 0;
}
}
} else {
//we know topologyArray is used
if (config.initialConnectAttempts >= 0 && attempts >= maxAttempts * getConnectorsSize()) {
if (staticTried) {
throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers();
} else {
topologyArrayTried = true;
shouldTryStatic = true;
attempts = 0;
}
}
} }
if (factory.waitForRetry(config.retryInterval)) { if (factory.waitForRetry(config.retryInterval)) {
throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers(); throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers();
@ -1414,7 +1441,6 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
if (topology.isEmpty()) { if (topology.isEmpty()) {
// Resetting the topology to its original condition as it was brand new // Resetting the topology to its original condition as it was brand new
receivedTopology = false; receivedTopology = false;
topologyArray = null;
} else { } else {
updateArraysAndPairs(eventTime); updateArraysAndPairs(eventTime);
@ -1492,6 +1518,12 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
synchronized (topologyArrayGuard) { synchronized (topologyArrayGuard) {
Collection<TopologyMemberImpl> membersCopy = topology.getMembers(); Collection<TopologyMemberImpl> membersCopy = topology.getMembers();
if (membersCopy.size() == 0) {
//it could happen when live is down, in that case we keeps the old copy
//and don't update
return;
}
Pair<TransportConfiguration, TransportConfiguration>[] topologyArrayLocal = (Pair<TransportConfiguration, TransportConfiguration>[]) Array.newInstance(Pair.class, membersCopy.size()); Pair<TransportConfiguration, TransportConfiguration>[] topologyArrayLocal = (Pair<TransportConfiguration, TransportConfiguration>[]) Array.newInstance(Pair.class, membersCopy.size());
int count = 0; int count = 0;
@ -1557,7 +1589,6 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
if (!clusterConnection && isEmpty) { if (!clusterConnection && isEmpty) {
receivedTopology = false; receivedTopology = false;
topologyArray = null;
} }
} }

View File

@ -26,6 +26,8 @@ import javax.jms.MessageProducer;
import javax.jms.Queue; import javax.jms.Queue;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import java.lang.reflect.Field;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -34,6 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration;
@ -41,6 +44,9 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.api.jms.JMSFactoryType; import org.apache.activemq.artemis.api.jms.JMSFactoryType;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
import org.apache.activemq.artemis.core.client.impl.Topology;
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration; import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration; import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
@ -254,6 +260,7 @@ public class JMSFailoverTest extends ActiveMQTestBase {
jbcfLive.setBlockOnDurableSend(true); jbcfLive.setBlockOnDurableSend(true);
ActiveMQConnectionFactory jbcfBackup = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY, backupParams)); ActiveMQConnectionFactory jbcfBackup = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY, backupParams));
jbcfBackup.setBlockOnNonDurableSend(true); jbcfBackup.setBlockOnNonDurableSend(true);
jbcfBackup.setBlockOnDurableSend(true); jbcfBackup.setBlockOnDurableSend(true);
jbcfBackup.setInitialConnectAttempts(-1); jbcfBackup.setInitialConnectAttempts(-1);
@ -437,6 +444,74 @@ public class JMSFailoverTest extends ActiveMQTestBase {
} }
@Test
public void testCreateNewConnectionAfterFailover() throws Exception {
ActiveMQConnectionFactory jbcf = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.CF, livetc);
jbcf.setInitialConnectAttempts(5);
jbcf.setRetryInterval(1000);
jbcf.setReconnectAttempts(-1);
Connection conn1 = null, conn2 = null, conn3 = null;
try {
conn1 = JMSUtil.createConnectionAndWaitForTopology(jbcf, 2, 5);
conn2 = JMSUtil.createConnectionAndWaitForTopology(jbcf, 2, 5);
Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
ClientSession coreSession1 = ((ActiveMQSession)sess1).getCoreSession();
ClientSession coreSession2 = ((ActiveMQSession)sess2).getCoreSession();
Topology fullTopology = jbcf.getServerLocator().getTopology();
Collection<TopologyMemberImpl> members = fullTopology.getMembers();
assertEquals(1, members.size());
TopologyMemberImpl member = members.iterator().next();
TransportConfiguration tcLive = member.getLive();
TransportConfiguration tcBackup = member.getBackup();
System.out.println("live tc: " + tcLive);
System.out.println("Backup tc: " + tcBackup);
JMSUtil.crash(liveServer, coreSession1, coreSession2);
waitForServerToStart(backupServer);
//now pretending that the live down event hasn't been propagated to client
simulateLiveDownHasNotReachClient((ServerLocatorImpl) jbcf.getServerLocator(), tcLive, tcBackup);
//now create a new connection after live is down
try {
conn3 = jbcf.createConnection();
} catch (Exception e) {
fail("The new connection should be established successfully after failover");
}
} finally {
if (conn1 != null) {
conn1.close();
}
if (conn2 != null) {
conn2.close();
}
if (conn3 != null) {
conn3.close();
}
}
}
private void simulateLiveDownHasNotReachClient(ServerLocatorImpl locator, TransportConfiguration tcLive, TransportConfiguration tcBackup) throws NoSuchFieldException, IllegalAccessException {
Field f = locator.getClass().getDeclaredField("topologyArray");
f.setAccessible(true);
Pair<TransportConfiguration, TransportConfiguration>[] value = (Pair<TransportConfiguration, TransportConfiguration>[]) f.get(locator);
assertEquals(1, value.length);
Pair<TransportConfiguration, TransportConfiguration> member = value[0];
member.setA(tcLive);
member.setB(tcBackup);
f.set(locator, value);
}
// Package protected --------------------------------------------- // Package protected ---------------------------------------------
// Protected ----------------------------------------------------- // Protected -----------------------------------------------------
@ -463,7 +538,10 @@ public class JMSFailoverTest extends ActiveMQTestBase {
backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1); backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
backupConf = createBasicConfig().addAcceptorConfiguration(backupAcceptortc).addConnectorConfiguration(livetc.getName(), livetc).addConnectorConfiguration(backuptc.getName(), backuptc).setSecurityEnabled(false).setJournalType(getDefaultJournalType()).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, backupParams)).setBindingsDirectory(getBindingsDir()).setJournalMinFiles(2).setJournalDirectory(getJournalDir()).setPagingDirectory(getPageDir()).setLargeMessagesDirectory(getLargeMessagesDir()).setPersistenceEnabled(true).setHAPolicyConfiguration(sharedStore ? new SharedStoreSlavePolicyConfiguration() : new ReplicaPolicyConfiguration()).addClusterConfiguration(basicClusterConnectionConfig(backuptc.getName(), livetc.getName())); backuptc.getParams().put(TransportConstants.SERVER_ID_PROP_NAME, 1);
backupAcceptortc.getParams().put(TransportConstants.SERVER_ID_PROP_NAME, 1);
backupConf = createBasicConfig().addConnectorConfiguration(livetc.getName(), livetc).addConnectorConfiguration(backuptc.getName(), backuptc).setSecurityEnabled(false).setJournalType(getDefaultJournalType()).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, backupParams)).setBindingsDirectory(getBindingsDir()).setJournalMinFiles(2).setJournalDirectory(getJournalDir()).setPagingDirectory(getPageDir()).setLargeMessagesDirectory(getLargeMessagesDir()).setPersistenceEnabled(true).setHAPolicyConfiguration(sharedStore ? new SharedStoreSlavePolicyConfiguration() : new ReplicaPolicyConfiguration()).addClusterConfiguration(basicClusterConnectionConfig(backuptc.getName(), livetc.getName()));
backupServer = addServer(new InVMNodeManagerServer(backupConf, nodeManager)); backupServer = addServer(new InVMNodeManagerServer(backupConf, nodeManager));
@ -484,7 +562,7 @@ public class JMSFailoverTest extends ActiveMQTestBase {
liveJMSServer.setRegistry(new JndiBindingRegistry(ctx1)); liveJMSServer.setRegistry(new JndiBindingRegistry(ctx1));
liveJMSServer.getActiveMQServer().setIdentity("JMSLive"); liveJMSServer.getActiveMQServer().setIdentity("JMSLive");
log.debug("Starting life"); log.debug("Starting live");
liveJMSServer.start(); liveJMSServer.start();