diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 0add7b7ae5..ed15a569f6 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -129,7 +129,7 @@ public class AMQPSessionCallback implements SessionCallback { @Override public void run() { try { - plugSender.getSender().drained(); + plugSender.reportDrained(); } finally { draining.set(false); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 8f8222b1a2..868e9c80d8 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -775,4 +775,18 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr return queue; } } + + /** + * Update link state to reflect that the previous drain attempt has completed. + */ + public void reportDrained() { + connection.lock(); + try { + sender.drained(); + } finally { + connection.unlock(); + } + + connection.flush(); + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 296088b7ba..f614fa160b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -579,10 +579,10 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { forceDelivery(sequence, r); } }); - } else { - r.run(); + return; } } + r.run(); } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorSendingForcedDelivery(e); }