From 8709b1e5d304d320d9ed31fba1ffcc6f89557927 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Fri, 27 Aug 2021 13:36:50 -0400 Subject: [PATCH] ARTEMIS-3448 Expiry (maybe DLQ copy) would break LargeMessage Delivery This commit will fix MessagesExpiredPagingTest.testSendReceiveAMQPLarge --- .../amqp/proton/ProtonServerSenderContext.java | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 6412af589a..b46bcc3c69 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -593,7 +593,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr ByteBuffer buf = ByteBuffer.allocate(frameSize); - for (; position < bodySize; ) { + for (; sender.getLocalState() != EndpointState.CLOSED && position < bodySize; ) { if (!connection.flowControl(this::resume)) { context.close(); return; @@ -754,11 +754,23 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } private void finishLargeMessage() { + lmUsageDown(); pendingLargeMessage = null; hasLarge = false; brokerConsumer.promptDelivery(); } + // will check for large message and set usageDown + private void lmUsageDown() { + AMQPLargeMessage lm = null; + if (pendingLargeMessage != null) { + lm = pendingLargeMessage.message; + } + if (lm != null) { + lm.usageDown(); + } + } + private void deliverLarge(MessageReference messageReference, AMQPLargeMessage message) { // we only need a tag if we are going to settle later @@ -769,6 +781,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr delivery.setMessageFormat((int) message.getMessageFormat()); delivery.setContext(messageReference); + message.usageUp(); pendingLargeMessage = new LargeMessageDeliveryContext(messageReference, message, delivery); pendingLargeMessage.deliver();