ARTEMIS-3365 - add simple local-target balancer example with amqp failover, remove manditory pool and policy config and update doc with data gravity concept

This commit is contained in:
gtully 2021-10-06 11:18:04 +01:00 committed by Gary Tully
parent d7f37ae313
commit 5508b8a87a
16 changed files with 766 additions and 19 deletions
artemis-server/src
main
java/org/apache/activemq/artemis/core
resources/schema
test
docs/user-manual/en
examples/features/broker-balancer
pom.xml
symmetric-simple
pom.xmlreadme.md
src/main
java/org/apache/activemq/artemis/jms/example
resources/activemq
server0
server1

View File

@ -18,7 +18,6 @@
package org.apache.activemq.artemis.core.config.balancing;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
public class PoolConfiguration implements Serializable {
@ -30,7 +29,7 @@ public class PoolConfiguration implements Serializable {
private String clusterConnection = null;
private List<String> staticConnectors = Collections.emptyList();
private List<String> staticConnectors = null;
private String discoveryGroupName = null;

View File

@ -118,7 +118,9 @@ public class BrokerBalancer implements ActiveMQComponent {
@Override
public void start() throws Exception {
pool.start();
if (pool != null) {
pool.start();
}
started = true;
}
@ -127,7 +129,9 @@ public class BrokerBalancer implements ActiveMQComponent {
public void stop() throws Exception {
started = false;
pool.stop();
if (pool != null) {
pool.stop();
}
}
public Target getTarget(Connection connection, String clientID, String username) {
@ -152,6 +156,10 @@ public class BrokerBalancer implements ActiveMQComponent {
return localTarget;
}
if (pool == null) {
return null;
}
Target target = null;
if (cache != null) {

View File

@ -88,9 +88,17 @@ public final class BrokerBalancerManager implements ActiveMQComponent {
Target localTarget = new LocalTarget(null, server);
Pool pool = deployPool(config.getPoolConfiguration(), localTarget);
Pool pool = null;
final PoolConfiguration poolConfiguration = config.getPoolConfiguration();
if (poolConfiguration != null) {
pool = deployPool(config.getPoolConfiguration(), localTarget);
}
Policy policy = deployPolicy(config.getPolicyConfiguration(), pool);
Policy policy = null;
PolicyConfiguration policyConfiguration = config.getPolicyConfiguration();
if (policyConfiguration != null) {
policy = deployPolicy(policyConfiguration, pool);
}
BrokerBalancer balancer = new BrokerBalancer(config.getName(), config.getTargetKey(), config.getTargetKeyFilter(),
localTarget, config.getLocalTargetFilter(), pool, policy, config.getCacheTimeout());

View File

@ -2122,14 +2122,14 @@
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="policy" type="brokerBalancerPolicyType" maxOccurs="1" minOccurs="1">
<xsd:element name="policy" type="brokerBalancerPolicyType" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
the policy configuration
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="pool" type="brokerBalancerPoolType" maxOccurs="1" minOccurs="1">
<xsd:element name="pool" type="brokerBalancerPoolType" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
the pool configuration

View File

@ -265,9 +265,14 @@ public class FileConfigurationTest extends ConfigurationImplTest {
}
}
Assert.assertEquals(3, conf.getBalancerConfigurations().size());
Assert.assertEquals(4, conf.getBalancerConfigurations().size());
for (BrokerBalancerConfiguration bc : conf.getBalancerConfigurations()) {
if (bc.getName().equals("simple-balancer")) {
if (bc.getName().equals("simple-local")) {
Assert.assertEquals(bc.getTargetKey(), TargetKey.CLIENT_ID);
Assert.assertNotNull(bc.getLocalTargetFilter());
Assert.assertNotNull(bc.getTargetKeyFilter());
Assert.assertNull(bc.getPolicyConfiguration());
} else if (bc.getName().equals("simple-balancer")) {
Assert.assertEquals(bc.getTargetKey(), TargetKey.USER_NAME);
Assert.assertNull(bc.getLocalTargetFilter());
Assert.assertEquals(bc.getPolicyConfiguration().getName(), FirstElementPolicy.NAME);
@ -281,7 +286,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
Assert.assertEquals(bc.getPolicyConfiguration().getName(), ConsistentHashPolicy.NAME);
Assert.assertEquals(1000, bc.getPoolConfiguration().getCheckPeriod());
Assert.assertEquals(true, bc.getPoolConfiguration().isLocalTargetEnabled());
Assert.assertEquals(Collections.emptyList(), bc.getPoolConfiguration().getStaticConnectors());
Assert.assertEquals(null, bc.getPoolConfiguration().getStaticConnectors());
Assert.assertEquals("dg1", bc.getPoolConfiguration().getDiscoveryGroupName());
} else {
Assert.assertEquals(bc.getTargetKey(), TargetKey.SOURCE_IP);
@ -292,7 +297,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
Assert.assertEquals(2, bc.getPoolConfiguration().getQuorumSize());
Assert.assertEquals(1000, bc.getPoolConfiguration().getQuorumTimeout());
Assert.assertEquals(false, bc.getPoolConfiguration().isLocalTargetEnabled());
Assert.assertEquals(Collections.emptyList(), bc.getPoolConfiguration().getStaticConnectors());
Assert.assertEquals(null, bc.getPoolConfiguration().getStaticConnectors());
Assert.assertEquals("dg2", bc.getPoolConfiguration().getDiscoveryGroupName());
}
}

View File

@ -0,0 +1,87 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
import org.apache.activemq.artemis.core.config.balancing.PolicyConfiguration;
import org.apache.activemq.artemis.core.config.balancing.PoolConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.balancing.policies.ConsistentHashPolicy;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.mockito.Mockito.mock;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class BrokerBalancerManagerTest {
ActiveMQServer mockServer;
BrokerBalancerManager underTest;
@Before
public void setUp() throws Exception {
mockServer = mock(ActiveMQServer.class);
Mockito.when(mockServer.getNodeID()).thenReturn(SimpleString.toSimpleString("UUID"));
underTest = new BrokerBalancerManager(null, mockServer, null);
underTest.start();
}
@After
public void tearDown() throws Exception {
if (underTest != null) {
underTest.stop();
}
}
@Test(expected = IllegalStateException.class)
public void deployLocalOnlyPoolInvalid() throws Exception {
BrokerBalancerConfiguration brokerBalancerConfiguration = new BrokerBalancerConfiguration();
brokerBalancerConfiguration.setName("partition-local-pool");
PolicyConfiguration policyConfig = new PolicyConfiguration();
policyConfig.setName(ConsistentHashPolicy.NAME);
brokerBalancerConfiguration.setPolicyConfiguration(policyConfig);
PoolConfiguration poolConfiguration = new PoolConfiguration();
poolConfiguration.setLocalTargetEnabled(true);
brokerBalancerConfiguration.setPoolConfiguration(poolConfiguration);
underTest.deployBrokerBalancer(brokerBalancerConfiguration);
}
@Test
public void deployLocalOnly() throws Exception {
ManagementService mockManagementService = Mockito.mock(ManagementService.class);
Mockito.when(mockServer.getManagementService()).thenReturn(mockManagementService);
BrokerBalancerConfiguration brokerBalancerConfiguration = new BrokerBalancerConfiguration();
brokerBalancerConfiguration.setName("partition-local-pool");
underTest.deployBrokerBalancer(brokerBalancerConfiguration);
}
}

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.balancing.policies.Policy;
import org.apache.activemq.artemis.core.server.balancing.pools.Pool;
import org.apache.activemq.artemis.core.server.balancing.targets.LocalTarget;
import org.apache.activemq.artemis.core.server.balancing.targets.Target;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class BrokerBalancerTest {
Target localTarget;
BrokerBalancer underTest;
@Before
public void setUp() {
ActiveMQServer mockServer = mock(ActiveMQServer.class);
Mockito.when(mockServer.getNodeID()).thenReturn(SimpleString.toSimpleString("UUID"));
localTarget = new LocalTarget(null, mockServer);
Pool pool = null;
Policy policy = null;
underTest = new BrokerBalancer("test", TargetKey.CLIENT_ID, "^.{3}",
localTarget, "^FOO.*", pool, policy, 0);
try {
underTest.start();
} catch (Exception e) {
fail(e.getMessage());
}
}
@After
public void after() {
if (underTest != null) {
try {
underTest.stop();
} catch (Exception e) {
fail(e.getMessage());
}
}
}
@Test
public void getTarget() {
assertEquals( localTarget, underTest.getTarget("FOO_EE"));
assertNotEquals( localTarget, underTest.getTarget("BAR_EE"));
}
}

View File

@ -153,6 +153,11 @@
</divert>
</diverts>
<broker-balancers>
<broker-balancer name="simple-local">
<target-key>CLIENT_ID</target-key>
<target-key-filter>^[^.]+</target-key-filter>
<local-target-filter>DEFAULT</local-target-filter>
</broker-balancer>
<broker-balancer name="simple-balancer">
<target-key>USER_NAME</target-key>
<policy name="FIRST_ELEMENT"/>

View File

@ -144,6 +144,11 @@
</divert>
</diverts>
<broker-balancers>
<broker-balancer name="simple-local">
<target-key>CLIENT_ID</target-key>
<target-key-filter>^[^.]+</target-key-filter>
<local-target-filter>DEFAULT</local-target-filter>
</broker-balancer>
<broker-balancer name="simple-balancer">
<target-key>USER_NAME</target-key>
<policy name="FIRST_ELEMENT"/>

View File

@ -19,9 +19,9 @@ It is a string retrieved from an incoming client connection, the supported value
* `USER_NAME` is the username indicated by the client.
## Pools
The pool is a group of target brokers and checks periodically their state.
The pool is a group of target brokers with periodic checks on their state.
It provides a list of ready target brokers to distribute incoming client connections only when it is active.
A pool becomes active when the minimum number of ready target brokers defined by the `quorum-size` parameter is reached.
A pool becomes active when the minimum number of target brokers, as defined by the `quorum-size` parameter, become ready.
When it is not active, it doesn't provide any target avoiding weird distribution at startup or after a restart.
Including the local broker in the target pool allows broker hosting the balancer to accept incoming client connections as well.
By default, a pool doesn't include the local broker, to include it as a target the `local-target-enabled` parameter must be `true`.
@ -102,22 +102,27 @@ A policy is defined by the `policy` element. Let's take a look at a policy examp
The broker balancer provides a cache with a timeout to improve the stickiness of the target broker selected,
returning the same target broker for a target key as long as it is present in the cache and is ready.
So a broker balancer with the cache enabled doesn't strictly follow the configured policy.
By default, the cache is enabled, and will never timeout. See below
By default, the cache is enabled and will never timeout. See below
for more details about setting the `cache-timeout` parameter.
## Defining broker balancers
A broker balancer is defined by `broker-balancer` element, it includes the following items:
* the `name` attribute defines the name of the broker balancer;
A broker balancer is defined by the `broker-balancer` element, it includes the following items:
* the `name` attribute defines the name of the broker balancer and is used to reference the balancer from an acceptor;
* the `target-key` element defines what key to select a target broker, the supported values are: `CLIENT_ID`, `SNI_HOST`, `SOURCE_IP`, `USER_NAME`, default is `SOURCE_IP`, see [target key](#target-key) for further details;
* the `target-key-filter` element defines a regular expression to filter the resolved keys;
* the `local-target-filter` element defines a regular expression to match the keys that have to return a local target;
* the `cache-timeout` element is the time period for a target broker to remain in the cache, measured in milliseconds, setting `0` will disable the cache, default is `-1`, meaning no expiration;
* the `pool` element defines the pool to group the target brokers, see [pools](#pools).
* the `policy` element defines the policy used to select the target brokers, see [policies](#policies);
* the `policy` element defines the policy used to select the target brokers from the pool, see [policies](#policies);
Let's take a look at some broker balancer examples from broker.xml:
```xml
<broker-balancers>
<broker-balancer name="local-partition">
<target-key>CLIENT_ID</target-key>
<target-key-filter>^.{3}</target-key-filter>
<local-target-filter>^FOO.*</local-target-filter>
</broker-balancer>
<broker-balancer name="simple-balancer">
<policy name="FIRST_ELEMENT"/>
<pool>
@ -155,8 +160,9 @@ Let's take a look at some broker balancer examples from broker.xml:
The broker balancer workflow include the following steps:
* Retrieve the target key from the incoming connection;
* Return the local target broker if the target key matches the local filter;
* Delegate to the pool:
* Return the cached target broker if it is ready;
* Get ready target brokers from the pool;
* Get ready/active target brokers from the pool;
* Select one target broker using the policy;
* Add the selected broker in the cache;
* Return the selected broker.
@ -164,6 +170,23 @@ The broker balancer workflow include the following steps:
Let's take a look at flowchart of the broker balancer workflow:
![Broker Balancer Workflow](images/broker_balancer_workflow.png)
## Data gravity
The first balancer configuration: `local-partition`, demonstrates the simplest use case,
that of preserving `data gravity` by confining a subset of application data to a given broker.
Each broker is given a subset of keys that it will exclusively service or reject.
If brokers are behind a round-robin load-balancer or have full knowledge of the broker
urls, `their` broker will eventually respond. The `local-target-filter` regular expression
determines the granularity of partition that is best for preserving `data gravity` for your applications.
The challenge is in providing a consistent [key](#Target_Key) in all related application connections.
Note: the concept of `data gravity` tries to capture the reality that while addresses are shared by multiple
applications, it is best to keep related addresses and their data co-located on a single broker. Typically,
applications should `connect` to the data rather than the data moving to whatever broker the application connects too.
This is particularly true when the amount of data (backlog) is large, the cost of movement to follow consumers outweighs
the cost of delivery to the application.
With the 'data gravity' mindset, operators are less concerned with numbers of connections and more concerned with
applications and the addresses they need to interact with.
## Redirection
Apache ActiveMQ Artemis provides a native redirection for supported clients and a new management API for other clients.

View File

@ -43,6 +43,7 @@ under the License.
<modules>
<module>evenly-redirect</module>
<module>symmetric-redirect</module>
<module>symmetric-simple</module>
</modules>
</profile>
<profile>
@ -50,6 +51,7 @@ under the License.
<modules>
<module>evenly-redirect</module>
<module>symmetric-redirect</module>
<module>symmetric-simple</module>
</modules>
</profile>
</profiles>

View File

@ -0,0 +1,161 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.activemq.examples</groupId>
<artifactId>broker-balancer</artifactId>
<version>2.19.0-SNAPSHOT</version>
</parent>
<artifactId>symmetric-simple</artifactId>
<packaging>jar</packaging>
<name>symmetric-simple</name>
<properties>
<activemq.basedir>${project.basedir}/../../../..</activemq.basedir>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-jms-client</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-maven-plugin</artifactId>
<executions>
<execution>
<id>create0</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<instance>${basedir}/target/server0</instance>
<allowAnonymous>true</allowAnonymous>
<configuration>${basedir}/target/classes/activemq/server0</configuration>
<!-- this makes it easier in certain envs -->
<javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions>
</configuration>
</execution>
<execution>
<id>create1</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<instance>${basedir}/target/server1</instance>
<allowAnonymous>true</allowAnonymous>
<configuration>${basedir}/target/classes/activemq/server1</configuration>
<!-- this makes it easier in certain envs -->
<javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions>
</configuration>
</execution>
<execution>
<id>start0</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<spawn>true</spawn>
<ignore>${noServer}</ignore>
<location>${basedir}/target/server0</location>
<args>
<param>run</param>
</args>
<name>server0</name>
</configuration>
</execution>
<execution>
<id>start1</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<spawn>true</spawn>
<location>${basedir}/target/server1</location>
<args>
<param>run</param>
</args>
<name>server1</name>
</configuration>
</execution>
<execution>
<id>runClient</id>
<goals>
<goal>runClient</goal>
</goals>
<configuration>
<!-- you may have to set export MAVEN_OPTS="-Djava.net.preferIPv4Stack=true"
if you are on MacOS for instance -->
<clientClass>org.apache.activemq.artemis.jms.example.SymmetricSimpleExample</clientClass>
</configuration>
</execution>
<execution>
<id>stop0</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<location>${basedir}/target/server0</location>
<args>
<param>stop</param>
</args>
</configuration>
</execution>
<execution>
<id>stop1</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<location>${basedir}/target/server1</location>
<args>
<param>stop</param>
</args>
</configuration>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>org.apache.activemq.examples</groupId>
<artifactId>symmetric-simple</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-clean-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,31 @@
# Symmetric Simple Example
To run the example, simply type **mvn verify** from this directory, or **mvn -PnoServer verify** if you want to create and start the broker manually.
This example demonstrates how data is partitioned across two brokers. The idea is to focus on
'data gravity' and partition connections to brokers based on the data they will access,
rather than focus on balancing connection numbers overall.
For example, ensuring that all users of some particular set of data map to `Broker0`
and all users of some other set, map to `Broker1`. In this way, there is no extra movement of
data to service requests.
This architecture is intentionally quite static; based on the configured regular expressions,
it is always possible to infer where a given application (key) should be routed to.
In this example both brokers have the role of target, but for a subset of keys. If the key is a match
it is accepted, if not it is rejected.
Note: redirection in this scenario is optional, with a round-robin distribution of client urls, a connection will
eventually find a local target match.
In this example, the qpid jms amqp client failover feature does the required round-robin distribution.
The job of application developers in this scenario is to provide a key that can easily be mapped to a regular
expression that can capture an appropriate 'center of data gravity' for a broker.
In configuration, the `local-target-filter` provides the regular expression that controls what keys are mapped to a broker.
`Broker0` takes clientIDs with prefix FOO and `Broker1` takes prefix BAR. The `target-key-filter` specifies how the key is extracted,
we care about the first 3 characters, from the user supplied clientId.
<broker-balancer name="symmetric-simple">
<target-key>CLIENT_ID</target-key>
<target-key-filter>^.{3}</target-key-filter>
<local-target-filter>^FOO.*</local-target-filter>
</broker-balancer>

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jms.example;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.qpid.jms.JmsConnectionFactory;
/**
* This example demonstrates how incoming client connections are partitioned across two brokers
* it uses AMQP to take advantage of the simple round-robin retry logic of the failover url scheme
*/
public class SymmetricSimpleExample {
public static void main(final String[] args) throws Exception {
/**
* Step 1. Create a connection for producer0 and producer1, and send a few messages with different key values
*/
ConnectionFactory connectionFactory = new JmsConnectionFactory("failover:(amqp://localhost:61616,amqp://localhost:61617)");
try (Connection connectionProducer0 = connectionFactory.createConnection();
Connection connectionProducer1 = connectionFactory.createConnection()) {
// using first 3 characters of clientID as key for data gravity
connectionProducer0.setClientID("BAR_PRODUCER");
connectionProducer1.setClientID("FOO_PRODUCER");
for (Connection connectionProducer : new Connection[] {connectionProducer0, connectionProducer1}) {
Session session = connectionProducer.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("exampleQueue" + connectionProducer.getClientID().substring(0, 3));
MessageProducer sender = session.createProducer(queue);
for (int i = 0; i < 10; i++) {
TextMessage message = session.createTextMessage("Hello world n" + i + " - " + connectionProducer.getClientID().substring(0, 3));
System.out.println("Sending message " + message.getText() + "/" + connectionProducer.getClientID());
sender.send(message);
}
}
}
/**
* Step 2. create a connection for consumer0 and consumer1, and receive a few messages.
* the consumers will find data that matches their key
*/
try (Connection connectionConsumer0 = connectionFactory.createConnection();
Connection connectionConsumer1 = connectionFactory.createConnection()) {
// using first 3 characters of clientID as key for data gravity
connectionConsumer0.setClientID("FOO_CONSUMER");
connectionConsumer1.setClientID("BAR_CONSUMER");
for (Connection connectionConsumer : new Connection[]{connectionConsumer0, connectionConsumer1}) {
connectionConsumer.start();
Session session = connectionConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("exampleQueue" + connectionConsumer.getClientID().substring(0, 3));
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < 10; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
System.out.println("Received message " + message.getText() + "/" + connectionConsumer.getClientID());
}
}
}
}
}

View File

@ -0,0 +1,121 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xi="http://www.w3.org/2001/XInclude"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq:core ">
<name>0.0.0.0</name>
<persistence-enabled>false</persistence-enabled>
<journal-type>NIO</journal-type>
<!-- should the broker detect dead locks and other issues -->
<critical-analyzer>true</critical-analyzer>
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
<critical-analyzer-policy>HALT</critical-analyzer-policy>
<page-sync-timeout>60000</page-sync-timeout>
<acceptors>
<!-- Acceptor for every supported protocol -->
<acceptor name="artemis">tcp://0.0.0.0:61616?redirect-to=symmetric-simple;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
</acceptors>
<broker-balancers>
<broker-balancer name="symmetric-simple">
<target-key>CLIENT_ID</target-key>
<target-key-filter>^.{3}</target-key-filter>
<local-target-filter>^FOO.*</local-target-filter>
</broker-balancer>
</broker-balancers>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="guest"/>
<permission type="deleteNonDurableQueue" roles="guest"/>
<permission type="createDurableQueue" roles="guest"/>
<permission type="deleteDurableQueue" roles="guest"/>
<permission type="createAddress" roles="guest"/>
<permission type="deleteAddress" roles="guest"/>
<permission type="consume" roles="guest"/>
<permission type="browse" roles="guest"/>
<permission type="send" roles="guest"/>
<permission type="manage" roles="guest"/>
</security-setting>
</security-settings>
<address-settings>
<!-- if you define auto-create on certain queues, management has to be auto-create -->
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
</address-settings>
<addresses>
<address name="DLQ">
<anycast>
<queue name="DLQ" />
</anycast>
</address>
<address name="ExpiryQueue">
<anycast>
<queue name="ExpiryQueue" />
</anycast>
</address>
</addresses>
</core>
</configuration>

View File

@ -0,0 +1,122 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xi="http://www.w3.org/2001/XInclude"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq:core ">
<name>0.0.0.0</name>
<persistence-enabled>false</persistence-enabled>
<journal-type>NIO</journal-type>
<!-- should the broker detect dead locks and other issues -->
<critical-analyzer>true</critical-analyzer>
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
<critical-analyzer-policy>HALT</critical-analyzer-policy>
<page-sync-timeout>60000</page-sync-timeout>
<acceptors>
<!-- Acceptor for every supported protocol -->
<acceptor name="artemis">tcp://0.0.0.0:61617?redirect-to=symmetric-simple;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
</acceptors>
<broker-balancers>
<broker-balancer name="symmetric-simple">
<target-key>CLIENT_ID</target-key>
<target-key-filter>^.{3}</target-key-filter>
<local-target-filter>^BAR.*</local-target-filter>
</broker-balancer>
</broker-balancers>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="guest"/>
<permission type="deleteNonDurableQueue" roles="guest"/>
<permission type="createDurableQueue" roles="guest"/>
<permission type="deleteDurableQueue" roles="guest"/>
<permission type="createAddress" roles="guest"/>
<permission type="deleteAddress" roles="guest"/>
<permission type="consume" roles="guest"/>
<permission type="browse" roles="guest"/>
<permission type="send" roles="guest"/>
<permission type="manage" roles="guest"/>
</security-setting>
</security-settings>
<address-settings>
<!-- if you define auto-create on certain queues, management has to be auto-create -->
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
</address-settings>
<addresses>
<address name="DLQ">
<anycast>
<queue name="DLQ" />
</anycast>
</address>
<address name="ExpiryQueue">
<anycast>
<queue name="ExpiryQueue" />
</anycast>
</address>
</addresses>
</core>
</configuration>