diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java index e8ac8f8d46..5c972e3ef1 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java @@ -64,8 +64,8 @@ import org.apache.activemq.artemis.spi.core.remoting.TopologyResponseHandler; import org.apache.activemq.artemis.utils.ClassloadingUtil; import org.apache.activemq.artemis.utils.ConfirmationWindowWarning; import org.apache.activemq.artemis.utils.ExecutorFactory; -import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.UUIDGenerator; +import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; import org.jboss.logging.Logger; @@ -77,7 +77,9 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C private final ClientProtocolManager clientProtocolManager; - private TransportConfiguration connectorConfig; + private final TransportConfiguration connectorConfig; + + private TransportConfiguration currentConnectorConfig; private TransportConfiguration backupConfig; @@ -175,6 +177,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C this.connectorConfig = connectorConfig; + this.currentConnectorConfig = connectorConfig; + connectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName()); checkTransportKeys(connectorFactory, connectorConfig); @@ -238,7 +242,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C getConnectionWithRetry(initialConnectAttempts); if (connection == null) { - StringBuilder msg = new StringBuilder("Unable to connect to server using configuration ").append(connectorConfig); + StringBuilder msg = new StringBuilder("Unable to connect to server using configuration ").append(currentConnectorConfig); if (backupConfig != null) { msg.append(" and backup configuration ").append(backupConfig); } @@ -249,7 +253,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C @Override public TransportConfiguration getConnectorConfiguration() { - return connectorConfig; + return currentConnectorConfig; } @Override @@ -260,7 +264,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C // to create a connector just to validate if the parameters are ok. // so this will create the instance to be used on the isEquivalent check if (localConnector == null) { - localConnector = connectorFactory.createConnector(connectorConfig.getParams(), new DelegatingBufferHandler(), this, closeExecutor, threadPool, scheduledThreadPool, clientProtocolManager); + localConnector = connectorFactory.createConnector(currentConnectorConfig.getParams(), new DelegatingBufferHandler(), this, closeExecutor, threadPool, scheduledThreadPool, clientProtocolManager); } if (localConnector.isEquivalent(live.getParams()) && backUp != null && !localConnector.isEquivalent(backUp.getParams())) { @@ -274,7 +278,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C " / " + backUp + " but it didn't belong to " + - connectorConfig); + currentConnectorConfig); } } } @@ -1068,14 +1072,15 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C try { if (logger.isDebugEnabled()) { logger.debug("Trying to connect with connectorFactory = " + connectorFactory + - ", connectorConfig=" + connectorConfig); + ", connectorConfig=" + currentConnectorConfig); } - Connector liveConnector = createConnector(connectorFactory, connectorConfig); + Connector liveConnector = createConnector(connectorFactory, currentConnectorConfig); if ((transportConnection = openTransportConnection(liveConnector)) != null) { // if we can't connect the connect method will return null, hence we have to try the backup connector = liveConnector; + return transportConnection; } else if (backupConfig != null) { if (logger.isDebugEnabled()) { logger.debug("Trying backup config = " + backupConfig); @@ -1096,15 +1101,39 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C // Switching backup as live connector = backupConnector; - connectorConfig = backupConfig; + currentConnectorConfig = backupConfig; backupConfig = null; connectorFactory = backupConnectorFactory; - } else { - if (logger.isDebugEnabled()) { - logger.debug("Backup is not active."); - } + return transportConnection; } + } + if (logger.isDebugEnabled()) { + logger.debug("Backup is not active, trying original connection configuration now."); + } + + + if (currentConnectorConfig.equals(connectorConfig)) { + + // There was no changes on current and original connectors, just return null here and let the retry happen at the first portion of this method on the next retry + return null; + } + + ConnectorFactory originalConnectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName()); + + Connector originalConnector = createConnector(originalConnectorFactory, connectorConfig); + + transportConnection = openTransportConnection(originalConnector); + + if (transportConnection != null) { + logger.debug("Returning into original connector"); + connector = originalConnector; + backupConfig = null; + currentConnectorConfig = connectorConfig; + return transportConnection; + } else { + logger.debug("no connection been made, returning null"); + return null; } } catch (Exception cause) { // Sanity catch for badly behaved remoting plugins @@ -1124,13 +1153,10 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C } catch (Throwable t) { } } - - transportConnection = null; - connector = null; + return null; } - return transportConnection; } private class DelegatingBufferHandler implements BufferHandler { @@ -1330,7 +1356,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C try { // if it is our connector then set the live id used for failover - if (connectorPair.getA() != null && TransportConfigurationUtil.isSameHost(connectorPair.getA(), connectorConfig)) { + if (connectorPair.getA() != null && TransportConfigurationUtil.isSameHost(connectorPair.getA(), currentConnectorConfig)) { liveNodeID = nodeID; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java index c6ec6dd877..20f5fdaa89 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java @@ -610,6 +610,47 @@ public class FailoverTest extends FailoverTestBase { Assert.assertEquals(0, sf.numConnections()); } + @Test(timeout = 60000) + public void testFailBothRestartLive() throws Exception { + ServerLocator locator = getServerLocator(); + + locator.setReconnectAttempts(-1).setRetryInterval(10); + + sf = (ClientSessionFactoryInternal)locator.createSessionFactory(); + + ClientSession session = createSession(sf, true, true); + + session.createQueue(FailoverTestBase.ADDRESS, RoutingType.MULTICAST, FailoverTestBase.ADDRESS, null, true); + + ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS); + + sendMessagesSomeDurable(session, producer); + + crash(session); + + ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS); + + session.start(); + + receiveDurableMessages(consumer); + + backupServer.getServer().fail(true); + + liveServer.start(); + + consumer.close(); + + producer.close(); + + producer = session.createProducer(FailoverTestBase.ADDRESS); + + sendMessagesSomeDurable(session, producer); + + sf.close(); + Assert.assertEquals(0, sf.numSessions()); + Assert.assertEquals(0, sf.numConnections()); + } + /** * Basic fail-back test. * diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveToLiveFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveToLiveFailoverTest.java index 4e6a70a7e7..e65602f00c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveToLiveFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveToLiveFailoverTest.java @@ -34,6 +34,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.impl.InVMNodeManager; import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; public class LiveToLiveFailoverTest extends FailoverTest { @@ -268,125 +269,99 @@ public class LiveToLiveFailoverTest extends FailoverTest { session = sendAndConsume(sf, false); } - @Override public void testTimeoutOnFailoverTransactionCommitTimeoutCommunication() throws Exception { } + @Override + @Ignore + public void testFailBothRestartLive() throws Exception { + } - //invalid tests for Live to Live failover + //invalid tests for Live to Live failover //all the timeout ones aren't as we don't migrate timeouts, any failback or server restart //or replicating tests aren't either @Override + @Ignore public void testLiveAndBackupBackupComesBackNewFactory() throws Exception { } @Override + @Ignore public void testLiveAndBackupLiveComesBackNewFactory() { } @Override + @Ignore public void testTimeoutOnFailoverConsumeBlocked() throws Exception { } @Override + @Ignore public void testFailoverMultipleSessionsWithConsumers() throws Exception { // } @Override + @Ignore public void testTimeoutOnFailover() throws Exception { } @Override + @Ignore public void testTimeoutOnFailoverTransactionRollback() throws Exception { } @Override + @Ignore public void testTimeoutOnFailoverConsume() throws Exception { } @Override + @Ignore public void testTimeoutOnFailoverTransactionCommit() throws Exception { } @Override + @Ignore public void testFailBack() throws Exception { } @Override + @Ignore public void testFailBackLiveRestartsBackupIsGone() throws Exception { } @Override + @Ignore public void testLiveAndBackupLiveComesBack() throws Exception { } @Override + @Ignore public void testSimpleFailover() throws Exception { } @Override + @Ignore public void testFailThenReceiveMoreMessagesAfterFailover2() throws Exception { } @Override + @Ignore public void testWithoutUsingTheBackup() throws Exception { } //todo check to see which failing tests are valid, @Override + @Ignore public void testSimpleSendAfterFailoverDurableNonTemporary() throws Exception { } @Override + @Ignore public void testCommitOccurredUnblockedAndResendNoDuplicates() throws Exception { } - - /*@Override - public void testCommitDidNotOccurUnblockedAndResend() throws Exception - { - } - - - - @Override - public void testLiveAndBackupLiveComesBackNewFactory() throws Exception - { - } - - @Override - public void testXAMessagesSentSoRollbackOnEnd() throws Exception - { - } - - @Override - public void testLiveAndBackupBackupComesBackNewFactory() throws Exception - { - } - - @Override - public void testXAMessagesSentSoRollbackOnEnd2() throws Exception - { - } - - @Override - public void testXAMessagesSentSoRollbackOnCommit() throws Exception - { - } - - @Override - public void testTransactedMessagesSentSoRollback() throws Exception - { - } - - @Override - public void testXAMessagesSentSoRollbackOnPrepare() throws Exception - { - } - - @Override - public void testNonTransactedWithZeroConsumerWindowSize() throws Exception - { - }*/ } + +