ARTEMIS-3448 Expiry (maybe DLQ copy) would break LargeMessage Delivery

This commit will fix MessagesExpiredPagingTest.testSendReceiveAMQPLarge
This commit is contained in:
Clebert Suconic 2021-08-27 13:36:50 -04:00
parent b8ed0f21c8
commit 8709b1e5d3
1 changed files with 14 additions and 1 deletions

View File

@ -593,7 +593,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
ByteBuffer buf = ByteBuffer.allocate(frameSize);
for (; position < bodySize; ) {
for (; sender.getLocalState() != EndpointState.CLOSED && position < bodySize; ) {
if (!connection.flowControl(this::resume)) {
context.close();
return;
@ -754,11 +754,23 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
private void finishLargeMessage() {
lmUsageDown();
pendingLargeMessage = null;
hasLarge = false;
brokerConsumer.promptDelivery();
}
// will check for large message and set usageDown
private void lmUsageDown() {
AMQPLargeMessage lm = null;
if (pendingLargeMessage != null) {
lm = pendingLargeMessage.message;
}
if (lm != null) {
lm.usageDown();
}
}
private void deliverLarge(MessageReference messageReference, AMQPLargeMessage message) {
// we only need a tag if we are going to settle later
@ -769,6 +781,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
delivery.setMessageFormat((int) message.getMessageFormat());
delivery.setContext(messageReference);
message.usageUp();
pendingLargeMessage = new LargeMessageDeliveryContext(messageReference, message, delivery);
pendingLargeMessage.deliver();