This commit is contained in:
Clebert Suconic 2017-08-04 16:26:58 -04:00
commit f12116d5a5
5 changed files with 99 additions and 3 deletions

View File

@ -133,6 +133,8 @@ public final class ActiveMQClient {
public static final String DEFAULT_CORE_PROTOCOL = "CORE";
public static final boolean DEFAULT_USE_TOPOLOGY_FOR_LOADBALANCING = true;
public static final String THREAD_POOL_MAX_SIZE_PROPERTY_KEY = "activemq.artemis.client.global.thread.pool.max.size";
public static final String SCHEDULED_THREAD_POOL_SIZE_PROPERTY_KEY = "activemq.artemis.client.global.scheduled.thread.pool.core.size";

View File

@ -728,6 +728,16 @@ public interface ServerLocator extends AutoCloseable {
@Override
void close();
/**
*
*
* @param useTopologyForLoadBalancing
* @return
*/
ServerLocator setUseTopologyForLoadBalancing(boolean useTopologyForLoadBalancing);
boolean getUseTopologyForLoadBalancing();
/**
* Exposes the Topology used by this ServerLocator.
*

View File

@ -207,6 +207,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
private TransportConfiguration clusterTransportConfiguration;
private boolean useTopologyForLoadBalancing;
private final Exception traceException = new Exception();
// To be called when there are ServerLocator being finalized.
@ -393,6 +395,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
compressLargeMessage = ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
clusterConnection = false;
useTopologyForLoadBalancing = ActiveMQClient.DEFAULT_USE_TOPOLOGY_FOR_LOADBALANCING;
}
public static ServerLocator newLocator(String uri) {
@ -524,6 +528,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
groupID = locator.groupID;
nodeID = locator.nodeID;
clusterTransportConfiguration = locator.clusterTransportConfiguration;
useTopologyForLoadBalancing = locator.useTopologyForLoadBalancing;
}
private TransportConfiguration selectConnector() {
@ -534,8 +539,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
}
synchronized (this) {
// if the topologyArray is null, we will use the initialConnectors
if (usedTopology != null) {
if (usedTopology != null && useTopologyForLoadBalancing) {
if (logger.isTraceEnabled()) {
logger.trace("Selecting connector from topology.");
}
@ -544,7 +548,6 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
return pair.getA();
} else {
// Get from initialconnectors
if (logger.isTraceEnabled()) {
logger.trace("Selecting connector from initial connectors.");
}
@ -1564,6 +1567,17 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
}
}
@Override
public ServerLocator setUseTopologyForLoadBalancing(boolean useTopologyForLoadBalancing) {
this.useTopologyForLoadBalancing = useTopologyForLoadBalancing;
return this;
}
@Override
public boolean getUseTopologyForLoadBalancing() {
return useTopologyForLoadBalancing;
}
@Override
public Topology getTopology() {
return topology;

View File

@ -1138,6 +1138,7 @@ public abstract class ActiveMQTestBase extends Assert {
for (TopologyMemberImpl member : topology.getMembers()) {
if (member.getLive() != null) {
liveNodesCount++;
ActiveMQServerLogger.LOGGER.info("Found live server connected to " + server.getNodeID());
}
if (member.getBackup() != null) {
backupNodesCount++;

View File

@ -16,10 +16,79 @@
*/
package org.apache.activemq.artemis.tests.integration.cluster.distribution;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.junit.Test;
public class NettySymmetricClusterTest extends SymmetricClusterTest {
@Override
protected boolean isNetty() {
return true;
}
@Test
public void testConnectionLoadBalancingUsingInitialConnectors() throws Exception {
final String ADDRESS = "queues.testaddress";
final String QUEUE = "queue0";
final String URL = "(tcp://localhost:61616,tcp://localhost:61617)?useTopologyForLoadBalancing=false";
final int CONNECTION_COUNT = 50;
setupCluster();
startServers();
for (int i = 0; i < 5; i++) {
setupSessionFactory(i, isNetty());
}
for (int i = 0; i < 5; i++) {
createQueue(i, ADDRESS, QUEUE, null, false);
}
for (int i = 0; i < 5; i++) {
addConsumer(i, i, QUEUE, null);
}
for (int i = 0; i < 5; i++) {
waitForBindings(i, ADDRESS, 1, 1, true);
}
for (int i = 0; i < 5; i++) {
waitForBindings(i, ADDRESS, 4, 4, false);
}
int[] baseline = new int[5];
for (int i = 0; i < 5; i++) {
baseline[i] = servers[i].getActiveMQServerControl().getConnectionCount();
}
ClientSessionFactory[] clientSessionFactories = new ClientSessionFactory[CONNECTION_COUNT];
ServerLocator locator = ActiveMQClient.createServerLocator(URL);
for (int i = 0; i < CONNECTION_COUNT; i++) {
clientSessionFactories[i] = addSessionFactory(locator.createSessionFactory());
}
/**
* Since we are only using the initial connectors to load-balance then all the connections should be on the first 2 nodes.
* Note: This still uses the load-balancing-policy so this would changed if we used the random one instead of the default
* round-robin one.
*/
assertEquals(CONNECTION_COUNT / 2, (servers[0].getActiveMQServerControl().getConnectionCount() - baseline[0]));
assertEquals(CONNECTION_COUNT / 2, (servers[1].getActiveMQServerControl().getConnectionCount() - baseline[1]));
for (int i = 0; i < CONNECTION_COUNT; i++) {
clientSessionFactories[i].close();
}
locator.setUseTopologyForLoadBalancing(true);
for (int i = 0; i < CONNECTION_COUNT; i++) {
clientSessionFactories[i] = addSessionFactory(locator.createSessionFactory());
}
for (int i = 0; i < 5; i++) {
assertTrue((servers[i].getActiveMQServerControl().getConnectionCount() - baseline[i]) < (CONNECTION_COUNT / 2));
}
}
}