ARTEMIS-2084: Failover will not work with network cable disconnect on core protocol
This commit is contained in:
parent
25a7d3bffc
commit
6361079aa0
|
@ -51,6 +51,7 @@ import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext;
|
import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext;
|
||||||
import org.apache.activemq.artemis.core.remoting.FailureListener;
|
import org.apache.activemq.artemis.core.remoting.FailureListener;
|
||||||
import org.apache.activemq.artemis.core.remoting.impl.TransportConfigurationUtil;
|
import org.apache.activemq.artemis.core.remoting.impl.TransportConfigurationUtil;
|
||||||
|
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
|
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
|
||||||
|
@ -1060,7 +1061,16 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Connector createConnector(ConnectorFactory connectorFactory, TransportConfiguration configuration) {
|
protected Connector createConnector(ConnectorFactory connectorFactory, TransportConfiguration configuration) {
|
||||||
return connectorFactory.createConnector(configuration.getParams(), new DelegatingBufferHandler(), this, closeExecutor, threadPool, scheduledThreadPool, clientProtocolManager);
|
Connector connector = connectorFactory.createConnector(configuration.getParams(), new DelegatingBufferHandler(), this, closeExecutor, threadPool, scheduledThreadPool, clientProtocolManager);
|
||||||
|
if (connector instanceof NettyConnector) {
|
||||||
|
NettyConnector nettyConnector = (NettyConnector) connector;
|
||||||
|
if (nettyConnector.getConnectTimeoutMillis() < 0) {
|
||||||
|
nettyConnector.setConnectTimeoutMillis((int)serverLocator.getConnectionTTL());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
return connector;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkTransportKeys(final ConnectorFactory factory, final TransportConfiguration tc) {
|
private void checkTransportKeys(final ConnectorFactory factory, final TransportConfiguration tc) {
|
||||||
|
|
|
@ -843,6 +843,14 @@ public class NettyConnector extends AbstractConnector {
|
||||||
|
|
||||||
// Public --------------------------------------------------------
|
// Public --------------------------------------------------------
|
||||||
|
|
||||||
|
public int getConnectTimeoutMillis() {
|
||||||
|
return connectTimeoutMillis;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setConnectTimeoutMillis(int connectTimeoutMillis) {
|
||||||
|
this.connectTimeoutMillis = connectTimeoutMillis;
|
||||||
|
}
|
||||||
|
|
||||||
// Package protected ---------------------------------------------
|
// Package protected ---------------------------------------------
|
||||||
|
|
||||||
// Protected -----------------------------------------------------
|
// Protected -----------------------------------------------------
|
||||||
|
|
|
@ -552,6 +552,14 @@ public abstract class ActiveMQTestBase extends Assert {
|
||||||
return params;
|
return params;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/** This exists as an extension point for tests, so tests can replace it */
|
||||||
|
protected ClusterConnectionConfiguration createBasicClusterConfig(String connectorName,
|
||||||
|
String... connectors) {
|
||||||
|
return basicClusterConnectionConfig(connectorName, connectors);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
protected static final ClusterConnectionConfiguration basicClusterConnectionConfig(String connectorName,
|
protected static final ClusterConnectionConfiguration basicClusterConnectionConfig(String connectorName,
|
||||||
String... connectors) {
|
String... connectors) {
|
||||||
ArrayList<String> connectors0 = new ArrayList<>();
|
ArrayList<String> connectors0 = new ArrayList<>();
|
||||||
|
|
|
@ -165,11 +165,11 @@ public abstract class FailoverTestBase extends ActiveMQTestBase {
|
||||||
TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
|
TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
|
||||||
TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
|
TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
|
||||||
|
|
||||||
backupConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration()).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName()));
|
backupConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration()).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(createBasicClusterConfig(backupConnector.getName(), liveConnector.getName()));
|
||||||
|
|
||||||
backupServer = createTestableServer(backupConfig);
|
backupServer = createTestableServer(backupConfig);
|
||||||
|
|
||||||
liveConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)).setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration()).addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName())).addConnectorConfiguration(liveConnector.getName(), liveConnector);
|
liveConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)).setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration()).addClusterConfiguration(createBasicClusterConfig(liveConnector.getName())).addConnectorConfiguration(liveConnector.getName(), liveConnector);
|
||||||
|
|
||||||
liveServer = createTestableServer(liveConfig);
|
liveServer = createTestableServer(liveConfig);
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
|
|
||||||
package org.apache.activemq.artemis.tests.integration.cluster.failover;
|
package org.apache.activemq.artemis.tests.integration.cluster.failover;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -35,10 +36,12 @@ import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
|
||||||
import org.apache.activemq.artemis.api.core.client.TopologyMember;
|
import org.apache.activemq.artemis.api.core.client.TopologyMember;
|
||||||
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
|
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
|
||||||
import org.apache.activemq.artemis.core.client.impl.Topology;
|
import org.apache.activemq.artemis.core.client.impl.Topology;
|
||||||
|
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
|
||||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
||||||
|
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||||
import org.apache.activemq.artemis.junit.Wait;
|
import org.apache.activemq.artemis.junit.Wait;
|
||||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||||
import org.apache.activemq.artemis.tests.util.network.NetUtil;
|
import org.apache.activemq.artemis.tests.util.network.NetUtil;
|
||||||
|
@ -138,8 +141,6 @@ public class NetworkFailureFailoverTest extends FailoverTestBase {
|
||||||
server1Params.put(TransportConstants.HOST_PROP_NAME, "localhost");
|
server1Params.put(TransportConstants.HOST_PROP_NAME, "localhost");
|
||||||
}
|
}
|
||||||
|
|
||||||
server1Params.put(TransportConstants.NETTY_CONNECT_TIMEOUT, 1000);
|
|
||||||
|
|
||||||
return new TransportConfiguration(NETTY_CONNECTOR_FACTORY, server1Params);
|
return new TransportConfiguration(NETTY_CONNECTOR_FACTORY, server1Params);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -151,7 +152,6 @@ public class NetworkFailureFailoverTest extends FailoverTestBase {
|
||||||
Assert.assertTrue(NetUtil.checkIP(LIVE_IP));
|
Assert.assertTrue(NetUtil.checkIP(LIVE_IP));
|
||||||
Map<String, Object> params = new HashMap<>();
|
Map<String, Object> params = new HashMap<>();
|
||||||
params.put(TransportConstants.HOST_PROP_NAME, LIVE_IP);
|
params.put(TransportConstants.HOST_PROP_NAME, LIVE_IP);
|
||||||
params.put(TransportConstants.NETTY_CONNECT_TIMEOUT, 1000);
|
|
||||||
TransportConfiguration tc = createTransportConfiguration(true, false, params);
|
TransportConfiguration tc = createTransportConfiguration(true, false, params);
|
||||||
|
|
||||||
final AtomicInteger countSent = new AtomicInteger(0);
|
final AtomicInteger countSent = new AtomicInteger(0);
|
||||||
|
@ -195,8 +195,8 @@ public class NetworkFailureFailoverTest extends FailoverTestBase {
|
||||||
locator.setReconnectAttempts(-1);
|
locator.setReconnectAttempts(-1);
|
||||||
locator.setConfirmationWindowSize(-1);
|
locator.setConfirmationWindowSize(-1);
|
||||||
locator.setProducerWindowSize(-1);
|
locator.setProducerWindowSize(-1);
|
||||||
locator.setClientFailureCheckPeriod(100);
|
|
||||||
locator.setConnectionTTL(1000);
|
locator.setConnectionTTL(1000);
|
||||||
|
locator.setClientFailureCheckPeriod(100);
|
||||||
ClientSessionFactoryInternal sfProducer = createSessionFactoryAndWaitForTopology(locator, 2);
|
ClientSessionFactoryInternal sfProducer = createSessionFactoryAndWaitForTopology(locator, 2);
|
||||||
sfProducer.addFailureListener(new SessionFailureListener() {
|
sfProducer.addFailureListener(new SessionFailureListener() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -312,7 +312,6 @@ public class NetworkFailureFailoverTest extends FailoverTestBase {
|
||||||
Assert.assertTrue(NetUtil.checkIP(LIVE_IP));
|
Assert.assertTrue(NetUtil.checkIP(LIVE_IP));
|
||||||
Map<String, Object> params = new HashMap<>();
|
Map<String, Object> params = new HashMap<>();
|
||||||
params.put(TransportConstants.HOST_PROP_NAME, LIVE_IP);
|
params.put(TransportConstants.HOST_PROP_NAME, LIVE_IP);
|
||||||
params.put(TransportConstants.NETTY_CONNECT_TIMEOUT, 1000);
|
|
||||||
TransportConfiguration tc = createTransportConfiguration(true, false, params);
|
TransportConfiguration tc = createTransportConfiguration(true, false, params);
|
||||||
|
|
||||||
final AtomicInteger countSent = new AtomicInteger(0);
|
final AtomicInteger countSent = new AtomicInteger(0);
|
||||||
|
@ -442,7 +441,6 @@ public class NetworkFailureFailoverTest extends FailoverTestBase {
|
||||||
Assert.assertTrue(NetUtil.checkIP(LIVE_IP));
|
Assert.assertTrue(NetUtil.checkIP(LIVE_IP));
|
||||||
Map<String, Object> params = new HashMap<>();
|
Map<String, Object> params = new HashMap<>();
|
||||||
params.put(TransportConstants.HOST_PROP_NAME, LIVE_IP);
|
params.put(TransportConstants.HOST_PROP_NAME, LIVE_IP);
|
||||||
params.put(TransportConstants.NETTY_CONNECT_TIMEOUT, 1000);
|
|
||||||
TransportConfiguration tc = createTransportConfiguration(true, false, params);
|
TransportConfiguration tc = createTransportConfiguration(true, false, params);
|
||||||
|
|
||||||
final AtomicInteger countSent = new AtomicInteger(0);
|
final AtomicInteger countSent = new AtomicInteger(0);
|
||||||
|
@ -459,7 +457,6 @@ public class NetworkFailureFailoverTest extends FailoverTestBase {
|
||||||
try {
|
try {
|
||||||
NetUtil.netDown(LIVE_IP);
|
NetUtil.netDown(LIVE_IP);
|
||||||
System.out.println("Blocking traffic");
|
System.out.println("Blocking traffic");
|
||||||
Thread.sleep(3000); // this is important to let stuff to block
|
|
||||||
blockedAt.set(sentMessages.get());
|
blockedAt.set(sentMessages.get());
|
||||||
latchDown.countDown();
|
latchDown.countDown();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
@ -555,7 +552,6 @@ public class NetworkFailureFailoverTest extends FailoverTestBase {
|
||||||
Assert.assertTrue(NetUtil.checkIP(LIVE_IP));
|
Assert.assertTrue(NetUtil.checkIP(LIVE_IP));
|
||||||
Map<String, Object> params = new HashMap<>();
|
Map<String, Object> params = new HashMap<>();
|
||||||
params.put(TransportConstants.HOST_PROP_NAME, LIVE_IP);
|
params.put(TransportConstants.HOST_PROP_NAME, LIVE_IP);
|
||||||
params.put(TransportConstants.NETTY_CONNECT_TIMEOUT, 1000);
|
|
||||||
TransportConfiguration tc = createTransportConfiguration(true, false, params);
|
TransportConfiguration tc = createTransportConfiguration(true, false, params);
|
||||||
|
|
||||||
final AtomicInteger countSent = new AtomicInteger(0);
|
final AtomicInteger countSent = new AtomicInteger(0);
|
||||||
|
@ -685,4 +681,22 @@ public class NetworkFailureFailoverTest extends FailoverTestBase {
|
||||||
t.join();
|
t.join();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ClusterConnectionConfiguration createBasicClusterConfig(String connectorName,
|
||||||
|
String... connectors) {
|
||||||
|
ArrayList<String> connectors0 = new ArrayList<>();
|
||||||
|
for (String c : connectors) {
|
||||||
|
connectors0.add(c);
|
||||||
|
}
|
||||||
|
ClusterConnectionConfiguration clusterConnectionConfiguration = new ClusterConnectionConfiguration().
|
||||||
|
setName("cluster1").setAddress("jms").setConnectorName(connectorName).
|
||||||
|
setRetryInterval(1000).setDuplicateDetection(false).setMaxHops(1).setClientFailureCheckPeriod(100).setConnectionTTL(1000).
|
||||||
|
setConfirmationWindowSize(1).setMessageLoadBalancingType(MessageLoadBalancingType.STRICT).
|
||||||
|
setStaticConnectors(connectors0);
|
||||||
|
|
||||||
|
return clusterConnectionConfiguration;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,7 +25,6 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServers;
|
import org.apache.activemq.artemis.core.server.ActiveMQServers;
|
||||||
import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
|
import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
|
||||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
||||||
public class ActiveMQRAClusteredTestBase extends ActiveMQRATestBase {
|
public class ActiveMQRAClusteredTestBase extends ActiveMQRATestBase {
|
||||||
|
@ -73,7 +72,7 @@ public class ActiveMQRAClusteredTestBase extends ActiveMQRATestBase {
|
||||||
index = 1;
|
index = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
ConfigurationImpl configuration = createBasicConfig(index).setJMXManagementEnabled(false).clearAcceptorConfigurations().addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, invmMap)).addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, nettyMap)).addConnectorConfiguration(secondaryConnectorName, secondaryConnector).addConnectorConfiguration(primaryConnectorName, primaryConnector).addClusterConfiguration(ActiveMQTestBase.basicClusterConnectionConfig(secondaryConnectorName, primaryConnectorName).setReconnectAttempts(0));
|
ConfigurationImpl configuration = createBasicConfig(index).setJMXManagementEnabled(false).clearAcceptorConfigurations().addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY, invmMap)).addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, nettyMap)).addConnectorConfiguration(secondaryConnectorName, secondaryConnector).addConnectorConfiguration(primaryConnectorName, primaryConnector).addClusterConfiguration(basicClusterConnectionConfig(secondaryConnectorName, primaryConnectorName).setReconnectAttempts(0));
|
||||||
|
|
||||||
recreateDataDirectories(getTestDir(), index, false);
|
recreateDataDirectories(getTestDir(), index, false);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue