From 8fb5b8969f98b0994b9563cb7f5441e58253d89f Mon Sep 17 00:00:00 2001 From: Robbie Gemmell Date: Thu, 9 Sep 2021 10:37:36 +0100 Subject: [PATCH] ARTEMIS-3449: fix some issues from using position to track an unrelated state, plus simplify some previous changes and more --- .../proton/ProtonServerSenderContext.java | 48 ++++++++++--------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 32585d7604..da09c6cf33 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -563,26 +563,16 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr final MessageReference reference; final AMQPLargeMessage message; final Delivery delivery; + boolean initialPacketHandled; void resume() { connection.runNow(this::deliver); } void deliver() { - // This is discounting some bytes due to Transfer payload final int frameSize = protonSession.session.getConnection().getTransport().getOutboundFrameSizeLimit() - 50 - (delivery.getTag() != null ? delivery.getTag().length : 0); - DeliveryAnnotations deliveryAnnotationsToEncode; - - message.checkReference(reference); - - if (reference.getProtocolData() != null && reference.getProtocolData() instanceof DeliveryAnnotations) { - deliveryAnnotationsToEncode = (DeliveryAnnotations)reference.getProtocolData(); - } else { - deliveryAnnotationsToEncode = null; - } - try { final ByteBuf frameBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(frameSize, frameSize); final NettyReadable frameView = new NettyReadable(frameBuffer); @@ -593,10 +583,12 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr // materialize it so we can use its internal NIO buffer frameBuffer.ensureWritable(frameSize); - if (position == 0 && sender.getLocalState() != EndpointState.CLOSED && position < bodySize) { - if (!deliverInitialPacket(context, deliveryAnnotationsToEncode, frameBuffer)) { + if (!initialPacketHandled && sender.getLocalState() != EndpointState.CLOSED) { + if (!deliverInitialPacket(context, frameBuffer)) { return; } + + initialPacketHandled = true; } for (; sender.getLocalState() != EndpointState.CLOSED && position < bodySize; ) { @@ -614,7 +606,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr position += readSize; if (readSize > 0) { - if (position < bodySize) { connection.instantFlush(); } @@ -650,13 +641,24 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } private boolean deliverInitialPacket(final LargeBodyReader context, - final DeliveryAnnotations deliveryAnnotationsToEncode, final ByteBuf frameBuffer) throws Exception { - assert position == 0 && context.position() == 0; + assert position == 0 && context.position() == 0 && !initialPacketHandled; + if (!connection.flowControl(this::resume)) { return false; } + frameBuffer.clear(); + + DeliveryAnnotations deliveryAnnotationsToEncode; + message.checkReference(reference); + + if (reference.getProtocolData() != null && reference.getProtocolData() instanceof DeliveryAnnotations) { + deliveryAnnotationsToEncode = (DeliveryAnnotations)reference.getProtocolData(); + } else { + deliveryAnnotationsToEncode = null; + } + try { replaceInitialHeader(deliveryAnnotationsToEncode, context, new NettyWritable(frameBuffer)); } catch (IndexOutOfBoundsException indexOutOfBoundsException) { @@ -670,15 +672,15 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr sendAndFlushInitialPacket(deliveryAnnotationsToEncode, context); return true; } + + int readSize = 0; final int writableBytes = frameBuffer.writableBytes(); - if (writableBytes == 0) { - sender.send(new NettyReadable(frameBuffer)); - connection.instantFlush(); - return true; + if (writableBytes != 0) { + final int writtenBytes = frameBuffer.writerIndex(); + readSize = context.readInto(frameBuffer.internalNioBuffer(writtenBytes, writableBytes)); + frameBuffer.writerIndex(writtenBytes + readSize); } - final int writtenBytes = frameBuffer.writerIndex(); - final int readSize = context.readInto(frameBuffer.internalNioBuffer(writtenBytes, writableBytes)); - frameBuffer.writerIndex(writtenBytes + readSize); + sender.send(new NettyReadable(frameBuffer)); position += readSize; connection.instantFlush();