This commit is contained in:
Clebert Suconic 2020-07-08 11:27:36 -04:00
commit 3396ac355a
5 changed files with 153 additions and 18 deletions

View File

@ -163,6 +163,19 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
final ScheduledExecutorService scheduledThreadPool,
final List<Interceptor> incomingInterceptors,
final List<Interceptor> outgoingInterceptors) {
this(serverLocator, new Pair<>(connectorConfig, null),
locatorConfig, reconnectAttempts, threadPool,
scheduledThreadPool, incomingInterceptors, outgoingInterceptors);
}
ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
final Pair<TransportConfiguration, TransportConfiguration> connectorConfig,
final ServerLocatorConfig locatorConfig,
final int reconnectAttempts,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool,
final List<Interceptor> incomingInterceptors,
final List<Interceptor> outgoingInterceptors) {
createTrace = new Exception();
this.serverLocator = serverLocator;
@ -171,11 +184,11 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
this.clientProtocolManager.setSessionFactory(this);
this.currentConnectorConfig = connectorConfig;
this.currentConnectorConfig = connectorConfig.getA();
connectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName());
connectorFactory = instantiateConnectorFactory(connectorConfig.getA().getFactoryClassName());
checkTransportKeys(connectorFactory, connectorConfig);
checkTransportKeys(connectorFactory, connectorConfig.getA());
this.callTimeout = locatorConfig.callTimeout;
@ -216,6 +229,10 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
confirmationWindowWarning = new ConfirmationWindowWarning(serverLocator.getConfirmationWindowSize() < 0);
connectionReadyForWrites = true;
if (connectorConfig.getB() != null) {
this.backupConfig = connectorConfig.getB();
}
}
@Override

View File

@ -165,8 +165,6 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
private TransportConfiguration clusterTransportConfiguration;
private boolean useTopologyForLoadBalancing;
/** For tests only */
public DiscoveryGroup getDiscoveryGroup() {
return discoveryGroup;
@ -422,7 +420,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
clusterTransportConfiguration = locator.clusterTransportConfiguration;
}
private TransportConfiguration selectConnector() {
private synchronized Pair<TransportConfiguration, TransportConfiguration> selectConnector(boolean useInitConnector) {
Pair<TransportConfiguration, TransportConfiguration>[] usedTopology;
flushTopology();
@ -432,14 +430,14 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
}
synchronized (this) {
if (usedTopology != null && config.useTopologyForLoadBalancing) {
if (usedTopology != null && config.useTopologyForLoadBalancing && !useInitConnector) {
if (logger.isTraceEnabled()) {
logger.trace("Selecting connector from topology.");
}
int pos = loadBalancingPolicy.select(usedTopology.length);
Pair<TransportConfiguration, TransportConfiguration> pair = usedTopology[pos];
return pair.getA();
return pair;
} else {
if (logger.isTraceEnabled()) {
logger.trace("Selecting connector from initial connectors.");
@ -447,7 +445,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
int pos = loadBalancingPolicy.select(initialConnectors.length);
return initialConnectors[pos];
return new Pair(initialConnectors[pos], null);
}
}
}
@ -658,10 +656,19 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
synchronized (this) {
boolean retry = true;
int attempts = 0;
boolean topologyArrayTried = !config.useTopologyForLoadBalancing || topologyArray == null || topologyArray.length == 0;
boolean staticTried = false;
boolean shouldTryStatic = !config.useTopologyForLoadBalancing || !receivedTopology || topologyArray == null || topologyArray.length == 0;
while (retry && !isClosed()) {
retry = false;
TransportConfiguration tc = selectConnector();
/*
* The logic is: If receivedTopology is false, try static first.
* if receivedTopology is true, try topologyArray first
*/
Pair<TransportConfiguration, TransportConfiguration> tc = selectConnector(shouldTryStatic);
if (tc == null) {
throw ActiveMQClientMessageBundle.BUNDLE.noTCForSessionFactory();
}
@ -682,12 +689,32 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
try {
if (e.getType() == ActiveMQExceptionType.NOT_CONNECTED) {
attempts++;
int connectorsSize = getConnectorsSize();
int maxAttempts = config.initialConnectAttempts == 0 ? 1 : config.initialConnectAttempts;
if (config.initialConnectAttempts >= 0 && attempts >= maxAttempts * connectorsSize) {
if (shouldTryStatic) {
//we know static is used
if (config.initialConnectAttempts >= 0 && attempts >= maxAttempts * this.getNumInitialConnectors()) {
if (topologyArrayTried) {
//stop retry and throw exception
throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers();
} else {
//lets try topologyArray
staticTried = true;
shouldTryStatic = false;
attempts = 0;
}
}
} else {
//we know topologyArray is used
if (config.initialConnectAttempts >= 0 && attempts >= maxAttempts * getConnectorsSize()) {
if (staticTried) {
throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers();
} else {
topologyArrayTried = true;
shouldTryStatic = true;
attempts = 0;
}
}
}
if (factory.waitForRetry(config.retryInterval)) {
throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers();
@ -1414,7 +1441,6 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
if (topology.isEmpty()) {
// Resetting the topology to its original condition as it was brand new
receivedTopology = false;
topologyArray = null;
} else {
updateArraysAndPairs(eventTime);
@ -1492,6 +1518,12 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
synchronized (topologyArrayGuard) {
Collection<TopologyMemberImpl> membersCopy = topology.getMembers();
if (membersCopy.size() == 0) {
//it could happen when live is down, in that case we keeps the old copy
//and don't update
return;
}
Pair<TransportConfiguration, TransportConfiguration>[] topologyArrayLocal = (Pair<TransportConfiguration, TransportConfiguration>[]) Array.newInstance(Pair.class, membersCopy.size());
int count = 0;
@ -1557,7 +1589,6 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
if (!clusterConnection && isEmpty) {
receivedTopology = false;
topologyArray = null;
}
}

View File

@ -2100,4 +2100,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224104, value = "Error starting the Acceptor {0} {1}", format = Message.Format.MESSAGE_FORMAT)
void errorStartingAcceptor(String name, Object configuration);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 224105, value = "Connecting to cluster failed")
void failedConnectingToCluster(@Cause Exception e);
}

View File

@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQAlreadyReplicatingException;
import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.Pair;
@ -330,6 +331,10 @@ public class SharedNothingLiveActivation extends LiveActivation {
// Just try connecting
listener.latch.await(5, TimeUnit.SECONDS);
} catch (Exception notConnected) {
if (!(notConnected instanceof ActiveMQException) || ActiveMQExceptionType.INTERNAL_ERROR.equals(((ActiveMQException) notConnected).getType())) {
// report all exceptions that aren't ActiveMQException and all INTERNAL_ERRORs
ActiveMQServerLogger.LOGGER.failedConnectingToCluster(notConnected);
}
return false;
}

View File

@ -26,6 +26,8 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.lang.reflect.Field;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@ -34,6 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
@ -41,6 +44,9 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
import org.apache.activemq.artemis.core.client.impl.Topology;
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
@ -254,6 +260,7 @@ public class JMSFailoverTest extends ActiveMQTestBase {
jbcfLive.setBlockOnDurableSend(true);
ActiveMQConnectionFactory jbcfBackup = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY, backupParams));
jbcfBackup.setBlockOnNonDurableSend(true);
jbcfBackup.setBlockOnDurableSend(true);
jbcfBackup.setInitialConnectAttempts(-1);
@ -437,6 +444,74 @@ public class JMSFailoverTest extends ActiveMQTestBase {
}
@Test
public void testCreateNewConnectionAfterFailover() throws Exception {
ActiveMQConnectionFactory jbcf = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.CF, livetc);
jbcf.setInitialConnectAttempts(5);
jbcf.setRetryInterval(1000);
jbcf.setReconnectAttempts(-1);
Connection conn1 = null, conn2 = null, conn3 = null;
try {
conn1 = JMSUtil.createConnectionAndWaitForTopology(jbcf, 2, 5);
conn2 = JMSUtil.createConnectionAndWaitForTopology(jbcf, 2, 5);
Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
ClientSession coreSession1 = ((ActiveMQSession)sess1).getCoreSession();
ClientSession coreSession2 = ((ActiveMQSession)sess2).getCoreSession();
Topology fullTopology = jbcf.getServerLocator().getTopology();
Collection<TopologyMemberImpl> members = fullTopology.getMembers();
assertEquals(1, members.size());
TopologyMemberImpl member = members.iterator().next();
TransportConfiguration tcLive = member.getLive();
TransportConfiguration tcBackup = member.getBackup();
System.out.println("live tc: " + tcLive);
System.out.println("Backup tc: " + tcBackup);
JMSUtil.crash(liveServer, coreSession1, coreSession2);
waitForServerToStart(backupServer);
//now pretending that the live down event hasn't been propagated to client
simulateLiveDownHasNotReachClient((ServerLocatorImpl) jbcf.getServerLocator(), tcLive, tcBackup);
//now create a new connection after live is down
try {
conn3 = jbcf.createConnection();
} catch (Exception e) {
fail("The new connection should be established successfully after failover");
}
} finally {
if (conn1 != null) {
conn1.close();
}
if (conn2 != null) {
conn2.close();
}
if (conn3 != null) {
conn3.close();
}
}
}
private void simulateLiveDownHasNotReachClient(ServerLocatorImpl locator, TransportConfiguration tcLive, TransportConfiguration tcBackup) throws NoSuchFieldException, IllegalAccessException {
Field f = locator.getClass().getDeclaredField("topologyArray");
f.setAccessible(true);
Pair<TransportConfiguration, TransportConfiguration>[] value = (Pair<TransportConfiguration, TransportConfiguration>[]) f.get(locator);
assertEquals(1, value.length);
Pair<TransportConfiguration, TransportConfiguration> member = value[0];
member.setA(tcLive);
member.setB(tcBackup);
f.set(locator, value);
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@ -463,7 +538,10 @@ public class JMSFailoverTest extends ActiveMQTestBase {
backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
backupConf = createBasicConfig().addAcceptorConfiguration(backupAcceptortc).addConnectorConfiguration(livetc.getName(), livetc).addConnectorConfiguration(backuptc.getName(), backuptc).setSecurityEnabled(false).setJournalType(getDefaultJournalType()).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, backupParams)).setBindingsDirectory(getBindingsDir()).setJournalMinFiles(2).setJournalDirectory(getJournalDir()).setPagingDirectory(getPageDir()).setLargeMessagesDirectory(getLargeMessagesDir()).setPersistenceEnabled(true).setHAPolicyConfiguration(sharedStore ? new SharedStoreSlavePolicyConfiguration() : new ReplicaPolicyConfiguration()).addClusterConfiguration(basicClusterConnectionConfig(backuptc.getName(), livetc.getName()));
backuptc.getParams().put(TransportConstants.SERVER_ID_PROP_NAME, 1);
backupAcceptortc.getParams().put(TransportConstants.SERVER_ID_PROP_NAME, 1);
backupConf = createBasicConfig().addConnectorConfiguration(livetc.getName(), livetc).addConnectorConfiguration(backuptc.getName(), backuptc).setSecurityEnabled(false).setJournalType(getDefaultJournalType()).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, backupParams)).setBindingsDirectory(getBindingsDir()).setJournalMinFiles(2).setJournalDirectory(getJournalDir()).setPagingDirectory(getPageDir()).setLargeMessagesDirectory(getLargeMessagesDir()).setPersistenceEnabled(true).setHAPolicyConfiguration(sharedStore ? new SharedStoreSlavePolicyConfiguration() : new ReplicaPolicyConfiguration()).addClusterConfiguration(basicClusterConnectionConfig(backuptc.getName(), livetc.getName()));
backupServer = addServer(new InVMNodeManagerServer(backupConf, nodeManager));
@ -484,7 +562,7 @@ public class JMSFailoverTest extends ActiveMQTestBase {
liveJMSServer.setRegistry(new JndiBindingRegistry(ctx1));
liveJMSServer.getActiveMQServer().setIdentity("JMSLive");
log.debug("Starting life");
log.debug("Starting live");
liveJMSServer.start();