From 6361079aa0533f31ad8b1b8e390fc70fda5ad217 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Thu, 6 Sep 2018 18:05:06 -0400 Subject: [PATCH] ARTEMIS-2084: Failover will not work with network cable disconnect on core protocol --- .../client/impl/ClientSessionFactoryImpl.java | 12 +++++++- .../remoting/impl/netty/NettyConnector.java | 8 +++++ .../artemis/tests/util/ActiveMQTestBase.java | 8 +++++ .../cluster/failover/FailoverTestBase.java | 4 +-- .../failover/NetworkFailureFailoverTest.java | 30 ++++++++++++++----- .../ra/ActiveMQRAClusteredTestBase.java | 3 +- 6 files changed, 52 insertions(+), 13 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java index 4723c88e0e..daac8f3498 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java @@ -51,6 +51,7 @@ import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext; import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.remoting.impl.TransportConfigurationUtil; +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector; import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; @@ -1060,7 +1061,16 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C } protected Connector createConnector(ConnectorFactory connectorFactory, TransportConfiguration configuration) { - return connectorFactory.createConnector(configuration.getParams(), new DelegatingBufferHandler(), this, closeExecutor, threadPool, scheduledThreadPool, clientProtocolManager); + Connector connector = connectorFactory.createConnector(configuration.getParams(), new DelegatingBufferHandler(), this, closeExecutor, threadPool, scheduledThreadPool, clientProtocolManager); + if (connector instanceof NettyConnector) { + NettyConnector nettyConnector = (NettyConnector) connector; + if (nettyConnector.getConnectTimeoutMillis() < 0) { + nettyConnector.setConnectTimeoutMillis((int)serverLocator.getConnectionTTL()); + } + + } + + return connector; } private void checkTransportKeys(final ConnectorFactory factory, final TransportConfiguration tc) { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java index 284d0b9053..2ef5fed2db 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java @@ -843,6 +843,14 @@ public class NettyConnector extends AbstractConnector { // Public -------------------------------------------------------- + public int getConnectTimeoutMillis() { + return connectTimeoutMillis; + } + + public void setConnectTimeoutMillis(int connectTimeoutMillis) { + this.connectTimeoutMillis = connectTimeoutMillis; + } + // Package protected --------------------------------------------- // Protected ----------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index 2cd5d564cb..9075ac6581 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -552,6 +552,14 @@ public abstract class ActiveMQTestBase extends Assert { return params; } + + /** This exists as an extension point for tests, so tests can replace it */ + protected ClusterConnectionConfiguration createBasicClusterConfig(String connectorName, + String... connectors) { + return basicClusterConnectionConfig(connectorName, connectors); + } + + protected static final ClusterConnectionConfiguration basicClusterConnectionConfig(String connectorName, String... connectors) { ArrayList connectors0 = new ArrayList<>(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java index 70e625a857..2a75f94eaf 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java @@ -165,11 +165,11 @@ public abstract class FailoverTestBase extends ActiveMQTestBase { TransportConfiguration liveConnector = getConnectorTransportConfiguration(true); TransportConfiguration backupConnector = getConnectorTransportConfiguration(false); - backupConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration()).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName())); + backupConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration()).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(createBasicClusterConfig(backupConnector.getName(), liveConnector.getName())); backupServer = createTestableServer(backupConfig); - liveConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)).setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration()).addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName())).addConnectorConfiguration(liveConnector.getName(), liveConnector); + liveConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)).setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration()).addClusterConfiguration(createBasicClusterConfig(liveConnector.getName())).addConnectorConfiguration(liveConnector.getName(), liveConnector); liveServer = createTestableServer(liveConfig); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NetworkFailureFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NetworkFailureFailoverTest.java index 10115024dc..c2e539fb4d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NetworkFailureFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NetworkFailureFailoverTest.java @@ -13,6 +13,7 @@ package org.apache.activemq.artemis.tests.integration.cluster.failover; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -35,10 +36,12 @@ import org.apache.activemq.artemis.api.core.client.SessionFailureListener; import org.apache.activemq.artemis.api.core.client.TopologyMember; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; import org.apache.activemq.artemis.core.client.impl.Topology; +import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration; import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.junit.Wait; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.tests.util.network.NetUtil; @@ -138,8 +141,6 @@ public class NetworkFailureFailoverTest extends FailoverTestBase { server1Params.put(TransportConstants.HOST_PROP_NAME, "localhost"); } - server1Params.put(TransportConstants.NETTY_CONNECT_TIMEOUT, 1000); - return new TransportConfiguration(NETTY_CONNECTOR_FACTORY, server1Params); } @@ -151,7 +152,6 @@ public class NetworkFailureFailoverTest extends FailoverTestBase { Assert.assertTrue(NetUtil.checkIP(LIVE_IP)); Map params = new HashMap<>(); params.put(TransportConstants.HOST_PROP_NAME, LIVE_IP); - params.put(TransportConstants.NETTY_CONNECT_TIMEOUT, 1000); TransportConfiguration tc = createTransportConfiguration(true, false, params); final AtomicInteger countSent = new AtomicInteger(0); @@ -195,8 +195,8 @@ public class NetworkFailureFailoverTest extends FailoverTestBase { locator.setReconnectAttempts(-1); locator.setConfirmationWindowSize(-1); locator.setProducerWindowSize(-1); - locator.setClientFailureCheckPeriod(100); locator.setConnectionTTL(1000); + locator.setClientFailureCheckPeriod(100); ClientSessionFactoryInternal sfProducer = createSessionFactoryAndWaitForTopology(locator, 2); sfProducer.addFailureListener(new SessionFailureListener() { @Override @@ -312,7 +312,6 @@ public class NetworkFailureFailoverTest extends FailoverTestBase { Assert.assertTrue(NetUtil.checkIP(LIVE_IP)); Map params = new HashMap<>(); params.put(TransportConstants.HOST_PROP_NAME, LIVE_IP); - params.put(TransportConstants.NETTY_CONNECT_TIMEOUT, 1000); TransportConfiguration tc = createTransportConfiguration(true, false, params); final AtomicInteger countSent = new AtomicInteger(0); @@ -442,7 +441,6 @@ public class NetworkFailureFailoverTest extends FailoverTestBase { Assert.assertTrue(NetUtil.checkIP(LIVE_IP)); Map params = new HashMap<>(); params.put(TransportConstants.HOST_PROP_NAME, LIVE_IP); - params.put(TransportConstants.NETTY_CONNECT_TIMEOUT, 1000); TransportConfiguration tc = createTransportConfiguration(true, false, params); final AtomicInteger countSent = new AtomicInteger(0); @@ -459,7 +457,6 @@ public class NetworkFailureFailoverTest extends FailoverTestBase { try { NetUtil.netDown(LIVE_IP); System.out.println("Blocking traffic"); - Thread.sleep(3000); // this is important to let stuff to block blockedAt.set(sentMessages.get()); latchDown.countDown(); } catch (Exception e) { @@ -555,7 +552,6 @@ public class NetworkFailureFailoverTest extends FailoverTestBase { Assert.assertTrue(NetUtil.checkIP(LIVE_IP)); Map params = new HashMap<>(); params.put(TransportConstants.HOST_PROP_NAME, LIVE_IP); - params.put(TransportConstants.NETTY_CONNECT_TIMEOUT, 1000); TransportConfiguration tc = createTransportConfiguration(true, false, params); final AtomicInteger countSent = new AtomicInteger(0); @@ -685,4 +681,22 @@ public class NetworkFailureFailoverTest extends FailoverTestBase { t.join(); } + + + @Override + protected ClusterConnectionConfiguration createBasicClusterConfig(String connectorName, + String... connectors) { + ArrayList connectors0 = new ArrayList<>(); + for (String c : connectors) { + connectors0.add(c); + } + ClusterConnectionConfiguration clusterConnectionConfiguration = new ClusterConnectionConfiguration(). + setName("cluster1").setAddress("jms").setConnectorName(connectorName). + setRetryInterval(1000).setDuplicateDetection(false).setMaxHops(1).setClientFailureCheckPeriod(100).setConnectionTTL(1000). + setConfirmationWindowSize(1).setMessageLoadBalancingType(MessageLoadBalancingType.STRICT). + setStaticConnectors(connectors0); + + return clusterConnectionConfiguration; + } + } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQRAClusteredTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQRAClusteredTestBase.java index 9e3abf28dd..cd14afe508 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQRAClusteredTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQRAClusteredTestBase.java @@ -25,7 +25,6 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl; -import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.junit.Before; public class ActiveMQRAClusteredTestBase extends ActiveMQRATestBase { @@ -73,7 +72,7 @@ public class ActiveMQRAClusteredTestBase extends ActiveMQRATestBase { index = 1; } - ConfigurationImpl configuration = createBasicConfig(index).setJMXManagementEnabled(false).clearAcceptorConfigurations().addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, invmMap)).addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, nettyMap)).addConnectorConfiguration(secondaryConnectorName, secondaryConnector).addConnectorConfiguration(primaryConnectorName, primaryConnector).addClusterConfiguration(ActiveMQTestBase.basicClusterConnectionConfig(secondaryConnectorName, primaryConnectorName).setReconnectAttempts(0)); + ConfigurationImpl configuration = createBasicConfig(index).setJMXManagementEnabled(false).clearAcceptorConfigurations().addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, invmMap)).addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, nettyMap)).addConnectorConfiguration(secondaryConnectorName, secondaryConnector).addConnectorConfiguration(primaryConnectorName, primaryConnector).addClusterConfiguration(basicClusterConnectionConfig(secondaryConnectorName, primaryConnectorName).setReconnectAttempts(0)); recreateDataDirectories(getTestDir(), index, false);