ARTEMIS-3708 Collapse key transformer into policy
This commit is contained in:
parent
2a26e46a8c
commit
603462a1a5
|
@ -29,7 +29,6 @@ public class ConnectionRouterConfiguration implements Serializable {
|
|||
private CacheConfiguration cacheConfiguration = null;
|
||||
private PoolConfiguration poolConfiguration = null;
|
||||
private NamedPropertyConfiguration policyConfiguration = null;
|
||||
private NamedPropertyConfiguration transformerConfiguration = null;
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
|
@ -93,12 +92,4 @@ public class ConnectionRouterConfiguration implements Serializable {
|
|||
this.poolConfiguration = poolConfiguration;
|
||||
return this;
|
||||
}
|
||||
|
||||
public void setTransformerConfiguration(NamedPropertyConfiguration configuration) {
|
||||
this.transformerConfiguration = configuration;
|
||||
}
|
||||
|
||||
public NamedPropertyConfiguration getTransformerConfiguration() {
|
||||
return transformerConfiguration;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -94,7 +94,6 @@ import org.apache.activemq.artemis.core.server.JournalType;
|
|||
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
|
||||
import org.apache.activemq.artemis.core.server.routing.policies.PolicyFactoryResolver;
|
||||
import org.apache.activemq.artemis.core.server.routing.KeyType;
|
||||
import org.apache.activemq.artemis.core.server.routing.transformer.TransformerFactoryResolver;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||
import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
|
||||
import org.apache.activemq.artemis.core.server.metrics.ActiveMQMetricsPlugin;
|
||||
|
@ -2686,10 +2685,6 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
poolConfiguration = new PoolConfiguration();
|
||||
parsePoolConfiguration((Element) child, config, poolConfiguration);
|
||||
connectionRouterConfiguration.setPoolConfiguration(poolConfiguration);
|
||||
} else if (child.getNodeName().equals("local-target-key-transformer")) {
|
||||
policyConfiguration = new NamedPropertyConfiguration();
|
||||
parseTransformerConfiguration((Element) child, policyConfiguration);
|
||||
connectionRouterConfiguration.setTransformerConfiguration(policyConfiguration);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2704,16 +2699,6 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
cacheConfiguration.getTimeout(), Validators.GE_ZERO));
|
||||
}
|
||||
|
||||
private void parseTransformerConfiguration(final Element e, final NamedPropertyConfiguration policyConfiguration) throws ClassNotFoundException {
|
||||
String name = e.getAttribute("name");
|
||||
|
||||
TransformerFactoryResolver.getInstance().resolve(name);
|
||||
|
||||
policyConfiguration.setName(name);
|
||||
|
||||
policyConfiguration.setProperties(getMapOfChildPropertyElements(e));
|
||||
}
|
||||
|
||||
private void parsePolicyConfiguration(final Element e, final NamedPropertyConfiguration policyConfiguration) throws ClassNotFoundException {
|
||||
String name = e.getAttribute("name");
|
||||
|
||||
|
|
|
@ -119,12 +119,12 @@ public class ConnectionRouterControlImpl extends AbstractControl implements Conn
|
|||
|
||||
@Override
|
||||
public void setTargetKeyFilter(String regExp) {
|
||||
connectionRouter.getTargetKeyResolver().setKeyFilter(regExp);
|
||||
connectionRouter.getKeyResolver().setKeyFilter(regExp);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTargetKeyFilter() {
|
||||
return connectionRouter.getTargetKeyResolver().getKeyFilter();
|
||||
return connectionRouter.getKeyResolver().getKeyFilter();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -24,7 +24,6 @@ import org.apache.activemq.artemis.core.server.routing.policies.Policy;
|
|||
import org.apache.activemq.artemis.core.server.routing.pools.Pool;
|
||||
import org.apache.activemq.artemis.core.server.routing.targets.Target;
|
||||
import org.apache.activemq.artemis.core.server.routing.targets.TargetResult;
|
||||
import org.apache.activemq.artemis.core.server.routing.transformer.KeyTransformer;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
|
@ -51,8 +50,6 @@ public class ConnectionRouter implements ActiveMQComponent {
|
|||
|
||||
private final Policy policy;
|
||||
|
||||
private final KeyTransformer transformer;
|
||||
|
||||
private final Cache cache;
|
||||
|
||||
private volatile boolean started = false;
|
||||
|
@ -61,10 +58,14 @@ public class ConnectionRouter implements ActiveMQComponent {
|
|||
return name;
|
||||
}
|
||||
|
||||
public KeyType getTargetKey() {
|
||||
public KeyType getKey() {
|
||||
return keyType;
|
||||
}
|
||||
|
||||
public KeyResolver getKeyResolver() {
|
||||
return keyResolver;
|
||||
}
|
||||
|
||||
public Target getLocalTarget() {
|
||||
return localTarget.getTarget();
|
||||
}
|
||||
|
@ -73,6 +74,14 @@ public class ConnectionRouter implements ActiveMQComponent {
|
|||
return localTargetFilter != null ? localTargetFilter.pattern() : null;
|
||||
}
|
||||
|
||||
public void setLocalTargetFilter(String regExp) {
|
||||
if (regExp == null || regExp.trim().isEmpty()) {
|
||||
this.localTargetFilter = null;
|
||||
} else {
|
||||
this.localTargetFilter = Pattern.compile(regExp);
|
||||
}
|
||||
}
|
||||
|
||||
public Pool getPool() {
|
||||
return pool;
|
||||
}
|
||||
|
@ -98,14 +107,11 @@ public class ConnectionRouter implements ActiveMQComponent {
|
|||
final String localTargetFilter,
|
||||
final Cache cache,
|
||||
final Pool pool,
|
||||
final Policy policy,
|
||||
KeyTransformer transformer) {
|
||||
final Policy policy) {
|
||||
this.name = name;
|
||||
|
||||
this.keyType = keyType;
|
||||
|
||||
this.transformer = transformer;
|
||||
|
||||
this.keyResolver = new KeyResolver(keyType, targetKeyFilter);
|
||||
|
||||
this.localTarget = new TargetResult(localTarget);
|
||||
|
@ -158,8 +164,11 @@ public class ConnectionRouter implements ActiveMQComponent {
|
|||
}
|
||||
|
||||
public TargetResult getTarget(String key) {
|
||||
if (policy != null && !KeyResolver.NULL_KEY_VALUE.equals(key)) {
|
||||
key = policy.transformKey(key);
|
||||
}
|
||||
|
||||
if (this.localTargetFilter != null && this.localTargetFilter.matcher(transform(key)).matches()) {
|
||||
if (this.localTargetFilter != null && this.localTargetFilter.matcher(key).matches()) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("The " + keyType + "[" + key + "] matches the localTargetFilter " + localTargetFilter.pattern());
|
||||
}
|
||||
|
@ -216,27 +225,4 @@ public class ConnectionRouter implements ActiveMQComponent {
|
|||
|
||||
return result != null ? result : TargetResult.REFUSED_UNAVAILABLE_RESULT;
|
||||
}
|
||||
|
||||
public void setLocalTargetFilter(String regExp) {
|
||||
if (regExp == null || regExp.trim().isEmpty()) {
|
||||
this.localTargetFilter = null;
|
||||
} else {
|
||||
this.localTargetFilter = Pattern.compile(regExp);
|
||||
}
|
||||
}
|
||||
|
||||
public KeyResolver getTargetKeyResolver() {
|
||||
return keyResolver;
|
||||
}
|
||||
|
||||
private String transform(String key) {
|
||||
String result = key;
|
||||
if (transformer != null) {
|
||||
result = transformer.transform(key);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Key: " + key + ", transformed to " + result);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,9 +42,6 @@ import org.apache.activemq.artemis.core.server.routing.targets.ActiveMQTargetFac
|
|||
import org.apache.activemq.artemis.core.server.routing.targets.LocalTarget;
|
||||
import org.apache.activemq.artemis.core.server.routing.targets.Target;
|
||||
import org.apache.activemq.artemis.core.server.routing.targets.TargetFactory;
|
||||
import org.apache.activemq.artemis.core.server.routing.transformer.KeyTransformer;
|
||||
import org.apache.activemq.artemis.core.server.routing.transformer.TransformerFactory;
|
||||
import org.apache.activemq.artemis.core.server.routing.transformer.TransformerFactoryResolver;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
|
@ -115,14 +112,8 @@ public final class ConnectionRouterManager implements ActiveMQComponent {
|
|||
policy = deployPolicy(policyConfiguration, pool);
|
||||
}
|
||||
|
||||
KeyTransformer transformer = null;
|
||||
NamedPropertyConfiguration transformerConfiguration = config.getTransformerConfiguration();
|
||||
if (transformerConfiguration != null) {
|
||||
transformer = deployTransformer(transformerConfiguration);
|
||||
}
|
||||
|
||||
ConnectionRouter connectionRouter = new ConnectionRouter(config.getName(), config.getKeyType(),
|
||||
config.getKeyFilter(), localTarget, config.getLocalTargetFilter(), cache, pool, policy, transformer);
|
||||
config.getKeyFilter(), localTarget, config.getLocalTargetFilter(), cache, pool, policy);
|
||||
|
||||
connectionRouters.put(connectionRouter.getName(), connectionRouter);
|
||||
|
||||
|
@ -202,16 +193,6 @@ public final class ConnectionRouterManager implements ActiveMQComponent {
|
|||
return policy;
|
||||
}
|
||||
|
||||
private KeyTransformer deployTransformer(NamedPropertyConfiguration configuration) throws Exception {
|
||||
TransformerFactory factory = TransformerFactoryResolver.getInstance().resolve(configuration.getName());
|
||||
|
||||
KeyTransformer transformer = factory.create();
|
||||
|
||||
transformer.init(configuration.getProperties());
|
||||
|
||||
return transformer;
|
||||
}
|
||||
|
||||
public ConnectionRouter getRouter(String name) {
|
||||
return connectionRouters.get(name);
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@ import java.util.regex.Matcher;
|
|||
import java.util.regex.Pattern;
|
||||
|
||||
public class KeyResolver {
|
||||
public static final String DEFAULT_KEY_VALUE = "DEFAULT";
|
||||
public static final String NULL_KEY_VALUE = "NULL";
|
||||
|
||||
|
||||
private static final Logger logger = Logger.getLogger(KeyResolver.class);
|
||||
|
@ -120,7 +120,7 @@ public class KeyResolver {
|
|||
}
|
||||
|
||||
if (keyValue == null) {
|
||||
keyValue = DEFAULT_KEY_VALUE;
|
||||
keyValue = NULL_KEY_VALUE;
|
||||
} else if (keyFilter != null) {
|
||||
Matcher keyMatcher = keyFilter.matcher(keyValue);
|
||||
|
||||
|
@ -128,12 +128,17 @@ public class KeyResolver {
|
|||
keyValue = keyMatcher.group();
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debugf("keyValue with filter %s: %s", keyFilter, keyValue);
|
||||
logger.debugf("keyValue for %s matches filter %s: %s", key, keyFilter, keyValue);
|
||||
}
|
||||
} else {
|
||||
keyValue = NULL_KEY_VALUE;
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debugf("keyValue for %s doesn't matches filter %s", key, keyFilter);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return keyValue;
|
||||
}
|
||||
|
||||
|
|
|
@ -15,33 +15,40 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.core.server.routing.transformer;
|
||||
package org.apache.activemq.artemis.core.server.routing.policies;
|
||||
|
||||
import org.apache.activemq.artemis.core.server.routing.targets.Target;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.activemq.artemis.core.server.routing.policies.ConsistentHashPolicy;
|
||||
import org.apache.activemq.artemis.core.server.routing.KeyResolver;
|
||||
|
||||
public class ConsistentHashModulo implements KeyTransformer {
|
||||
public class ConsistentHashModuloPolicy extends ConsistentHashPolicy {
|
||||
public static final String NAME = "CONSISTENT_HASH_MODULO";
|
||||
public static final String MODULO = "modulo";
|
||||
|
||||
public static final String MODULO = "MODULO";
|
||||
|
||||
int modulo = 0;
|
||||
|
||||
@Override
|
||||
public String transform(String str) {
|
||||
if (KeyResolver.DEFAULT_KEY_VALUE.equals(str)) {
|
||||
// we only want to transform resolved keys
|
||||
return str;
|
||||
}
|
||||
if (modulo == 0) {
|
||||
return str;
|
||||
}
|
||||
int hash = ConsistentHashPolicy.getHash(str);
|
||||
return String.valueOf( hash % modulo );
|
||||
public ConsistentHashModuloPolicy() {
|
||||
super(NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(Map<String, String> properties) {
|
||||
modulo = Integer.parseInt(properties.get(MODULO));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String transformKey(String key) {
|
||||
if (modulo > 0) {
|
||||
return String.valueOf(getHash(key) % modulo);
|
||||
}
|
||||
|
||||
return key;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Target selectTarget(List<Target> targets, String key) {
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -31,6 +31,10 @@ public class ConsistentHashPolicy extends AbstractPolicy {
|
|||
super(NAME);
|
||||
}
|
||||
|
||||
protected ConsistentHashPolicy(String name) {
|
||||
super(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Target selectTarget(List<Target> targets, String key) {
|
||||
if (targets.size() > 1) {
|
||||
|
@ -56,7 +60,7 @@ public class ConsistentHashPolicy extends AbstractPolicy {
|
|||
return null;
|
||||
}
|
||||
|
||||
public static int getHash(String str) {
|
||||
protected int getHash(String str) {
|
||||
final int FNV_INIT = 0x811c9dc5;
|
||||
final int FNV_PRIME = 0x01000193;
|
||||
|
||||
|
|
|
@ -32,5 +32,11 @@ public interface Policy {
|
|||
|
||||
void init(Map<String, String> properties);
|
||||
|
||||
Target selectTarget(List<Target> targets, String key);
|
||||
default String transformKey(String key) {
|
||||
return key;
|
||||
}
|
||||
|
||||
default Target selectTarget(List<Target> targets, String key) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -40,6 +40,7 @@ public class PolicyFactoryResolver {
|
|||
policyFactories.put(FirstElementPolicy.NAME, () -> new FirstElementPolicy());
|
||||
policyFactories.put(LeastConnectionsPolicy.NAME, () -> new LeastConnectionsPolicy());
|
||||
policyFactories.put(RoundRobinPolicy.NAME, () -> new RoundRobinPolicy());
|
||||
policyFactories.put(ConsistentHashModuloPolicy.NAME, () -> new ConsistentHashModuloPolicy());
|
||||
|
||||
loadPolicyFactories();
|
||||
}
|
||||
|
|
|
@ -1,30 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.core.server.routing.transformer;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public interface KeyTransformer {
|
||||
|
||||
default void init(Map<String, String> properties) {
|
||||
}
|
||||
|
||||
default String transform(String key) {
|
||||
return key;
|
||||
}
|
||||
}
|
|
@ -1,22 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.core.server.routing.transformer;
|
||||
|
||||
public interface TransformerFactory {
|
||||
KeyTransformer create();
|
||||
}
|
|
@ -1,62 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.core.server.routing.transformer;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.ServiceLoader;
|
||||
|
||||
import org.apache.activemq.artemis.core.server.routing.ConnectionRouter;
|
||||
|
||||
public class TransformerFactoryResolver {
|
||||
private static TransformerFactoryResolver instance;
|
||||
|
||||
public static TransformerFactoryResolver getInstance() {
|
||||
if (instance == null) {
|
||||
instance = new TransformerFactoryResolver();
|
||||
}
|
||||
return instance;
|
||||
}
|
||||
|
||||
private final Map<String, TransformerFactory> factories = new HashMap<>();
|
||||
|
||||
private TransformerFactoryResolver() {
|
||||
factories.put(ConsistentHashModulo.NAME, () -> new ConsistentHashModulo());
|
||||
loadFactories(); // let service loader override
|
||||
}
|
||||
|
||||
public TransformerFactory resolve(String policyName) throws ClassNotFoundException {
|
||||
TransformerFactory factory = factories.get(policyName);
|
||||
if (factory == null) {
|
||||
throw new ClassNotFoundException("No TransformerFactory found for " + policyName);
|
||||
}
|
||||
return factory;
|
||||
}
|
||||
|
||||
private void loadFactories() {
|
||||
ServiceLoader<TransformerFactory> serviceLoader = ServiceLoader.load(
|
||||
TransformerFactory.class, ConnectionRouter.class.getClassLoader());
|
||||
for (TransformerFactory factory : serviceLoader) {
|
||||
factories.put(keyFromClassName(factory.getClass().getName()), factory);
|
||||
}
|
||||
}
|
||||
|
||||
String keyFromClassName(String name) {
|
||||
return name.substring(0, name.indexOf("TransformerFactory"));
|
||||
}
|
||||
}
|
|
@ -2163,10 +2163,10 @@
|
|||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
<xsd:element name="local-target-key-transformer" type="connectionRouterKeyTransformerType" maxOccurs="1" minOccurs="0">
|
||||
<xsd:element name="key-transformer" type="connectionRouterKeyTransformerType" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
the local target key transformer configuration
|
||||
the key transformer configuration
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
|
|
@ -55,6 +55,7 @@ import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType
|
|||
import org.apache.activemq.artemis.core.server.JournalType;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
|
||||
import org.apache.activemq.artemis.core.server.routing.policies.ConsistentHashModuloPolicy;
|
||||
import org.apache.activemq.artemis.core.server.routing.policies.ConsistentHashPolicy;
|
||||
import org.apache.activemq.artemis.core.server.routing.policies.FirstElementPolicy;
|
||||
import org.apache.activemq.artemis.core.server.routing.policies.LeastConnectionsPolicy;
|
||||
|
@ -76,8 +77,6 @@ import org.junit.Assert;
|
|||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.apache.activemq.artemis.core.server.routing.transformer.ConsistentHashModulo.MODULO;
|
||||
|
||||
public class FileConfigurationTest extends ConfigurationImplTest {
|
||||
|
||||
@BeforeClass
|
||||
|
@ -280,9 +279,8 @@ public class FileConfigurationTest extends ConfigurationImplTest {
|
|||
Assert.assertEquals(bc.getKeyType(), KeyType.CLIENT_ID);
|
||||
Assert.assertNotNull(bc.getLocalTargetFilter());
|
||||
Assert.assertNotNull(bc.getKeyFilter());
|
||||
Assert.assertNull(bc.getPolicyConfiguration());
|
||||
Assert.assertNotNull(bc.getTransformerConfiguration());
|
||||
Assert.assertNotNull(bc.getTransformerConfiguration().getProperties().get(MODULO));
|
||||
Assert.assertNotNull(bc.getPolicyConfiguration());
|
||||
Assert.assertNotNull(bc.getPolicyConfiguration().getProperties().get(ConsistentHashModuloPolicy.MODULO));
|
||||
} else if (bc.getName().equals("simple-router")) {
|
||||
Assert.assertEquals(bc.getKeyType(), KeyType.USER_NAME);
|
||||
Assert.assertNull(bc.getLocalTargetFilter());
|
||||
|
|
|
@ -17,15 +17,15 @@
|
|||
|
||||
package org.apache.activemq.artemis.core.server.routing;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Collections;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.config.routing.ConnectionRouterConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.routing.NamedPropertyConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.routing.PoolConfiguration;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.routing.policies.ConsistentHashModuloPolicy;
|
||||
import org.apache.activemq.artemis.core.server.routing.policies.ConsistentHashPolicy;
|
||||
import org.apache.activemq.artemis.core.server.routing.transformer.ConsistentHashModulo;
|
||||
import org.apache.activemq.artemis.core.server.management.ManagementService;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
@ -96,12 +96,10 @@ public class ConnectionRouterManagerTest {
|
|||
|
||||
ConnectionRouterConfiguration connectionRouterConfiguration = new ConnectionRouterConfiguration();
|
||||
connectionRouterConfiguration.setName("partition-local-consistent-hash").setKeyType(KeyType.CLIENT_ID).setLocalTargetFilter(String.valueOf(2));
|
||||
NamedPropertyConfiguration policyConfig = new NamedPropertyConfiguration();
|
||||
policyConfig.setName(ConsistentHashModulo.NAME);
|
||||
HashMap<String, String> properties = new HashMap<>();
|
||||
properties.put(ConsistentHashModulo.MODULO, String.valueOf(2));
|
||||
policyConfig.setProperties(properties);
|
||||
connectionRouterConfiguration.setTransformerConfiguration(policyConfig);
|
||||
NamedPropertyConfiguration policyConfig = new NamedPropertyConfiguration()
|
||||
.setName(ConsistentHashModuloPolicy.NAME)
|
||||
.setProperties(Collections.singletonMap(ConsistentHashModuloPolicy.MODULO, String.valueOf(2)));
|
||||
connectionRouterConfiguration.setPolicyConfiguration(policyConfig);
|
||||
|
||||
|
||||
underTest.deployConnectionRouter(connectionRouterConfiguration);
|
||||
|
|
|
@ -22,12 +22,11 @@ import static org.mockito.Mockito.mock;
|
|||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.routing.policies.AbstractPolicy;
|
||||
import org.apache.activemq.artemis.core.server.routing.policies.Policy;
|
||||
import org.apache.activemq.artemis.core.server.routing.pools.Pool;
|
||||
import org.apache.activemq.artemis.core.server.routing.targets.LocalTarget;
|
||||
import org.apache.activemq.artemis.core.server.routing.targets.Target;
|
||||
import org.apache.activemq.artemis.core.server.routing.targets.TargetResult;
|
||||
import org.apache.activemq.artemis.core.server.routing.transformer.KeyTransformer;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
|
@ -49,26 +48,24 @@ public class ConnectionRouterTest {
|
|||
|
||||
@Test
|
||||
public void getTarget() {
|
||||
Pool pool = null;
|
||||
Policy policy = null;
|
||||
underTest = new ConnectionRouter("test", KeyType.CLIENT_ID, "^.{3}",
|
||||
localTarget, "^FOO.*", null, pool, policy, null);
|
||||
localTarget, "^FOO.*", null, null, policy);
|
||||
assertEquals( localTarget, underTest.getTarget("FOO_EE").getTarget());
|
||||
assertEquals(TargetResult.REFUSED_USE_ANOTHER_RESULT, underTest.getTarget("BAR_EE"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getLocalTargetWithTransformer() throws Exception {
|
||||
Pool pool = null;
|
||||
Policy policy = null;
|
||||
KeyTransformer keyTransformer = new KeyTransformer() {
|
||||
Policy policy = new AbstractPolicy("TEST") {
|
||||
@Override
|
||||
public String transform(String key) {
|
||||
public String transformKey(String key) {
|
||||
return key.substring("TRANSFORM_TO".length() + 1);
|
||||
}
|
||||
};
|
||||
|
||||
underTest = new ConnectionRouter("test", KeyType.CLIENT_ID, "^.{3}",
|
||||
localTarget, "^FOO.*", null, pool, policy, keyTransformer);
|
||||
localTarget, "^FOO.*", null, null, policy);
|
||||
assertEquals( localTarget, underTest.getTarget("TRANSFORM_TO_FOO_EE").getTarget());
|
||||
}
|
||||
|
||||
|
|
|
@ -15,15 +15,13 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.core.server.routing.targets;
|
||||
package org.apache.activemq.artemis.core.server.routing;
|
||||
|
||||
import javax.security.auth.Subject;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.activemq.artemis.core.server.routing.KeyResolver;
|
||||
import org.apache.activemq.artemis.core.server.routing.KeyType;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||
import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal;
|
||||
|
@ -32,7 +30,8 @@ import org.junit.Assert;
|
|||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
public class KeyTypeResolverTest {
|
||||
public class KeyResolverTest {
|
||||
private static final String UNMATCHED_FILTER = "ARTEMIS";
|
||||
|
||||
@Test
|
||||
public void testClientIDKey() {
|
||||
|
@ -44,12 +43,17 @@ public class KeyTypeResolverTest {
|
|||
testClientIDKey("TEST", "TEST1234", "^.{4}");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClientIDKeyWithUnmatchedFilter() {
|
||||
testClientIDKey(KeyResolver.NULL_KEY_VALUE, "TEST1234", UNMATCHED_FILTER);
|
||||
}
|
||||
|
||||
private void testClientIDKey(String expected, String clientID, String filter) {
|
||||
KeyResolver keyResolver = new KeyResolver(KeyType.CLIENT_ID, filter);
|
||||
|
||||
Assert.assertEquals(expected, keyResolver.resolve(null, clientID, null));
|
||||
|
||||
Assert.assertEquals(KeyResolver.DEFAULT_KEY_VALUE, keyResolver.resolve(null, null, null));
|
||||
Assert.assertEquals(KeyResolver.NULL_KEY_VALUE, keyResolver.resolve(null, null, null));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -62,6 +66,11 @@ public class KeyTypeResolverTest {
|
|||
testSNIHostKey("TEST", "TEST1234", "^.{4}");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSNIHostKeyWithUnmatchedFilter() {
|
||||
testSNIHostKey(KeyResolver.NULL_KEY_VALUE, null, UNMATCHED_FILTER);
|
||||
}
|
||||
|
||||
private void testSNIHostKey(String expected, String sniHost, String filter) {
|
||||
Connection connection = Mockito.mock(Connection.class);
|
||||
|
||||
|
@ -70,10 +79,10 @@ public class KeyTypeResolverTest {
|
|||
Mockito.when(connection.getSNIHostName()).thenReturn(sniHost);
|
||||
Assert.assertEquals(expected, keyResolver.resolve(connection, null, null));
|
||||
|
||||
Assert.assertEquals(KeyResolver.DEFAULT_KEY_VALUE, keyResolver.resolve(null, null, null));
|
||||
Assert.assertEquals(KeyResolver.NULL_KEY_VALUE, keyResolver.resolve(null, null, null));
|
||||
|
||||
Mockito.when(connection.getSNIHostName()).thenReturn(null);
|
||||
Assert.assertEquals(KeyResolver.DEFAULT_KEY_VALUE, keyResolver.resolve(null, null, null));
|
||||
Assert.assertEquals(KeyResolver.NULL_KEY_VALUE, keyResolver.resolve(null, null, null));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -86,6 +95,11 @@ public class KeyTypeResolverTest {
|
|||
testSourceIPKey("10", "10.0.0.1:12345", "^[^.]+");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSourceIPKeyWithUnmatchedFilter() {
|
||||
testSourceIPKey(KeyResolver.NULL_KEY_VALUE, "10.0.0.1:12345", UNMATCHED_FILTER);
|
||||
}
|
||||
|
||||
private void testSourceIPKey(String expected, String remoteAddress, String filter) {
|
||||
Connection connection = Mockito.mock(Connection.class);
|
||||
|
||||
|
@ -94,10 +108,10 @@ public class KeyTypeResolverTest {
|
|||
Mockito.when(connection.getRemoteAddress()).thenReturn(remoteAddress);
|
||||
Assert.assertEquals(expected, keyResolver.resolve(connection, null, null));
|
||||
|
||||
Assert.assertEquals(KeyResolver.DEFAULT_KEY_VALUE, keyResolver.resolve(null, null, null));
|
||||
Assert.assertEquals(KeyResolver.NULL_KEY_VALUE, keyResolver.resolve(null, null, null));
|
||||
|
||||
Mockito.when(connection.getRemoteAddress()).thenReturn(null);
|
||||
Assert.assertEquals(KeyResolver.DEFAULT_KEY_VALUE, keyResolver.resolve(null, null, null));
|
||||
Assert.assertEquals(KeyResolver.NULL_KEY_VALUE, keyResolver.resolve(null, null, null));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -110,12 +124,17 @@ public class KeyTypeResolverTest {
|
|||
testUserNameKey("TEST", "TEST1234", "^.{4}");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUserNameKeyWithUnmatchedFilter() {
|
||||
testUserNameKey(KeyResolver.NULL_KEY_VALUE, "TEST1234", UNMATCHED_FILTER);
|
||||
}
|
||||
|
||||
private void testUserNameKey(String expected, String username, String filter) {
|
||||
KeyResolver keyResolver = new KeyResolver(KeyType.USER_NAME, filter);
|
||||
|
||||
Assert.assertEquals(expected, keyResolver.resolve(null, null, username));
|
||||
|
||||
Assert.assertEquals(KeyResolver.DEFAULT_KEY_VALUE, keyResolver.resolve(null, null, null));
|
||||
Assert.assertEquals(KeyResolver.NULL_KEY_VALUE, keyResolver.resolve(null, null, null));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -123,23 +142,23 @@ public class KeyTypeResolverTest {
|
|||
KeyResolver keyResolver = new KeyResolver(KeyType.ROLE_NAME, "B");
|
||||
|
||||
Connection connection = Mockito.mock(Connection.class);
|
||||
Assert.assertEquals(KeyResolver.DEFAULT_KEY_VALUE, keyResolver.resolve(connection, null, null));
|
||||
Assert.assertEquals(KeyResolver.NULL_KEY_VALUE, keyResolver.resolve(connection, null, null));
|
||||
|
||||
RemotingConnection protocolConnection = Mockito.mock(RemotingConnection.class);
|
||||
Mockito.when(connection.getProtocolConnection()).thenReturn(protocolConnection);
|
||||
Subject subject = Mockito.mock(Subject.class);
|
||||
Mockito.when(protocolConnection.getAuditSubject()).thenReturn(subject);
|
||||
|
||||
Assert.assertEquals(KeyResolver.DEFAULT_KEY_VALUE, keyResolver.resolve(connection, null, null));
|
||||
Assert.assertEquals(KeyResolver.NULL_KEY_VALUE, keyResolver.resolve(connection, null, null));
|
||||
|
||||
Set<RolePrincipal> rolePrincipals = new HashSet<>();
|
||||
Mockito.when(subject.getPrincipals(RolePrincipal.class)).thenReturn(rolePrincipals);
|
||||
|
||||
Assert.assertEquals(KeyResolver.DEFAULT_KEY_VALUE, keyResolver.resolve(connection, null, null));
|
||||
Assert.assertEquals(KeyResolver.NULL_KEY_VALUE, keyResolver.resolve(connection, null, null));
|
||||
|
||||
rolePrincipals.add(new RolePrincipal("A"));
|
||||
|
||||
Assert.assertEquals(KeyResolver.DEFAULT_KEY_VALUE, keyResolver.resolve(connection, null, null));
|
||||
Assert.assertEquals(KeyResolver.NULL_KEY_VALUE, keyResolver.resolve(connection, null, null));
|
||||
|
||||
rolePrincipals.add(new RolePrincipal("B"));
|
||||
|
||||
|
@ -151,19 +170,19 @@ public class KeyTypeResolverTest {
|
|||
KeyResolver keyResolver = new KeyResolver(KeyType.ROLE_NAME, null);
|
||||
|
||||
Connection connection = Mockito.mock(Connection.class);
|
||||
Assert.assertEquals(KeyResolver.DEFAULT_KEY_VALUE, keyResolver.resolve(connection, null, null));
|
||||
Assert.assertEquals(KeyResolver.NULL_KEY_VALUE, keyResolver.resolve(connection, null, null));
|
||||
|
||||
RemotingConnection protocolConnection = Mockito.mock(RemotingConnection.class);
|
||||
Mockito.when(connection.getProtocolConnection()).thenReturn(protocolConnection);
|
||||
Subject subject = Mockito.mock(Subject.class);
|
||||
Mockito.when(protocolConnection.getAuditSubject()).thenReturn(subject);
|
||||
|
||||
Assert.assertEquals(KeyResolver.DEFAULT_KEY_VALUE, keyResolver.resolve(connection, null, null));
|
||||
Assert.assertEquals(KeyResolver.NULL_KEY_VALUE, keyResolver.resolve(connection, null, null));
|
||||
|
||||
Set<RolePrincipal> rolePrincipals = new ListOrderedSet();
|
||||
Mockito.when(subject.getPrincipals(RolePrincipal.class)).thenReturn(rolePrincipals);
|
||||
|
||||
Assert.assertEquals(KeyResolver.DEFAULT_KEY_VALUE, keyResolver.resolve(connection, null, null));
|
||||
Assert.assertEquals(KeyResolver.NULL_KEY_VALUE, keyResolver.resolve(connection, null, null));
|
||||
|
||||
final RolePrincipal roleA = new RolePrincipal("A");
|
||||
rolePrincipals.add(roleA);
|
|
@ -15,37 +15,37 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.core.server.routing.transformer;
|
||||
package org.apache.activemq.artemis.core.server.routing.policies;
|
||||
|
||||
import java.util.HashMap;
|
||||
|
||||
import org.apache.activemq.artemis.core.server.routing.KeyResolver;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class ConsistentHashModuloTest {
|
||||
public class ConsistentHashModuloPolicyTest {
|
||||
|
||||
@Test
|
||||
public void transform() {
|
||||
ConsistentHashModulo underTest = new ConsistentHashModulo();
|
||||
public void transformKey() {
|
||||
ConsistentHashModuloPolicy underTest = new ConsistentHashModuloPolicy();
|
||||
|
||||
assertEquals(KeyResolver.DEFAULT_KEY_VALUE, underTest.transform(KeyResolver.DEFAULT_KEY_VALUE));
|
||||
Assert.assertEquals(KeyResolver.NULL_KEY_VALUE, underTest.transformKey(KeyResolver.NULL_KEY_VALUE));
|
||||
|
||||
assertEquals("AA", underTest.transform("AA")); // default modulo 0 does nothing
|
||||
Assert.assertEquals("AA", underTest.transformKey("AA")); // default modulo 0 does nothing
|
||||
|
||||
HashMap<String, String> properties = new HashMap<>();
|
||||
|
||||
final int modulo = 2;
|
||||
properties.put(ConsistentHashModulo.MODULO, String.valueOf(modulo));
|
||||
properties.put(ConsistentHashModuloPolicy.MODULO, String.valueOf(modulo));
|
||||
underTest.init(properties);
|
||||
|
||||
String hash1 = underTest.transform("AAA");
|
||||
String hash1 = underTest.transformKey("AAA");
|
||||
int v1 = Integer.parseInt(hash1);
|
||||
|
||||
String hash2 = underTest.transform("BBB");
|
||||
String hash2 = underTest.transformKey("BBB");
|
||||
int v2 = Integer.parseInt(hash2);
|
||||
|
||||
assertNotEquals(hash1, hash2);
|
|
@ -1,43 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.server.routing.transformer;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
||||
public class TransformerFactoryResolverTest {
|
||||
|
||||
@Test
|
||||
public void resolveOk() throws Exception {
|
||||
TransformerFactoryResolver instance = TransformerFactoryResolver.getInstance();
|
||||
assertNotNull(instance.resolve(ConsistentHashModulo.NAME));
|
||||
}
|
||||
|
||||
@Test(expected = ClassNotFoundException.class)
|
||||
public void resolveError() throws Exception {
|
||||
TransformerFactoryResolver instance = TransformerFactoryResolver.getInstance();
|
||||
assertNotNull(instance.resolve("NOT PRESENT"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void keyFromName() throws Exception {
|
||||
TransformerFactoryResolver instance = TransformerFactoryResolver.getInstance();
|
||||
assertEquals("New", instance.keyFromClassName("NewTransformerFactory"));
|
||||
}
|
||||
}
|
|
@ -164,9 +164,9 @@
|
|||
<key-type>CLIENT_ID</key-type>
|
||||
<key-filter>^[^.]+</key-filter>
|
||||
<local-target-filter>DEFAULT</local-target-filter>
|
||||
<local-target-key-transformer name="CONSISTENT_HASH_MODULO">
|
||||
<property key="modulo" value="2"></property>
|
||||
</local-target-key-transformer>
|
||||
<policy name="CONSISTENT_HASH_MODULO">
|
||||
<property key="MODULO" value="2"></property>
|
||||
</policy>
|
||||
</connection-router>
|
||||
<connection-router name="simple-router">
|
||||
<key-type>USER_NAME</key-type>
|
||||
|
|
|
@ -154,9 +154,9 @@
|
|||
<key-type>CLIENT_ID</key-type>
|
||||
<key-filter>^[^.]+</key-filter>
|
||||
<local-target-filter>DEFAULT</local-target-filter>
|
||||
<local-target-key-transformer name="CONSISTENT_HASH_MODULO">
|
||||
<property key="modulo" value="2"></property>
|
||||
</local-target-key-transformer>
|
||||
<policy name="CONSISTENT_HASH_MODULO">
|
||||
<property key="MODULO" value="2"></property>
|
||||
</policy>
|
||||
</connection-router>
|
||||
<connection-router name="simple-router">
|
||||
<key-type>USER_NAME</key-type>
|
||||
|
|
|
@ -84,13 +84,15 @@ Let's take a look at a pool example from broker.xml:
|
|||
```
|
||||
|
||||
## Policies
|
||||
The policy define how to select a broker from a pool. The included policies are:
|
||||
The policy defines how to select a broker from a pool and allows [key values](#key-values) transformation. The included policies are:
|
||||
* `FIRST_ELEMENT` to select the first target broker from the pool which is ready. It is useful to select the ready target brokers
|
||||
according to the priority defined with their sequence order, ie supposing there are 2 target brokers
|
||||
this policy selects the second target broker only when the first target broker isn't ready.
|
||||
* `ROUND_ROBIN` to select a target sequentially from a pool, this policy is useful to evenly distribute;
|
||||
* `CONSISTENT_HASH` to select a target by a key. This policy always selects the same target broker for the same key until it is removed from the pool.
|
||||
* `LEAST_CONNECTIONS` to select the targets with the fewest active connections. This policy helps you maintain an equal distribution of active connections with the target brokers.
|
||||
* `CONSISTENT_HASH_MODULO` to transform a key value to a number from 0 to N-1, it takes a single `modulo` property to configure the bound N. One use case is `CLIENT_ID`
|
||||
sharding across a cluster of N brokers. With a consistent hash % N transformation, each client id can map exclusively to just one of the brokers.
|
||||
|
||||
A policy is defined by the `policy` element. Let's take a look at a policy example from broker.xml:
|
||||
```xml
|
||||
|
@ -115,19 +117,14 @@ Let's take a look at a cache example from broker.xml:
|
|||
</cache>
|
||||
```
|
||||
|
||||
## Key transformers
|
||||
A `local-target-key-transformer` allows key value transformation before matching against any local-target-filter. One use case is
|
||||
CLIENT_ID sharding across a cluster of N brokers. With a consistent hash % N transformation, each client id
|
||||
can map exclusively to just one of the brokers. The included transformers are:
|
||||
* `CONSISTENT_HASH_MODULO` that takes a single `modulo` property to configure the bound.
|
||||
|
||||
## Defining connection routers
|
||||
A connection router is defined by the `connection-router` element, it includes the following items:
|
||||
* the `name` attribute defines the name of the connection router and is used to reference the router from an acceptor;
|
||||
* the `key-type` element defines what type of key to select a target broker, the supported values are: `CLIENT_ID`, `SNI_HOST`, `SOURCE_IP`, `USER_NAME`, `ROLE_NAME`, default is `SOURCE_IP`, see [Keys](#keys) for further details;
|
||||
* the `key-filter` element defines a regular expression to filter the resolved keys;
|
||||
* the `local-target-filter` element defines a regular expression to match the keys that have to return a local target;
|
||||
* the `local-target-key-transformer` element defines a key transformer, see [key transformers](#key-transformers);
|
||||
* the `key-filter` element defines a regular expression to filter the resolved [key values](#key-values);
|
||||
* the `local-target-filter` element defines a regular expression to match the [key values](#key-values)
|
||||
that have to return a local target, the [key value](#key-values) could be equal to the special string `NULL`
|
||||
if the value of the key is undefined or it doesn't match the `key-filter`;
|
||||
* the `pool` element defines the pool to group the target brokers, see [pools](#pools);
|
||||
* the `policy` element defines the policy used to select the target brokers from the pool, see [policies](#policies).
|
||||
|
||||
|
@ -172,15 +169,23 @@ Let's take a look at some connection router examples from broker.xml:
|
|||
</connection-routers>
|
||||
```
|
||||
|
||||
## Key values
|
||||
The key value is retrieved from the incoming client connection.
|
||||
If the incoming client connection has no value for the key type used, the key value is set to the special string `NULL`.
|
||||
If the incoming client connection has a value for the key type used, the key value retrieved can be sequentially manipulated using a `key-filter` and a `policy`.
|
||||
If a `key-filter` is defined and the filter fails to match, the value is set to the special string `NULL`.
|
||||
If a `policy` with a key transformation is defined, the key value is set to the transformed value.
|
||||
|
||||
|
||||
## Connection Router Workflow
|
||||
The connection router workflow include the following steps:
|
||||
* Retrieve the key value from the incoming connection;
|
||||
* Retrieve the [key value](#key-values) from the incoming connection;
|
||||
* Return the local target broker if the key value matches the local filter;
|
||||
* Delegate to the pool:
|
||||
* Delegate to the [pool](#pools):
|
||||
* Return the cached target broker if it is ready;
|
||||
* Get ready/active target brokers from the pool;
|
||||
* Select one target broker using the policy;
|
||||
* Add the selected broker in the cache;
|
||||
* Select one target broker using the [policy](#policies);
|
||||
* Add the selected broker in the [cache](#cache);
|
||||
* Return the selected broker.
|
||||
|
||||
Let's take a look at flowchart of the connection router workflow:
|
||||
|
|
|
@ -132,6 +132,7 @@ under the License.
|
|||
<spawn>true</spawn>
|
||||
<ignore>${noServer}</ignore>
|
||||
<location>${basedir}/target/server0</location>
|
||||
<testClientID>$.artemis.internal.router.client.test</testClientID>
|
||||
<testURI>tcp://localhost:61616</testURI>
|
||||
<args>
|
||||
<param>run</param>
|
||||
|
|
|
@ -58,7 +58,6 @@ under the License.
|
|||
<connection-router name="evenly-balance">
|
||||
<key-type>CLIENT_ID</key-type>
|
||||
<key-filter>^.{3}</key-filter>
|
||||
<local-target-filter>DEFAULT</local-target-filter>
|
||||
<cache/>
|
||||
<policy name="LEAST_CONNECTIONS"/>
|
||||
<pool>
|
||||
|
|
|
@ -86,6 +86,7 @@ under the License.
|
|||
<spawn>true</spawn>
|
||||
<ignore>${noServer}</ignore>
|
||||
<location>${basedir}/target/server0</location>
|
||||
<testClientID>$.artemis.internal.router.client.test</testClientID>
|
||||
<testURI>tcp://localhost:61616</testURI>
|
||||
<args>
|
||||
<param>run</param>
|
||||
|
@ -102,6 +103,7 @@ under the License.
|
|||
<ignore>${noServer}</ignore>
|
||||
<spawn>true</spawn>
|
||||
<location>${basedir}/target/server1</location>
|
||||
<testClientID>$.artemis.internal.router.client.test</testClientID>
|
||||
<testURI>tcp://localhost:61617</testURI>
|
||||
<args>
|
||||
<param>run</param>
|
||||
|
|
|
@ -74,7 +74,6 @@ under the License.
|
|||
<connection-router name="symmetric-router">
|
||||
<key-type>CLIENT_ID</key-type>
|
||||
<key-filter>^.{3}</key-filter>
|
||||
<local-target-filter>DEFAULT</local-target-filter>
|
||||
<policy name="CONSISTENT_HASH"/>
|
||||
<pool>
|
||||
<username>guest</username>
|
||||
|
|
|
@ -74,7 +74,6 @@ under the License.
|
|||
<connection-router name="symmetric-router">
|
||||
<key-type>CLIENT_ID</key-type>
|
||||
<key-filter>^.{3}</key-filter>
|
||||
<local-target-filter>DEFAULT</local-target-filter>
|
||||
<policy name="CONSISTENT_HASH"/>
|
||||
<pool>
|
||||
<username>guest</username>
|
||||
|
|
|
@ -41,8 +41,8 @@ import org.apache.activemq.artemis.core.config.routing.NamedPropertyConfiguratio
|
|||
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManagerFactory;
|
||||
import org.apache.activemq.artemis.core.server.routing.KeyType;
|
||||
import org.apache.activemq.artemis.core.server.routing.KeyResolver;
|
||||
import org.apache.activemq.artemis.core.server.routing.transformer.ConsistentHashModulo;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||
import org.apache.activemq.artemis.core.server.routing.policies.ConsistentHashModuloPolicy;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
|
||||
import org.apache.activemq.artemis.utils.Wait;
|
||||
|
@ -275,13 +275,13 @@ public class AutoClientIDShardClusterTest extends RoutingTestBase {
|
|||
for (int node = 0; node < numberOfNodes; node++) {
|
||||
Configuration configuration = servers[node].getConfiguration();
|
||||
ConnectionRouterConfiguration connectionRouterConfiguration = new ConnectionRouterConfiguration().setName(CONNECTION_ROUTER_NAME);
|
||||
connectionRouterConfiguration.setKeyType(KeyType.CLIENT_ID).setLocalTargetFilter(KeyResolver.DEFAULT_KEY_VALUE + "|" + node);
|
||||
NamedPropertyConfiguration transformerConfig = new NamedPropertyConfiguration();
|
||||
transformerConfig.setName(ConsistentHashModulo.NAME);
|
||||
connectionRouterConfiguration.setKeyType(KeyType.CLIENT_ID).setLocalTargetFilter(KeyResolver.NULL_KEY_VALUE + "|" + node);
|
||||
NamedPropertyConfiguration polocyConfig = new NamedPropertyConfiguration();
|
||||
polocyConfig.setName(ConsistentHashModuloPolicy.NAME);
|
||||
HashMap<String, String> properties = new HashMap<>();
|
||||
properties.put(ConsistentHashModulo.MODULO, String.valueOf(numberOfNodes));
|
||||
transformerConfig.setProperties(properties);
|
||||
connectionRouterConfiguration.setTransformerConfiguration(transformerConfig);
|
||||
properties.put(ConsistentHashModuloPolicy.MODULO, String.valueOf(numberOfNodes));
|
||||
polocyConfig.setProperties(properties);
|
||||
connectionRouterConfiguration.setPolicyConfiguration(polocyConfig);
|
||||
|
||||
configuration.setConnectionRouters(Collections.singletonList(connectionRouterConfiguration));
|
||||
|
||||
|
|
Loading…
Reference in New Issue