ARTEMIS-2613: Add support for DivertBindings for federated addresses

This will allow federated addresses to create remote consumers based on
the existing of divert bindings and matching queue bindings
This commit is contained in:
Christopher L. Shannon (cshannon) 2020-01-30 08:53:32 -05:00
parent 3c83587746
commit 3966e47338
17 changed files with 867 additions and 62 deletions

View File

@ -34,6 +34,7 @@ public class FederationAddressPolicyConfiguration implements FederationPolicy<Fe
private Long autoDeleteMessageCount;
private int maxHops;
private String transformerRef;
private boolean enableDivertBindings;
@Override
public String getName() {
@ -109,6 +110,15 @@ public class FederationAddressPolicyConfiguration implements FederationPolicy<Fe
return this;
}
public Boolean isEnableDivertBindings() {
return enableDivertBindings;
}
public FederationAddressPolicyConfiguration setEnableDivertBindings(Boolean enableDivertBindings) {
this.enableDivertBindings = enableDivertBindings;
return this;
}
@Override
public void encode(ActiveMQBuffer buffer) {
Preconditions.checkArgument(name != null, "name can not be null");
@ -118,9 +128,9 @@ public class FederationAddressPolicyConfiguration implements FederationPolicy<Fe
buffer.writeNullableLong(autoDeleteMessageCount);
buffer.writeInt(maxHops);
buffer.writeNullableString(transformerRef);
encodeMatchers(buffer, includes);
encodeMatchers(buffer, excludes);
buffer.writeBoolean(enableDivertBindings);
}
@Override
@ -131,12 +141,14 @@ public class FederationAddressPolicyConfiguration implements FederationPolicy<Fe
autoDeleteMessageCount = buffer.readNullableLong();
maxHops = buffer.readInt();
transformerRef = buffer.readNullableString();
includes = new HashSet<>();
excludes = new HashSet<>();
decodeMatchers(buffer, includes);
decodeMatchers(buffer, excludes);
if (buffer.readableBytes() > 0) {
enableDivertBindings = buffer.readBoolean();
}
}
private void encodeMatchers(final ActiveMQBuffer buffer, final Set<Matcher> matchers) {

View File

@ -2091,6 +2091,9 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
} else if (item.getNodeName().equals("transformer-ref")) {
String transformerRef = item.getNodeValue();
config.setTransformerRef(transformerRef);
} else if (item.getNodeName().equals("enable-divert-bindings")) {
boolean enableDivertBindings = Boolean.parseBoolean(item.getNodeValue());
config.setEnableDivertBindings(enableDivertBindings);
}
}

View File

@ -1659,6 +1659,11 @@ public interface ActiveMQServerLogger extends BasicLogger {
format = Message.Format.MESSAGE_FORMAT)
void federationPluginExecutionError(@Cause Throwable e, String pluginMethod);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222287, value = "Error looking up bindings for address {}.",
format = Message.Format.MESSAGE_FORMAT)
void federationBindingsLookupError(@Cause Throwable e, SimpleString address);
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
void initializationError(@Cause Throwable e);

View File

@ -31,4 +31,6 @@ public interface Divert extends Bindable {
SimpleString getRoutingName();
Transformer getTransformer();
SimpleString getForwardAddress();
}

View File

@ -19,7 +19,9 @@ package org.apache.activemq.artemis.core.server.federation.address;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@ -34,8 +36,9 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.config.federation.FederationAddressPolicyConfiguration;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.postoffice.impl.DivertBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Queue;
@ -43,11 +46,13 @@ import org.apache.activemq.artemis.core.server.federation.FederatedAbstract;
import org.apache.activemq.artemis.core.server.federation.FederatedConsumerKey;
import org.apache.activemq.artemis.core.server.federation.Federation;
import org.apache.activemq.artemis.core.server.federation.FederationUpstream;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBindingPlugin;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.core.settings.impl.Match;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.jboss.logging.Logger;
/**
* Federated Address, replicate messages from the remote brokers address to itself.
@ -59,9 +64,8 @@ import org.jboss.logging.Logger;
*
*
*/
public class FederatedAddress extends FederatedAbstract implements ActiveMQServerQueuePlugin, Serializable {
public class FederatedAddress extends FederatedAbstract implements ActiveMQServerBindingPlugin, ActiveMQServerAddressPlugin, Serializable {
private static final Logger logger = Logger.getLogger(FederatedAddress.class);
public static final String FEDERATED_QUEUE_PREFIX = "federated";
public static final SimpleString HDR_HOPS = new SimpleString("_AMQ_Hops");
@ -69,8 +73,8 @@ public class FederatedAddress extends FederatedAbstract implements ActiveMQServe
private final SimpleString filterString;
private final Set<Matcher> includes;
private final Set<Matcher> excludes;
private final FederationAddressPolicyConfiguration config;
private final Map<DivertBinding, Set<SimpleString>> matchingDiverts = new HashMap<>();
public FederatedAddress(Federation federation, FederationAddressPolicyConfiguration config, ActiveMQServer server, FederationUpstream upstream) {
super(federation, server, upstream);
@ -102,25 +106,16 @@ public class FederatedAddress extends FederatedAbstract implements ActiveMQServe
}
@Override
public void start() {
super.start();
server.getPostOffice()
.getAllBindings()
.values()
.stream()
.filter(b -> b instanceof QueueBinding)
.map(b -> ((QueueBinding) b).getQueue())
.forEach(this::conditionalCreateRemoteConsumer);
}
/**
* After a queue has been created
*
* @param queue The newly created queue
*/
@Override
public synchronized void afterCreateQueue(Queue queue) {
conditionalCreateRemoteConsumer(queue);
public synchronized void start() {
if (!isStarted()) {
super.start();
server.getPostOffice()
.getAllBindings()
.values()
.stream()
.filter(b -> b instanceof QueueBinding || b instanceof DivertBinding)
.forEach(this::afterAddBinding);
}
}
private void conditionalCreateRemoteConsumer(Queue queue) {
@ -141,6 +136,145 @@ public class FederatedAddress extends FederatedAbstract implements ActiveMQServe
createRemoteConsumer(queue);
}
@Override
public void afterAddAddress(AddressInfo addressInfo, boolean reload) {
if (match(addressInfo)) {
try {
//Diverts can be added without the source address existing yet so
//if a new address is added we need to see if there are matching divert bindings
server.getPostOffice()
.getDirectBindings(addressInfo.getName())
.getBindings().stream().filter(binding -> binding instanceof DivertBinding)
.forEach(this::afterAddBinding);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.federationBindingsLookupError(e, addressInfo.getName());
}
}
}
@Override
public void afterAddBinding(Binding binding) {
if (binding instanceof QueueBinding) {
conditionalCreateRemoteConsumer(((QueueBinding) binding).getQueue());
if (config.isEnableDivertBindings()) {
synchronized (this) {
for (Map.Entry<DivertBinding, Set<SimpleString>> entry : matchingDiverts.entrySet()) {
//for each divert check the new QueueBinding to see if the divert matches and is not already tracking
if (!entry.getValue().contains(((QueueBinding) binding).getQueue().getName())) {
//conditionalCreateRemoteConsumer will check if the queue is a target of the divert before adding
conditionalCreateRemoteConsumer(entry.getKey(), entry.getValue(), (QueueBinding) binding);
}
}
}
}
} else if (config.isEnableDivertBindings() && binding instanceof DivertBinding) {
final DivertBinding divertBinding = (DivertBinding) binding;
final AddressInfo addressInfo = server.getPostOffice().getAddressInfo(binding.getAddress());
synchronized (this) {
if (match(addressInfo) && matchingDiverts.get(divertBinding) == null) {
final Set<SimpleString> matchingQueues = new HashSet<>();
matchingDiverts.put(divertBinding, matchingQueues);
//find existing matching queue bindings for the divert to create consumers for
final SimpleString forwardAddress = divertBinding.getDivert().getForwardAddress();
try {
//create demand for each matching queue binding that isn't already tracked by the divert
//conditionalCreateRemoteConsumer will check if the queue is a target of the divert before adding
server.getPostOffice().getBindingsForAddress(forwardAddress).getBindings()
.stream().filter(b -> b instanceof QueueBinding).map(b -> (QueueBinding) b)
.forEach(queueBinding -> conditionalCreateRemoteConsumer(divertBinding, matchingQueues, queueBinding));
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.federationBindingsLookupError(e, forwardAddress);
}
}
}
}
}
private void conditionalCreateRemoteConsumer(DivertBinding divertBinding, Set<SimpleString> matchingQueues, QueueBinding queueBinding) {
if (server.hasBrokerFederationPlugins()) {
final AtomicBoolean conditionalCreate = new AtomicBoolean(true);
try {
server.callBrokerFederationPlugins(plugin -> {
conditionalCreate.set(conditionalCreate.get() && plugin.federatedAddressConditionalCreateDivertConsumer(divertBinding, queueBinding));
});
} catch (ActiveMQException t) {
ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "federatedAddressConditionalCreateDivertConsumer");
throw new IllegalStateException(t.getMessage(), t.getCause());
}
if (!conditionalCreate.get()) {
return;
}
}
createRemoteConsumer(divertBinding, matchingQueues, queueBinding);
}
private void createRemoteConsumer(DivertBinding divertBinding, final Set<SimpleString> matchingQueues, QueueBinding queueBinding) {
final AddressInfo addressInfo = server.getPostOffice().getAddressInfo(divertBinding.getAddress());
//If the divert address matches and if the new queueBinding matches the forwarding address of the divert
//then create a remote consumer if not already being tracked by the divert
if (match(addressInfo) && queueBinding.getAddress().equals(divertBinding.getDivert().getForwardAddress())
&& matchingQueues.add(queueBinding.getQueue().getName())) {
FederatedConsumerKey key = getKey(addressInfo);
Transformer transformer = getTransformer(config.getTransformerRef());
Transformer addHop = FederatedAddress::addHop;
createRemoteConsumer(key, mergeTransformers(addHop, transformer), clientSession -> createRemoteQueue(clientSession, key));
}
}
@Override
public void beforeRemoveBinding(SimpleString uniqueName, Transaction tx, boolean deleteData) {
final Binding binding = server.getPostOffice().getBinding(uniqueName);
if (binding instanceof QueueBinding) {
final Queue queue = ((QueueBinding) binding).getQueue();
//Remove any direct queue demand
removeRemoteConsumer(getKey(queue));
if (config.isEnableDivertBindings()) {
//See if there is any matching diverts that match this queue binding and remove demand now that
//the queue is going away
synchronized (this) {
matchingDiverts.entrySet().forEach(entry -> {
if (entry.getKey().getDivert().getForwardAddress().equals(queue.getAddress())) {
final AddressInfo addressInfo = server.getPostOffice().getAddressInfo(binding.getAddress());
//check if the queue has been tracked by this divert and if so remove the consumer
if (entry.getValue().remove(queue)) {
removeRemoteConsumer(getKey(addressInfo));
}
}
});
}
}
} else if (config.isEnableDivertBindings() && binding instanceof DivertBinding) {
final DivertBinding divertBinding = (DivertBinding) binding;
final SimpleString forwardAddress = divertBinding.getDivert().getForwardAddress();
//Check if we have added this divert binding as a matching binding
//If we have then we need to look for any still existing queue bindings that map to this divert
//and remove consumers if they haven't already been removed
synchronized (this) {
final Set<SimpleString> matchingQueues;
if ((matchingQueues = matchingDiverts.remove(binding)) != null) {
try {
final AddressInfo addressInfo = server.getPostOffice().getAddressInfo(binding.getAddress());
if (addressInfo != null) {
//remove queue binding demand if tracked by the divert
server.getPostOffice().getBindingsForAddress(forwardAddress)
.getBindings().stream().filter(b -> b instanceof QueueBinding && matchingQueues.remove(((QueueBinding) b).getQueue().getName()))
.forEach(queueBinding -> removeRemoteConsumer(getKey(addressInfo)));
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.federationBindingsLookupError(e, forwardAddress);
}
}
}
}
}
public FederationAddressPolicyConfiguration getConfig() {
return config;
}
@ -170,12 +304,20 @@ public class FederatedAddress extends FederatedAbstract implements ActiveMQServe
}
private boolean match(Queue queue) {
return match(queue.getAddress(), queue.getRoutingType());
}
private boolean match(AddressInfo addressInfo) {
return addressInfo != null ? match(addressInfo.getName(), addressInfo.getRoutingType()) : false;
}
private boolean match(SimpleString address, RoutingType routingType) {
//Currently only supporting Multicast currently.
if (RoutingType.ANYCAST.equals(queue.getRoutingType())) {
if (RoutingType.ANYCAST.equals(routingType)) {
return false;
}
for (Matcher exclude : excludes) {
if (exclude.test(queue)) {
if (exclude.test(address.toString())) {
return false;
}
}
@ -183,7 +325,7 @@ public class FederatedAddress extends FederatedAbstract implements ActiveMQServe
return true;
} else {
for (Matcher include : includes) {
if (include.test(queue)) {
if (include.test(address.toString())) {
return true;
}
}
@ -208,24 +350,15 @@ public class FederatedAddress extends FederatedAbstract implements ActiveMQServe
}
}
/**
* Before an address is removed
*
* @param queue The queue that will be removed
*/
@Override
public synchronized void beforeDestroyQueue(Queue queue, final SecurityAuth session, boolean checkConsumerCount,
boolean removeConsumers, boolean autoDeleteAddress) {
FederatedConsumerKey key = getKey(queue);
removeRemoteConsumer(key);
}
private FederatedConsumerKey getKey(Queue queue) {
return new FederatedAddressConsumerKey(federation.getName(), upstream.getName(), queue.getAddress(), queue.getRoutingType(), queueNameFormat, filterString);
}
public static class Matcher {
private FederatedConsumerKey getKey(AddressInfo address) {
return new FederatedAddressConsumerKey(federation.getName(), upstream.getName(), address.getName(), address.getRoutingType(), queueNameFormat, filterString);
}
public static class Matcher {
Predicate<String> addressPredicate;
Matcher(FederationAddressPolicyConfiguration.Matcher config, WildcardConfiguration wildcardConfiguration) {
@ -234,10 +367,8 @@ public class FederatedAddress extends FederatedAbstract implements ActiveMQServe
}
}
public boolean test(Queue queue) {
return addressPredicate == null || addressPredicate.test(queue.getAddress().toString());
public boolean test(String address) {
return addressPredicate == null || addressPredicate.test(address);
}
}
}

View File

@ -163,6 +163,11 @@ public class DivertImpl implements Divert {
return transformer;
}
@Override
public SimpleString getForwardAddress() {
return forwardAddress;
}
/* (non-Javadoc)
* @see java.lang.Object#toString()
*/

View File

@ -19,6 +19,8 @@ package org.apache.activemq.artemis.core.server.plugin;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.DivertBinding;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.federation.FederatedConsumerKey;
@ -121,6 +123,10 @@ public interface ActiveMQServerFederationPlugin extends ActiveMQServerBasePlugin
return true;
}
default boolean federatedAddressConditionalCreateDivertConsumer(DivertBinding divertBinding, QueueBinding queueBinding) throws ActiveMQException {
return true;
}
/**
* Conditionally create a federated queue consumer for a federated queue. This allows custom
* logic to be inserted to decide when to create federated queue consumers

View File

@ -1824,6 +1824,7 @@
<xsd:attribute name="auto-delete-message-count" type="xsd:long" use="optional" />
<xsd:attribute name="max-hops" type="xsd:int" use="optional" />
<xsd:attribute name="name" type="xsd:ID" use="required" />
<xsd:attribute name="enable-divert-bindings" type="xsd:boolean" use="optional" />
<xsd:attributeGroup ref="xml:specialAttrs"/>
</xsd:complexType>

View File

@ -1696,6 +1696,7 @@
<xsd:attribute name="auto-delete-message-count" type="xsd:long" use="optional" />
<xsd:attribute name="max-hops" type="xsd:int" use="optional" />
<xsd:attribute name="name" type="xsd:ID" use="required" />
<xsd:attribute name="enable-divert-bindings" type="xsd:boolean" use="optional" />
<xsd:attributeGroup ref="xml:specialAttrs"/>
</xsd:complexType>

View File

@ -66,6 +66,18 @@ the tree can extend to any depth, and can be extended to without needing to re-c
In this case messages published to the master address can be received by any consumer connected to any broker in the tree.
### Divert Binding Support
Divert binding support can be added as part of the address policy configuration. This will allow the federation to respond
to divert bindings to create demand. For example, let's say there is one address called "test.federation.source" that is
included as a match for the federated address and another address called "test.federation.target" that is not included. Normally
when a queue is created on "test.federation.target" this would not cause a federated consumer to be created because the address
is not part of the included matches. However, if we create a divert binding such that "test.federation.source" is the source address
and "test.federation.target" is the forwarded address then demand will now be created. The source address still must be multicast
but the target address can be multicast or anycast.
An example use case for this might be a divert that redirects JMS topics (multicast addresses) to a JMS queue (anycast addresses) to
allow for load balancing of the messages on a topic for legacy consumers not supporting JMS 2.0 and shared subscriptions.
## Configuring Address Federation
@ -135,6 +147,9 @@ and the delay and message count params have been met. This is useful if you want
- `transformer-ref`. The ref name for a transformer (see transformer config) that you may wish to configure to transform the message on federation transfer.
- `enable-divert-bindings`. Setting to true will enable divert bindings to be listened for demand. If there is a divert binding with an address that matches the included
addresses for the stream, any queue bindings that match the forward address of the divert will create demand. Default is false
**note** `address-policy`'s and `queue-policy`'s are able to be defined in the same federation, and be linked to the same upstream.

View File

@ -0,0 +1,173 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.activemq.examples.federation</groupId>
<artifactId>broker-federation</artifactId>
<version>2.12.0-SNAPSHOT</version>
</parent>
<artifactId>federated-address-divert</artifactId>
<packaging>jar</packaging>
<name>ActiveMQ Artemis Federated Address Divert Example</name>
<properties>
<activemq.basedir>${project.basedir}/../../../..</activemq.basedir>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-jms-client</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-maven-plugin</artifactId>
<executions>
<execution>
<id>create0</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<instance>${basedir}/target/server0</instance>
<configuration>${basedir}/target/classes/activemq/server0</configuration>
<!-- this makes it easier in certain envs -->
<javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions>
</configuration>
</execution>
<execution>
<id>create1</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<instance>${basedir}/target/server1</instance>
<configuration>${basedir}/target/classes/activemq/server1</configuration>
<!-- this makes it easier in certain envs -->
<javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions>
</configuration>
</execution>
<execution>
<id>start0</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<spawn>true</spawn>
<location>${basedir}/target/server0</location>
<testURI>tcp://localhost:61616</testURI>
<args>
<param>run</param>
</args>
<name>eu-west-1</name>
</configuration>
</execution>
<execution>
<id>start1</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<spawn>true</spawn>
<location>${basedir}/target/server1</location>
<testURI>tcp://localhost:61617</testURI>
<args>
<param>run</param>
</args>
<name>eu-east-1</name>
</configuration>
</execution>
<execution>
<id>runClient</id>
<goals>
<goal>runClient</goal>
</goals>
<configuration>
<clientClass>org.apache.activemq.artemis.jms.example.FederatedAddressDivertExample</clientClass>
</configuration>
</execution>
<execution>
<id>stop0</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<location>${basedir}/target/server0</location>
<args>
<param>stop</param>
</args>
</configuration>
</execution>
<execution>
<id>stop1</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<location>${basedir}/target/server1</location>
<args>
<param>stop</param>
</args>
</configuration>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>org.apache.activemq.examples.federation</groupId>
<artifactId>federated-address-divert</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-clean-plugin</artifactId>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>release</id>
<build>
<plugins>
<plugin>
<groupId>com.vladsch.flexmark</groupId>
<artifactId>markdown-page-generator-plugin</artifactId>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@ -0,0 +1,18 @@
# Federated Address Divert Example
To run the example, simply type **mvn verify** from this directory, or **mvn -PnoServer verify** if you want to start and create the broker manually.
This example demonstrates a core multicast address deployed on two different brokers. The two brokers are configured to form a federated address mesh.
In the example we name the brokers eu-west and eu-east.
The following is then carried out:
1. create a divert binding with a source address of exampleTopic and target address of divertExampleTopic on eu-west
1. create a consumer on the topic divertExampleTopic on eu-west and create a producer on the topic exampleTopic on eu-east.
2. send some messages via the producer on eu-east, and we verify the eu-west consumer receives the messages because of the divert binding demand
For more information on ActiveMQ Artemis Federation please see the federation section of the user manual.

View File

@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jms.example;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
/**
* A simple example that demonstrates multicast address replication between remote servers,
* using Address Federation feature and diverts.
*/
public class FederatedAddressDivertExample {
public static void main(final String[] args) throws Exception {
Connection connectionEUWest = null;
Connection connectionEUEast = null;
try {
// Step 1. Instantiate the Topic (multicast) for the producers
Topic topic = ActiveMQJMSClient.createTopic("exampleTopic");
//Create a topic for the consumers
Topic topic2 = ActiveMQJMSClient.createTopic("divertExampleTopic");
// Step 2. Instantiate connection towards server EU West
ConnectionFactory cfEUWest = new ActiveMQConnectionFactory("tcp://localhost:61616");
// Step 3. Instantiate connection towards server EU East
ConnectionFactory cfEUEast = new ActiveMQConnectionFactory("tcp://localhost:61617");
// Step 5. We create a JMS Connection connectionEUWest which is a connection to server EU West
connectionEUWest = cfEUWest.createConnection();
// Step 6. We create a JMS Connection connectionEUEast which is a connection to server EU East
connectionEUEast = cfEUEast.createConnection();
// Step 8. We create a JMS Session on server EU West
Session sessionEUWest = connectionEUWest.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Step 9. We create a JMS Session on server EU East
Session sessionEUEast = connectionEUEast.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Step 11. We start the connections to ensure delivery occurs on them
connectionEUWest.start();
connectionEUEast.start();
// Step 12. We create a JMS MessageProducer object on each server
MessageProducer producerEUEast = sessionEUEast.createProducer(topic);
// Step 13. We create JMS MessageConsumer objects on each server - Messages will be diverted to this topic
MessageConsumer consumerEUWest = sessionEUWest.createSharedDurableConsumer(topic2, "exampleSubscription");
// Step 14. Let a little time for everything to start and form.
Thread.sleep(5000);
// Step 13. We send some messages to server EU West
final int numMessages = 10;
// Step 15. Repeat same test one last time, this time sending on EU East
for (int i = 0; i < numMessages; i++) {
TextMessage message = sessionEUEast.createTextMessage("This is text sent from EU East, message " + i);
producerEUEast.send(message);
System.out.println("EU East :: Sent message: " + message.getText());
}
// Step 14. We now consume those messages on *all* servers .
// We note that every consumer, receives a message even so on seperate servers
for (int i = 0; i < numMessages; i++) {
TextMessage messageEUWest = (TextMessage) consumerEUWest.receive(5000);
System.out.println("EU West :: Got message: " + messageEUWest.getText());
}
} finally {
// Step 16. Be sure to close our resources!
if (connectionEUWest != null) {
connectionEUWest.stop();
connectionEUWest.close();
}
if (connectionEUEast != null) {
connectionEUEast.stop();
connectionEUEast.close();
}
}
}
}

View File

@ -0,0 +1,111 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core">
<name>eu-west-1-master</name>
<bindings-directory>./data/bindings</bindings-directory>
<journal-directory>./data/journal</journal-directory>
<large-messages-directory>./data/largemessages</large-messages-directory>
<paging-directory>./data/paging</paging-directory>
<!-- Connectors -->
<connectors>
<connector name="netty-connector">tcp://localhost:61616</connector>
<connector name="eu-west-1-connector">tcp://localhost:61616</connector>
<connector name="eu-east-1-connector">tcp://localhost:61617</connector>
</connectors>
<!-- Acceptors -->
<acceptors>
<acceptor name="netty-acceptor">tcp://localhost:61616</acceptor>
</acceptors>
<!-- Federation -->
<federations>
<federation name="eu-west-1-federation">
<upstream name="eu-east-1-upstream">
<circuit-breaker-timeout>1000</circuit-breaker-timeout>
<static-connectors>
<connector-ref>eu-east-1-connector</connector-ref>
</static-connectors>
<policy ref="policySetA"/>
</upstream>
<policy-set name="policySetA">
<policy ref="address-federation" />
</policy-set>
<!-- Enable the use of divert bindings -->
<address-policy name="address-federation" enable-divert-bindings="true">
<include address-match="exampleTopic" />
</address-policy>
</federation>
</federations>
<!-- Divert configuration -->
<diverts>
<divert name="federation-divert">
<routing-name>federation-divert</routing-name>
<address>exampleTopic</address>
<forwarding-address>divertExampleTopic</forwarding-address>
<exclusive>true</exclusive>
</divert>
</diverts>
<!-- Other config -->
<security-settings>
<!--security for example queue-->
<security-setting match="exampleTopic">
<permission roles="guest" type="createDurableQueue"/>
<permission roles="guest" type="deleteDurableQueue"/>
<permission roles="guest" type="createNonDurableQueue"/>
<permission roles="guest" type="deleteNonDurableQueue"/>
<permission roles="guest" type="consume"/>
<permission roles="guest" type="send"/>
</security-setting>
<security-setting match="divertExampleTopic">
<permission roles="guest" type="createDurableQueue"/>
<permission roles="guest" type="deleteDurableQueue"/>
<permission roles="guest" type="createNonDurableQueue"/>
<permission roles="guest" type="deleteNonDurableQueue"/>
<permission roles="guest" type="consume"/>
<permission roles="guest" type="send"/>
</security-setting>
</security-settings>
<addresses>
<address name="exampleTopic">
<multicast />
</address>
<address name="divertExampleTopic">
<multicast>
<queue name="exampleSubscription"/>
</multicast>
</address>
</addresses>
</core>
</configuration>

View File

@ -0,0 +1,65 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core">
<name>eu-east-1-master</name>
<bindings-directory>target/server1/data/messaging/bindings</bindings-directory>
<journal-directory>target/server1/data/messaging/journal</journal-directory>
<large-messages-directory>target/server1/data/messaging/largemessages</large-messages-directory>
<paging-directory>target/server1/data/messaging/paging</paging-directory>
<!-- Connectors -->
<connectors>
<connector name="netty-connector">tcp://localhost:61617</connector>
<connector name="eu-west-1-connector">tcp://localhost:61616</connector>
<connector name="eu-east-1-connector">tcp://localhost:61617</connector>
</connectors>
<!-- Acceptors -->
<acceptors>
<acceptor name="netty-acceptor">tcp://localhost:61617</acceptor>
</acceptors>
<!-- Other config -->
<security-settings>
<!--security for example queue-->
<security-setting match="exampleTopic">
<permission roles="guest" type="createDurableQueue"/>
<permission roles="guest" type="deleteDurableQueue"/>
<permission roles="guest" type="createNonDurableQueue"/>
<permission roles="guest" type="deleteNonDurableQueue"/>
<permission roles="guest" type="consume"/>
<permission roles="guest" type="send"/>
</security-setting>
</security-settings>
<addresses>
<address name="exampleTopic">
<multicast />
</address>
</addresses>
</core>
</configuration>

View File

@ -53,6 +53,7 @@ under the License.
<module>federated-address</module>
<module>federated-address-downstream</module>
<module>federated-address-downstream-upstream</module>
<module>federated-address-divert</module>
</modules>
</profile>
<profile>
@ -64,6 +65,7 @@ under the License.
<module>federated-address</module>
<module>federated-address-downstream</module>
<module>federated-address-downstream-upstream</module>
<module>federated-address-divert</module>
</modules>
</profile>
</profiles>

View File

@ -22,11 +22,18 @@ import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.config.FederationConfiguration;
import org.apache.activemq.artemis.core.config.federation.FederationAddressPolicyConfiguration;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.Wait;
@ -170,6 +177,139 @@ public class FederatedAddressTest extends FederatedTestBase {
verifyTransformer(address);
}
/**
* Test diverts for downstream configurations
*/
//Test creating address first followed by divert
//Test creating divert before consumer
@Test
public void testDownstreamFederatedAddressDivertAddressAndDivertFirst() throws Exception {
testFederatedAddressDivert(true,true, true);
}
//Test creating divert first followed by address
//Test creating divert before consumer
@Test
public void testDownstreamFederatedAddressDivertAddressSecondDivertFirst() throws Exception {
testFederatedAddressDivert(true,false, true);
}
//Test creating address first followed by divert
//Test creating consumer before divert
@Test
public void testDownstreamFederatedAddressDivertAddressFirstDivertSecond() throws Exception {
testFederatedAddressDivert(true,true, false);
}
//Test creating divert first followed by address
//Test creating consumer before divert
@Test
public void testDownstreamFederatedAddressDivertAddressAndDivertSecond() throws Exception {
testFederatedAddressDivert(true,false, false);
}
/**
* Test diverts for upstream configurations
*/
//Test creating address first followed by divert
//Test creating divert before consumer
@Test
public void testUpstreamFederatedAddressDivertAddressAndDivertFirst() throws Exception {
testFederatedAddressDivert(false,true, true);
}
//Test creating divert first followed by address
//Test creating divert before consumer
@Test
public void testUpstreamFederatedAddressDivertAddressSecondDivertFirst() throws Exception {
testFederatedAddressDivert(false,false, true);
}
//Test creating address first followed by divert
//Test creating consumer before divert
@Test
public void testUpstreamFederatedAddressDivertAddressFirstDivertSecond() throws Exception {
testFederatedAddressDivert(false,true, false);
}
//Test creating divert first followed by address
//Test creating consumer before divert
@Test
public void testUpstreamFederatedAddressDivertAddressAndDivertSecond() throws Exception {
testFederatedAddressDivert(false,false, false);
}
protected void testFederatedAddressDivert(boolean downstream, boolean addressFirst, boolean divertBeforeConsumer) throws Exception {
String address = getName();
String address2 = "fedOneWayDivertTest";
if (addressFirst) {
getServer(0).addAddressInfo(new AddressInfo(SimpleString.toSimpleString(address), RoutingType.MULTICAST));
}
final FederationConfiguration federationConfiguration;
final int deployServer;
if (downstream) {
federationConfiguration = FederatedTestUtil.createAddressDownstreamFederationConfiguration(
"server0", address, getServer(1).getConfiguration().getTransportConfigurations("server1")[0]);
deployServer = 1;
} else {
federationConfiguration = FederatedTestUtil.createAddressUpstreamFederationConfiguration("server1", address);
deployServer = 0;
}
FederationAddressPolicyConfiguration policy = (FederationAddressPolicyConfiguration) federationConfiguration.getFederationPolicyMap().get("AddressPolicy" + address);
//enable listening for divert bindings
policy.setEnableDivertBindings(true);
getServer(deployServer).getConfiguration().getFederationConfigurations().add(federationConfiguration);
getServer(deployServer).getFederationManager().deploy();
ConnectionFactory cf1 = getCF(1);
ConnectionFactory cf0 = getCF(0);
try (Connection connection1 = cf1.createConnection(); Connection connection0 = cf0.createConnection()) {
connection1.start();
connection0.start();
Session session1 = connection1.createSession();
Topic topic1 = session1.createTopic(address);
MessageProducer producer1 = session1.createProducer(topic1);
if (divertBeforeConsumer) {
getServer(0).deployDivert(new DivertConfiguration().setName(address + ":" + address2)
.setAddress(address).setExclusive(true).setForwardingAddress(address2)
.setRoutingType(ComponentConfigurationRoutingType.ANYCAST));
}
Session session0 = connection0.createSession();
Queue queue0 = session0.createQueue(address2);
MessageConsumer consumer0 = session0.createConsumer(queue0);
if (!addressFirst) {
getServer(0).addAddressInfo(new AddressInfo(SimpleString.toSimpleString(address), RoutingType.MULTICAST));
}
if (!divertBeforeConsumer) {
getServer(0).deployDivert(new DivertConfiguration().setName(address + ":" + address2)
.setAddress(address).setExclusive(true).setForwardingAddress(address2)
.setRoutingType(ComponentConfigurationRoutingType.ANYCAST));
}
assertTrue(Wait.waitFor(() -> getServer(1).getPostOffice().getBindingsForAddress(SimpleString.toSimpleString(address)).getBindings().size() == 1,
1000, 100));
final QueueBinding remoteQueueBinding = (QueueBinding) getServer(1).getPostOffice().getBindingsForAddress(SimpleString.toSimpleString(address))
.getBindings().iterator().next();
assertEquals(1, remoteQueueBinding.getQueue().getConsumerCount());
producer1.send(session1.createTextMessage("hello"));
assertNotNull(consumer0.receive(1000));
//Test consumer is cleaned up after divert destroyed
getServer(0).destroyDivert(SimpleString.toSimpleString(address + ":" + address2));
// getServer(0).destroyQueue(SimpleString.toSimpleString(address2));
assertTrue(Wait.waitFor(() -> remoteQueueBinding.getQueue().getConsumerCount() == 0, 2000, 100));
}
}
private void testFederatedAddressReplication(String address) throws Exception {
ConnectionFactory cf1 = getCF(1);
@ -188,37 +328,38 @@ public class FederatedAddressTest extends FederatedTestBase {
Topic topic0 = session0.createTopic(address);
MessageConsumer consumer0 = session0.createConsumer(topic0);
assertTrue(Wait.waitFor(() -> getServer(1).getPostOffice().getBindingsForAddress(SimpleString.toSimpleString(address)).getBindings().size() == 1));
assertTrue(Wait.waitFor(() -> getServer(1).getPostOffice().getBindingsForAddress(
SimpleString.toSimpleString(address)).getBindings().size() == 1, 2000, 100));
producer.send(session1.createTextMessage("hello"));
assertNotNull(consumer0.receive(10000));
assertNotNull(consumer0.receive(1000));
producer.send(session1.createTextMessage("hello"));
assertNotNull(consumer0.receive(10000));
assertNotNull(consumer0.receive(1000));
MessageConsumer consumer1 = session1.createConsumer(topic1);
producer.send(session1.createTextMessage("hello"));
assertNotNull(consumer1.receive(10000));
assertNotNull(consumer0.receive(10000));
assertNotNull(consumer1.receive(1000));
assertNotNull(consumer0.receive(1000));
consumer1.close();
//Groups
producer.send(session1.createTextMessage("hello"));
assertNotNull(consumer0.receive(10000));
assertNotNull(consumer0.receive(1000));
producer.send(createTextMessage(session1, "groupA"));
assertNotNull(consumer0.receive(10000));
assertNotNull(consumer0.receive(1000));
consumer1 = session1.createConsumer(topic1);
producer.send(createTextMessage(session1, "groupA"));
assertNotNull(consumer1.receive(10000));
assertNotNull(consumer0.receive(10000));
assertNotNull(consumer1.receive(1000));
assertNotNull(consumer0.receive(1000));
}
@ -246,20 +387,17 @@ public class FederatedAddressTest extends FederatedTestBase {
producer.send(session1.createTextMessage("hello"));
assertNull(consumer0.receive(100));
FederationConfiguration federationConfiguration = FederatedTestUtil.createAddressUpstreamFederationConfiguration("server1", address);
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
getServer(0).getFederationManager().deploy();
Wait.waitFor(() -> getServer(1).getPostOffice().getBindingsForAddress(SimpleString.toSimpleString(address)).getBindings().size() == 1);
Wait.waitFor(() -> getServer(1).getPostOffice().getBindingsForAddress(
SimpleString.toSimpleString(address)).getBindings().size() == 1, 2000, 100);
producer.send(session1.createTextMessage("hello"));
assertNotNull(consumer0.receive(10000));
assertNotNull(consumer0.receive(1000));
}
}