diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/config/federation/FederationAddressPolicyConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/config/federation/FederationAddressPolicyConfiguration.java index 71188e692c..f00fbd91f2 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/config/federation/FederationAddressPolicyConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/config/federation/FederationAddressPolicyConfiguration.java @@ -34,6 +34,7 @@ public class FederationAddressPolicyConfiguration implements FederationPolicy(); excludes = new HashSet<>(); decodeMatchers(buffer, includes); decodeMatchers(buffer, excludes); + if (buffer.readableBytes() > 0) { + enableDivertBindings = buffer.readBoolean(); + } } private void encodeMatchers(final ActiveMQBuffer buffer, final Set matchers) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index ce4e39d1ce..0087f44fda 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -2091,6 +2091,9 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { } else if (item.getNodeName().equals("transformer-ref")) { String transformerRef = item.getNodeValue(); config.setTransformerRef(transformerRef); + } else if (item.getNodeName().equals("enable-divert-bindings")) { + boolean enableDivertBindings = Boolean.parseBoolean(item.getNodeValue()); + config.setEnableDivertBindings(enableDivertBindings); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index 84de5ed7a4..4909c0b15f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1659,6 +1659,11 @@ public interface ActiveMQServerLogger extends BasicLogger { format = Message.Format.MESSAGE_FORMAT) void federationPluginExecutionError(@Cause Throwable e, String pluginMethod); + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222287, value = "Error looking up bindings for address {}.", + format = Message.Format.MESSAGE_FORMAT) + void federationBindingsLookupError(@Cause Throwable e, SimpleString address); + @LogMessage(level = Logger.Level.ERROR) @Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT) void initializationError(@Cause Throwable e); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Divert.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Divert.java index 5b2694c954..f51f4b055d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Divert.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Divert.java @@ -31,4 +31,6 @@ public interface Divert extends Bindable { SimpleString getRoutingName(); Transformer getTransformer(); + + SimpleString getForwardAddress(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/address/FederatedAddress.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/address/FederatedAddress.java index 9a10cdeae4..2a3d7e510c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/address/FederatedAddress.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/address/FederatedAddress.java @@ -19,7 +19,9 @@ package org.apache.activemq.artemis.core.server.federation.address; import java.io.Serializable; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -34,8 +36,9 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.core.config.WildcardConfiguration; import org.apache.activemq.artemis.core.config.federation.FederationAddressPolicyConfiguration; +import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.QueueBinding; -import org.apache.activemq.artemis.core.security.SecurityAuth; +import org.apache.activemq.artemis.core.postoffice.impl.DivertBinding; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.Queue; @@ -43,11 +46,13 @@ import org.apache.activemq.artemis.core.server.federation.FederatedAbstract; import org.apache.activemq.artemis.core.server.federation.FederatedConsumerKey; import org.apache.activemq.artemis.core.server.federation.Federation; import org.apache.activemq.artemis.core.server.federation.FederationUpstream; -import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBindingPlugin; import org.apache.activemq.artemis.core.server.transformer.Transformer; import org.apache.activemq.artemis.core.settings.impl.Match; +import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.utils.ByteUtil; -import org.jboss.logging.Logger; /** * Federated Address, replicate messages from the remote brokers address to itself. @@ -59,9 +64,8 @@ import org.jboss.logging.Logger; * * */ -public class FederatedAddress extends FederatedAbstract implements ActiveMQServerQueuePlugin, Serializable { +public class FederatedAddress extends FederatedAbstract implements ActiveMQServerBindingPlugin, ActiveMQServerAddressPlugin, Serializable { - private static final Logger logger = Logger.getLogger(FederatedAddress.class); public static final String FEDERATED_QUEUE_PREFIX = "federated"; public static final SimpleString HDR_HOPS = new SimpleString("_AMQ_Hops"); @@ -69,8 +73,8 @@ public class FederatedAddress extends FederatedAbstract implements ActiveMQServe private final SimpleString filterString; private final Set includes; private final Set excludes; - private final FederationAddressPolicyConfiguration config; + private final Map> matchingDiverts = new HashMap<>(); public FederatedAddress(Federation federation, FederationAddressPolicyConfiguration config, ActiveMQServer server, FederationUpstream upstream) { super(federation, server, upstream); @@ -102,25 +106,16 @@ public class FederatedAddress extends FederatedAbstract implements ActiveMQServe } @Override - public void start() { - super.start(); - server.getPostOffice() - .getAllBindings() - .values() - .stream() - .filter(b -> b instanceof QueueBinding) - .map(b -> ((QueueBinding) b).getQueue()) - .forEach(this::conditionalCreateRemoteConsumer); - } - - /** - * After a queue has been created - * - * @param queue The newly created queue - */ - @Override - public synchronized void afterCreateQueue(Queue queue) { - conditionalCreateRemoteConsumer(queue); + public synchronized void start() { + if (!isStarted()) { + super.start(); + server.getPostOffice() + .getAllBindings() + .values() + .stream() + .filter(b -> b instanceof QueueBinding || b instanceof DivertBinding) + .forEach(this::afterAddBinding); + } } private void conditionalCreateRemoteConsumer(Queue queue) { @@ -141,6 +136,145 @@ public class FederatedAddress extends FederatedAbstract implements ActiveMQServe createRemoteConsumer(queue); } + @Override + public void afterAddAddress(AddressInfo addressInfo, boolean reload) { + if (match(addressInfo)) { + try { + //Diverts can be added without the source address existing yet so + //if a new address is added we need to see if there are matching divert bindings + server.getPostOffice() + .getDirectBindings(addressInfo.getName()) + .getBindings().stream().filter(binding -> binding instanceof DivertBinding) + .forEach(this::afterAddBinding); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.federationBindingsLookupError(e, addressInfo.getName()); + } + } + } + + @Override + public void afterAddBinding(Binding binding) { + if (binding instanceof QueueBinding) { + conditionalCreateRemoteConsumer(((QueueBinding) binding).getQueue()); + + if (config.isEnableDivertBindings()) { + synchronized (this) { + for (Map.Entry> entry : matchingDiverts.entrySet()) { + //for each divert check the new QueueBinding to see if the divert matches and is not already tracking + if (!entry.getValue().contains(((QueueBinding) binding).getQueue().getName())) { + //conditionalCreateRemoteConsumer will check if the queue is a target of the divert before adding + conditionalCreateRemoteConsumer(entry.getKey(), entry.getValue(), (QueueBinding) binding); + } + } + } + } + } else if (config.isEnableDivertBindings() && binding instanceof DivertBinding) { + final DivertBinding divertBinding = (DivertBinding) binding; + final AddressInfo addressInfo = server.getPostOffice().getAddressInfo(binding.getAddress()); + + synchronized (this) { + if (match(addressInfo) && matchingDiverts.get(divertBinding) == null) { + final Set matchingQueues = new HashSet<>(); + matchingDiverts.put(divertBinding, matchingQueues); + + //find existing matching queue bindings for the divert to create consumers for + final SimpleString forwardAddress = divertBinding.getDivert().getForwardAddress(); + try { + //create demand for each matching queue binding that isn't already tracked by the divert + //conditionalCreateRemoteConsumer will check if the queue is a target of the divert before adding + server.getPostOffice().getBindingsForAddress(forwardAddress).getBindings() + .stream().filter(b -> b instanceof QueueBinding).map(b -> (QueueBinding) b) + .forEach(queueBinding -> conditionalCreateRemoteConsumer(divertBinding, matchingQueues, queueBinding)); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.federationBindingsLookupError(e, forwardAddress); + } + } + } + } + } + + private void conditionalCreateRemoteConsumer(DivertBinding divertBinding, Set matchingQueues, QueueBinding queueBinding) { + if (server.hasBrokerFederationPlugins()) { + final AtomicBoolean conditionalCreate = new AtomicBoolean(true); + try { + server.callBrokerFederationPlugins(plugin -> { + conditionalCreate.set(conditionalCreate.get() && plugin.federatedAddressConditionalCreateDivertConsumer(divertBinding, queueBinding)); + }); + } catch (ActiveMQException t) { + ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "federatedAddressConditionalCreateDivertConsumer"); + throw new IllegalStateException(t.getMessage(), t.getCause()); + } + if (!conditionalCreate.get()) { + return; + } + } + createRemoteConsumer(divertBinding, matchingQueues, queueBinding); + } + + private void createRemoteConsumer(DivertBinding divertBinding, final Set matchingQueues, QueueBinding queueBinding) { + final AddressInfo addressInfo = server.getPostOffice().getAddressInfo(divertBinding.getAddress()); + + //If the divert address matches and if the new queueBinding matches the forwarding address of the divert + //then create a remote consumer if not already being tracked by the divert + if (match(addressInfo) && queueBinding.getAddress().equals(divertBinding.getDivert().getForwardAddress()) + && matchingQueues.add(queueBinding.getQueue().getName())) { + FederatedConsumerKey key = getKey(addressInfo); + Transformer transformer = getTransformer(config.getTransformerRef()); + Transformer addHop = FederatedAddress::addHop; + createRemoteConsumer(key, mergeTransformers(addHop, transformer), clientSession -> createRemoteQueue(clientSession, key)); + } + } + + @Override + public void beforeRemoveBinding(SimpleString uniqueName, Transaction tx, boolean deleteData) { + final Binding binding = server.getPostOffice().getBinding(uniqueName); + if (binding instanceof QueueBinding) { + final Queue queue = ((QueueBinding) binding).getQueue(); + + //Remove any direct queue demand + removeRemoteConsumer(getKey(queue)); + + if (config.isEnableDivertBindings()) { + //See if there is any matching diverts that match this queue binding and remove demand now that + //the queue is going away + synchronized (this) { + matchingDiverts.entrySet().forEach(entry -> { + if (entry.getKey().getDivert().getForwardAddress().equals(queue.getAddress())) { + final AddressInfo addressInfo = server.getPostOffice().getAddressInfo(binding.getAddress()); + //check if the queue has been tracked by this divert and if so remove the consumer + if (entry.getValue().remove(queue)) { + removeRemoteConsumer(getKey(addressInfo)); + } + } + }); + } + } + } else if (config.isEnableDivertBindings() && binding instanceof DivertBinding) { + final DivertBinding divertBinding = (DivertBinding) binding; + final SimpleString forwardAddress = divertBinding.getDivert().getForwardAddress(); + + //Check if we have added this divert binding as a matching binding + //If we have then we need to look for any still existing queue bindings that map to this divert + //and remove consumers if they haven't already been removed + synchronized (this) { + final Set matchingQueues; + if ((matchingQueues = matchingDiverts.remove(binding)) != null) { + try { + final AddressInfo addressInfo = server.getPostOffice().getAddressInfo(binding.getAddress()); + if (addressInfo != null) { + //remove queue binding demand if tracked by the divert + server.getPostOffice().getBindingsForAddress(forwardAddress) + .getBindings().stream().filter(b -> b instanceof QueueBinding && matchingQueues.remove(((QueueBinding) b).getQueue().getName())) + .forEach(queueBinding -> removeRemoteConsumer(getKey(addressInfo))); + } + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.federationBindingsLookupError(e, forwardAddress); + } + } + } + } + } + public FederationAddressPolicyConfiguration getConfig() { return config; } @@ -170,12 +304,20 @@ public class FederatedAddress extends FederatedAbstract implements ActiveMQServe } private boolean match(Queue queue) { + return match(queue.getAddress(), queue.getRoutingType()); + } + + private boolean match(AddressInfo addressInfo) { + return addressInfo != null ? match(addressInfo.getName(), addressInfo.getRoutingType()) : false; + } + + private boolean match(SimpleString address, RoutingType routingType) { //Currently only supporting Multicast currently. - if (RoutingType.ANYCAST.equals(queue.getRoutingType())) { + if (RoutingType.ANYCAST.equals(routingType)) { return false; } for (Matcher exclude : excludes) { - if (exclude.test(queue)) { + if (exclude.test(address.toString())) { return false; } } @@ -183,7 +325,7 @@ public class FederatedAddress extends FederatedAbstract implements ActiveMQServe return true; } else { for (Matcher include : includes) { - if (include.test(queue)) { + if (include.test(address.toString())) { return true; } } @@ -208,24 +350,15 @@ public class FederatedAddress extends FederatedAbstract implements ActiveMQServe } } - /** - * Before an address is removed - * - * @param queue The queue that will be removed - */ - @Override - public synchronized void beforeDestroyQueue(Queue queue, final SecurityAuth session, boolean checkConsumerCount, - boolean removeConsumers, boolean autoDeleteAddress) { - FederatedConsumerKey key = getKey(queue); - removeRemoteConsumer(key); - } - private FederatedConsumerKey getKey(Queue queue) { return new FederatedAddressConsumerKey(federation.getName(), upstream.getName(), queue.getAddress(), queue.getRoutingType(), queueNameFormat, filterString); } - public static class Matcher { + private FederatedConsumerKey getKey(AddressInfo address) { + return new FederatedAddressConsumerKey(federation.getName(), upstream.getName(), address.getName(), address.getRoutingType(), queueNameFormat, filterString); + } + public static class Matcher { Predicate addressPredicate; Matcher(FederationAddressPolicyConfiguration.Matcher config, WildcardConfiguration wildcardConfiguration) { @@ -234,10 +367,8 @@ public class FederatedAddress extends FederatedAbstract implements ActiveMQServe } } - public boolean test(Queue queue) { - return addressPredicate == null || addressPredicate.test(queue.getAddress().toString()); + public boolean test(String address) { + return addressPredicate == null || addressPredicate.test(address); } - } - } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java index 7e37adb4f0..1bf6123633 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java @@ -163,6 +163,11 @@ public class DivertImpl implements Divert { return transformer; } + @Override + public SimpleString getForwardAddress() { + return forwardAddress; + } + /* (non-Javadoc) * @see java.lang.Object#toString() */ diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerFederationPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerFederationPlugin.java index 8e27693048..e64d264109 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerFederationPlugin.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerFederationPlugin.java @@ -19,6 +19,8 @@ package org.apache.activemq.artemis.core.server.plugin; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.core.postoffice.QueueBinding; +import org.apache.activemq.artemis.core.postoffice.impl.DivertBinding; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.federation.FederatedConsumerKey; @@ -121,6 +123,10 @@ public interface ActiveMQServerFederationPlugin extends ActiveMQServerBasePlugin return true; } + default boolean federatedAddressConditionalCreateDivertConsumer(DivertBinding divertBinding, QueueBinding queueBinding) throws ActiveMQException { + return true; + } + /** * Conditionally create a federated queue consumer for a federated queue. This allows custom * logic to be inserted to decide when to create federated queue consumers diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 794409433c..1177f4b76c 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -1824,6 +1824,7 @@ + diff --git a/artemis-tools/src/test/resources/artemis-configuration.xsd b/artemis-tools/src/test/resources/artemis-configuration.xsd index 891df2d0a0..b06c527340 100644 --- a/artemis-tools/src/test/resources/artemis-configuration.xsd +++ b/artemis-tools/src/test/resources/artemis-configuration.xsd @@ -1696,6 +1696,7 @@ + diff --git a/docs/user-manual/en/federation-address.md b/docs/user-manual/en/federation-address.md index 8a77cb9f1f..68d1ba2a12 100644 --- a/docs/user-manual/en/federation-address.md +++ b/docs/user-manual/en/federation-address.md @@ -66,6 +66,18 @@ the tree can extend to any depth, and can be extended to without needing to re-c In this case messages published to the master address can be received by any consumer connected to any broker in the tree. +### Divert Binding Support + +Divert binding support can be added as part of the address policy configuration. This will allow the federation to respond +to divert bindings to create demand. For example, let's say there is one address called "test.federation.source" that is +included as a match for the federated address and another address called "test.federation.target" that is not included. Normally +when a queue is created on "test.federation.target" this would not cause a federated consumer to be created because the address +is not part of the included matches. However, if we create a divert binding such that "test.federation.source" is the source address +and "test.federation.target" is the forwarded address then demand will now be created. The source address still must be multicast +but the target address can be multicast or anycast. + +An example use case for this might be a divert that redirects JMS topics (multicast addresses) to a JMS queue (anycast addresses) to +allow for load balancing of the messages on a topic for legacy consumers not supporting JMS 2.0 and shared subscriptions. ## Configuring Address Federation @@ -135,6 +147,9 @@ and the delay and message count params have been met. This is useful if you want - `transformer-ref`. The ref name for a transformer (see transformer config) that you may wish to configure to transform the message on federation transfer. +- `enable-divert-bindings`. Setting to true will enable divert bindings to be listened for demand. If there is a divert binding with an address that matches the included +addresses for the stream, any queue bindings that match the forward address of the divert will create demand. Default is false + **note** `address-policy`'s and `queue-policy`'s are able to be defined in the same federation, and be linked to the same upstream. diff --git a/examples/features/federation/federated-address-divert/pom.xml b/examples/features/federation/federated-address-divert/pom.xml new file mode 100644 index 0000000000..bfab93c58c --- /dev/null +++ b/examples/features/federation/federated-address-divert/pom.xml @@ -0,0 +1,173 @@ + + + + + 4.0.0 + + + org.apache.activemq.examples.federation + broker-federation + 2.12.0-SNAPSHOT + + + federated-address-divert + jar + ActiveMQ Artemis Federated Address Divert Example + + + ${project.basedir}/../../../.. + + + + + org.apache.activemq + artemis-jms-client + ${project.version} + + + + + + + org.apache.activemq + artemis-maven-plugin + + + create0 + + create + + + ${noServer} + ${basedir}/target/server0 + ${basedir}/target/classes/activemq/server0 + + -Djava.net.preferIPv4Stack=true + + + + create1 + + create + + + ${noServer} + ${basedir}/target/server1 + ${basedir}/target/classes/activemq/server1 + + -Djava.net.preferIPv4Stack=true + + + + start0 + + cli + + + ${noServer} + true + ${basedir}/target/server0 + tcp://localhost:61616 + + run + + eu-west-1 + + + + start1 + + cli + + + ${noServer} + true + ${basedir}/target/server1 + tcp://localhost:61617 + + run + + eu-east-1 + + + + runClient + + runClient + + + org.apache.activemq.artemis.jms.example.FederatedAddressDivertExample + + + + stop0 + + cli + + + ${noServer} + ${basedir}/target/server0 + + stop + + + + + stop1 + + cli + + + ${noServer} + ${basedir}/target/server1 + + stop + + + + + + + org.apache.activemq.examples.federation + federated-address-divert + ${project.version} + + + + + org.apache.maven.plugins + maven-clean-plugin + + + + + + release + + + + com.vladsch.flexmark + markdown-page-generator-plugin + + + + + + diff --git a/examples/features/federation/federated-address-divert/readme.md b/examples/features/federation/federated-address-divert/readme.md new file mode 100644 index 0000000000..b30d43bd7d --- /dev/null +++ b/examples/features/federation/federated-address-divert/readme.md @@ -0,0 +1,18 @@ +# Federated Address Divert Example + +To run the example, simply type **mvn verify** from this directory, or **mvn -PnoServer verify** if you want to start and create the broker manually. + +This example demonstrates a core multicast address deployed on two different brokers. The two brokers are configured to form a federated address mesh. + +In the example we name the brokers eu-west and eu-east. + +The following is then carried out: + +1. create a divert binding with a source address of exampleTopic and target address of divertExampleTopic on eu-west + +1. create a consumer on the topic divertExampleTopic on eu-west and create a producer on the topic exampleTopic on eu-east. + +2. send some messages via the producer on eu-east, and we verify the eu-west consumer receives the messages because of the divert binding demand + + +For more information on ActiveMQ Artemis Federation please see the federation section of the user manual. \ No newline at end of file diff --git a/examples/features/federation/federated-address-divert/src/main/java/org/apache/activemq/artemis/jms/example/FederatedAddressDivertExample.java b/examples/features/federation/federated-address-divert/src/main/java/org/apache/activemq/artemis/jms/example/FederatedAddressDivertExample.java new file mode 100644 index 0000000000..62af0aceff --- /dev/null +++ b/examples/features/federation/federated-address-divert/src/main/java/org/apache/activemq/artemis/jms/example/FederatedAddressDivertExample.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.jms.example; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; + +/** + * A simple example that demonstrates multicast address replication between remote servers, + * using Address Federation feature and diverts. + */ +public class FederatedAddressDivertExample { + + public static void main(final String[] args) throws Exception { + Connection connectionEUWest = null; + + Connection connectionEUEast = null; + + + try { + // Step 1. Instantiate the Topic (multicast) for the producers + Topic topic = ActiveMQJMSClient.createTopic("exampleTopic"); + + //Create a topic for the consumers + Topic topic2 = ActiveMQJMSClient.createTopic("divertExampleTopic"); + + // Step 2. Instantiate connection towards server EU West + ConnectionFactory cfEUWest = new ActiveMQConnectionFactory("tcp://localhost:61616"); + + // Step 3. Instantiate connection towards server EU East + ConnectionFactory cfEUEast = new ActiveMQConnectionFactory("tcp://localhost:61617"); + + + // Step 5. We create a JMS Connection connectionEUWest which is a connection to server EU West + connectionEUWest = cfEUWest.createConnection(); + + // Step 6. We create a JMS Connection connectionEUEast which is a connection to server EU East + connectionEUEast = cfEUEast.createConnection(); + + // Step 8. We create a JMS Session on server EU West + Session sessionEUWest = connectionEUWest.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Step 9. We create a JMS Session on server EU East + Session sessionEUEast = connectionEUEast.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Step 11. We start the connections to ensure delivery occurs on them + connectionEUWest.start(); + + connectionEUEast.start(); + + // Step 12. We create a JMS MessageProducer object on each server + MessageProducer producerEUEast = sessionEUEast.createProducer(topic); + + // Step 13. We create JMS MessageConsumer objects on each server - Messages will be diverted to this topic + MessageConsumer consumerEUWest = sessionEUWest.createSharedDurableConsumer(topic2, "exampleSubscription"); + + + // Step 14. Let a little time for everything to start and form. + + Thread.sleep(5000); + + // Step 13. We send some messages to server EU West + final int numMessages = 10; + + // Step 15. Repeat same test one last time, this time sending on EU East + + for (int i = 0; i < numMessages; i++) { + TextMessage message = sessionEUEast.createTextMessage("This is text sent from EU East, message " + i); + + producerEUEast.send(message); + + System.out.println("EU East :: Sent message: " + message.getText()); + } + + // Step 14. We now consume those messages on *all* servers . + // We note that every consumer, receives a message even so on seperate servers + + for (int i = 0; i < numMessages; i++) { + TextMessage messageEUWest = (TextMessage) consumerEUWest.receive(5000); + + System.out.println("EU West :: Got message: " + messageEUWest.getText()); + } + } finally { + // Step 16. Be sure to close our resources! + if (connectionEUWest != null) { + connectionEUWest.stop(); + connectionEUWest.close(); + } + + if (connectionEUEast != null) { + connectionEUEast.stop(); + connectionEUEast.close(); + } + } + } +} diff --git a/examples/features/federation/federated-address-divert/src/main/resources/activemq/server0/broker.xml b/examples/features/federation/federated-address-divert/src/main/resources/activemq/server0/broker.xml new file mode 100644 index 0000000000..9a279c1d90 --- /dev/null +++ b/examples/features/federation/federated-address-divert/src/main/resources/activemq/server0/broker.xml @@ -0,0 +1,111 @@ + + + + + + eu-west-1-master + + ./data/bindings + + ./data/journal + + ./data/largemessages + + ./data/paging + + + + tcp://localhost:61616 + tcp://localhost:61616 + tcp://localhost:61617 + + + + + tcp://localhost:61616 + + + + + + + + 1000 + + eu-east-1-connector + + + + + + + + + + + + + + + + + + + federation-divert +
exampleTopic
+ divertExampleTopic + true +
+
+ + + + + + + + + + + + + + + + + + + + + + + + +
+ +
+
+ + + +
+
+
+
diff --git a/examples/features/federation/federated-address-divert/src/main/resources/activemq/server1/broker.xml b/examples/features/federation/federated-address-divert/src/main/resources/activemq/server1/broker.xml new file mode 100644 index 0000000000..dda46db0d8 --- /dev/null +++ b/examples/features/federation/federated-address-divert/src/main/resources/activemq/server1/broker.xml @@ -0,0 +1,65 @@ + + + + + + eu-east-1-master + + target/server1/data/messaging/bindings + + target/server1/data/messaging/journal + + target/server1/data/messaging/largemessages + + target/server1/data/messaging/paging + + + + tcp://localhost:61617 + tcp://localhost:61616 + tcp://localhost:61617 + + + + + tcp://localhost:61617 + + + + + + + + + + + + + + + + + +
+ +
+
+
+
diff --git a/examples/features/federation/pom.xml b/examples/features/federation/pom.xml index f13a087d5d..327c66b527 100644 --- a/examples/features/federation/pom.xml +++ b/examples/features/federation/pom.xml @@ -53,6 +53,7 @@ under the License. federated-address federated-address-downstream federated-address-downstream-upstream + federated-address-divert @@ -64,6 +65,7 @@ under the License. federated-address federated-address-downstream federated-address-downstream-upstream + federated-address-divert diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedAddressTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedAddressTest.java index eea8925de3..bc876613d8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedAddressTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedAddressTest.java @@ -22,11 +22,18 @@ import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; +import javax.jms.Queue; import javax.jms.Session; import javax.jms.Topic; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.DivertConfiguration; import org.apache.activemq.artemis.core.config.FederationConfiguration; +import org.apache.activemq.artemis.core.config.federation.FederationAddressPolicyConfiguration; +import org.apache.activemq.artemis.core.postoffice.QueueBinding; +import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.transformer.Transformer; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.tests.util.Wait; @@ -170,6 +177,139 @@ public class FederatedAddressTest extends FederatedTestBase { verifyTransformer(address); } + /** + * Test diverts for downstream configurations + */ + //Test creating address first followed by divert + //Test creating divert before consumer + @Test + public void testDownstreamFederatedAddressDivertAddressAndDivertFirst() throws Exception { + testFederatedAddressDivert(true,true, true); + } + + //Test creating divert first followed by address + //Test creating divert before consumer + @Test + public void testDownstreamFederatedAddressDivertAddressSecondDivertFirst() throws Exception { + testFederatedAddressDivert(true,false, true); + } + + //Test creating address first followed by divert + //Test creating consumer before divert + @Test + public void testDownstreamFederatedAddressDivertAddressFirstDivertSecond() throws Exception { + testFederatedAddressDivert(true,true, false); + } + + //Test creating divert first followed by address + //Test creating consumer before divert + @Test + public void testDownstreamFederatedAddressDivertAddressAndDivertSecond() throws Exception { + testFederatedAddressDivert(true,false, false); + } + + /** + * Test diverts for upstream configurations + */ + //Test creating address first followed by divert + //Test creating divert before consumer + @Test + public void testUpstreamFederatedAddressDivertAddressAndDivertFirst() throws Exception { + testFederatedAddressDivert(false,true, true); + } + + //Test creating divert first followed by address + //Test creating divert before consumer + @Test + public void testUpstreamFederatedAddressDivertAddressSecondDivertFirst() throws Exception { + testFederatedAddressDivert(false,false, true); + } + + //Test creating address first followed by divert + //Test creating consumer before divert + @Test + public void testUpstreamFederatedAddressDivertAddressFirstDivertSecond() throws Exception { + testFederatedAddressDivert(false,true, false); + } + + //Test creating divert first followed by address + //Test creating consumer before divert + @Test + public void testUpstreamFederatedAddressDivertAddressAndDivertSecond() throws Exception { + testFederatedAddressDivert(false,false, false); + } + + protected void testFederatedAddressDivert(boolean downstream, boolean addressFirst, boolean divertBeforeConsumer) throws Exception { + String address = getName(); + String address2 = "fedOneWayDivertTest"; + + if (addressFirst) { + getServer(0).addAddressInfo(new AddressInfo(SimpleString.toSimpleString(address), RoutingType.MULTICAST)); + } + + final FederationConfiguration federationConfiguration; + final int deployServer; + if (downstream) { + federationConfiguration = FederatedTestUtil.createAddressDownstreamFederationConfiguration( + "server0", address, getServer(1).getConfiguration().getTransportConfigurations("server1")[0]); + deployServer = 1; + } else { + federationConfiguration = FederatedTestUtil.createAddressUpstreamFederationConfiguration("server1", address); + deployServer = 0; + } + + FederationAddressPolicyConfiguration policy = (FederationAddressPolicyConfiguration) federationConfiguration.getFederationPolicyMap().get("AddressPolicy" + address); + //enable listening for divert bindings + policy.setEnableDivertBindings(true); + getServer(deployServer).getConfiguration().getFederationConfigurations().add(federationConfiguration); + getServer(deployServer).getFederationManager().deploy(); + + ConnectionFactory cf1 = getCF(1); + ConnectionFactory cf0 = getCF(0); + try (Connection connection1 = cf1.createConnection(); Connection connection0 = cf0.createConnection()) { + connection1.start(); + connection0.start(); + + Session session1 = connection1.createSession(); + Topic topic1 = session1.createTopic(address); + MessageProducer producer1 = session1.createProducer(topic1); + + if (divertBeforeConsumer) { + getServer(0).deployDivert(new DivertConfiguration().setName(address + ":" + address2) + .setAddress(address).setExclusive(true).setForwardingAddress(address2) + .setRoutingType(ComponentConfigurationRoutingType.ANYCAST)); + } + + Session session0 = connection0.createSession(); + Queue queue0 = session0.createQueue(address2); + MessageConsumer consumer0 = session0.createConsumer(queue0); + + if (!addressFirst) { + getServer(0).addAddressInfo(new AddressInfo(SimpleString.toSimpleString(address), RoutingType.MULTICAST)); + } + + if (!divertBeforeConsumer) { + getServer(0).deployDivert(new DivertConfiguration().setName(address + ":" + address2) + .setAddress(address).setExclusive(true).setForwardingAddress(address2) + .setRoutingType(ComponentConfigurationRoutingType.ANYCAST)); + } + + assertTrue(Wait.waitFor(() -> getServer(1).getPostOffice().getBindingsForAddress(SimpleString.toSimpleString(address)).getBindings().size() == 1, + 1000, 100)); + final QueueBinding remoteQueueBinding = (QueueBinding) getServer(1).getPostOffice().getBindingsForAddress(SimpleString.toSimpleString(address)) + .getBindings().iterator().next(); + assertEquals(1, remoteQueueBinding.getQueue().getConsumerCount()); + + producer1.send(session1.createTextMessage("hello")); + assertNotNull(consumer0.receive(1000)); + + //Test consumer is cleaned up after divert destroyed + getServer(0).destroyDivert(SimpleString.toSimpleString(address + ":" + address2)); + // getServer(0).destroyQueue(SimpleString.toSimpleString(address2)); + assertTrue(Wait.waitFor(() -> remoteQueueBinding.getQueue().getConsumerCount() == 0, 2000, 100)); + } + } + private void testFederatedAddressReplication(String address) throws Exception { ConnectionFactory cf1 = getCF(1); @@ -188,37 +328,38 @@ public class FederatedAddressTest extends FederatedTestBase { Topic topic0 = session0.createTopic(address); MessageConsumer consumer0 = session0.createConsumer(topic0); - assertTrue(Wait.waitFor(() -> getServer(1).getPostOffice().getBindingsForAddress(SimpleString.toSimpleString(address)).getBindings().size() == 1)); + assertTrue(Wait.waitFor(() -> getServer(1).getPostOffice().getBindingsForAddress( + SimpleString.toSimpleString(address)).getBindings().size() == 1, 2000, 100)); producer.send(session1.createTextMessage("hello")); - assertNotNull(consumer0.receive(10000)); + assertNotNull(consumer0.receive(1000)); producer.send(session1.createTextMessage("hello")); - assertNotNull(consumer0.receive(10000)); + assertNotNull(consumer0.receive(1000)); MessageConsumer consumer1 = session1.createConsumer(topic1); producer.send(session1.createTextMessage("hello")); - assertNotNull(consumer1.receive(10000)); - assertNotNull(consumer0.receive(10000)); + assertNotNull(consumer1.receive(1000)); + assertNotNull(consumer0.receive(1000)); consumer1.close(); //Groups producer.send(session1.createTextMessage("hello")); - assertNotNull(consumer0.receive(10000)); + assertNotNull(consumer0.receive(1000)); producer.send(createTextMessage(session1, "groupA")); - assertNotNull(consumer0.receive(10000)); + assertNotNull(consumer0.receive(1000)); consumer1 = session1.createConsumer(topic1); producer.send(createTextMessage(session1, "groupA")); - assertNotNull(consumer1.receive(10000)); - assertNotNull(consumer0.receive(10000)); + assertNotNull(consumer1.receive(1000)); + assertNotNull(consumer0.receive(1000)); } @@ -246,20 +387,17 @@ public class FederatedAddressTest extends FederatedTestBase { producer.send(session1.createTextMessage("hello")); - assertNull(consumer0.receive(100)); FederationConfiguration federationConfiguration = FederatedTestUtil.createAddressUpstreamFederationConfiguration("server1", address); getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration); getServer(0).getFederationManager().deploy(); - Wait.waitFor(() -> getServer(1).getPostOffice().getBindingsForAddress(SimpleString.toSimpleString(address)).getBindings().size() == 1); - + Wait.waitFor(() -> getServer(1).getPostOffice().getBindingsForAddress( + SimpleString.toSimpleString(address)).getBindings().size() == 1, 2000, 100); producer.send(session1.createTextMessage("hello")); - - assertNotNull(consumer0.receive(10000)); - + assertNotNull(consumer0.receive(1000)); } }