This commit is contained in:
Justin Bertram 2018-04-18 08:49:01 -05:00
commit 7fa8c55f43
3 changed files with 109 additions and 67 deletions

View File

@ -64,8 +64,8 @@ import org.apache.activemq.artemis.spi.core.remoting.TopologyResponseHandler;
import org.apache.activemq.artemis.utils.ClassloadingUtil;
import org.apache.activemq.artemis.utils.ConfirmationWindowWarning;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.jboss.logging.Logger;
@ -77,7 +77,9 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
private final ClientProtocolManager clientProtocolManager;
private TransportConfiguration connectorConfig;
private final TransportConfiguration connectorConfig;
private TransportConfiguration currentConnectorConfig;
private TransportConfiguration backupConfig;
@ -175,6 +177,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
this.connectorConfig = connectorConfig;
this.currentConnectorConfig = connectorConfig;
connectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName());
checkTransportKeys(connectorFactory, connectorConfig);
@ -238,7 +242,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
getConnectionWithRetry(initialConnectAttempts);
if (connection == null) {
StringBuilder msg = new StringBuilder("Unable to connect to server using configuration ").append(connectorConfig);
StringBuilder msg = new StringBuilder("Unable to connect to server using configuration ").append(currentConnectorConfig);
if (backupConfig != null) {
msg.append(" and backup configuration ").append(backupConfig);
}
@ -249,7 +253,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
@Override
public TransportConfiguration getConnectorConfiguration() {
return connectorConfig;
return currentConnectorConfig;
}
@Override
@ -260,7 +264,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
// to create a connector just to validate if the parameters are ok.
// so this will create the instance to be used on the isEquivalent check
if (localConnector == null) {
localConnector = connectorFactory.createConnector(connectorConfig.getParams(), new DelegatingBufferHandler(), this, closeExecutor, threadPool, scheduledThreadPool, clientProtocolManager);
localConnector = connectorFactory.createConnector(currentConnectorConfig.getParams(), new DelegatingBufferHandler(), this, closeExecutor, threadPool, scheduledThreadPool, clientProtocolManager);
}
if (localConnector.isEquivalent(live.getParams()) && backUp != null && !localConnector.isEquivalent(backUp.getParams())) {
@ -274,7 +278,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
" / " +
backUp +
" but it didn't belong to " +
connectorConfig);
currentConnectorConfig);
}
}
}
@ -1068,14 +1072,15 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
try {
if (logger.isDebugEnabled()) {
logger.debug("Trying to connect with connectorFactory = " + connectorFactory +
", connectorConfig=" + connectorConfig);
", connectorConfig=" + currentConnectorConfig);
}
Connector liveConnector = createConnector(connectorFactory, connectorConfig);
Connector liveConnector = createConnector(connectorFactory, currentConnectorConfig);
if ((transportConnection = openTransportConnection(liveConnector)) != null) {
// if we can't connect the connect method will return null, hence we have to try the backup
connector = liveConnector;
return transportConnection;
} else if (backupConfig != null) {
if (logger.isDebugEnabled()) {
logger.debug("Trying backup config = " + backupConfig);
@ -1096,15 +1101,39 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
// Switching backup as live
connector = backupConnector;
connectorConfig = backupConfig;
currentConnectorConfig = backupConfig;
backupConfig = null;
connectorFactory = backupConnectorFactory;
} else {
if (logger.isDebugEnabled()) {
logger.debug("Backup is not active.");
}
return transportConnection;
}
}
if (logger.isDebugEnabled()) {
logger.debug("Backup is not active, trying original connection configuration now.");
}
if (currentConnectorConfig.equals(connectorConfig)) {
// There was no changes on current and original connectors, just return null here and let the retry happen at the first portion of this method on the next retry
return null;
}
ConnectorFactory originalConnectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName());
Connector originalConnector = createConnector(originalConnectorFactory, connectorConfig);
transportConnection = openTransportConnection(originalConnector);
if (transportConnection != null) {
logger.debug("Returning into original connector");
connector = originalConnector;
backupConfig = null;
currentConnectorConfig = connectorConfig;
return transportConnection;
} else {
logger.debug("no connection been made, returning null");
return null;
}
} catch (Exception cause) {
// Sanity catch for badly behaved remoting plugins
@ -1124,13 +1153,10 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
} catch (Throwable t) {
}
}
transportConnection = null;
connector = null;
return null;
}
return transportConnection;
}
private class DelegatingBufferHandler implements BufferHandler {
@ -1330,7 +1356,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
try {
// if it is our connector then set the live id used for failover
if (connectorPair.getA() != null && TransportConfigurationUtil.isSameHost(connectorPair.getA(), connectorConfig)) {
if (connectorPair.getA() != null && TransportConfigurationUtil.isSameHost(connectorPair.getA(), currentConnectorConfig)) {
liveNodeID = nodeID;
}

View File

@ -610,6 +610,47 @@ public class FailoverTest extends FailoverTestBase {
Assert.assertEquals(0, sf.numConnections());
}
@Test(timeout = 60000)
public void testFailBothRestartLive() throws Exception {
ServerLocator locator = getServerLocator();
locator.setReconnectAttempts(-1).setRetryInterval(10);
sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
ClientSession session = createSession(sf, true, true);
session.createQueue(FailoverTestBase.ADDRESS, RoutingType.MULTICAST, FailoverTestBase.ADDRESS, null, true);
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
sendMessagesSomeDurable(session, producer);
crash(session);
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
session.start();
receiveDurableMessages(consumer);
backupServer.getServer().fail(true);
liveServer.start();
consumer.close();
producer.close();
producer = session.createProducer(FailoverTestBase.ADDRESS);
sendMessagesSomeDurable(session, producer);
sf.close();
Assert.assertEquals(0, sf.numSessions());
Assert.assertEquals(0, sf.numConnections());
}
/**
* Basic fail-back test.
*

View File

@ -34,6 +34,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
public class LiveToLiveFailoverTest extends FailoverTest {
@ -268,125 +269,99 @@ public class LiveToLiveFailoverTest extends FailoverTest {
session = sendAndConsume(sf, false);
}
@Override
public void testTimeoutOnFailoverTransactionCommitTimeoutCommunication() throws Exception {
}
@Override
@Ignore
public void testFailBothRestartLive() throws Exception {
}
//invalid tests for Live to Live failover
//invalid tests for Live to Live failover
//all the timeout ones aren't as we don't migrate timeouts, any failback or server restart
//or replicating tests aren't either
@Override
@Ignore
public void testLiveAndBackupBackupComesBackNewFactory() throws Exception {
}
@Override
@Ignore
public void testLiveAndBackupLiveComesBackNewFactory() {
}
@Override
@Ignore
public void testTimeoutOnFailoverConsumeBlocked() throws Exception {
}
@Override
@Ignore
public void testFailoverMultipleSessionsWithConsumers() throws Exception {
//
}
@Override
@Ignore
public void testTimeoutOnFailover() throws Exception {
}
@Override
@Ignore
public void testTimeoutOnFailoverTransactionRollback() throws Exception {
}
@Override
@Ignore
public void testTimeoutOnFailoverConsume() throws Exception {
}
@Override
@Ignore
public void testTimeoutOnFailoverTransactionCommit() throws Exception {
}
@Override
@Ignore
public void testFailBack() throws Exception {
}
@Override
@Ignore
public void testFailBackLiveRestartsBackupIsGone() throws Exception {
}
@Override
@Ignore
public void testLiveAndBackupLiveComesBack() throws Exception {
}
@Override
@Ignore
public void testSimpleFailover() throws Exception {
}
@Override
@Ignore
public void testFailThenReceiveMoreMessagesAfterFailover2() throws Exception {
}
@Override
@Ignore
public void testWithoutUsingTheBackup() throws Exception {
}
//todo check to see which failing tests are valid,
@Override
@Ignore
public void testSimpleSendAfterFailoverDurableNonTemporary() throws Exception {
}
@Override
@Ignore
public void testCommitOccurredUnblockedAndResendNoDuplicates() throws Exception {
}
/*@Override
public void testCommitDidNotOccurUnblockedAndResend() throws Exception
{
}
@Override
public void testLiveAndBackupLiveComesBackNewFactory() throws Exception
{
}
@Override
public void testXAMessagesSentSoRollbackOnEnd() throws Exception
{
}
@Override
public void testLiveAndBackupBackupComesBackNewFactory() throws Exception
{
}
@Override
public void testXAMessagesSentSoRollbackOnEnd2() throws Exception
{
}
@Override
public void testXAMessagesSentSoRollbackOnCommit() throws Exception
{
}
@Override
public void testTransactedMessagesSentSoRollback() throws Exception
{
}
@Override
public void testXAMessagesSentSoRollbackOnPrepare() throws Exception
{
}
@Override
public void testNonTransactedWithZeroConsumerWindowSize() throws Exception
{
}*/
}