ARTEMIS-3759 Add mirror controller address filter support

Allow replication only certain addresses with mirror controller.
The configuration is similar to cluster address configuration.

Co-authored-by: Robbie Gemmell <robbie@apache.org>
This commit is contained in:
iliya 2022-04-26 19:36:42 +03:00 committed by clebertsuconic
parent dc5143502d
commit 0b321ab8ff
9 changed files with 254 additions and 2 deletions

View File

@ -82,6 +82,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
final boolean acks; final boolean acks;
final boolean addQueues; final boolean addQueues;
final boolean deleteQueues; final boolean deleteQueues;
final MirrorAddressFilter addressFilter;
private final AMQPBrokerConnection brokerConnection; private final AMQPBrokerConnection brokerConnection;
final AMQPMirrorBrokerConnectionElement replicaConfig; final AMQPMirrorBrokerConnectionElement replicaConfig;
@ -110,6 +111,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
this.idSupplier = protonProtocolManager.getReferenceIDSupplier(); this.idSupplier = protonProtocolManager.getReferenceIDSupplier();
this.addQueues = replicaConfig.isQueueCreation(); this.addQueues = replicaConfig.isQueueCreation();
this.deleteQueues = replicaConfig.isQueueRemoval(); this.deleteQueues = replicaConfig.isQueueRemoval();
this.addressFilter = new MirrorAddressFilter(replicaConfig.getAddressFilter());
this.acks = replicaConfig.isMessageAcknowledgements(); this.acks = replicaConfig.isMessageAcknowledgements();
this.brokerConnection = brokerConnection; this.brokerConnection = brokerConnection;
} }
@ -131,6 +133,11 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
if (getControllerInUse() != null && !addressInfo.isInternal()) { if (getControllerInUse() != null && !addressInfo.isInternal()) {
return; return;
} }
if (ignoreAddress(addressInfo.getName())) {
return;
}
if (addQueues) { if (addQueues) {
Message message = createMessage(addressInfo.getName(), null, ADD_ADDRESS, null, addressInfo.toJSON()); Message message = createMessage(addressInfo.getName(), null, ADD_ADDRESS, null, addressInfo.toJSON());
route(server, message); route(server, message);
@ -145,6 +152,9 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
if (invalidTarget(getControllerInUse()) || addressInfo.isInternal()) { if (invalidTarget(getControllerInUse()) || addressInfo.isInternal()) {
return; return;
} }
if (ignoreAddress(addressInfo.getName())) {
return;
}
if (deleteQueues) { if (deleteQueues) {
Message message = createMessage(addressInfo.getName(), null, DELETE_ADDRESS, null, addressInfo.toJSON()); Message message = createMessage(addressInfo.getName(), null, DELETE_ADDRESS, null, addressInfo.toJSON());
route(server, message); route(server, message);
@ -162,6 +172,12 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
} }
return; return;
} }
if (ignoreAddress(queueConfiguration.getAddress())) {
if (logger.isTraceEnabled()) {
logger.trace("Skipping create " + queueConfiguration + ", queue address " + queueConfiguration.getAddress() + " doesn't match filter");
}
return;
}
if (addQueues) { if (addQueues) {
Message message = createMessage(queueConfiguration.getAddress(), queueConfiguration.getName(), CREATE_QUEUE, null, queueConfiguration.toJSON()); Message message = createMessage(queueConfiguration.getAddress(), queueConfiguration.getName(), CREATE_QUEUE, null, queueConfiguration.toJSON());
route(server, message); route(server, message);
@ -178,6 +194,10 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
return; return;
} }
if (ignoreAddress(address)) {
return;
}
if (deleteQueues) { if (deleteQueues) {
Message message = createMessage(address, queue, DELETE_QUEUE, null, queue.toString()); Message message = createMessage(address, queue, DELETE_QUEUE, null, queue.toString());
route(server, message); route(server, message);
@ -188,12 +208,18 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
return controller != null && sameNode(getRemoteMirrorId(), controller.getRemoteMirrorId()); return controller != null && sameNode(getRemoteMirrorId(), controller.getRemoteMirrorId());
} }
private boolean ignoreAddress(SimpleString address) {
return !addressFilter.match(address);
}
private boolean sameNode(String remoteID, String sourceID) { private boolean sameNode(String remoteID, String sourceID) {
return (remoteID != null && sourceID != null && remoteID.equals(sourceID)); return (remoteID != null && sourceID != null && remoteID.equals(sourceID));
} }
@Override @Override
public void sendMessage(Message message, RoutingContext context, List<MessageReference> refs) { public void sendMessage(Message message, RoutingContext context, List<MessageReference> refs) {
SimpleString address = context.getAddress(message);
if (invalidTarget(context.getMirrorSource())) { if (invalidTarget(context.getMirrorSource())) {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("server " + server + " is discarding send to avoid infinite loop (reflection with the mirror)"); logger.trace("server " + server + " is discarding send to avoid infinite loop (reflection with the mirror)");
@ -208,6 +234,13 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
return; return;
} }
if (ignoreAddress(address)) {
if (logger.isTraceEnabled()) {
logger.trace("server " + server + " is discarding send to address " + address + ", address doesn't match filter");
}
return;
}
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace(server + " send message " + message); logger.trace(server + " send message " + message);
} }
@ -301,6 +334,13 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
return; return;
} }
if (ignoreAddress(ref.getQueue().getAddress())) {
if (logger.isTraceEnabled()) {
logger.trace(server + " rejecting postAcknowledge queue=" + ref.getQueue().getName() + ", ref=" + ref + ", queue address is excluded");
}
return;
}
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace(server + " postAcknowledge " + ref); logger.trace(server + " postAcknowledge " + ref);
} }
@ -337,4 +377,5 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
return true; return true;
} }
} }
} }

View File

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.connect.mirror;
import java.util.HashSet;
import java.util.Set;
import org.apache.activemq.artemis.api.core.SimpleString;
public class MirrorAddressFilter {
private final SimpleString[] allowList;
private final SimpleString[] denyList;
public MirrorAddressFilter(String filter) {
Set<SimpleString> allowList = new HashSet<>();
Set<SimpleString> denyList = new HashSet<>();
if (filter != null && !filter.isEmpty()) {
String[] parts = filter.split(",");
for (String part : parts) {
if (!"".equals(part) && !"!".equals(part)) {
if (part.startsWith("!")) {
denyList.add(new SimpleString(part.substring(1)));
} else {
allowList.add(new SimpleString(part));
}
}
}
}
this.allowList = allowList.toArray(new SimpleString[]{});
this.denyList = denyList.toArray(new SimpleString[]{});
}
public boolean match(SimpleString checkAddress) {
if (denyList.length > 0) {
for (SimpleString pattern : denyList) {
if (checkAddress.startsWith(pattern)) {
return false;
}
}
}
if (allowList.length > 0) {
for (SimpleString pattern : allowList) {
if (checkAddress.startsWith(pattern)) {
return true;
}
}
return false;
}
return true;
}
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.connect.mirror;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.junit.Assert;
import org.junit.Test;
public class MirrorAddressFilterTest {
@Test
public void testAddressFilter() {
Assert.assertTrue(new MirrorAddressFilter("").match(new SimpleString("any")));
Assert.assertTrue(new MirrorAddressFilter("test").match(new SimpleString("test123")));
Assert.assertTrue(new MirrorAddressFilter("a,b").match(new SimpleString("b")));
Assert.assertTrue(new MirrorAddressFilter("!c").match(new SimpleString("a")));
Assert.assertTrue(new MirrorAddressFilter("!a,!").match(new SimpleString("b123")));
Assert.assertFalse(new MirrorAddressFilter("a,b,!ab").match(new SimpleString("ab")));
Assert.assertFalse(new MirrorAddressFilter("!a,!b").match(new SimpleString("b123")));
Assert.assertFalse(new MirrorAddressFilter("a,").match(new SimpleString("b")));
}
}

View File

@ -30,6 +30,8 @@ public class AMQPMirrorBrokerConnectionElement extends AMQPBrokerConnectionEleme
SimpleString mirrorSNF; SimpleString mirrorSNF;
String addressFilter;
public SimpleString getMirrorSNF() { public SimpleString getMirrorSNF() {
return mirrorSNF; return mirrorSNF;
} }
@ -86,4 +88,14 @@ public class AMQPMirrorBrokerConnectionElement extends AMQPBrokerConnectionEleme
this.messageAcknowledgements = messageAcknowledgements; this.messageAcknowledgements = messageAcknowledgements;
return this; return this;
} }
public String getAddressFilter() {
return addressFilter;
}
public AMQPMirrorBrokerConnectionElement setAddressFilter(String addressFilter) {
this.addressFilter = addressFilter;
return this;
}
} }

View File

@ -2094,8 +2094,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
boolean queueCreation = getBooleanAttribute(e2,"queue-creation", true); boolean queueCreation = getBooleanAttribute(e2,"queue-creation", true);
boolean durable = getBooleanAttribute(e2, "durable", true); boolean durable = getBooleanAttribute(e2, "durable", true);
boolean queueRemoval = getBooleanAttribute(e2, "queue-removal", true); boolean queueRemoval = getBooleanAttribute(e2, "queue-removal", true);
String addressFilter = getAttributeValue(e2, "address-filter");
AMQPMirrorBrokerConnectionElement amqpMirrorConnectionElement = new AMQPMirrorBrokerConnectionElement(); AMQPMirrorBrokerConnectionElement amqpMirrorConnectionElement = new AMQPMirrorBrokerConnectionElement();
amqpMirrorConnectionElement.setMessageAcknowledgements(messageAcks).setQueueCreation(queueCreation).setQueueRemoval(queueRemoval).setDurable(durable); amqpMirrorConnectionElement.setMessageAcknowledgements(messageAcks).setQueueCreation(queueCreation).setQueueRemoval(queueRemoval).setDurable(durable).setAddressFilter(addressFilter);
connectionElement = amqpMirrorConnectionElement; connectionElement = amqpMirrorConnectionElement;
connectionElement.setType(AMQPBrokerConnectionAddressType.MIRROR); connectionElement.setType(AMQPBrokerConnectionAddressType.MIRROR);
} else { } else {

View File

@ -2442,6 +2442,14 @@
</xsd:documentation> </xsd:documentation>
</xsd:annotation> </xsd:annotation>
</xsd:attribute> </xsd:attribute>
<xsd:attribute name="address-filter" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation>
This defines a filter that mirror will use to determine witch events will be forwarded toward
target server based on source address.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType> </xsd:complexType>

View File

@ -442,7 +442,7 @@
<receiver address-match="TEST-RECEIVER" /> <receiver address-match="TEST-RECEIVER" />
<peer address-match="TEST-PEER"/> <peer address-match="TEST-PEER"/>
<receiver queue-name="TEST-WITH-QUEUE-NAME"/> <receiver queue-name="TEST-WITH-QUEUE-NAME"/>
<mirror message-acknowledgements="false" queue-creation="false" durable="false" queue-removal="false"/> <mirror message-acknowledgements="false" queue-creation="false" durable="false" queue-removal="false" address-filter="TEST-QUEUE,!IGNORE-QUEUE"/>
</amqp-connection> </amqp-connection>
<amqp-connection uri="tcp://test2:222" name="test2"> <amqp-connection uri="tcp://test2:222" name="test2">
<mirror durable="false"/> <mirror durable="false"/>

View File

@ -103,6 +103,25 @@ The following optional arguments can be utilized:
* `queue-removal`: Specifies whether a queue- or address-removal event is sent. The default value is `true`. * `queue-removal`: Specifies whether a queue- or address-removal event is sent. The default value is `true`.
* `message-acknowledgements`: Specifies whether message acknowledgements are sent. The default value is `true`. * `message-acknowledgements`: Specifies whether message acknowledgements are sent. The default value is `true`.
* `queue-creation`: Specifies whether a queue- or address-creation event is sent. The default value is `true`. * `queue-creation`: Specifies whether a queue- or address-creation event is sent. The default value is `true`.
* `address-filter`: An optional comma-separated list of inclusion and/or exclusion filter entries used to govern which addresses (and related queues) mirroring events will be created for on this broker-connection. That is, events will only be mirrored to the target broker for addresses that match the filter.
An address is matched when it begins with an inclusion entry specified in this field, unless the address is also explicitly excluded by another entry. An exclusion entry is prefixed with `!` to denote any address beginning with that value does not match.
If no inclusion entry is specified in the list, all addresses not explicitly excluded will match. If the address-filter attribute is not specified, then all addresses (and related queues) will match and be mirrored.
Examples:
- 'eu'
matches all addresses starting with 'eu'
- '!eu'
matches all address except for those starting with 'eu'
- 'eu.uk,eu.de'
matches all addresses starting with either 'eu.uk' or 'eu.de'
- 'eu,!eu.uk'
matches all addresses starting with 'eu' but not those starting with 'eu.uk'
**Note:**
- Address exclusion will always take precedence over address inclusion.
- Address matching on mirror elements is prefix-based and does not support wild-card matching.
An example of a mirror configuration is shown below: An example of a mirror configuration is shown below:
```xml ```xml

View File

@ -473,6 +473,70 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
} }
} }
@Test
public void testAddressFilter() throws Exception {
final String REPLICATED = "replicated";
final String NON_REPLICATED = "nonReplicated";
final String ADDRESS_FILTER = REPLICATED + "," + "!" + NON_REPLICATED;
final String MSG = "msg";
server.start();
server_2 = createServer(AMQP_PORT_2, false);
server_2.setIdentity("server_2");
server_2.getConfiguration().setName("server_2");
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("mirror-source", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
AMQPMirrorBrokerConnectionElement replica = new AMQPMirrorBrokerConnectionElement().setDurable(true).setAddressFilter(ADDRESS_FILTER);
amqpConnection.addElement(replica);
server_2.getConfiguration().addAMQPConnection(amqpConnection);
server_2.start();
try (Connection connection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT_2).createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Send to non replicated address
try (MessageProducer producer = session.createProducer(session.createQueue(NON_REPLICATED))) {
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i < 2; i++) {
producer.send(session.createTextMessage("never receive"));
}
}
// Check nothing was added to SnF queue
Assert.assertEquals(0, server_2.locateQueue(replica.getMirrorSNF()).getMessagesAdded());
// Send to replicated address
try (MessageProducer producer = session.createProducer(session.createQueue(REPLICATED))) {
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i < 2; i++) {
producer.send(session.createTextMessage(MSG));
}
}
// Check some messages were sent to SnF queue
Assert.assertTrue(server_2.locateQueue(replica.getMirrorSNF()).getMessagesAdded() > 0);
}
try (Connection connection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT).createConnection()) {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
try (MessageConsumer consumer = session.createConsumer(session.createQueue(REPLICATED))) {
Message message = consumer.receive(3000);
Assert.assertNotNull(message);
Assert.assertEquals(MSG, message.getBody(String.class));
}
try (MessageConsumer consumer = session.createConsumer(session.createQueue(NON_REPLICATED))) {
Assert.assertNull(consumer.receiveNoWait());
}
}
}
@Test @Test
public void testRouteSurviving() throws Exception { public void testRouteSurviving() throws Exception {
testRouteSurvivor(false); testRouteSurvivor(false);