diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index ed15a569f6..98845cd1b0 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -94,7 +94,7 @@ public class AMQPSessionCallback implements SessionCallback { private AMQPSessionContext protonSession; - private final Executor closeExecutor; + private final Executor sessionExecutor; private final AtomicBoolean draining = new AtomicBoolean(false); @@ -109,7 +109,7 @@ public class AMQPSessionCallback implements SessionCallback { this.storageManager = manager.getServer().getStorageManager(); this.connection = connection; this.transportConnection = transportConnection; - this.closeExecutor = executor; + this.sessionExecutor = executor; this.operationContext = operationContext; } @@ -164,6 +164,11 @@ public class AMQPSessionCallback implements SessionCallback { } + @Override + public boolean supportsDirectDelivery() { + return false; + } + public void init(AMQPSessionContext protonSession, SASLResult saslResult) throws Exception { this.protonSession = protonSession; diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index bd07251187..9b6670e423 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -129,6 +129,13 @@ public class AMQSession implements SessionCallback { } + + @Override + public boolean supportsDirectDelivery() { + return false; + } + + @Override public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) { if (consumer.getProtocolData() != null) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java index 50c0b01d5f..6df48890da 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java @@ -19,9 +19,18 @@ package org.apache.activemq.artemis.core.server; import java.util.List; import org.apache.activemq.artemis.core.filter.Filter; +import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; public interface Consumer { + /** + * + * @see SessionCallback#supportsDirectDelivery() + */ + default boolean supportsDirectDelivery() { + return true; + } + /** * There was a change on semantic during 2.3 here.
* We now first accept the message, and the actual deliver is done as part of diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index f922c3a7c5..9269eb34f2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -227,6 +227,8 @@ public class QueueImpl implements Queue { private volatile boolean directDeliver = true; + private volatile boolean supportsDirectDeliver = true; + private AddressSettingsRepositoryListener addressSettingsRepositoryListener; private final ExpiryScanner expiryScanner = new ExpiryScanner(); @@ -623,7 +625,7 @@ public class QueueImpl implements Queue { // The checkDirect flag is periodically set to true, if the delivery is specified as direct then this causes the // directDeliver flag to be re-computed resulting in direct delivery if the queue is empty // We don't recompute it on every delivery since executing isEmpty is expensive for a ConcurrentQueue - if (!directDeliver && + if (supportsDirectDeliver && !directDeliver && direct && System.currentTimeMillis() - lastDirectDeliveryCheck > CHECK_QUEUE_SIZE_PERIOD) { lastDirectDeliveryCheck = System.currentTimeMillis(); @@ -636,13 +638,13 @@ public class QueueImpl implements Queue { // deliveries if (flushExecutor() && flushDeliveriesInTransit()) { // Go into direct delivery mode - directDeliver = true; + directDeliver = supportsDirectDeliver; } } } } - if (direct && directDeliver && deliveriesInTransit.getCount() == 0 && deliverDirect(ref)) { + if (direct && supportsDirectDeliver && directDeliver && deliveriesInTransit.getCount() == 0 && deliverDirect(ref)) { return; } @@ -796,6 +798,10 @@ public class QueueImpl implements Queue { consumersChanged = true; + if (!consumer.supportsDirectDelivery()) { + this.supportsDirectDeliver = false; + } + cancelRedistributor(); consumerList.add(new ConsumerHolder(consumer)); @@ -828,6 +834,8 @@ public class QueueImpl implements Queue { } } + this.supportsDirectDeliver = checkConsumerDirectDeliver(); + if (pos > 0 && pos >= consumerList.size()) { pos = consumerList.size() - 1; } @@ -864,6 +872,16 @@ public class QueueImpl implements Queue { } } + private boolean checkConsumerDirectDeliver() { + boolean supports = true; + for (ConsumerHolder consumerCheck: consumerList) { + if (!consumerCheck.consumer.supportsDirectDelivery()) { + supports = false; + } + } + return supports; + } + @Override public synchronized void addRedistributor(final long delay) { clearRedistributorFuture(); @@ -2620,6 +2638,12 @@ public class QueueImpl implements Queue { */ private boolean deliverDirect(final MessageReference ref) { synchronized (this) { + if (!supportsDirectDeliver) { + // this should never happen, but who knows? + // if someone ever change add and removeConsumer, + // this would protect any eventual bug + return false; + } if (paused || consumerList.isEmpty()) { return false; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index f614fa160b..a68516368a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -336,6 +336,16 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { return refs; } + /** i + * + * @see SessionCallback#supportsDirectDelivery() + */ + @Override + public boolean supportsDirectDelivery() { + return callback.supportsDirectDelivery(); + } + + @Override public HandleStatus handle(final MessageReference ref) throws Exception { if (callback != null && !callback.hasCredits(this) || availableCredits != null && availableCredits.get() <= 0) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java index 799e8b0ee7..edfb5dc46e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java @@ -24,6 +24,17 @@ import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; public interface SessionCallback { + /** A requirement to do direct delivery is: + * no extra locking required at the protocol layer. + * which cannot be guaranteed at AMQP as proton will need the locking. + * So, disable this on AMQP or any other protocol requiring extra lock. + * @return + */ + default boolean supportsDirectDelivery() { + return true; + } + + /** * This one gives a chance for Proton to have its own flow control. */