diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java index 2d7cd10008..a70773634f 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java @@ -82,6 +82,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController im final boolean acks; final boolean addQueues; final boolean deleteQueues; + final MirrorAddressFilter addressFilter; private final AMQPBrokerConnection brokerConnection; final AMQPMirrorBrokerConnectionElement replicaConfig; @@ -110,6 +111,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController im this.idSupplier = protonProtocolManager.getReferenceIDSupplier(); this.addQueues = replicaConfig.isQueueCreation(); this.deleteQueues = replicaConfig.isQueueRemoval(); + this.addressFilter = new MirrorAddressFilter(replicaConfig.getAddressFilter()); this.acks = replicaConfig.isMessageAcknowledgements(); this.brokerConnection = brokerConnection; } @@ -131,6 +133,11 @@ public class AMQPMirrorControllerSource extends BasicMirrorController im if (getControllerInUse() != null && !addressInfo.isInternal()) { return; } + + if (ignoreAddress(addressInfo.getName())) { + return; + } + if (addQueues) { Message message = createMessage(addressInfo.getName(), null, ADD_ADDRESS, null, addressInfo.toJSON()); route(server, message); @@ -145,6 +152,9 @@ public class AMQPMirrorControllerSource extends BasicMirrorController im if (invalidTarget(getControllerInUse()) || addressInfo.isInternal()) { return; } + if (ignoreAddress(addressInfo.getName())) { + return; + } if (deleteQueues) { Message message = createMessage(addressInfo.getName(), null, DELETE_ADDRESS, null, addressInfo.toJSON()); route(server, message); @@ -162,6 +172,12 @@ public class AMQPMirrorControllerSource extends BasicMirrorController im } return; } + if (ignoreAddress(queueConfiguration.getAddress())) { + if (logger.isTraceEnabled()) { + logger.trace("Skipping create " + queueConfiguration + ", queue address " + queueConfiguration.getAddress() + " doesn't match filter"); + } + return; + } if (addQueues) { Message message = createMessage(queueConfiguration.getAddress(), queueConfiguration.getName(), CREATE_QUEUE, null, queueConfiguration.toJSON()); route(server, message); @@ -178,6 +194,10 @@ public class AMQPMirrorControllerSource extends BasicMirrorController im return; } + if (ignoreAddress(address)) { + return; + } + if (deleteQueues) { Message message = createMessage(address, queue, DELETE_QUEUE, null, queue.toString()); route(server, message); @@ -188,12 +208,18 @@ public class AMQPMirrorControllerSource extends BasicMirrorController im return controller != null && sameNode(getRemoteMirrorId(), controller.getRemoteMirrorId()); } + private boolean ignoreAddress(SimpleString address) { + return !addressFilter.match(address); + } + private boolean sameNode(String remoteID, String sourceID) { return (remoteID != null && sourceID != null && remoteID.equals(sourceID)); } @Override public void sendMessage(Message message, RoutingContext context, List refs) { + SimpleString address = context.getAddress(message); + if (invalidTarget(context.getMirrorSource())) { if (logger.isTraceEnabled()) { logger.trace("server " + server + " is discarding send to avoid infinite loop (reflection with the mirror)"); @@ -208,6 +234,13 @@ public class AMQPMirrorControllerSource extends BasicMirrorController im return; } + if (ignoreAddress(address)) { + if (logger.isTraceEnabled()) { + logger.trace("server " + server + " is discarding send to address " + address + ", address doesn't match filter"); + } + return; + } + if (logger.isTraceEnabled()) { logger.trace(server + " send message " + message); } @@ -301,6 +334,13 @@ public class AMQPMirrorControllerSource extends BasicMirrorController im return; } + if (ignoreAddress(ref.getQueue().getAddress())) { + if (logger.isTraceEnabled()) { + logger.trace(server + " rejecting postAcknowledge queue=" + ref.getQueue().getName() + ", ref=" + ref + ", queue address is excluded"); + } + return; + } + if (logger.isTraceEnabled()) { logger.trace(server + " postAcknowledge " + ref); } @@ -337,4 +377,5 @@ public class AMQPMirrorControllerSource extends BasicMirrorController im return true; } } + } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/MirrorAddressFilter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/MirrorAddressFilter.java new file mode 100644 index 0000000000..ec4e710edc --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/MirrorAddressFilter.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.protocol.amqp.connect.mirror; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.activemq.artemis.api.core.SimpleString; + +public class MirrorAddressFilter { + + private final SimpleString[] allowList; + + private final SimpleString[] denyList; + + public MirrorAddressFilter(String filter) { + Set allowList = new HashSet<>(); + Set denyList = new HashSet<>(); + + if (filter != null && !filter.isEmpty()) { + String[] parts = filter.split(","); + for (String part : parts) { + if (!"".equals(part) && !"!".equals(part)) { + if (part.startsWith("!")) { + denyList.add(new SimpleString(part.substring(1))); + } else { + allowList.add(new SimpleString(part)); + } + } + } + } + + this.allowList = allowList.toArray(new SimpleString[]{}); + this.denyList = denyList.toArray(new SimpleString[]{}); + } + + public boolean match(SimpleString checkAddress) { + if (denyList.length > 0) { + for (SimpleString pattern : denyList) { + if (checkAddress.startsWith(pattern)) { + return false; + } + } + } + + if (allowList.length > 0) { + for (SimpleString pattern : allowList) { + if (checkAddress.startsWith(pattern)) { + return true; + } + } + return false; + } + return true; + } +} diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/MirrorAddressFilterTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/MirrorAddressFilterTest.java new file mode 100644 index 0000000000..967b8cab3e --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/MirrorAddressFilterTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.protocol.amqp.connect.mirror; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.junit.Assert; +import org.junit.Test; + +public class MirrorAddressFilterTest { + + @Test + public void testAddressFilter() { + Assert.assertTrue(new MirrorAddressFilter("").match(new SimpleString("any"))); + Assert.assertTrue(new MirrorAddressFilter("test").match(new SimpleString("test123"))); + Assert.assertTrue(new MirrorAddressFilter("a,b").match(new SimpleString("b"))); + Assert.assertTrue(new MirrorAddressFilter("!c").match(new SimpleString("a"))); + Assert.assertTrue(new MirrorAddressFilter("!a,!").match(new SimpleString("b123"))); + Assert.assertFalse(new MirrorAddressFilter("a,b,!ab").match(new SimpleString("ab"))); + Assert.assertFalse(new MirrorAddressFilter("!a,!b").match(new SimpleString("b123"))); + Assert.assertFalse(new MirrorAddressFilter("a,").match(new SimpleString("b"))); + } +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java index 46a6610b50..98a130888f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java @@ -30,6 +30,8 @@ public class AMQPMirrorBrokerConnectionElement extends AMQPBrokerConnectionEleme SimpleString mirrorSNF; + String addressFilter; + public SimpleString getMirrorSNF() { return mirrorSNF; } @@ -86,4 +88,14 @@ public class AMQPMirrorBrokerConnectionElement extends AMQPBrokerConnectionEleme this.messageAcknowledgements = messageAcknowledgements; return this; } + + public String getAddressFilter() { + return addressFilter; + } + + public AMQPMirrorBrokerConnectionElement setAddressFilter(String addressFilter) { + this.addressFilter = addressFilter; + return this; + } + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index 413cc5475f..9e839c2f39 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -2094,8 +2094,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { boolean queueCreation = getBooleanAttribute(e2,"queue-creation", true); boolean durable = getBooleanAttribute(e2, "durable", true); boolean queueRemoval = getBooleanAttribute(e2, "queue-removal", true); + String addressFilter = getAttributeValue(e2, "address-filter"); + AMQPMirrorBrokerConnectionElement amqpMirrorConnectionElement = new AMQPMirrorBrokerConnectionElement(); - amqpMirrorConnectionElement.setMessageAcknowledgements(messageAcks).setQueueCreation(queueCreation).setQueueRemoval(queueRemoval).setDurable(durable); + amqpMirrorConnectionElement.setMessageAcknowledgements(messageAcks).setQueueCreation(queueCreation).setQueueRemoval(queueRemoval).setDurable(durable).setAddressFilter(addressFilter); connectionElement = amqpMirrorConnectionElement; connectionElement.setType(AMQPBrokerConnectionAddressType.MIRROR); } else { diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 33fb5a35de..ca9f8140a9 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -2442,6 +2442,14 @@ + + + + This defines a filter that mirror will use to determine witch events will be forwarded toward + target server based on source address. + + + diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml index 1adf01baf8..409254647f 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml @@ -442,7 +442,7 @@ - + diff --git a/docs/user-manual/en/amqp-broker-connections.md b/docs/user-manual/en/amqp-broker-connections.md index 0f1704de22..667ab8ceff 100644 --- a/docs/user-manual/en/amqp-broker-connections.md +++ b/docs/user-manual/en/amqp-broker-connections.md @@ -103,6 +103,25 @@ The following optional arguments can be utilized: * `queue-removal`: Specifies whether a queue- or address-removal event is sent. The default value is `true`. * `message-acknowledgements`: Specifies whether message acknowledgements are sent. The default value is `true`. * `queue-creation`: Specifies whether a queue- or address-creation event is sent. The default value is `true`. +* `address-filter`: An optional comma-separated list of inclusion and/or exclusion filter entries used to govern which addresses (and related queues) mirroring events will be created for on this broker-connection. That is, events will only be mirrored to the target broker for addresses that match the filter. + An address is matched when it begins with an inclusion entry specified in this field, unless the address is also explicitly excluded by another entry. An exclusion entry is prefixed with `!` to denote any address beginning with that value does not match. + If no inclusion entry is specified in the list, all addresses not explicitly excluded will match. If the address-filter attribute is not specified, then all addresses (and related queues) will match and be mirrored. + + Examples: + + - 'eu' + matches all addresses starting with 'eu' + - '!eu' + matches all address except for those starting with 'eu' + - 'eu.uk,eu.de' + matches all addresses starting with either 'eu.uk' or 'eu.de' + - 'eu,!eu.uk' + matches all addresses starting with 'eu' but not those starting with 'eu.uk' + + **Note:** + + - Address exclusion will always take precedence over address inclusion. + - Address matching on mirror elements is prefix-based and does not support wild-card matching. An example of a mirror configuration is shown below: ```xml diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java index 12db46f2a1..190456dca0 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java @@ -473,6 +473,70 @@ public class AMQPReplicaTest extends AmqpClientTestSupport { } } + @Test + public void testAddressFilter() throws Exception { + final String REPLICATED = "replicated"; + final String NON_REPLICATED = "nonReplicated"; + final String ADDRESS_FILTER = REPLICATED + "," + "!" + NON_REPLICATED; + final String MSG = "msg"; + + server.start(); + + server_2 = createServer(AMQP_PORT_2, false); + server_2.setIdentity("server_2"); + server_2.getConfiguration().setName("server_2"); + + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("mirror-source", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100); + AMQPMirrorBrokerConnectionElement replica = new AMQPMirrorBrokerConnectionElement().setDurable(true).setAddressFilter(ADDRESS_FILTER); + amqpConnection.addElement(replica); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + + server_2.start(); + + try (Connection connection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT_2).createConnection()) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Send to non replicated address + try (MessageProducer producer = session.createProducer(session.createQueue(NON_REPLICATED))) { + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + for (int i = 0; i < 2; i++) { + producer.send(session.createTextMessage("never receive")); + } + } + + // Check nothing was added to SnF queue + Assert.assertEquals(0, server_2.locateQueue(replica.getMirrorSNF()).getMessagesAdded()); + + // Send to replicated address + try (MessageProducer producer = session.createProducer(session.createQueue(REPLICATED))) { + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + for (int i = 0; i < 2; i++) { + producer.send(session.createTextMessage(MSG)); + } + } + + // Check some messages were sent to SnF queue + Assert.assertTrue(server_2.locateQueue(replica.getMirrorSNF()).getMessagesAdded() > 0); + } + + try (Connection connection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT).createConnection()) { + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + try (MessageConsumer consumer = session.createConsumer(session.createQueue(REPLICATED))) { + Message message = consumer.receive(3000); + Assert.assertNotNull(message); + Assert.assertEquals(MSG, message.getBody(String.class)); + } + + try (MessageConsumer consumer = session.createConsumer(session.createQueue(NON_REPLICATED))) { + Assert.assertNull(consumer.receiveNoWait()); + } + } + + } + @Test public void testRouteSurviving() throws Exception { testRouteSurvivor(false);