WIP: testing another placement for filtering
This commit is contained in:
parent
46d8165608
commit
1984a0d3c7
|
@ -758,6 +758,12 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean filterRef(MessageReference ref, ServerConsumer consumer) {
|
||||
ProtonServerSenderContext plugSender = (ProtonServerSenderContext) consumer.getProtocolContext();
|
||||
return plugSender.filterRef(ref);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int sendLargeMessage(MessageReference ref,
|
||||
ServerConsumer consumer,
|
||||
|
|
|
@ -469,7 +469,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
|
|||
queue.getName().toString(),
|
||||
mirrorControllerSource::setLink,
|
||||
(r) -> AMQPMirrorControllerSource.validateProtocolData(protonProtocolManager.getReferenceIDSupplier(), r, getMirrorSNF(replica)),
|
||||
(r) -> mirrorControllerSource.filterMessage(r),
|
||||
(r) -> mirrorControllerSource.shouldFilterRef(r),
|
||||
server.getNodeID().toString(),
|
||||
desiredCapabilities,
|
||||
null,
|
||||
|
@ -776,7 +776,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
|
|||
String targetName,
|
||||
java.util.function.Consumer<Sender> senderConsumer,
|
||||
java.util.function.Consumer<? super MessageReference> beforeDeliver,
|
||||
java.util.function.Predicate<? super MessageReference> beforeDeliverFiltering,
|
||||
java.util.function.Predicate<? super MessageReference> shouldFilterRef,
|
||||
String brokerID,
|
||||
Symbol[] desiredCapabilities,
|
||||
Symbol[] targetCapabilities,
|
||||
|
@ -837,7 +837,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
|
|||
|
||||
// Using attachments to set up a Runnable that will be executed inside AMQPBrokerConnection::remoteLinkOpened
|
||||
sender.attachments().set(AMQP_LINK_INITIALIZER_KEY, Runnable.class, () -> {
|
||||
ProtonServerSenderContext senderContext = new ProtonServerSenderContext(protonRemotingConnection.getAmqpConnection(), sender, sessionContext, sessionContext.getSessionSPI(), outgoingInitializer).setBeforeDelivery(beforeDeliver).setBeforeDeliveryFiltering(beforeDeliverFiltering);
|
||||
ProtonServerSenderContext senderContext = new ProtonServerSenderContext(protonRemotingConnection.getAmqpConnection(), sender, sessionContext, sessionContext.getSessionSPI(), outgoingInitializer).setBeforeDelivery(beforeDeliver).setShouldFilterRef(shouldFilterRef);
|
||||
try {
|
||||
if (!cancelled.get()) {
|
||||
if (futureTimeout != null) {
|
||||
|
|
|
@ -506,7 +506,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
|
|||
* @return true if the INTERNAL_RECEIVER_ID_FILTER annotation of the message is set to a different value
|
||||
* than the remoteMirrorID, false otherwise.
|
||||
*/
|
||||
public boolean filterMessage(MessageReference ref) {
|
||||
public boolean shouldFilterRef(MessageReference ref) {
|
||||
Object filterID = ref.getMessage().getAnnotation(RECEIVER_ID_FILTER);
|
||||
if (filterID != null) {
|
||||
String remoteMirrorId = getRemoteMirrorId();
|
||||
|
@ -514,6 +514,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
|
|||
if (remoteMirrorId.equals(filterID)) {
|
||||
return false;
|
||||
} else {
|
||||
logger.trace("filtering message {} as remote mirror ID {} diverges from the wanted receiver {}", ref, remoteMirrorId, filterID);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
@ -607,7 +608,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
|
|||
noForwardSource = String.valueOf(ref.getMessage().getBrokerProperty(NO_FORWARD_SOURCE));
|
||||
if (remoteMirrorId != null && !remoteMirrorId.equals(noForwardSource)) {
|
||||
if (logger.isInfoEnabled()) {
|
||||
logger.info("Due to the noForward policy in place, no Ack for the ref={} should reach the remote mirror ID", ref, remoteMirrorId);
|
||||
logger.trace("Due to the noForward policy in place, no Ack for the ref={} should reach the remote mirror ID", ref, remoteMirrorId);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -89,7 +89,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
|||
private int credits = 0;
|
||||
private AtomicInteger pending = new AtomicInteger(0);
|
||||
private java.util.function.Consumer<? super MessageReference> beforeDelivery;
|
||||
private java.util.function.Predicate<? super MessageReference> beforeDeliveryFiltering;
|
||||
private java.util.function.Predicate<? super MessageReference> shouldFilterRef;
|
||||
|
||||
protected volatile Runnable afterDelivery;
|
||||
protected volatile MessageWriter messageWriter = SenderController.REJECTING_MESSAGE_WRITER;
|
||||
|
@ -121,8 +121,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
|||
return this;
|
||||
}
|
||||
|
||||
public ProtonServerSenderContext setBeforeDeliveryFiltering(java.util.function.Predicate<? super MessageReference> beforeDeliveryFiltering) {
|
||||
this.beforeDeliveryFiltering = beforeDeliveryFiltering;
|
||||
public ProtonServerSenderContext setShouldFilterRef(java.util.function.Predicate<? super MessageReference> shouldFilterRef) {
|
||||
this.shouldFilterRef = shouldFilterRef;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -447,6 +447,13 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
|||
return handled;
|
||||
}
|
||||
|
||||
public boolean filterRef(MessageReference ref) {
|
||||
if (shouldFilterRef != null) {
|
||||
return shouldFilterRef.test(ref);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private final class ConnectionFlushIOCallback implements IOCallback {
|
||||
|
||||
@Override
|
||||
|
@ -481,12 +488,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
|||
beforeDelivery.accept(messageReference);
|
||||
}
|
||||
|
||||
if (beforeDeliveryFiltering != null) {
|
||||
if (beforeDeliveryFiltering.test(messageReference)) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
synchronized (creditsLock) {
|
||||
if (sender.getLocalState() == EndpointState.CLOSED) {
|
||||
return 0;
|
||||
|
|
|
@ -428,6 +428,12 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
|||
|
||||
return HandleStatus.NO_MATCH;
|
||||
}
|
||||
if (callback != null && callback.filterRef(ref, ServerConsumerImpl.this)) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.trace("Reference {} is not allowed to be consumed by {} due to message filtering.", ref, this);
|
||||
}
|
||||
return HandleStatus.NO_MATCH;
|
||||
}
|
||||
|
||||
synchronized (lock) {
|
||||
// If the consumer is stopped then we don't accept the message, it
|
||||
|
|
|
@ -98,4 +98,8 @@ public interface SessionCallback {
|
|||
default Transaction getCurrentTransaction() {
|
||||
return null;
|
||||
}
|
||||
|
||||
default boolean filterRef(MessageReference ref, ServerConsumer consumer) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue