This closes #1410
This commit is contained in:
commit
f138bc5284
|
@ -94,7 +94,7 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
|
||||
private AMQPSessionContext protonSession;
|
||||
|
||||
private final Executor closeExecutor;
|
||||
private final Executor sessionExecutor;
|
||||
|
||||
private final AtomicBoolean draining = new AtomicBoolean(false);
|
||||
|
||||
|
@ -109,7 +109,7 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
this.storageManager = manager.getServer().getStorageManager();
|
||||
this.connection = connection;
|
||||
this.transportConnection = transportConnection;
|
||||
this.closeExecutor = executor;
|
||||
this.sessionExecutor = executor;
|
||||
this.operationContext = operationContext;
|
||||
}
|
||||
|
||||
|
@ -164,6 +164,11 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean supportsDirectDelivery() {
|
||||
return false;
|
||||
}
|
||||
|
||||
public void init(AMQPSessionContext protonSession, SASLResult saslResult) throws Exception {
|
||||
|
||||
this.protonSession = protonSession;
|
||||
|
|
|
@ -129,6 +129,13 @@ public class AMQSession implements SessionCallback {
|
|||
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean supportsDirectDelivery() {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
|
||||
if (consumer.getProtocolData() != null) {
|
||||
|
|
|
@ -19,9 +19,18 @@ package org.apache.activemq.artemis.core.server;
|
|||
import java.util.List;
|
||||
|
||||
import org.apache.activemq.artemis.core.filter.Filter;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
|
||||
|
||||
public interface Consumer {
|
||||
|
||||
/**
|
||||
*
|
||||
* @see SessionCallback#supportsDirectDelivery()
|
||||
*/
|
||||
default boolean supportsDirectDelivery() {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* There was a change on semantic during 2.3 here.<br>
|
||||
* We now first accept the message, and the actual deliver is done as part of
|
||||
|
|
|
@ -227,6 +227,8 @@ public class QueueImpl implements Queue {
|
|||
|
||||
private volatile boolean directDeliver = true;
|
||||
|
||||
private volatile boolean supportsDirectDeliver = true;
|
||||
|
||||
private AddressSettingsRepositoryListener addressSettingsRepositoryListener;
|
||||
|
||||
private final ExpiryScanner expiryScanner = new ExpiryScanner();
|
||||
|
@ -623,7 +625,7 @@ public class QueueImpl implements Queue {
|
|||
// The checkDirect flag is periodically set to true, if the delivery is specified as direct then this causes the
|
||||
// directDeliver flag to be re-computed resulting in direct delivery if the queue is empty
|
||||
// We don't recompute it on every delivery since executing isEmpty is expensive for a ConcurrentQueue
|
||||
if (!directDeliver &&
|
||||
if (supportsDirectDeliver && !directDeliver &&
|
||||
direct &&
|
||||
System.currentTimeMillis() - lastDirectDeliveryCheck > CHECK_QUEUE_SIZE_PERIOD) {
|
||||
lastDirectDeliveryCheck = System.currentTimeMillis();
|
||||
|
@ -636,13 +638,13 @@ public class QueueImpl implements Queue {
|
|||
// deliveries
|
||||
if (flushExecutor() && flushDeliveriesInTransit()) {
|
||||
// Go into direct delivery mode
|
||||
directDeliver = true;
|
||||
directDeliver = supportsDirectDeliver;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (direct && directDeliver && deliveriesInTransit.getCount() == 0 && deliverDirect(ref)) {
|
||||
if (direct && supportsDirectDeliver && directDeliver && deliveriesInTransit.getCount() == 0 && deliverDirect(ref)) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -796,6 +798,10 @@ public class QueueImpl implements Queue {
|
|||
|
||||
consumersChanged = true;
|
||||
|
||||
if (!consumer.supportsDirectDelivery()) {
|
||||
this.supportsDirectDeliver = false;
|
||||
}
|
||||
|
||||
cancelRedistributor();
|
||||
|
||||
consumerList.add(new ConsumerHolder(consumer));
|
||||
|
@ -828,6 +834,8 @@ public class QueueImpl implements Queue {
|
|||
}
|
||||
}
|
||||
|
||||
this.supportsDirectDeliver = checkConsumerDirectDeliver();
|
||||
|
||||
if (pos > 0 && pos >= consumerList.size()) {
|
||||
pos = consumerList.size() - 1;
|
||||
}
|
||||
|
@ -864,6 +872,16 @@ public class QueueImpl implements Queue {
|
|||
}
|
||||
}
|
||||
|
||||
private boolean checkConsumerDirectDeliver() {
|
||||
boolean supports = true;
|
||||
for (ConsumerHolder consumerCheck: consumerList) {
|
||||
if (!consumerCheck.consumer.supportsDirectDelivery()) {
|
||||
supports = false;
|
||||
}
|
||||
}
|
||||
return supports;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void addRedistributor(final long delay) {
|
||||
clearRedistributorFuture();
|
||||
|
@ -2620,6 +2638,12 @@ public class QueueImpl implements Queue {
|
|||
*/
|
||||
private boolean deliverDirect(final MessageReference ref) {
|
||||
synchronized (this) {
|
||||
if (!supportsDirectDeliver) {
|
||||
// this should never happen, but who knows?
|
||||
// if someone ever change add and removeConsumer,
|
||||
// this would protect any eventual bug
|
||||
return false;
|
||||
}
|
||||
if (paused || consumerList.isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -336,6 +336,16 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
|||
return refs;
|
||||
}
|
||||
|
||||
/** i
|
||||
*
|
||||
* @see SessionCallback#supportsDirectDelivery()
|
||||
*/
|
||||
@Override
|
||||
public boolean supportsDirectDelivery() {
|
||||
return callback.supportsDirectDelivery();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public HandleStatus handle(final MessageReference ref) throws Exception {
|
||||
if (callback != null && !callback.hasCredits(this) || availableCredits != null && availableCredits.get() <= 0) {
|
||||
|
|
|
@ -24,6 +24,17 @@ import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
|
|||
|
||||
public interface SessionCallback {
|
||||
|
||||
/** A requirement to do direct delivery is:
|
||||
* no extra locking required at the protocol layer.
|
||||
* which cannot be guaranteed at AMQP as proton will need the locking.
|
||||
* So, disable this on AMQP or any other protocol requiring extra lock.
|
||||
* @return
|
||||
*/
|
||||
default boolean supportsDirectDelivery() {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* This one gives a chance for Proton to have its own flow control.
|
||||
*/
|
||||
|
|
Loading…
Reference in New Issue