This commit is contained in:
Clebert Suconic 2018-07-31 14:21:45 -04:00
commit 99bc916fdd
3 changed files with 125 additions and 11 deletions

View File

@ -77,11 +77,11 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
private final ClientProtocolManager clientProtocolManager;
private final TransportConfiguration connectorConfig;
private TransportConfiguration connectorConfig;
private TransportConfiguration currentConnectorConfig;
private TransportConfiguration backupConfig;
private volatile TransportConfiguration backupConfig;
private ConnectorFactory connectorFactory;
@ -175,8 +175,6 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
this.clientProtocolManager.setSessionFactory(this);
this.connectorConfig = connectorConfig;
this.currentConnectorConfig = connectorConfig;
connectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName());
@ -881,6 +879,10 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
}
}
//The order of connector configs to try to get a connection:
//currentConnectorConfig, backupConfig and then lastConnectorConfig.
//On each successful connect, the current and last will be
//updated properly.
@Override
public RemotingConnection getConnection() {
if (closed)
@ -1101,8 +1103,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
// Switching backup as live
connector = backupConnector;
connectorConfig = currentConnectorConfig;
currentConnectorConfig = backupConfig;
backupConfig = null;
connectorFactory = backupConnectorFactory;
return transportConnection;
}
@ -1113,23 +1115,24 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
}
if (currentConnectorConfig.equals(connectorConfig)) {
if (currentConnectorConfig.equals(connectorConfig) || connectorConfig == null) {
// There was no changes on current and original connectors, just return null here and let the retry happen at the first portion of this method on the next retry
return null;
}
ConnectorFactory originalConnectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName());
ConnectorFactory lastConnectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName());
Connector originalConnector = createConnector(originalConnectorFactory, connectorConfig);
Connector lastConnector = createConnector(lastConnectorFactory, connectorConfig);
transportConnection = openTransportConnection(originalConnector);
transportConnection = openTransportConnection(lastConnector);
if (transportConnection != null) {
logger.debug("Returning into original connector");
connector = originalConnector;
backupConfig = null;
connector = lastConnector;
TransportConfiguration temp = currentConnectorConfig;
currentConnectorConfig = connectorConfig;
connectorConfig = temp;
return transportConnection;
} else {
logger.debug("no connection been made, returning null");

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.cluster.failover;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@ -45,6 +46,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.cluster.ha.BackupPolicy;
@ -651,6 +653,110 @@ public class FailoverTest extends FailoverTestBase {
Assert.assertEquals(0, sf.numConnections());
}
@Test(timeout = 10000)
public void testFailLiveTooSoon() throws Exception {
ServerLocator locator = getServerLocator();
locator.setReconnectAttempts(-1);
locator.setRetryInterval(10);
sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
waitForBackupConfig(sf);
TransportConfiguration initialLive = getFieldFromSF(sf, "currentConnectorConfig");
TransportConfiguration initialBackup = getFieldFromSF(sf, "backupConfig");
System.out.println("initlive: " + initialLive);
System.out.println("initback: " + initialBackup);
TransportConfiguration last = getFieldFromSF(sf, "connectorConfig");
TransportConfiguration current = getFieldFromSF(sf, "currentConnectorConfig");
System.out.println("now last: " + last);
System.out.println("now current: " + current);
assertTrue(current.equals(initialLive));
ClientSession session = createSession(sf, true, true);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, true);
//crash 1
crash();
//make sure failover is ok
createSession(sf, true, true).close();
last = getFieldFromSF(sf, "connectorConfig");
current = getFieldFromSF(sf, "currentConnectorConfig");
System.out.println("now after live crashed last: " + last);
System.out.println("now current: " + current);
assertTrue(current.equals(initialBackup));
//fail back
beforeRestart(liveServer);
adaptLiveConfigForReplicatedFailBack(liveServer);
liveServer.getServer().start();
Assert.assertTrue("live initialized...", liveServer.getServer().waitForActivation(40, TimeUnit.SECONDS));
int i = 0;
while (!backupServer.isStarted() && i++ < 100) {
Thread.sleep(100);
}
liveServer.getServer().waitForActivation(5, TimeUnit.SECONDS);
Assert.assertTrue(backupServer.isStarted());
//make sure failover is ok
createSession(sf, true, true).close();
last = getFieldFromSF(sf, "connectorConfig");
current = getFieldFromSF(sf, "currentConnectorConfig");
System.out.println("now after live back again last: " + last);
System.out.println("now current: " + current);
//cannot use equals here because the config's name (uuid) changes
//after failover
assertTrue(current.isSameParams(initialLive));
//now manually corrupt the backup in sf
setSFFieldValue(sf, "backupConfig", null);
//crash 2
crash();
beforeRestart(backupServer);
createSession(sf, true, true).close();
sf.close();
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
}
protected void waitForBackupConfig(ClientSessionFactoryInternal sf) throws NoSuchFieldException, IllegalAccessException, InterruptedException {
TransportConfiguration initialBackup = getFieldFromSF(sf, "backupConfig");
int cnt = 50;
while (initialBackup == null && cnt > 0) {
cnt--;
Thread.sleep(200);
initialBackup = getFieldFromSF(sf, "backupConfig");
}
}
protected void setSFFieldValue(ClientSessionFactoryInternal sf, String tcName, Object value) throws NoSuchFieldException, IllegalAccessException {
Field tcField = ClientSessionFactoryImpl.class.getDeclaredField(tcName);
tcField.setAccessible(true);
tcField.set(sf, value);
}
protected TransportConfiguration getFieldFromSF(ClientSessionFactoryInternal sf, String tcName) throws NoSuchFieldException, IllegalAccessException {
Field tcField = ClientSessionFactoryImpl.class.getDeclaredField(tcName);
tcField.setAccessible(true);
return (TransportConfiguration) tcField.get(sf);
}
/**
* Basic fail-back test.
*

View File

@ -362,6 +362,11 @@ public class LiveToLiveFailoverTest extends FailoverTest {
@Ignore
public void testCommitOccurredUnblockedAndResendNoDuplicates() throws Exception {
}
@Override
@Ignore
public void testFailLiveTooSoon() throws Exception {
}
}