From 5508b8a87a0c3e4b2fc51d23db7c00665b4ce778 Mon Sep 17 00:00:00 2001 From: gtully Date: Wed, 6 Oct 2021 11:18:04 +0100 Subject: [PATCH] ARTEMIS-3365 - add simple local-target balancer example with amqp failover, remove manditory pool and policy config and update doc with data gravity concept --- .../config/balancing/PoolConfiguration.java | 3 +- .../core/server/balancing/BrokerBalancer.java | 12 +- .../balancing/BrokerBalancerManager.java | 12 +- .../schema/artemis-configuration.xsd | 4 +- .../config/impl/FileConfigurationTest.java | 13 +- .../balancing/BrokerBalancerManagerTest.java | 87 ++++++++++ .../server/balancing/BrokerBalancerTest.java | 81 +++++++++ .../ConfigurationTest-full-config.xml | 5 + .../ConfigurationTest-xinclude-config.xml | 5 + docs/user-manual/en/broker-balancers.md | 37 +++- examples/features/broker-balancer/pom.xml | 2 + .../broker-balancer/symmetric-simple/pom.xml | 161 ++++++++++++++++++ .../symmetric-simple/readme.md | 31 ++++ .../jms/example/SymmetricSimpleExample.java | 89 ++++++++++ .../resources/activemq/server0/broker.xml | 121 +++++++++++++ .../resources/activemq/server1/broker.xml | 122 +++++++++++++ 16 files changed, 766 insertions(+), 19 deletions(-) create mode 100644 artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManagerTest.java create mode 100644 artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java create mode 100644 examples/features/broker-balancer/symmetric-simple/pom.xml create mode 100644 examples/features/broker-balancer/symmetric-simple/readme.md create mode 100644 examples/features/broker-balancer/symmetric-simple/src/main/java/org/apache/activemq/artemis/jms/example/SymmetricSimpleExample.java create mode 100644 examples/features/broker-balancer/symmetric-simple/src/main/resources/activemq/server0/broker.xml create mode 100644 examples/features/broker-balancer/symmetric-simple/src/main/resources/activemq/server1/broker.xml diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/PoolConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/PoolConfiguration.java index 699184a22f..1c7abc7729 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/PoolConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/balancing/PoolConfiguration.java @@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.config.balancing; import java.io.Serializable; -import java.util.Collections; import java.util.List; public class PoolConfiguration implements Serializable { @@ -30,7 +29,7 @@ public class PoolConfiguration implements Serializable { private String clusterConnection = null; - private List staticConnectors = Collections.emptyList(); + private List staticConnectors = null; private String discoveryGroupName = null; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java index 93e38b1613..512056f36a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java @@ -118,7 +118,9 @@ public class BrokerBalancer implements ActiveMQComponent { @Override public void start() throws Exception { - pool.start(); + if (pool != null) { + pool.start(); + } started = true; } @@ -127,7 +129,9 @@ public class BrokerBalancer implements ActiveMQComponent { public void stop() throws Exception { started = false; - pool.stop(); + if (pool != null) { + pool.stop(); + } } public Target getTarget(Connection connection, String clientID, String username) { @@ -152,6 +156,10 @@ public class BrokerBalancer implements ActiveMQComponent { return localTarget; } + if (pool == null) { + return null; + } + Target target = null; if (cache != null) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManager.java index fc5fba65b9..10afe15efe 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManager.java @@ -88,9 +88,17 @@ public final class BrokerBalancerManager implements ActiveMQComponent { Target localTarget = new LocalTarget(null, server); - Pool pool = deployPool(config.getPoolConfiguration(), localTarget); + Pool pool = null; + final PoolConfiguration poolConfiguration = config.getPoolConfiguration(); + if (poolConfiguration != null) { + pool = deployPool(config.getPoolConfiguration(), localTarget); + } - Policy policy = deployPolicy(config.getPolicyConfiguration(), pool); + Policy policy = null; + PolicyConfiguration policyConfiguration = config.getPolicyConfiguration(); + if (policyConfiguration != null) { + policy = deployPolicy(policyConfiguration, pool); + } BrokerBalancer balancer = new BrokerBalancer(config.getName(), config.getTargetKey(), config.getTargetKeyFilter(), localTarget, config.getLocalTargetFilter(), pool, policy, config.getCacheTimeout()); diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index a9fbbd915c..27d31344c4 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -2122,14 +2122,14 @@ - + the policy configuration - + the pool configuration diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java index f578601807..d33b3eeaf5 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java @@ -265,9 +265,14 @@ public class FileConfigurationTest extends ConfigurationImplTest { } } - Assert.assertEquals(3, conf.getBalancerConfigurations().size()); + Assert.assertEquals(4, conf.getBalancerConfigurations().size()); for (BrokerBalancerConfiguration bc : conf.getBalancerConfigurations()) { - if (bc.getName().equals("simple-balancer")) { + if (bc.getName().equals("simple-local")) { + Assert.assertEquals(bc.getTargetKey(), TargetKey.CLIENT_ID); + Assert.assertNotNull(bc.getLocalTargetFilter()); + Assert.assertNotNull(bc.getTargetKeyFilter()); + Assert.assertNull(bc.getPolicyConfiguration()); + } else if (bc.getName().equals("simple-balancer")) { Assert.assertEquals(bc.getTargetKey(), TargetKey.USER_NAME); Assert.assertNull(bc.getLocalTargetFilter()); Assert.assertEquals(bc.getPolicyConfiguration().getName(), FirstElementPolicy.NAME); @@ -281,7 +286,7 @@ public class FileConfigurationTest extends ConfigurationImplTest { Assert.assertEquals(bc.getPolicyConfiguration().getName(), ConsistentHashPolicy.NAME); Assert.assertEquals(1000, bc.getPoolConfiguration().getCheckPeriod()); Assert.assertEquals(true, bc.getPoolConfiguration().isLocalTargetEnabled()); - Assert.assertEquals(Collections.emptyList(), bc.getPoolConfiguration().getStaticConnectors()); + Assert.assertEquals(null, bc.getPoolConfiguration().getStaticConnectors()); Assert.assertEquals("dg1", bc.getPoolConfiguration().getDiscoveryGroupName()); } else { Assert.assertEquals(bc.getTargetKey(), TargetKey.SOURCE_IP); @@ -292,7 +297,7 @@ public class FileConfigurationTest extends ConfigurationImplTest { Assert.assertEquals(2, bc.getPoolConfiguration().getQuorumSize()); Assert.assertEquals(1000, bc.getPoolConfiguration().getQuorumTimeout()); Assert.assertEquals(false, bc.getPoolConfiguration().isLocalTargetEnabled()); - Assert.assertEquals(Collections.emptyList(), bc.getPoolConfiguration().getStaticConnectors()); + Assert.assertEquals(null, bc.getPoolConfiguration().getStaticConnectors()); Assert.assertEquals("dg2", bc.getPoolConfiguration().getDiscoveryGroupName()); } } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManagerTest.java new file mode 100644 index 0000000000..b98c7ecbc5 --- /dev/null +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerManagerTest.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.balancing; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration; +import org.apache.activemq.artemis.core.config.balancing.PolicyConfiguration; +import org.apache.activemq.artemis.core.config.balancing.PoolConfiguration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.balancing.policies.ConsistentHashPolicy; +import org.apache.activemq.artemis.core.server.management.ManagementService; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.mockito.Mockito.mock; + +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class BrokerBalancerManagerTest { + + ActiveMQServer mockServer; + BrokerBalancerManager underTest; + + @Before + public void setUp() throws Exception { + + mockServer = mock(ActiveMQServer.class); + Mockito.when(mockServer.getNodeID()).thenReturn(SimpleString.toSimpleString("UUID")); + + underTest = new BrokerBalancerManager(null, mockServer, null); + underTest.start(); + } + + @After + public void tearDown() throws Exception { + if (underTest != null) { + underTest.stop(); + } + } + + @Test(expected = IllegalStateException.class) + public void deployLocalOnlyPoolInvalid() throws Exception { + + BrokerBalancerConfiguration brokerBalancerConfiguration = new BrokerBalancerConfiguration(); + brokerBalancerConfiguration.setName("partition-local-pool"); + PolicyConfiguration policyConfig = new PolicyConfiguration(); + policyConfig.setName(ConsistentHashPolicy.NAME); + brokerBalancerConfiguration.setPolicyConfiguration(policyConfig); + + PoolConfiguration poolConfiguration = new PoolConfiguration(); + poolConfiguration.setLocalTargetEnabled(true); + brokerBalancerConfiguration.setPoolConfiguration(poolConfiguration); + + underTest.deployBrokerBalancer(brokerBalancerConfiguration); + } + + @Test + public void deployLocalOnly() throws Exception { + + ManagementService mockManagementService = Mockito.mock(ManagementService.class); + Mockito.when(mockServer.getManagementService()).thenReturn(mockManagementService); + + BrokerBalancerConfiguration brokerBalancerConfiguration = new BrokerBalancerConfiguration(); + brokerBalancerConfiguration.setName("partition-local-pool"); + + underTest.deployBrokerBalancer(brokerBalancerConfiguration); + } +} \ No newline at end of file diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java new file mode 100644 index 0000000000..a59307d7f5 --- /dev/null +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.balancing; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.balancing.policies.Policy; +import org.apache.activemq.artemis.core.server.balancing.pools.Pool; +import org.apache.activemq.artemis.core.server.balancing.targets.LocalTarget; +import org.apache.activemq.artemis.core.server.balancing.targets.Target; +import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class BrokerBalancerTest { + + Target localTarget; + BrokerBalancer underTest; + + @Before + public void setUp() { + + ActiveMQServer mockServer = mock(ActiveMQServer.class); + Mockito.when(mockServer.getNodeID()).thenReturn(SimpleString.toSimpleString("UUID")); + + localTarget = new LocalTarget(null, mockServer); + + Pool pool = null; + Policy policy = null; + underTest = new BrokerBalancer("test", TargetKey.CLIENT_ID, "^.{3}", + localTarget, "^FOO.*", pool, policy, 0); + try { + underTest.start(); + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @After + public void after() { + if (underTest != null) { + try { + underTest.stop(); + } catch (Exception e) { + fail(e.getMessage()); + } + } + } + + @Test + public void getTarget() { + assertEquals( localTarget, underTest.getTarget("FOO_EE")); + assertNotEquals( localTarget, underTest.getTarget("BAR_EE")); + } + +} \ No newline at end of file diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml index 7987092a03..0a772b194d 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml @@ -153,6 +153,11 @@ + + CLIENT_ID + ^[^.]+ + DEFAULT + USER_NAME diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml index dde21431cc..faaef30f56 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml @@ -144,6 +144,11 @@ + + CLIENT_ID + ^[^.]+ + DEFAULT + USER_NAME diff --git a/docs/user-manual/en/broker-balancers.md b/docs/user-manual/en/broker-balancers.md index e588a9e980..4605d85ca8 100644 --- a/docs/user-manual/en/broker-balancers.md +++ b/docs/user-manual/en/broker-balancers.md @@ -19,9 +19,9 @@ It is a string retrieved from an incoming client connection, the supported value * `USER_NAME` is the username indicated by the client. ## Pools -The pool is a group of target brokers and checks periodically their state. +The pool is a group of target brokers with periodic checks on their state. It provides a list of ready target brokers to distribute incoming client connections only when it is active. -A pool becomes active when the minimum number of ready target brokers defined by the `quorum-size` parameter is reached. +A pool becomes active when the minimum number of target brokers, as defined by the `quorum-size` parameter, become ready. When it is not active, it doesn't provide any target avoiding weird distribution at startup or after a restart. Including the local broker in the target pool allows broker hosting the balancer to accept incoming client connections as well. By default, a pool doesn't include the local broker, to include it as a target the `local-target-enabled` parameter must be `true`. @@ -102,22 +102,27 @@ A policy is defined by the `policy` element. Let's take a look at a policy examp The broker balancer provides a cache with a timeout to improve the stickiness of the target broker selected, returning the same target broker for a target key as long as it is present in the cache and is ready. So a broker balancer with the cache enabled doesn't strictly follow the configured policy. -By default, the cache is enabled, and will never timeout. See below +By default, the cache is enabled and will never timeout. See below for more details about setting the `cache-timeout` parameter. ## Defining broker balancers -A broker balancer is defined by `broker-balancer` element, it includes the following items: -* the `name` attribute defines the name of the broker balancer; +A broker balancer is defined by the `broker-balancer` element, it includes the following items: +* the `name` attribute defines the name of the broker balancer and is used to reference the balancer from an acceptor; * the `target-key` element defines what key to select a target broker, the supported values are: `CLIENT_ID`, `SNI_HOST`, `SOURCE_IP`, `USER_NAME`, default is `SOURCE_IP`, see [target key](#target-key) for further details; * the `target-key-filter` element defines a regular expression to filter the resolved keys; * the `local-target-filter` element defines a regular expression to match the keys that have to return a local target; * the `cache-timeout` element is the time period for a target broker to remain in the cache, measured in milliseconds, setting `0` will disable the cache, default is `-1`, meaning no expiration; * the `pool` element defines the pool to group the target brokers, see [pools](#pools). -* the `policy` element defines the policy used to select the target brokers, see [policies](#policies); +* the `policy` element defines the policy used to select the target brokers from the pool, see [policies](#policies); Let's take a look at some broker balancer examples from broker.xml: ```xml + + CLIENT_ID + ^.{3} + ^FOO.* + @@ -155,8 +160,9 @@ Let's take a look at some broker balancer examples from broker.xml: The broker balancer workflow include the following steps: * Retrieve the target key from the incoming connection; * Return the local target broker if the target key matches the local filter; +* Delegate to the pool: * Return the cached target broker if it is ready; -* Get ready target brokers from the pool; +* Get ready/active target brokers from the pool; * Select one target broker using the policy; * Add the selected broker in the cache; * Return the selected broker. @@ -164,6 +170,23 @@ The broker balancer workflow include the following steps: Let's take a look at flowchart of the broker balancer workflow: ![Broker Balancer Workflow](images/broker_balancer_workflow.png) +## Data gravity +The first balancer configuration: `local-partition`, demonstrates the simplest use case, +that of preserving `data gravity` by confining a subset of application data to a given broker. +Each broker is given a subset of keys that it will exclusively service or reject. +If brokers are behind a round-robin load-balancer or have full knowledge of the broker +urls, `their` broker will eventually respond. The `local-target-filter` regular expression +determines the granularity of partition that is best for preserving `data gravity` for your applications. + +The challenge is in providing a consistent [key](#Target_Key) in all related application connections. + +Note: the concept of `data gravity` tries to capture the reality that while addresses are shared by multiple +applications, it is best to keep related addresses and their data co-located on a single broker. Typically, +applications should `connect` to the data rather than the data moving to whatever broker the application connects too. +This is particularly true when the amount of data (backlog) is large, the cost of movement to follow consumers outweighs +the cost of delivery to the application. +With the 'data gravity' mindset, operators are less concerned with numbers of connections and more concerned with +applications and the addresses they need to interact with. ## Redirection Apache ActiveMQ Artemis provides a native redirection for supported clients and a new management API for other clients. diff --git a/examples/features/broker-balancer/pom.xml b/examples/features/broker-balancer/pom.xml index ced2073d2d..984a2d6c18 100644 --- a/examples/features/broker-balancer/pom.xml +++ b/examples/features/broker-balancer/pom.xml @@ -43,6 +43,7 @@ under the License. evenly-redirect symmetric-redirect + symmetric-simple @@ -50,6 +51,7 @@ under the License. evenly-redirect symmetric-redirect + symmetric-simple diff --git a/examples/features/broker-balancer/symmetric-simple/pom.xml b/examples/features/broker-balancer/symmetric-simple/pom.xml new file mode 100644 index 0000000000..54a23b5dfc --- /dev/null +++ b/examples/features/broker-balancer/symmetric-simple/pom.xml @@ -0,0 +1,161 @@ + + + + + 4.0.0 + + + org.apache.activemq.examples + broker-balancer + 2.19.0-SNAPSHOT + + + symmetric-simple + jar + symmetric-simple + + + ${project.basedir}/../../../.. + + + + + org.apache.qpid + qpid-jms-client + + + + + + + org.apache.activemq + artemis-maven-plugin + + + create0 + + create + + + ${noServer} + ${basedir}/target/server0 + true + ${basedir}/target/classes/activemq/server0 + + -Djava.net.preferIPv4Stack=true + + + + create1 + + create + + + ${noServer} + ${basedir}/target/server1 + true + ${basedir}/target/classes/activemq/server1 + + -Djava.net.preferIPv4Stack=true + + + + start0 + + cli + + + true + ${noServer} + ${basedir}/target/server0 + + run + + server0 + + + + start1 + + cli + + + ${noServer} + true + ${basedir}/target/server1 + + run + + server1 + + + + runClient + + runClient + + + + org.apache.activemq.artemis.jms.example.SymmetricSimpleExample + + + + stop0 + + cli + + + ${noServer} + ${basedir}/target/server0 + + stop + + + + + stop1 + + cli + + + ${noServer} + ${basedir}/target/server1 + + stop + + + + + + + org.apache.activemq.examples + symmetric-simple + ${project.version} + + + + + org.apache.maven.plugins + maven-clean-plugin + + + + diff --git a/examples/features/broker-balancer/symmetric-simple/readme.md b/examples/features/broker-balancer/symmetric-simple/readme.md new file mode 100644 index 0000000000..cf18e91a59 --- /dev/null +++ b/examples/features/broker-balancer/symmetric-simple/readme.md @@ -0,0 +1,31 @@ +# Symmetric Simple Example + +To run the example, simply type **mvn verify** from this directory, or **mvn -PnoServer verify** if you want to create and start the broker manually. + +This example demonstrates how data is partitioned across two brokers. The idea is to focus on +'data gravity' and partition connections to brokers based on the data they will access, +rather than focus on balancing connection numbers overall. +For example, ensuring that all users of some particular set of data map to `Broker0` +and all users of some other set, map to `Broker1`. In this way, there is no extra movement of +data to service requests. +This architecture is intentionally quite static; based on the configured regular expressions, +it is always possible to infer where a given application (key) should be routed to. + +In this example both brokers have the role of target, but for a subset of keys. If the key is a match +it is accepted, if not it is rejected. +Note: redirection in this scenario is optional, with a round-robin distribution of client urls, a connection will +eventually find a local target match. +In this example, the qpid jms amqp client failover feature does the required round-robin distribution. + +The job of application developers in this scenario is to provide a key that can easily be mapped to a regular +expression that can capture an appropriate 'center of data gravity' for a broker. + +In configuration, the `local-target-filter` provides the regular expression that controls what keys are mapped to a broker. +`Broker0` takes clientIDs with prefix FOO and `Broker1` takes prefix BAR. The `target-key-filter` specifies how the key is extracted, +we care about the first 3 characters, from the user supplied clientId. + + + CLIENT_ID + ^.{3} + ^FOO.* + diff --git a/examples/features/broker-balancer/symmetric-simple/src/main/java/org/apache/activemq/artemis/jms/example/SymmetricSimpleExample.java b/examples/features/broker-balancer/symmetric-simple/src/main/java/org/apache/activemq/artemis/jms/example/SymmetricSimpleExample.java new file mode 100644 index 0000000000..ec5ab91200 --- /dev/null +++ b/examples/features/broker-balancer/symmetric-simple/src/main/java/org/apache/activemq/artemis/jms/example/SymmetricSimpleExample.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.jms.example; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.qpid.jms.JmsConnectionFactory; + + +/** + * This example demonstrates how incoming client connections are partitioned across two brokers + * it uses AMQP to take advantage of the simple round-robin retry logic of the failover url scheme + */ +public class SymmetricSimpleExample { + + public static void main(final String[] args) throws Exception { + + /** + * Step 1. Create a connection for producer0 and producer1, and send a few messages with different key values + */ + ConnectionFactory connectionFactory = new JmsConnectionFactory("failover:(amqp://localhost:61616,amqp://localhost:61617)"); + + + try (Connection connectionProducer0 = connectionFactory.createConnection(); + Connection connectionProducer1 = connectionFactory.createConnection()) { + + // using first 3 characters of clientID as key for data gravity + connectionProducer0.setClientID("BAR_PRODUCER"); + connectionProducer1.setClientID("FOO_PRODUCER"); + + + for (Connection connectionProducer : new Connection[] {connectionProducer0, connectionProducer1}) { + Session session = connectionProducer.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue queue = session.createQueue("exampleQueue" + connectionProducer.getClientID().substring(0, 3)); + MessageProducer sender = session.createProducer(queue); + for (int i = 0; i < 10; i++) { + TextMessage message = session.createTextMessage("Hello world n" + i + " - " + connectionProducer.getClientID().substring(0, 3)); + System.out.println("Sending message " + message.getText() + "/" + connectionProducer.getClientID()); + sender.send(message); + } + } + } + + /** + * Step 2. create a connection for consumer0 and consumer1, and receive a few messages. + * the consumers will find data that matches their key + */ + + try (Connection connectionConsumer0 = connectionFactory.createConnection(); + Connection connectionConsumer1 = connectionFactory.createConnection()) { + + // using first 3 characters of clientID as key for data gravity + connectionConsumer0.setClientID("FOO_CONSUMER"); + connectionConsumer1.setClientID("BAR_CONSUMER"); + + for (Connection connectionConsumer : new Connection[]{connectionConsumer0, connectionConsumer1}) { + connectionConsumer.start(); + Session session = connectionConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("exampleQueue" + connectionConsumer.getClientID().substring(0, 3)); + MessageConsumer consumer = session.createConsumer(queue); + for (int i = 0; i < 10; i++) { + TextMessage message = (TextMessage) consumer.receive(5000); + System.out.println("Received message " + message.getText() + "/" + connectionConsumer.getClientID()); + } + } + } + } +} diff --git a/examples/features/broker-balancer/symmetric-simple/src/main/resources/activemq/server0/broker.xml b/examples/features/broker-balancer/symmetric-simple/src/main/resources/activemq/server0/broker.xml new file mode 100644 index 0000000000..7ca578c67d --- /dev/null +++ b/examples/features/broker-balancer/symmetric-simple/src/main/resources/activemq/server0/broker.xml @@ -0,0 +1,121 @@ + + + + + + + + 0.0.0.0 + + + false + + NIO + + + true + + 120000 + + 60000 + + HALT + + 60000 + + + + tcp://0.0.0.0:61616?redirect-to=symmetric-simple;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true + + + + + CLIENT_ID + ^.{3} + ^FOO.* + + + + + + + + + + + + + + + + + + + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + true + true + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + true + true + + + + +

+ + + +
+
+ + + +
+ + + + + diff --git a/examples/features/broker-balancer/symmetric-simple/src/main/resources/activemq/server1/broker.xml b/examples/features/broker-balancer/symmetric-simple/src/main/resources/activemq/server1/broker.xml new file mode 100644 index 0000000000..2e759c6381 --- /dev/null +++ b/examples/features/broker-balancer/symmetric-simple/src/main/resources/activemq/server1/broker.xml @@ -0,0 +1,122 @@ + + + + + + + + 0.0.0.0 + + + false + + NIO + + + true + + 120000 + + 60000 + + HALT + + 60000 + + + + tcp://0.0.0.0:61617?redirect-to=symmetric-simple;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true + + + + + CLIENT_ID + ^.{3} + ^BAR.* + + + + + + + + + + + + + + + + + + + + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + true + true + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + true + true + + + + +
+ + + +
+
+ + + +
+ +
+ +
+