ARTEMIS-3594 - add support for a local target key transformer and an instance of CONSISTENT_HASH_MODULO that can be used to partition in a static cluster

This commit is contained in:
gtully 2021-12-01 15:27:54 +00:00 committed by Gary Tully
parent 3f7f8c0ecd
commit e0b16217a1
27 changed files with 837 additions and 128 deletions

View File

@ -28,7 +28,8 @@ public class BrokerBalancerConfiguration implements Serializable {
private String localTargetFilter = null;
private int cacheTimeout = -1;
private PoolConfiguration poolConfiguration = null;
private PolicyConfiguration policyConfiguration = null;
private NamedPropertyConfiguration policyConfiguration = null;
private NamedPropertyConfiguration transformerConfiguration = null;
public String getName() {
return name;
@ -75,11 +76,11 @@ public class BrokerBalancerConfiguration implements Serializable {
return this;
}
public PolicyConfiguration getPolicyConfiguration() {
public NamedPropertyConfiguration getPolicyConfiguration() {
return policyConfiguration;
}
public BrokerBalancerConfiguration setPolicyConfiguration(PolicyConfiguration policyConfiguration) {
public BrokerBalancerConfiguration setPolicyConfiguration(NamedPropertyConfiguration policyConfiguration) {
this.policyConfiguration = policyConfiguration;
return this;
}
@ -92,4 +93,12 @@ public class BrokerBalancerConfiguration implements Serializable {
this.poolConfiguration = poolConfiguration;
return this;
}
public void setTransformerConfiguration(NamedPropertyConfiguration configuration) {
this.transformerConfiguration = configuration;
}
public NamedPropertyConfiguration getTransformerConfiguration() {
return transformerConfiguration;
}
}

View File

@ -20,7 +20,7 @@ package org.apache.activemq.artemis.core.config.balancing;
import java.io.Serializable;
import java.util.Map;
public class PolicyConfiguration implements Serializable {
public class NamedPropertyConfiguration implements Serializable {
private String name;
private Map<String, String> properties;
@ -29,7 +29,7 @@ public class PolicyConfiguration implements Serializable {
return name;
}
public PolicyConfiguration setName(String name) {
public NamedPropertyConfiguration setName(String name) {
this.name = name;
return this;
}
@ -38,7 +38,7 @@ public class PolicyConfiguration implements Serializable {
return properties;
}
public PolicyConfiguration setProperties(Map<String, String> properties) {
public NamedPropertyConfiguration setProperties(Map<String, String> properties) {
this.properties = properties;
return this;
}

View File

@ -47,7 +47,7 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
import org.apache.activemq.artemis.core.config.balancing.PolicyConfiguration;
import org.apache.activemq.artemis.core.config.balancing.NamedPropertyConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
@ -93,6 +93,7 @@ import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
import org.apache.activemq.artemis.core.server.balancing.policies.PolicyFactoryResolver;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
import org.apache.activemq.artemis.core.server.balancing.transformer.TransformerFactoryResolver;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
import org.apache.activemq.artemis.core.server.metrics.ActiveMQMetricsPlugin;
@ -2653,7 +2654,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
brokerBalancerConfiguration.setCacheTimeout(getInteger(e, "cache-timeout",
brokerBalancerConfiguration.getCacheTimeout(), Validators.MINUS_ONE_OR_GE_ZERO));
PolicyConfiguration policyConfiguration = null;
NamedPropertyConfiguration policyConfiguration = null;
PoolConfiguration poolConfiguration = null;
NodeList children = e.getChildNodes();
@ -2661,20 +2662,34 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
Node child = children.item(j);
if (child.getNodeName().equals("policy")) {
policyConfiguration = new PolicyConfiguration();
parsePolicyConfiguration((Element)child, policyConfiguration);
policyConfiguration = new NamedPropertyConfiguration();
parsePolicyConfiguration((Element) child, policyConfiguration);
brokerBalancerConfiguration.setPolicyConfiguration(policyConfiguration);
} else if (child.getNodeName().equals("pool")) {
poolConfiguration = new PoolConfiguration();
parsePoolConfiguration((Element) child, config, poolConfiguration);
brokerBalancerConfiguration.setPoolConfiguration(poolConfiguration);
} else if (child.getNodeName().equals("local-target-key-transformer")) {
policyConfiguration = new NamedPropertyConfiguration();
parseTransformerConfiguration((Element) child, policyConfiguration);
brokerBalancerConfiguration.setTransformerConfiguration(policyConfiguration);
}
}
config.getBalancerConfigurations().add(brokerBalancerConfiguration);
}
private void parsePolicyConfiguration(final Element e, final PolicyConfiguration policyConfiguration) throws ClassNotFoundException {
private void parseTransformerConfiguration(final Element e, final NamedPropertyConfiguration policyConfiguration) throws ClassNotFoundException {
String name = e.getAttribute("name");
TransformerFactoryResolver.getInstance().resolve(name);
policyConfiguration.setName(name);
policyConfiguration.setProperties(getMapOfChildPropertyElements(e));
}
private void parsePolicyConfiguration(final Element e, final NamedPropertyConfiguration policyConfiguration) throws ClassNotFoundException {
String name = e.getAttribute("name");
PolicyFactoryResolver.getInstance().resolve(name);

View File

@ -27,6 +27,7 @@ import org.apache.activemq.artemis.core.server.balancing.targets.Target;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetKeyResolver;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetResult;
import org.apache.activemq.artemis.core.server.balancing.transformer.KeyTransformer;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.jboss.logging.Logger;
@ -40,7 +41,6 @@ public class BrokerBalancer implements ActiveMQComponent {
public static final String CLIENT_ID_PREFIX = ActiveMQDefaultConfiguration.DEFAULT_INTERNAL_NAMING_PREFIX + "balancer.client.";
private final String name;
private final TargetKey targetKey;
@ -55,6 +55,8 @@ public class BrokerBalancer implements ActiveMQComponent {
private final Policy policy;
private final KeyTransformer transformer;
private final Cache<String, TargetResult> cache;
private volatile boolean started = false;
@ -93,11 +95,21 @@ public class BrokerBalancer implements ActiveMQComponent {
}
public BrokerBalancer(final String name, final TargetKey targetKey, final String targetKeyFilter, final Target localTarget, final String localTargetFilter, final Pool pool, final Policy policy, final int cacheTimeout) {
public BrokerBalancer(final String name,
final TargetKey targetKey,
final String targetKeyFilter,
final Target localTarget,
final String localTargetFilter,
final Pool pool,
final Policy policy,
KeyTransformer transformer,
final int cacheTimeout) {
this.name = name;
this.targetKey = targetKey;
this.transformer = transformer;
this.targetKeyResolver = new TargetKeyResolver(targetKey, targetKeyFilter);
this.localTarget = new TargetResult(localTarget);
@ -149,7 +161,7 @@ public class BrokerBalancer implements ActiveMQComponent {
public TargetResult getTarget(String key) {
if (this.localTargetFilter != null && this.localTargetFilter.matcher(key).matches()) {
if (this.localTargetFilter != null && this.localTargetFilter.matcher(transform(key)).matches()) {
if (logger.isDebugEnabled()) {
logger.debug("The " + targetKey + "[" + key + "] matches the localTargetFilter " + localTargetFilter.pattern());
}
@ -201,4 +213,15 @@ public class BrokerBalancer implements ActiveMQComponent {
return result != null ? result : TargetResult.REFUSED_UNAVAILABLE_RESULT;
}
private String transform(String key) {
String result = key;
if (transformer != null) {
result = transformer.transform(key);
if (logger.isDebugEnabled()) {
logger.debug("Key: " + key + ", transformed to " + result);
}
}
return result;
}
}

View File

@ -21,7 +21,7 @@ import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.cluster.DiscoveryGroup;
import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
import org.apache.activemq.artemis.core.config.balancing.PolicyConfiguration;
import org.apache.activemq.artemis.core.config.balancing.NamedPropertyConfiguration;
import org.apache.activemq.artemis.core.config.balancing.PoolConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
@ -39,6 +39,9 @@ import org.apache.activemq.artemis.core.server.balancing.targets.ActiveMQTargetF
import org.apache.activemq.artemis.core.server.balancing.targets.LocalTarget;
import org.apache.activemq.artemis.core.server.balancing.targets.Target;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetFactory;
import org.apache.activemq.artemis.core.server.balancing.transformer.KeyTransformer;
import org.apache.activemq.artemis.core.server.balancing.transformer.TransformerFactory;
import org.apache.activemq.artemis.core.server.balancing.transformer.TransformerFactoryResolver;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.jboss.logging.Logger;
@ -95,13 +98,19 @@ public final class BrokerBalancerManager implements ActiveMQComponent {
}
Policy policy = null;
PolicyConfiguration policyConfiguration = config.getPolicyConfiguration();
NamedPropertyConfiguration policyConfiguration = config.getPolicyConfiguration();
if (policyConfiguration != null) {
policy = deployPolicy(policyConfiguration, pool);
}
KeyTransformer transformer = null;
NamedPropertyConfiguration transformerConfiguration = config.getTransformerConfiguration();
if (transformerConfiguration != null) {
transformer = deployTransformer(transformerConfiguration);
}
BrokerBalancer balancer = new BrokerBalancer(config.getName(), config.getTargetKey(), config.getTargetKeyFilter(),
localTarget, config.getLocalTargetFilter(), pool, policy, config.getCacheTimeout());
localTarget, config.getLocalTargetFilter(), pool, policy, transformer, config.getCacheTimeout());
balancerControllers.put(balancer.getName(), balancer);
@ -160,10 +169,10 @@ public final class BrokerBalancerManager implements ActiveMQComponent {
return pool;
}
private Policy deployPolicy(PolicyConfiguration policyConfig, Pool pool) throws ClassNotFoundException {
private Policy deployPolicy(NamedPropertyConfiguration policyConfig, Pool pool) throws ClassNotFoundException {
PolicyFactory policyFactory = PolicyFactoryResolver.getInstance().resolve(policyConfig.getName());
Policy policy = policyFactory.createPolicy(policyConfig.getName());
Policy policy = policyFactory.create();
policy.init(policyConfig.getProperties());
@ -174,6 +183,16 @@ public final class BrokerBalancerManager implements ActiveMQComponent {
return policy;
}
private KeyTransformer deployTransformer(NamedPropertyConfiguration configuration) throws Exception {
TransformerFactory factory = TransformerFactoryResolver.getInstance().resolve(configuration.getName());
KeyTransformer transformer = factory.create();
transformer.init(configuration.getProperties());
return transformer;
}
public BrokerBalancer getBalancer(String name) {
return balancerControllers.get(name);
}

View File

@ -31,10 +31,6 @@ public class ConsistentHashPolicy extends AbstractPolicy {
super(NAME);
}
protected ConsistentHashPolicy(String name) {
super(name);
}
@Override
public Target selectTarget(List<Target> targets, String key) {
if (targets.size() > 1) {
@ -60,7 +56,7 @@ public class ConsistentHashPolicy extends AbstractPolicy {
return null;
}
private int getHash(String str) {
public static int getHash(String str) {
final int FNV_INIT = 0x811c9dc5;
final int FNV_PRIME = 0x01000193;

View File

@ -1,49 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.policies;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
public class DefaultPolicyFactory extends PolicyFactory {
private static final Map<String, Supplier<AbstractPolicy>> supportedPolicies = new HashMap<>();
static {
supportedPolicies.put(ConsistentHashPolicy.NAME, () -> new ConsistentHashPolicy());
supportedPolicies.put(FirstElementPolicy.NAME, () -> new FirstElementPolicy());
supportedPolicies.put(LeastConnectionsPolicy.NAME, () -> new LeastConnectionsPolicy());
supportedPolicies.put(RoundRobinPolicy.NAME, () -> new RoundRobinPolicy());
}
@Override
public String[] getSupportedPolicies() {
return supportedPolicies.keySet().toArray(new String[supportedPolicies.size()]);
}
@Override
public AbstractPolicy createPolicy(String policyName) {
Supplier<AbstractPolicy> policySupplier = supportedPolicies.get(policyName);
if (policySupplier == null) {
throw new IllegalArgumentException("Policy not supported: " + policyName);
}
return policySupplier.get();
}
}

View File

@ -17,8 +17,6 @@
package org.apache.activemq.artemis.core.server.balancing.policies;
public abstract class PolicyFactory {
public abstract String[] getSupportedPolicies();
public abstract Policy createPolicy(String policyName);
public interface PolicyFactory {
Policy create();
}

View File

@ -36,7 +36,10 @@ public class PolicyFactoryResolver {
private final Map<String, PolicyFactory> policyFactories = new HashMap<>();
private PolicyFactoryResolver() {
registerPolicyFactory(new DefaultPolicyFactory());
policyFactories.put(ConsistentHashPolicy.NAME, () -> new ConsistentHashPolicy());
policyFactories.put(FirstElementPolicy.NAME, () -> new FirstElementPolicy());
policyFactories.put(LeastConnectionsPolicy.NAME, () -> new LeastConnectionsPolicy());
policyFactories.put(RoundRobinPolicy.NAME, () -> new RoundRobinPolicy());
loadPolicyFactories();
}
@ -56,19 +59,15 @@ public class PolicyFactoryResolver {
PolicyFactory.class, BrokerBalancer.class.getClassLoader());
for (PolicyFactory policyFactory : serviceLoader) {
registerPolicyFactory(policyFactory);
policyFactories.put(keyFromClassName(policyFactory.getClass().getName()), policyFactory);
}
}
public void registerPolicyFactory(PolicyFactory policyFactory) {
for (String policyName : policyFactory.getSupportedPolicies()) {
policyFactories.put(policyName, policyFactory);
}
public void registerPolicyFactory(String name, PolicyFactory policyFactory) {
policyFactories.put(name, policyFactory);
}
public void unregisterPolicyFactory(PolicyFactory policyFactory) {
for (String policyName : policyFactory.getSupportedPolicies()) {
policyFactories.remove(policyName, policyFactory);
}
String keyFromClassName(String name) {
return name.substring(0, name.indexOf("PolicyFactory"));
}
}

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.transformer;
import java.util.Map;
import org.apache.activemq.artemis.core.server.balancing.policies.ConsistentHashPolicy;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetKeyResolver;
public class ConsistentHashModulo implements KeyTransformer {
public static final String NAME = "CONSISTENT_HASH_MODULO";
public static final String MODULO = "modulo";
int modulo = 0;
@Override
public String transform(String str) {
if (TargetKeyResolver.DEFAULT_KEY_VALUE.equals(str)) {
// we only want to transform resolved keys
return str;
}
if (modulo == 0) {
return str;
}
int hash = ConsistentHashPolicy.getHash(str);
return String.valueOf( hash % modulo );
}
@Override
public void init(Map<String, String> properties) {
modulo = Integer.parseInt(properties.get(MODULO));
}
}

View File

@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.transformer;
import java.util.Map;
public interface KeyTransformer {
default void init(Map<String, String> properties) {
}
default String transform(String key) {
return key;
}
}

View File

@ -0,0 +1,22 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.transformer;
public interface TransformerFactory {
KeyTransformer create();
}

View File

@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.transformer;
import java.util.HashMap;
import java.util.Map;
import java.util.ServiceLoader;
import org.apache.activemq.artemis.core.server.balancing.BrokerBalancer;
public class TransformerFactoryResolver {
private static TransformerFactoryResolver instance;
public static TransformerFactoryResolver getInstance() {
if (instance == null) {
instance = new TransformerFactoryResolver();
}
return instance;
}
private final Map<String, TransformerFactory> factories = new HashMap<>();
private TransformerFactoryResolver() {
factories.put(ConsistentHashModulo.NAME, () -> new ConsistentHashModulo());
loadFactories(); // let service loader override
}
public TransformerFactory resolve(String policyName) throws ClassNotFoundException {
TransformerFactory factory = factories.get(policyName);
if (factory == null) {
throw new ClassNotFoundException("No TransformerFactory found for " + policyName);
}
return factory;
}
private void loadFactories() {
ServiceLoader<TransformerFactory> serviceLoader = ServiceLoader.load(
TransformerFactory.class, BrokerBalancer.class.getClassLoader());
for (TransformerFactory factory : serviceLoader) {
factories.put(keyFromClassName(factory.getClass().getName()), factory);
}
}
String keyFromClassName(String name) {
return name.substring(0, name.indexOf("TransformerFactory"));
}
}

View File

@ -2136,6 +2136,13 @@
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="local-target-key-transformer" type="brokerBalancerKeyTransformerType" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
the local target key transformer configuration
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:sequence>
<xsd:attribute name="name" type="xsd:string" use="required">
<xsd:annotation>
@ -2176,6 +2183,26 @@
<xsd:attributeGroup ref="xml:specialAttrs"/>
</xsd:complexType>
<xsd:complexType name="brokerBalancerKeyTransformerType">
<xsd:sequence>
<xsd:element ref="property" maxOccurs="unbounded" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
properties to configure a key transformer
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:sequence>
<xsd:attribute name="name" type="xsd:ID" use="required">
<xsd:annotation>
<xsd:documentation>
the name of the policy
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attributeGroup ref="xml:specialAttrs"/>
</xsd:complexType>
<xsd:complexType name="brokerBalancerPoolType">
<xsd:sequence maxOccurs="unbounded">
<xsd:element name="username" type="xsd:string" maxOccurs="1" minOccurs="0">

View File

@ -76,6 +76,8 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.apache.activemq.artemis.core.server.balancing.transformer.ConsistentHashModulo.MODULO;
public class FileConfigurationTest extends ConfigurationImplTest {
@BeforeClass
@ -265,13 +267,20 @@ public class FileConfigurationTest extends ConfigurationImplTest {
}
}
Assert.assertEquals(4, conf.getBalancerConfigurations().size());
Assert.assertEquals(5, conf.getBalancerConfigurations().size());
for (BrokerBalancerConfiguration bc : conf.getBalancerConfigurations()) {
if (bc.getName().equals("simple-local")) {
Assert.assertEquals(bc.getTargetKey(), TargetKey.CLIENT_ID);
Assert.assertNotNull(bc.getLocalTargetFilter());
Assert.assertNotNull(bc.getTargetKeyFilter());
Assert.assertNull(bc.getPolicyConfiguration());
} else if (bc.getName().equals("simple-local-with-transformer")) {
Assert.assertEquals(bc.getTargetKey(), TargetKey.CLIENT_ID);
Assert.assertNotNull(bc.getLocalTargetFilter());
Assert.assertNotNull(bc.getTargetKeyFilter());
Assert.assertNull(bc.getPolicyConfiguration());
Assert.assertNotNull(bc.getTransformerConfiguration());
Assert.assertNotNull(bc.getTransformerConfiguration().getProperties().get(MODULO));
} else if (bc.getName().equals("simple-balancer")) {
Assert.assertEquals(bc.getTargetKey(), TargetKey.USER_NAME);
Assert.assertNull(bc.getLocalTargetFilter());

View File

@ -17,12 +17,16 @@
package org.apache.activemq.artemis.core.server.balancing;
import java.util.HashMap;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
import org.apache.activemq.artemis.core.config.balancing.PolicyConfiguration;
import org.apache.activemq.artemis.core.config.balancing.NamedPropertyConfiguration;
import org.apache.activemq.artemis.core.config.balancing.PoolConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.balancing.policies.ConsistentHashPolicy;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
import org.apache.activemq.artemis.core.server.balancing.transformer.ConsistentHashModulo;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.junit.After;
import org.junit.Before;
@ -62,7 +66,7 @@ public class BrokerBalancerManagerTest {
BrokerBalancerConfiguration brokerBalancerConfiguration = new BrokerBalancerConfiguration();
brokerBalancerConfiguration.setName("partition-local-pool");
PolicyConfiguration policyConfig = new PolicyConfiguration();
NamedPropertyConfiguration policyConfig = new NamedPropertyConfiguration();
policyConfig.setName(ConsistentHashPolicy.NAME);
brokerBalancerConfiguration.setPolicyConfiguration(policyConfig);
@ -82,6 +86,25 @@ public class BrokerBalancerManagerTest {
BrokerBalancerConfiguration brokerBalancerConfiguration = new BrokerBalancerConfiguration();
brokerBalancerConfiguration.setName("partition-local-pool");
underTest.deployBrokerBalancer(brokerBalancerConfiguration);
}
@Test()
public void deployLocalOnlyWithPolicy() throws Exception {
ManagementService mockManagementService = Mockito.mock(ManagementService.class);
Mockito.when(mockServer.getManagementService()).thenReturn(mockManagementService);
BrokerBalancerConfiguration brokerBalancerConfiguration = new BrokerBalancerConfiguration();
brokerBalancerConfiguration.setName("partition-local-consistent-hash").setTargetKey(TargetKey.CLIENT_ID).setLocalTargetFilter(String.valueOf(2));
NamedPropertyConfiguration policyConfig = new NamedPropertyConfiguration();
policyConfig.setName(ConsistentHashModulo.NAME);
HashMap<String, String> properties = new HashMap<>();
properties.put(ConsistentHashModulo.MODULO, String.valueOf(2));
policyConfig.setProperties(properties);
brokerBalancerConfiguration.setTransformerConfiguration(policyConfig);
underTest.deployBrokerBalancer(brokerBalancerConfiguration);
}
}

View File

@ -18,7 +18,6 @@
package org.apache.activemq.artemis.core.server.balancing;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -29,7 +28,7 @@ import org.apache.activemq.artemis.core.server.balancing.targets.LocalTarget;
import org.apache.activemq.artemis.core.server.balancing.targets.Target;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetResult;
import org.junit.After;
import org.apache.activemq.artemis.core.server.balancing.transformer.KeyTransformer;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -44,38 +43,34 @@ public class BrokerBalancerTest {
@Before
public void setUp() {
ActiveMQServer mockServer = mock(ActiveMQServer.class);
Mockito.when(mockServer.getNodeID()).thenReturn(SimpleString.toSimpleString("UUID"));
localTarget = new LocalTarget(null, mockServer);
Pool pool = null;
Policy policy = null;
underTest = new BrokerBalancer("test", TargetKey.CLIENT_ID, "^.{3}",
localTarget, "^FOO.*", pool, policy, 0);
try {
underTest.start();
} catch (Exception e) {
fail(e.getMessage());
}
}
@After
public void after() {
if (underTest != null) {
try {
underTest.stop();
} catch (Exception e) {
fail(e.getMessage());
}
}
}
@Test
public void getTarget() {
Pool pool = null;
Policy policy = null;
underTest = new BrokerBalancer("test", TargetKey.CLIENT_ID, "^.{3}",
localTarget, "^FOO.*", pool, policy, null, 0);
assertEquals( localTarget, underTest.getTarget("FOO_EE").getTarget());
assertEquals(TargetResult.REFUSED_USE_ANOTHER_RESULT, underTest.getTarget("BAR_EE"));
}
@Test
public void getLocalTargetWithTransformer() throws Exception {
Pool pool = null;
Policy policy = null;
KeyTransformer keyTransformer = new KeyTransformer() {
@Override
public String transform(String key) {
return key.substring("TRANSFORM_TO".length() + 1);
}
};
underTest = new BrokerBalancer("test", TargetKey.CLIENT_ID, "^.{3}",
localTarget, "^FOO.*", pool, policy, keyTransformer, 0);
assertEquals( localTarget, underTest.getTarget("TRANSFORM_TO_FOO_EE").getTarget());
}
}

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.policies;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class PolicyFactoryResolverTest {
@Test
public void resolveOk() throws Exception {
PolicyFactoryResolver instance = PolicyFactoryResolver.getInstance();
assertNotNull(instance.resolve(ConsistentHashPolicy.NAME));
}
@Test(expected = ClassNotFoundException.class)
public void resolveError() throws Exception {
PolicyFactoryResolver instance = PolicyFactoryResolver.getInstance();
assertNotNull(instance.resolve("NOT PRESENT"));
}
@Test
public void keyFromName() throws Exception {
PolicyFactoryResolver instance = PolicyFactoryResolver.getInstance();
assertEquals("New", instance.keyFromClassName("NewPolicyFactory"));
}
}

View File

@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.transformer;
import java.util.HashMap;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetKeyResolver;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
public class ConsistentHashModuloTest {
@Test
public void transform() {
ConsistentHashModulo underTest = new ConsistentHashModulo();
assertEquals(TargetKeyResolver.DEFAULT_KEY_VALUE, underTest.transform(TargetKeyResolver.DEFAULT_KEY_VALUE));
assertEquals("AA", underTest.transform("AA")); // default modulo 0 does nothing
HashMap<String, String> properties = new HashMap<>();
final int modulo = 2;
properties.put(ConsistentHashModulo.MODULO, String.valueOf(modulo));
underTest.init(properties);
String hash1 = underTest.transform("AAA");
int v1 = Integer.parseInt(hash1);
String hash2 = underTest.transform("BBB");
int v2 = Integer.parseInt(hash2);
assertNotEquals(hash1, hash2);
assertNotEquals(v1, v2);
assertTrue(v1 < modulo && v2 < modulo);
}
}

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.transformer;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class TransformerFactoryResolverTest {
@Test
public void resolveOk() throws Exception {
TransformerFactoryResolver instance = TransformerFactoryResolver.getInstance();
assertNotNull(instance.resolve(ConsistentHashModulo.NAME));
}
@Test(expected = ClassNotFoundException.class)
public void resolveError() throws Exception {
TransformerFactoryResolver instance = TransformerFactoryResolver.getInstance();
assertNotNull(instance.resolve("NOT PRESENT"));
}
@Test
public void keyFromName() throws Exception {
TransformerFactoryResolver instance = TransformerFactoryResolver.getInstance();
assertEquals("New", instance.keyFromClassName("NewTransformerFactory"));
}
}

View File

@ -1964,7 +1964,7 @@ public abstract class ActiveMQTestBase extends Assert {
}
}
if (bindingCount == expectedBindingCount && totConsumers == expectedConsumerCount) {
if (bindingCount == expectedBindingCount && (expectedConsumerCount == -1 || totConsumers == expectedConsumerCount)) {
return true;
}

View File

@ -158,6 +158,14 @@
<target-key-filter>^[^.]+</target-key-filter>
<local-target-filter>DEFAULT</local-target-filter>
</broker-balancer>
<broker-balancer name="simple-local-with-transformer">
<target-key>CLIENT_ID</target-key>
<target-key-filter>^[^.]+</target-key-filter>
<local-target-filter>DEFAULT</local-target-filter>
<local-target-key-transformer name="CONSISTENT_HASH_MODULO">
<property key="modulo" value="2"></property>
</local-target-key-transformer>
</broker-balancer>
<broker-balancer name="simple-balancer">
<target-key>USER_NAME</target-key>
<policy name="FIRST_ELEMENT"/>

View File

@ -149,6 +149,14 @@
<target-key-filter>^[^.]+</target-key-filter>
<local-target-filter>DEFAULT</local-target-filter>
</broker-balancer>
<broker-balancer name="simple-local-with-transformer">
<target-key>CLIENT_ID</target-key>
<target-key-filter>^[^.]+</target-key-filter>
<local-target-filter>DEFAULT</local-target-filter>
<local-target-key-transformer name="CONSISTENT_HASH_MODULO">
<property key="modulo" value="2"></property>
</local-target-key-transformer>
</broker-balancer>
<broker-balancer name="simple-balancer">
<target-key>USER_NAME</target-key>
<policy name="FIRST_ELEMENT"/>

View File

@ -106,12 +106,19 @@ So a broker balancer with the cache enabled doesn't strictly follow the configur
By default, the cache is enabled and will never timeout. See below
for more details about setting the `cache-timeout` parameter.
## Key transformers
A `local-target-key-transformer` allows target key transformation before matching against any local-target-filter. One use case is
CLIENT_ID sharding across a cluster of N brokers. With a consistent hash % N transformation, each client id
can map exclusively to just one of the brokers. The included transformers are:
* `CONSISTENT_HASH_MODULO` that takes a single `modulo` property to configure the bound.
## Defining broker balancers
A broker balancer is defined by the `broker-balancer` element, it includes the following items:
* the `name` attribute defines the name of the broker balancer and is used to reference the balancer from an acceptor;
* the `target-key` element defines what key to select a target broker, the supported values are: `CLIENT_ID`, `SNI_HOST`, `SOURCE_IP`, `USER_NAME`, `ROLE_NAME`, default is `SOURCE_IP`, see [target key](#target-key) for further details;
* the `target-key-filter` element defines a regular expression to filter the resolved keys;
* the `local-target-filter` element defines a regular expression to match the keys that have to return a local target;
* the `local-target-key-transformer` element defines a key transformer, see [key transformers](#key-transformers);
* the `cache-timeout` element is the time period for a target broker to remain in the cache, measured in milliseconds, setting `0` will disable the cache, default is `-1`, meaning no expiration;
* the `pool` element defines the pool to group the target brokers, see [pools](#pools).
* the `policy` element defines the policy used to select the target brokers from the pool, see [policies](#policies);

View File

@ -0,0 +1,325 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.balancing;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TopicSubscriber;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
import org.apache.activemq.artemis.core.config.balancing.NamedPropertyConfiguration;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManagerFactory;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetKeyResolver;
import org.apache.activemq.artemis.core.server.balancing.transformer.ConsistentHashModulo;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class AutoClientIDShardClusterTest extends BalancingTestBase {
@Parameterized.Parameters(name = "protocol: {0}")
public static Collection<Object[]> data() {
final String[] protocols = new String[] {AMQP_PROTOCOL, CORE_PROTOCOL, OPENWIRE_PROTOCOL};
Collection<Object[]> data = new ArrayList<>();
for (String protocol : protocols) {
data.add(new Object[] {protocol});
}
return data;
}
private final String protocol;
final int numMessages = 50;
AtomicInteger toSend = new AtomicInteger(numMessages);
public AutoClientIDShardClusterTest(String protocol) {
this.protocol = protocol;
}
protected void setupServers() throws Exception {
for (int i = 0; i < 2; i++) {
setupLiveServer(i, true, HAType.SharedNothingReplication, true, false);
servers[i].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
servers[i].addProtocolManagerFactory(new OpenWireProtocolManagerFactory());
}
setupClusterConnection("cluster0", name.getMethodName(), MessageLoadBalancingType.ON_DEMAND, 1, true, 0, 1);
setupClusterConnection("cluster1", name.getMethodName(), MessageLoadBalancingType.ON_DEMAND, 1, true, 1, 0);
toSend.set(numMessages);
}
Runnable producer = new Runnable() {
final AtomicInteger producerSeq = new AtomicInteger();
@Override
public void run() {
while (toSend.get() > 0) {
try {
ConnectionFactory connectionFactory = createFactory(protocol, "producer", "admin", "admin");
try (Connection connection = connectionFactory.createConnection()) {
connection.start();
try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
javax.jms.Topic topic = session.createTopic(name.getMethodName());
try (MessageProducer producer = session.createProducer(topic)) {
for (int i = 0; i < 10 && toSend.get() > 0; i++) {
Message message = session.createTextMessage();
message.setIntProperty("SEQ", producerSeq.get() + 1);
producer.send(message);
producerSeq.incrementAndGet();
toSend.decrementAndGet();
}
TimeUnit.MILLISECONDS.sleep(100);
}
}
}
} catch (Exception ok) {
}
}
}
};
class DurableSub implements Runnable {
final String id;
int receivedInOrder = -1;
int lastReceived;
int maxReceived;
AtomicBoolean consumerDone = new AtomicBoolean();
AtomicBoolean orderShot = new AtomicBoolean();
CountDownLatch registered = new CountDownLatch(1);
DurableSub(String id) {
this.id = id;
}
@Override
public void run() {
while (!consumerDone.get()) {
try {
ConnectionFactory connectionFactory = createFactory(protocol, "ClientId-" + id, "admin", "admin");
Connection connection = null;
try {
connection = connectionFactory.createConnection();
connection.start();
try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
javax.jms.Topic topic = session.createTopic(name.getMethodName());
try (TopicSubscriber durableSubscriber = session.createDurableSubscriber(topic, "Sub-" + id)) {
registered.countDown();
for (int i = 0; i < 5; i++) {
Message message = durableSubscriber.receive(500);
if (message != null) {
lastReceived = message.getIntProperty("SEQ");
if (lastReceived > maxReceived) {
maxReceived = lastReceived;
}
if (receivedInOrder < 0) {
receivedInOrder = lastReceived;
} else if (receivedInOrder == lastReceived - 1) {
receivedInOrder++;
} else {
if (!orderShot.get()) {
System.err.println("Sub: " + id + ", received: out of order " + lastReceived + ", last in order: " + receivedInOrder);
}
orderShot.set(true);
}
} else {
// no point trying again if there is nothing for us now.
break;
}
}
TimeUnit.MILLISECONDS.sleep(500);
}
}
} finally {
if (connection != null) {
connection.close(); // seems openwire not jms2.0 auto closable always
}
}
} catch (Exception ok) {
}
}
}
}
@Ignore("not totally reliable, but does show the root cause of the problem being solved")
public void testWithoutOutSharding() throws Exception {
setupServers();
startServers(0, 1);
// two bouncy durable consumers
DurableSub sub0 = new DurableSub("0");
DurableSub sub1 = new DurableSub("1");
ExecutorService executorService = Executors.newFixedThreadPool(3);
try {
executorService.submit(sub0);
executorService.submit(sub1);
// waiting for registration before production to give bridges a chance
assertTrue(sub0.registered.await(20, TimeUnit.SECONDS));
assertTrue(sub1.registered.await(20, TimeUnit.SECONDS));
assertTrue(waitForBindings(servers[0], name.getMethodName(), true, 2, -1, 10000));
assertTrue(waitForBindings(servers[1], name.getMethodName(), true, 2, -1, 10000));
// wait for remote bindings!
assertTrue(waitForBindings(servers[0], name.getMethodName(), false, 2, -1, 10000));
assertTrue(waitForBindings(servers[1], name.getMethodName(), false, 2, -1, 10000));
// produce a few every second with failover randomize=true so we produce on all nodes
executorService.submit(producer);
assertTrue("All sent", Wait.waitFor(() -> toSend.get() == 0));
assertTrue("All received sub0", Wait.waitFor(() -> sub0.maxReceived == numMessages));
assertTrue("All received sub1", Wait.waitFor(() -> sub1.maxReceived == numMessages));
// with bouncing, one 'may' be out of order, hence ignored
assertTrue(sub0.orderShot.get() || sub1.orderShot.get());
} finally {
sub0.consumerDone.set(true);
sub1.consumerDone.set(true);
executorService.shutdown();
stopServers(0, 1);
}
}
@Test
public void testWithConsistentHashClientIDModTwo() throws Exception {
setupServers();
addBalancerWithClientIdConsistentHashMod();
startServers(0, 1);
// two bouncy durable consumers
DurableSub sub0 = new DurableSub("0");
DurableSub sub1 = new DurableSub("1");
ExecutorService executorService = Executors.newFixedThreadPool(3);
try {
executorService.submit(sub0);
executorService.submit(sub1);
// waiting for registration before production to give bridges a chance
assertTrue(sub0.registered.await(5, TimeUnit.SECONDS));
assertTrue(sub1.registered.await(5, TimeUnit.SECONDS));
assertTrue(waitForBindings(servers[0], name.getMethodName(), true, 1, 1, 2000));
assertTrue(waitForBindings(servers[1], name.getMethodName(), true, 1, 1, 2000));
// wait for remote bindings!
assertTrue(waitForBindings(servers[0], name.getMethodName(), false, 1, 1, 10000));
assertTrue(waitForBindings(servers[1], name.getMethodName(), false, 1, 1, 10000));
// produce a few every second with failover randomize=true so we produce on all nodes
executorService.submit(producer);
assertTrue("All sent", Wait.waitFor(() -> toSend.get() == 0));
assertTrue("All received sub0", Wait.waitFor(() -> sub0.maxReceived == numMessages));
assertTrue("All received sub1", Wait.waitFor(() -> sub1.maxReceived == numMessages));
// with partition, none will be out of order
assertFalse(sub0.orderShot.get() && sub1.orderShot.get());
} finally {
sub0.consumerDone.set(true);
sub1.consumerDone.set(true);
executorService.shutdown();
stopServers(0, 1);
}
}
private void addBalancerWithClientIdConsistentHashMod() {
final int numberOfNodes = 2;
for (int node = 0; node < numberOfNodes; node++) {
Configuration configuration = servers[node].getConfiguration();
BrokerBalancerConfiguration brokerBalancerConfiguration = new BrokerBalancerConfiguration().setName(BROKER_BALANCER_NAME);
brokerBalancerConfiguration.setTargetKey(TargetKey.CLIENT_ID).setLocalTargetFilter(TargetKeyResolver.DEFAULT_KEY_VALUE + "|" + node);
NamedPropertyConfiguration transformerConfig = new NamedPropertyConfiguration();
transformerConfig.setName(ConsistentHashModulo.NAME);
HashMap<String, String> properties = new HashMap<>();
properties.put(ConsistentHashModulo.MODULO, String.valueOf(numberOfNodes));
transformerConfig.setProperties(properties);
brokerBalancerConfiguration.setTransformerConfiguration(transformerConfig);
configuration.setBalancerConfigurations(Collections.singletonList(brokerBalancerConfiguration));
TransportConfiguration acceptor = getDefaultServerAcceptor(node);
acceptor.getParams().put("redirect-to", BROKER_BALANCER_NAME);
}
}
protected ConnectionFactory createFactory(String protocol, String clientID, String user, String password) throws Exception {
StringBuilder urlBuilder = new StringBuilder();
switch (protocol) {
case CORE_PROTOCOL: {
urlBuilder.append("(tcp://localhost:61616,tcp://localhost:61617)?connectionLoadBalancingPolicyClassName=org.apache.activemq.artemis.api.core.client.loadbalance.RandomConnectionLoadBalancingPolicy");
urlBuilder.append("&clientID=");
urlBuilder.append(clientID);
return new ActiveMQConnectionFactory(urlBuilder.toString(), user, password);
}
case AMQP_PROTOCOL: {
urlBuilder.append("failover:(amqp://localhost:61616,amqp://localhost:61617)?failover.randomize=true");
urlBuilder.append("&jms.clientID=");
urlBuilder.append(clientID);
return new JmsConnectionFactory(user, password, urlBuilder.toString());
}
case OPENWIRE_PROTOCOL: {
urlBuilder.append("failover:(tcp://localhost:61616,tcp://localhost:61617)?randomize=true&maxReconnectAttempts=0&startupMaxReconnectAttempts=0");
urlBuilder.append("&jms.clientID=");
urlBuilder.append(clientID);
return new org.apache.activemq.ActiveMQConnectionFactory(user, password, urlBuilder.toString());
}
default:
throw new IllegalStateException("Unexpected value: " + protocol);
}
}
}

View File

@ -27,7 +27,7 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
import org.apache.activemq.artemis.core.config.balancing.PolicyConfiguration;
import org.apache.activemq.artemis.core.config.balancing.NamedPropertyConfiguration;
import org.apache.activemq.artemis.core.config.balancing.PoolConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
@ -88,7 +88,7 @@ public class BalancingTestBase extends ClusterTestBase {
brokerBalancerConfiguration.setTargetKey(targetKey).setLocalTargetFilter(localTargetFilter)
.setPoolConfiguration(new PoolConfiguration().setCheckPeriod(1000).setQuorumSize(quorumSize)
.setLocalTargetEnabled(localTargetEnabled).setClusterConnection(clusterConnection))
.setPolicyConfiguration(new PolicyConfiguration().setName(policyName).setProperties(properties));
.setPolicyConfiguration(new NamedPropertyConfiguration().setName(policyName).setProperties(properties));
configuration.setBalancerConfigurations(Collections.singletonList(brokerBalancerConfiguration));
@ -105,7 +105,7 @@ public class BalancingTestBase extends ClusterTestBase {
brokerBalancerConfiguration.setTargetKey(targetKey).setLocalTargetFilter(localTargetFilter)
.setPoolConfiguration(new PoolConfiguration().setCheckPeriod(1000).setQuorumSize(quorumSize)
.setLocalTargetEnabled(localTargetEnabled).setDiscoveryGroupName("dg1"))
.setPolicyConfiguration(new PolicyConfiguration().setName(policyName).setProperties(properties));
.setPolicyConfiguration(new NamedPropertyConfiguration().setName(policyName).setProperties(properties));
configuration.setBalancerConfigurations(Collections.singletonList(brokerBalancerConfiguration));
@ -129,7 +129,7 @@ public class BalancingTestBase extends ClusterTestBase {
brokerBalancerConfiguration.setTargetKey(targetKey).setLocalTargetFilter(localTargetFilter)
.setPoolConfiguration(new PoolConfiguration().setCheckPeriod(1000).setQuorumSize(quorumSize)
.setLocalTargetEnabled(localTargetEnabled).setStaticConnectors(staticConnectors))
.setPolicyConfiguration(new PolicyConfiguration().setName(policyName).setProperties(properties));
.setPolicyConfiguration(new NamedPropertyConfiguration().setName(policyName).setProperties(properties));
configuration.setBalancerConfigurations(Collections.singletonList(brokerBalancerConfiguration));
@ -215,7 +215,7 @@ public class BalancingTestBase extends ClusterTestBase {
urlBuilder.append(")");
}
urlBuilder.append("?failover.startupMaxReconnectAttempts=" + retries);
urlBuilder.append("?failover.startupMaxReconnectAttempts=" + retries + "&failover.randomize=true");
if (clientID != null) {
urlBuilder.append("&jms.clientID=");

View File

@ -88,15 +88,10 @@ public class TargetKeyTest extends BalancingTestBase {
@Before
public void setup() throws Exception {
PolicyFactoryResolver.getInstance().registerPolicyFactory(
PolicyFactoryResolver.getInstance().registerPolicyFactory(MOCK_POLICY_NAME,
new PolicyFactory() {
@Override
public String[] getSupportedPolicies() {
return new String[] {MOCK_POLICY_NAME};
}
@Override
public Policy createPolicy(String policyName) {
public Policy create() {
return new FirstElementPolicy(MOCK_POLICY_NAME) {
@Override
public Target selectTarget(List<Target> targets, String key) {