ARTEMIS-2160: Addressed occurance where cluster configuration on server locator was hard coded. Covered with test.
This commit is contained in:
parent
a2264c528c
commit
5f74faa34a
|
@ -185,8 +185,11 @@ public class ClusterController implements ActiveMQComponent {
|
|||
serverLocator.setConnectionTTL(config.getConnectionTTL());
|
||||
serverLocator.setClientFailureCheckPeriod(config.getClientFailureCheckPeriod());
|
||||
//if the cluster isn't available we want to hang around until it is
|
||||
serverLocator.setReconnectAttempts(-1);
|
||||
serverLocator.setInitialConnectAttempts(-1);
|
||||
serverLocator.setReconnectAttempts(config.getReconnectAttempts());
|
||||
serverLocator.setInitialConnectAttempts(config.getInitialConnectAttempts());
|
||||
serverLocator.setRetryInterval(config.getRetryInterval());
|
||||
serverLocator.setRetryIntervalMultiplier(config.getRetryIntervalMultiplier());
|
||||
serverLocator.setMaxRetryInterval(config.getMaxRetryInterval());
|
||||
//this is used for replication so need to use the server packet decoder
|
||||
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator));
|
||||
serverLocator.setThreadPools(server.getThreadPool(), server.getScheduledPool());
|
||||
|
@ -438,4 +441,8 @@ public class ClusterController implements ActiveMQComponent {
|
|||
return this.replicationLocator;
|
||||
}
|
||||
|
||||
public ServerLocator getServerLocator(SimpleString name) {
|
||||
return locators.get(name);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -16,12 +16,19 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.cluster;
|
||||
|
||||
import java.util.List;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQClusterSecurityException;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
|
||||
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
|
||||
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ClusterControl;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ClusterController;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
|
||||
import org.junit.Before;
|
||||
|
@ -29,6 +36,9 @@ import org.junit.Test;
|
|||
|
||||
public class ClusterControllerTest extends ClusterTestBase {
|
||||
|
||||
private ClusterConnectionConfiguration clusterConf0;
|
||||
private ClusterConnectionConfiguration clusterConf1;
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
|
@ -45,13 +55,74 @@ public class ClusterControllerTest extends ClusterTestBase {
|
|||
|
||||
getServer(1).getConfiguration().setClusterPassword("something different");
|
||||
|
||||
setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, true, 0);
|
||||
setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, true, 1);
|
||||
clusterConf0 = new ClusterConnectionConfiguration()
|
||||
.setName("cluster0")
|
||||
.setAddress("queues")
|
||||
.setMessageLoadBalancingType(MessageLoadBalancingType.ON_DEMAND)
|
||||
.setMaxHops(1)
|
||||
.setInitialConnectAttempts(8)
|
||||
.setReconnectAttempts(10)
|
||||
.setRetryInterval(250)
|
||||
.setMaxRetryInterval(4000)
|
||||
.setRetryIntervalMultiplier(2.0);
|
||||
|
||||
clusterConf1 = new ClusterConnectionConfiguration()
|
||||
.setName("cluster0")
|
||||
.setAddress("queues")
|
||||
.setMessageLoadBalancingType(MessageLoadBalancingType.ON_DEMAND)
|
||||
.setMaxHops(1)
|
||||
.setInitialConnectAttempts(8)
|
||||
.setReconnectAttempts(10)
|
||||
.setRetryInterval(250)
|
||||
.setMaxRetryInterval(4000)
|
||||
.setRetryIntervalMultiplier(2.0);
|
||||
|
||||
setupClusterConnection(clusterConf0, true, 0);
|
||||
setupClusterConnection(clusterConf1, true, 1);
|
||||
|
||||
startServers(0);
|
||||
startServers(1);
|
||||
}
|
||||
|
||||
private boolean clusterConnectionConfigurationIsSameBeforeAfterStart(ClusterConnectionConfiguration clusterConnectionConfigurationBeforeStart, int node) {
|
||||
boolean clusterConnectionConfigurationIsSame = false;
|
||||
|
||||
Configuration serverNodeConfiguration = getServer(node).getConfiguration();
|
||||
ActiveMQServer serverNode = getServer(node);
|
||||
ClusterManager clusterManager = serverNode.getClusterManager();
|
||||
ClusterController clusterController = clusterManager.getClusterController();
|
||||
ServerLocator serverNodeLocator = clusterController.getServerLocator(new SimpleString(clusterConnectionConfigurationBeforeStart.getName()));
|
||||
List<ClusterConnectionConfiguration> serverNodeClusterConnectionConfigurations = serverNodeConfiguration.getClusterConfigurations();
|
||||
|
||||
do {
|
||||
if (serverNodeLocator.getInitialConnectAttempts() != clusterConnectionConfigurationBeforeStart.getInitialConnectAttempts()) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (serverNodeLocator.getReconnectAttempts() != clusterConnectionConfigurationBeforeStart.getReconnectAttempts()) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (serverNodeLocator.getRetryInterval() != clusterConnectionConfigurationBeforeStart.getRetryInterval()) {
|
||||
break;
|
||||
}
|
||||
if (serverNodeLocator.getMaxRetryInterval() != clusterConnectionConfigurationBeforeStart.getMaxRetryInterval()) {
|
||||
break;
|
||||
}
|
||||
|
||||
Double serverNodeClusterConnectionConfigurationRIM = serverNodeLocator.getRetryIntervalMultiplier();
|
||||
Double clusterConnectionConfigurationBeforeStartRIM = clusterConnectionConfigurationBeforeStart.getRetryIntervalMultiplier();
|
||||
if (0 != serverNodeClusterConnectionConfigurationRIM.compareTo(clusterConnectionConfigurationBeforeStartRIM)) {
|
||||
break;
|
||||
}
|
||||
|
||||
clusterConnectionConfigurationIsSame = true;
|
||||
}
|
||||
while (false);
|
||||
|
||||
return clusterConnectionConfigurationIsSame;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void controlWithDifferentConnector() throws Exception {
|
||||
try (ServerLocatorImpl locator = (ServerLocatorImpl) createInVMNonHALocator()) {
|
||||
|
@ -76,4 +147,14 @@ public class ClusterControllerTest extends ClusterTestBase {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void verifyServerLocatorsClusterConfiguration() {
|
||||
if (false == clusterConnectionConfigurationIsSameBeforeAfterStart(clusterConf0, 0)) {
|
||||
fail("serverLocator is not configured as per clusterConf0");
|
||||
}
|
||||
if (false == clusterConnectionConfigurationIsSameBeforeAfterStart(clusterConf1, 1)) {
|
||||
fail("serverLocator is not configured as per clusterConf1");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1755,6 +1755,36 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
|
|||
setupClusterConnection(name, address, messageLoadBalancingType, maxHops, netty, null, nodeFrom, nodesTo);
|
||||
}
|
||||
|
||||
private List<String> getClusterConnectionTCNames(boolean netty, ActiveMQServer serverFrom, int[] nodesTo) {
|
||||
List<String> pairs = new ArrayList<>();
|
||||
for (int element : nodesTo) {
|
||||
TransportConfiguration serverTotc = createTransportConfiguration(netty, false, generateParams(element, netty));
|
||||
serverFrom.getConfiguration().getConnectorConfigurations().put(serverTotc.getName(), serverTotc);
|
||||
pairs.add(serverTotc.getName());
|
||||
}
|
||||
return pairs;
|
||||
}
|
||||
|
||||
protected void setupClusterConnection(ClusterConnectionConfiguration clusterConf,
|
||||
final boolean netty,
|
||||
final int nodeFrom,
|
||||
final int... nodesTo) {
|
||||
ActiveMQServer serverFrom = servers[nodeFrom];
|
||||
|
||||
if (serverFrom == null) {
|
||||
throw new IllegalStateException("No server at node " + nodeFrom);
|
||||
}
|
||||
|
||||
TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty));
|
||||
serverFrom.getConfiguration().getConnectorConfigurations().put(connectorFrom.getName(), connectorFrom);
|
||||
|
||||
List<String> pairs = getClusterConnectionTCNames(netty, serverFrom, nodesTo);
|
||||
Configuration config = serverFrom.getConfiguration();
|
||||
clusterConf.setConnectorName(connectorFrom.getName()).setConfirmationWindowSize(1024).setStaticConnectors(pairs);
|
||||
|
||||
config.getClusterConfigurations().add(clusterConf);
|
||||
}
|
||||
|
||||
protected void setupClusterConnection(final String name,
|
||||
final String address,
|
||||
final MessageLoadBalancingType messageLoadBalancingType,
|
||||
|
@ -1772,12 +1802,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
|
|||
TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty));
|
||||
serverFrom.getConfiguration().getConnectorConfigurations().put(connectorFrom.getName(), connectorFrom);
|
||||
|
||||
List<String> pairs = new ArrayList<>();
|
||||
for (int element : nodesTo) {
|
||||
TransportConfiguration serverTotc = createTransportConfiguration(netty, false, generateParams(element, netty));
|
||||
serverFrom.getConfiguration().getConnectorConfigurations().put(serverTotc.getName(), serverTotc);
|
||||
pairs.add(serverTotc.getName());
|
||||
}
|
||||
List<String> pairs = getClusterConnectionTCNames(netty, serverFrom, nodesTo);
|
||||
Configuration config = serverFrom.getConfiguration();
|
||||
ClusterConnectionConfiguration clusterConf = createClusterConfig(name, address, messageLoadBalancingType, maxHops, connectorFrom, pairs);
|
||||
|
||||
|
@ -1805,15 +1830,21 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
|
|||
TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty));
|
||||
serverFrom.getConfiguration().getConnectorConfigurations().put(connectorFrom.getName(), connectorFrom);
|
||||
|
||||
List<String> pairs = new ArrayList<>();
|
||||
for (int element : nodesTo) {
|
||||
TransportConfiguration serverTotc = createTransportConfiguration(netty, false, generateParams(element, netty));
|
||||
serverFrom.getConfiguration().getConnectorConfigurations().put(serverTotc.getName(), serverTotc);
|
||||
pairs.add(serverTotc.getName());
|
||||
}
|
||||
List<String> pairs = getClusterConnectionTCNames(netty, serverFrom, nodesTo);
|
||||
Configuration config = serverFrom.getConfiguration();
|
||||
|
||||
ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration().setName(name).setAddress(address).setConnectorName(connectorFrom.getName()).setRetryInterval(retryInterval).setReconnectAttempts(reconnectAttempts).setCallTimeout(100).setCallFailoverTimeout(100).setMessageLoadBalancingType(messageLoadBalancingType).setMaxHops(maxHops).setConfirmationWindowSize(1024).setStaticConnectors(pairs);
|
||||
ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration()
|
||||
.setName(name)
|
||||
.setAddress(address)
|
||||
.setConnectorName(connectorFrom.getName())
|
||||
.setRetryInterval(retryInterval)
|
||||
.setReconnectAttempts(reconnectAttempts)
|
||||
.setCallTimeout(100)
|
||||
.setCallFailoverTimeout(100)
|
||||
.setMessageLoadBalancingType(messageLoadBalancingType)
|
||||
.setMaxHops(maxHops)
|
||||
.setConfirmationWindowSize(1024)
|
||||
.setStaticConnectors(pairs);
|
||||
|
||||
config.getClusterConfigurations().add(clusterConf);
|
||||
}
|
||||
|
@ -1824,7 +1855,15 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
|
|||
final int maxHops,
|
||||
TransportConfiguration connectorFrom,
|
||||
List<String> pairs) {
|
||||
return new ClusterConnectionConfiguration().setName(name).setAddress(address).setConnectorName(connectorFrom.getName()).setRetryInterval(250).setMessageLoadBalancingType(messageLoadBalancingType).setMaxHops(maxHops).setConfirmationWindowSize(1024).setStaticConnectors(pairs);
|
||||
return new ClusterConnectionConfiguration()
|
||||
.setName(name)
|
||||
.setAddress(address)
|
||||
.setConnectorName(connectorFrom.getName())
|
||||
.setRetryInterval(250)
|
||||
.setMessageLoadBalancingType(messageLoadBalancingType)
|
||||
.setMaxHops(maxHops)
|
||||
.setConfirmationWindowSize(1024)
|
||||
.setStaticConnectors(pairs);
|
||||
}
|
||||
|
||||
protected void setupClusterConnectionWithBackups(final String name,
|
||||
|
@ -1843,12 +1882,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
|
|||
TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty));
|
||||
serverFrom.getConfiguration().getConnectorConfigurations().put(name, connectorFrom);
|
||||
|
||||
List<String> pairs = new ArrayList<>();
|
||||
for (int element : nodesTo) {
|
||||
TransportConfiguration serverTotc = createTransportConfiguration(netty, false, generateParams(element, netty));
|
||||
serverFrom.getConfiguration().getConnectorConfigurations().put(serverTotc.getName(), serverTotc);
|
||||
pairs.add(serverTotc.getName());
|
||||
}
|
||||
List<String> pairs = getClusterConnectionTCNames(netty, serverFrom, nodesTo);
|
||||
Configuration config = serverFrom.getConfiguration();
|
||||
|
||||
ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration().setName(name).setAddress(address).setConnectorName(name).setRetryInterval(250).setMessageLoadBalancingType(messageLoadBalancingType).setMaxHops(maxHops).setConfirmationWindowSize(1024).setStaticConnectors(pairs);
|
||||
|
|
Loading…
Reference in New Issue