diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index b93a1280bd..17f42748cd 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -758,6 +758,12 @@ public class AMQPSessionCallback implements SessionCallback { } + @Override + public boolean filterRef(MessageReference ref, ServerConsumer consumer) { + ProtonServerSenderContext plugSender = (ProtonServerSenderContext) consumer.getProtocolContext(); + return plugSender.filterRef(ref); + } + @Override public int sendLargeMessage(MessageReference ref, ServerConsumer consumer, diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java index 24f8b0a97e..147abf37c0 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java @@ -469,7 +469,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener, queue.getName().toString(), mirrorControllerSource::setLink, (r) -> AMQPMirrorControllerSource.validateProtocolData(protonProtocolManager.getReferenceIDSupplier(), r, getMirrorSNF(replica)), - (r) -> mirrorControllerSource.filterMessage(r), + (r) -> mirrorControllerSource.shouldFilterRef(r), server.getNodeID().toString(), desiredCapabilities, null, @@ -776,7 +776,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener, String targetName, java.util.function.Consumer senderConsumer, java.util.function.Consumer beforeDeliver, - java.util.function.Predicate beforeDeliverFiltering, + java.util.function.Predicate shouldFilterRef, String brokerID, Symbol[] desiredCapabilities, Symbol[] targetCapabilities, @@ -837,7 +837,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener, // Using attachments to set up a Runnable that will be executed inside AMQPBrokerConnection::remoteLinkOpened sender.attachments().set(AMQP_LINK_INITIALIZER_KEY, Runnable.class, () -> { - ProtonServerSenderContext senderContext = new ProtonServerSenderContext(protonRemotingConnection.getAmqpConnection(), sender, sessionContext, sessionContext.getSessionSPI(), outgoingInitializer).setBeforeDelivery(beforeDeliver).setBeforeDeliveryFiltering(beforeDeliverFiltering); + ProtonServerSenderContext senderContext = new ProtonServerSenderContext(protonRemotingConnection.getAmqpConnection(), sender, sessionContext, sessionContext.getSessionSPI(), outgoingInitializer).setBeforeDelivery(beforeDeliver).setShouldFilterRef(shouldFilterRef); try { if (!cancelled.get()) { if (futureTimeout != null) { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java index b1c01a9409..dae23699a7 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java @@ -506,7 +506,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController im * @return true if the INTERNAL_RECEIVER_ID_FILTER annotation of the message is set to a different value * than the remoteMirrorID, false otherwise. */ - public boolean filterMessage(MessageReference ref) { + public boolean shouldFilterRef(MessageReference ref) { Object filterID = ref.getMessage().getAnnotation(RECEIVER_ID_FILTER); if (filterID != null) { String remoteMirrorId = getRemoteMirrorId(); @@ -514,6 +514,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController im if (remoteMirrorId.equals(filterID)) { return false; } else { + logger.trace("filtering message {} as remote mirror ID {} diverges from the wanted receiver {}", ref, remoteMirrorId, filterID); return true; } } @@ -607,7 +608,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController im noForwardSource = String.valueOf(ref.getMessage().getBrokerProperty(NO_FORWARD_SOURCE)); if (remoteMirrorId != null && !remoteMirrorId.equals(noForwardSource)) { if (logger.isInfoEnabled()) { - logger.info("Due to the noForward policy in place, no Ack for the ref={} should reach the remote mirror ID", ref, remoteMirrorId); + logger.trace("Due to the noForward policy in place, no Ack for the ref={} should reach the remote mirror ID", ref, remoteMirrorId); } return; } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index b0a13c08d6..767c67cf6e 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -89,7 +89,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr private int credits = 0; private AtomicInteger pending = new AtomicInteger(0); private java.util.function.Consumer beforeDelivery; - private java.util.function.Predicate beforeDeliveryFiltering; + private java.util.function.Predicate shouldFilterRef; protected volatile Runnable afterDelivery; protected volatile MessageWriter messageWriter = SenderController.REJECTING_MESSAGE_WRITER; @@ -121,8 +121,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr return this; } - public ProtonServerSenderContext setBeforeDeliveryFiltering(java.util.function.Predicate beforeDeliveryFiltering) { - this.beforeDeliveryFiltering = beforeDeliveryFiltering; + public ProtonServerSenderContext setShouldFilterRef(java.util.function.Predicate shouldFilterRef) { + this.shouldFilterRef = shouldFilterRef; return this; } @@ -447,6 +447,13 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr return handled; } + public boolean filterRef(MessageReference ref) { + if (shouldFilterRef != null) { + return shouldFilterRef.test(ref); + } + return false; + } + private final class ConnectionFlushIOCallback implements IOCallback { @Override @@ -481,12 +488,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr beforeDelivery.accept(messageReference); } - if (beforeDeliveryFiltering != null) { - if (beforeDeliveryFiltering.test(messageReference)) { - return 0; - } - } - synchronized (creditsLock) { if (sender.getLocalState() == EndpointState.CLOSED) { return 0; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index bab579a5c0..a470bc635f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -428,6 +428,12 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { return HandleStatus.NO_MATCH; } + if (callback != null && callback.filterRef(ref, ServerConsumerImpl.this)) { + if (logger.isDebugEnabled()) { + logger.trace("Reference {} is not allowed to be consumed by {} due to message filtering.", ref, this); + } + return HandleStatus.NO_MATCH; + } synchronized (lock) { // If the consumer is stopped then we don't accept the message, it diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java index e20f01277f..7b45d03140 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java @@ -98,4 +98,8 @@ public interface SessionCallback { default Transaction getCurrentTransaction() { return null; } + + default boolean filterRef(MessageReference ref, ServerConsumer consumer) { + return false; + } }