ARTEMIS-1995 Client fail over fails when live shut down too soon
In a live-backup scenario, if the live is restarted and shutdown too soon, the client have a chance to fail on failover because it's internal topology is inconsistent with the final status. The client keeps connecting to live already shut down, never trying to connect to the backup. It's a porting from HORNETQ-1572.
This commit is contained in:
parent
f92c931fa1
commit
983232d273
|
@ -77,11 +77,11 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
|||
|
||||
private final ClientProtocolManager clientProtocolManager;
|
||||
|
||||
private final TransportConfiguration connectorConfig;
|
||||
private TransportConfiguration connectorConfig;
|
||||
|
||||
private TransportConfiguration currentConnectorConfig;
|
||||
|
||||
private TransportConfiguration backupConfig;
|
||||
private volatile TransportConfiguration backupConfig;
|
||||
|
||||
private ConnectorFactory connectorFactory;
|
||||
|
||||
|
@ -175,8 +175,6 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
|||
|
||||
this.clientProtocolManager.setSessionFactory(this);
|
||||
|
||||
this.connectorConfig = connectorConfig;
|
||||
|
||||
this.currentConnectorConfig = connectorConfig;
|
||||
|
||||
connectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName());
|
||||
|
@ -881,6 +879,10 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
|||
}
|
||||
}
|
||||
|
||||
//The order of connector configs to try to get a connection:
|
||||
//currentConnectorConfig, backupConfig and then lastConnectorConfig.
|
||||
//On each successful connect, the current and last will be
|
||||
//updated properly.
|
||||
@Override
|
||||
public RemotingConnection getConnection() {
|
||||
if (closed)
|
||||
|
@ -1101,8 +1103,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
|||
|
||||
// Switching backup as live
|
||||
connector = backupConnector;
|
||||
connectorConfig = currentConnectorConfig;
|
||||
currentConnectorConfig = backupConfig;
|
||||
backupConfig = null;
|
||||
connectorFactory = backupConnectorFactory;
|
||||
return transportConnection;
|
||||
}
|
||||
|
@ -1113,23 +1115,24 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
|||
}
|
||||
|
||||
|
||||
if (currentConnectorConfig.equals(connectorConfig)) {
|
||||
if (currentConnectorConfig.equals(connectorConfig) || connectorConfig == null) {
|
||||
|
||||
// There was no changes on current and original connectors, just return null here and let the retry happen at the first portion of this method on the next retry
|
||||
return null;
|
||||
}
|
||||
|
||||
ConnectorFactory originalConnectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName());
|
||||
ConnectorFactory lastConnectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName());
|
||||
|
||||
Connector originalConnector = createConnector(originalConnectorFactory, connectorConfig);
|
||||
Connector lastConnector = createConnector(lastConnectorFactory, connectorConfig);
|
||||
|
||||
transportConnection = openTransportConnection(originalConnector);
|
||||
transportConnection = openTransportConnection(lastConnector);
|
||||
|
||||
if (transportConnection != null) {
|
||||
logger.debug("Returning into original connector");
|
||||
connector = originalConnector;
|
||||
backupConfig = null;
|
||||
connector = lastConnector;
|
||||
TransportConfiguration temp = currentConnectorConfig;
|
||||
currentConnectorConfig = connectorConfig;
|
||||
connectorConfig = temp;
|
||||
return transportConnection;
|
||||
} else {
|
||||
logger.debug("no connection been made, returning null");
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.cluster.failover;
|
|||
import javax.transaction.xa.XAException;
|
||||
import javax.transaction.xa.XAResource;
|
||||
import javax.transaction.xa.Xid;
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
@ -45,6 +46,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
|||
import org.apache.activemq.artemis.api.core.client.MessageHandler;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
|
||||
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
|
||||
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ha.BackupPolicy;
|
||||
|
@ -651,6 +653,110 @@ public class FailoverTest extends FailoverTestBase {
|
|||
Assert.assertEquals(0, sf.numConnections());
|
||||
}
|
||||
|
||||
@Test(timeout = 10000)
|
||||
public void testFailLiveTooSoon() throws Exception {
|
||||
ServerLocator locator = getServerLocator();
|
||||
|
||||
locator.setReconnectAttempts(-1);
|
||||
locator.setRetryInterval(10);
|
||||
|
||||
sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
|
||||
|
||||
waitForBackupConfig(sf);
|
||||
|
||||
TransportConfiguration initialLive = getFieldFromSF(sf, "currentConnectorConfig");
|
||||
TransportConfiguration initialBackup = getFieldFromSF(sf, "backupConfig");
|
||||
|
||||
System.out.println("initlive: " + initialLive);
|
||||
System.out.println("initback: " + initialBackup);
|
||||
|
||||
TransportConfiguration last = getFieldFromSF(sf, "connectorConfig");
|
||||
TransportConfiguration current = getFieldFromSF(sf, "currentConnectorConfig");
|
||||
|
||||
System.out.println("now last: " + last);
|
||||
System.out.println("now current: " + current);
|
||||
assertTrue(current.equals(initialLive));
|
||||
|
||||
ClientSession session = createSession(sf, true, true);
|
||||
|
||||
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, true);
|
||||
|
||||
//crash 1
|
||||
crash();
|
||||
|
||||
//make sure failover is ok
|
||||
createSession(sf, true, true).close();
|
||||
|
||||
last = getFieldFromSF(sf, "connectorConfig");
|
||||
current = getFieldFromSF(sf, "currentConnectorConfig");
|
||||
|
||||
System.out.println("now after live crashed last: " + last);
|
||||
System.out.println("now current: " + current);
|
||||
|
||||
assertTrue(current.equals(initialBackup));
|
||||
|
||||
//fail back
|
||||
beforeRestart(liveServer);
|
||||
adaptLiveConfigForReplicatedFailBack(liveServer);
|
||||
liveServer.getServer().start();
|
||||
|
||||
Assert.assertTrue("live initialized...", liveServer.getServer().waitForActivation(40, TimeUnit.SECONDS));
|
||||
int i = 0;
|
||||
while (!backupServer.isStarted() && i++ < 100) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
liveServer.getServer().waitForActivation(5, TimeUnit.SECONDS);
|
||||
Assert.assertTrue(backupServer.isStarted());
|
||||
|
||||
//make sure failover is ok
|
||||
createSession(sf, true, true).close();
|
||||
|
||||
last = getFieldFromSF(sf, "connectorConfig");
|
||||
current = getFieldFromSF(sf, "currentConnectorConfig");
|
||||
|
||||
System.out.println("now after live back again last: " + last);
|
||||
System.out.println("now current: " + current);
|
||||
|
||||
//cannot use equals here because the config's name (uuid) changes
|
||||
//after failover
|
||||
assertTrue(current.isSameParams(initialLive));
|
||||
|
||||
//now manually corrupt the backup in sf
|
||||
setSFFieldValue(sf, "backupConfig", null);
|
||||
|
||||
//crash 2
|
||||
crash();
|
||||
|
||||
beforeRestart(backupServer);
|
||||
createSession(sf, true, true).close();
|
||||
|
||||
sf.close();
|
||||
Assert.assertEquals(0, sf.numSessions());
|
||||
Assert.assertEquals(0, sf.numConnections());
|
||||
}
|
||||
|
||||
protected void waitForBackupConfig(ClientSessionFactoryInternal sf) throws NoSuchFieldException, IllegalAccessException, InterruptedException {
|
||||
TransportConfiguration initialBackup = getFieldFromSF(sf, "backupConfig");
|
||||
int cnt = 50;
|
||||
while (initialBackup == null && cnt > 0) {
|
||||
cnt--;
|
||||
Thread.sleep(200);
|
||||
initialBackup = getFieldFromSF(sf, "backupConfig");
|
||||
}
|
||||
}
|
||||
|
||||
protected void setSFFieldValue(ClientSessionFactoryInternal sf, String tcName, Object value) throws NoSuchFieldException, IllegalAccessException {
|
||||
Field tcField = ClientSessionFactoryImpl.class.getDeclaredField(tcName);
|
||||
tcField.setAccessible(true);
|
||||
tcField.set(sf, value);
|
||||
}
|
||||
|
||||
protected TransportConfiguration getFieldFromSF(ClientSessionFactoryInternal sf, String tcName) throws NoSuchFieldException, IllegalAccessException {
|
||||
Field tcField = ClientSessionFactoryImpl.class.getDeclaredField(tcName);
|
||||
tcField.setAccessible(true);
|
||||
return (TransportConfiguration) tcField.get(sf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Basic fail-back test.
|
||||
*
|
||||
|
|
|
@ -362,6 +362,11 @@ public class LiveToLiveFailoverTest extends FailoverTest {
|
|||
@Ignore
|
||||
public void testCommitOccurredUnblockedAndResendNoDuplicates() throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
@Ignore
|
||||
public void testFailLiveTooSoon() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue