From b6b8fa411f4dcf9e6ac8248489579dac8d931cb3 Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Fri, 4 Aug 2017 13:22:43 -0500 Subject: [PATCH] ARTEMIS-1322 ServerLocator option to ignore topology for load-balancing --- .../api/core/client/ActiveMQClient.java | 2 + .../api/core/client/ServerLocator.java | 10 +++ .../core/client/impl/ServerLocatorImpl.java | 20 +++++- .../artemis/tests/util/ActiveMQTestBase.java | 1 + .../NettySymmetricClusterTest.java | 69 +++++++++++++++++++ 5 files changed, 99 insertions(+), 3 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java index 8fba7475e0..caa2a39885 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java @@ -133,6 +133,8 @@ public final class ActiveMQClient { public static final String DEFAULT_CORE_PROTOCOL = "CORE"; + public static final boolean DEFAULT_USE_TOPOLOGY_FOR_LOADBALANCING = true; + public static final String THREAD_POOL_MAX_SIZE_PROPERTY_KEY = "activemq.artemis.client.global.thread.pool.max.size"; public static final String SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY = "activemq.artemis.client.global.scheduled.thread.pool.core.size"; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java index 69e287cfcd..04bd1f63b6 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java @@ -728,6 +728,16 @@ public interface ServerLocator extends AutoCloseable { @Override void close(); + /** + * + * + * @param useTopologyForLoadBalancing + * @return + */ + ServerLocator setUseTopologyForLoadBalancing(boolean useTopologyForLoadBalancing); + + boolean getUseTopologyForLoadBalancing(); + /** * Exposes the Topology used by this ServerLocator. * diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java index c27d3c3722..adae8f7d53 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java @@ -207,6 +207,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery private TransportConfiguration clusterTransportConfiguration; + private boolean useTopologyForLoadBalancing; + private final Exception traceException = new Exception(); // To be called when there are ServerLocator being finalized. @@ -393,6 +395,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery compressLargeMessage = ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES; clusterConnection = false; + + useTopologyForLoadBalancing = ActiveMQClient.DEFAULT_USE_TOPOLOGY_FOR_LOADBALANCING; } public static ServerLocator newLocator(String uri) { @@ -524,6 +528,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery groupID = locator.groupID; nodeID = locator.nodeID; clusterTransportConfiguration = locator.clusterTransportConfiguration; + useTopologyForLoadBalancing = locator.useTopologyForLoadBalancing; } private TransportConfiguration selectConnector() { @@ -534,8 +539,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery } synchronized (this) { - // if the topologyArray is null, we will use the initialConnectors - if (usedTopology != null) { + if (usedTopology != null && useTopologyForLoadBalancing) { if (logger.isTraceEnabled()) { logger.trace("Selecting connector from topology."); } @@ -544,7 +548,6 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return pair.getA(); } else { - // Get from initialconnectors if (logger.isTraceEnabled()) { logger.trace("Selecting connector from initial connectors."); } @@ -1564,6 +1567,17 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery } } + @Override + public ServerLocator setUseTopologyForLoadBalancing(boolean useTopologyForLoadBalancing) { + this.useTopologyForLoadBalancing = useTopologyForLoadBalancing; + return this; + } + + @Override + public boolean getUseTopologyForLoadBalancing() { + return useTopologyForLoadBalancing; + } + @Override public Topology getTopology() { return topology; diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index 736ded1de6..6e396b72a8 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -1138,6 +1138,7 @@ public abstract class ActiveMQTestBase extends Assert { for (TopologyMemberImpl member : topology.getMembers()) { if (member.getLive() != null) { liveNodesCount++; + ActiveMQServerLogger.LOGGER.info("Found live server connected to " + server.getNodeID()); } if (member.getBackup() != null) { backupNodesCount++; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/NettySymmetricClusterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/NettySymmetricClusterTest.java index cc3a266298..e7b829488d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/NettySymmetricClusterTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/NettySymmetricClusterTest.java @@ -16,10 +16,79 @@ */ package org.apache.activemq.artemis.tests.integration.cluster.distribution; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.junit.Test; + public class NettySymmetricClusterTest extends SymmetricClusterTest { @Override protected boolean isNetty() { return true; } + + @Test + public void testConnectionLoadBalancingUsingInitialConnectors() throws Exception { + final String ADDRESS = "queues.testaddress"; + final String QUEUE = "queue0"; + final String URL = "(tcp://localhost:61616,tcp://localhost:61617)?useTopologyForLoadBalancing=false"; + final int CONNECTION_COUNT = 50; + + setupCluster(); + + startServers(); + + for (int i = 0; i < 5; i++) { + setupSessionFactory(i, isNetty()); + } + + for (int i = 0; i < 5; i++) { + createQueue(i, ADDRESS, QUEUE, null, false); + } + + for (int i = 0; i < 5; i++) { + addConsumer(i, i, QUEUE, null); + } + + for (int i = 0; i < 5; i++) { + waitForBindings(i, ADDRESS, 1, 1, true); + } + + for (int i = 0; i < 5; i++) { + waitForBindings(i, ADDRESS, 4, 4, false); + } + + int[] baseline = new int[5]; + for (int i = 0; i < 5; i++) { + baseline[i] = servers[i].getActiveMQServerControl().getConnectionCount(); + } + + ClientSessionFactory[] clientSessionFactories = new ClientSessionFactory[CONNECTION_COUNT]; + ServerLocator locator = ActiveMQClient.createServerLocator(URL); + for (int i = 0; i < CONNECTION_COUNT; i++) { + clientSessionFactories[i] = addSessionFactory(locator.createSessionFactory()); + } + + /** + * Since we are only using the initial connectors to load-balance then all the connections should be on the first 2 nodes. + * Note: This still uses the load-balancing-policy so this would changed if we used the random one instead of the default + * round-robin one. + */ + assertEquals(CONNECTION_COUNT / 2, (servers[0].getActiveMQServerControl().getConnectionCount() - baseline[0])); + assertEquals(CONNECTION_COUNT / 2, (servers[1].getActiveMQServerControl().getConnectionCount() - baseline[1])); + + for (int i = 0; i < CONNECTION_COUNT; i++) { + clientSessionFactories[i].close(); + } + + locator.setUseTopologyForLoadBalancing(true); + for (int i = 0; i < CONNECTION_COUNT; i++) { + clientSessionFactories[i] = addSessionFactory(locator.createSessionFactory()); + } + + for (int i = 0; i < 5; i++) { + assertTrue((servers[i].getActiveMQServerControl().getConnectionCount() - baseline[i]) < (CONNECTION_COUNT / 2)); + } + } }