ARTEMIS-3449: fix some issues from using position to track an unrelated state, plus simplify some previous changes and more

This commit is contained in:
Robbie Gemmell 2021-09-09 10:37:36 +01:00
parent 9de288857d
commit 8fb5b8969f
1 changed files with 25 additions and 23 deletions

View File

@ -563,26 +563,16 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
final MessageReference reference; final MessageReference reference;
final AMQPLargeMessage message; final AMQPLargeMessage message;
final Delivery delivery; final Delivery delivery;
boolean initialPacketHandled;
void resume() { void resume() {
connection.runNow(this::deliver); connection.runNow(this::deliver);
} }
void deliver() { void deliver() {
// This is discounting some bytes due to Transfer payload // This is discounting some bytes due to Transfer payload
final int frameSize = protonSession.session.getConnection().getTransport().getOutboundFrameSizeLimit() - 50 - (delivery.getTag() != null ? delivery.getTag().length : 0); final int frameSize = protonSession.session.getConnection().getTransport().getOutboundFrameSizeLimit() - 50 - (delivery.getTag() != null ? delivery.getTag().length : 0);
DeliveryAnnotations deliveryAnnotationsToEncode;
message.checkReference(reference);
if (reference.getProtocolData() != null && reference.getProtocolData() instanceof DeliveryAnnotations) {
deliveryAnnotationsToEncode = (DeliveryAnnotations)reference.getProtocolData();
} else {
deliveryAnnotationsToEncode = null;
}
try { try {
final ByteBuf frameBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(frameSize, frameSize); final ByteBuf frameBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(frameSize, frameSize);
final NettyReadable frameView = new NettyReadable(frameBuffer); final NettyReadable frameView = new NettyReadable(frameBuffer);
@ -593,10 +583,12 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
// materialize it so we can use its internal NIO buffer // materialize it so we can use its internal NIO buffer
frameBuffer.ensureWritable(frameSize); frameBuffer.ensureWritable(frameSize);
if (position == 0 && sender.getLocalState() != EndpointState.CLOSED && position < bodySize) { if (!initialPacketHandled && sender.getLocalState() != EndpointState.CLOSED) {
if (!deliverInitialPacket(context, deliveryAnnotationsToEncode, frameBuffer)) { if (!deliverInitialPacket(context, frameBuffer)) {
return; return;
} }
initialPacketHandled = true;
} }
for (; sender.getLocalState() != EndpointState.CLOSED && position < bodySize; ) { for (; sender.getLocalState() != EndpointState.CLOSED && position < bodySize; ) {
@ -614,7 +606,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
position += readSize; position += readSize;
if (readSize > 0) { if (readSize > 0) {
if (position < bodySize) { if (position < bodySize) {
connection.instantFlush(); connection.instantFlush();
} }
@ -650,13 +641,24 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} }
private boolean deliverInitialPacket(final LargeBodyReader context, private boolean deliverInitialPacket(final LargeBodyReader context,
final DeliveryAnnotations deliveryAnnotationsToEncode,
final ByteBuf frameBuffer) throws Exception { final ByteBuf frameBuffer) throws Exception {
assert position == 0 && context.position() == 0; assert position == 0 && context.position() == 0 && !initialPacketHandled;
if (!connection.flowControl(this::resume)) { if (!connection.flowControl(this::resume)) {
return false; return false;
} }
frameBuffer.clear(); frameBuffer.clear();
DeliveryAnnotations deliveryAnnotationsToEncode;
message.checkReference(reference);
if (reference.getProtocolData() != null && reference.getProtocolData() instanceof DeliveryAnnotations) {
deliveryAnnotationsToEncode = (DeliveryAnnotations)reference.getProtocolData();
} else {
deliveryAnnotationsToEncode = null;
}
try { try {
replaceInitialHeader(deliveryAnnotationsToEncode, context, new NettyWritable(frameBuffer)); replaceInitialHeader(deliveryAnnotationsToEncode, context, new NettyWritable(frameBuffer));
} catch (IndexOutOfBoundsException indexOutOfBoundsException) { } catch (IndexOutOfBoundsException indexOutOfBoundsException) {
@ -670,15 +672,15 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
sendAndFlushInitialPacket(deliveryAnnotationsToEncode, context); sendAndFlushInitialPacket(deliveryAnnotationsToEncode, context);
return true; return true;
} }
int readSize = 0;
final int writableBytes = frameBuffer.writableBytes(); final int writableBytes = frameBuffer.writableBytes();
if (writableBytes == 0) { if (writableBytes != 0) {
sender.send(new NettyReadable(frameBuffer)); final int writtenBytes = frameBuffer.writerIndex();
connection.instantFlush(); readSize = context.readInto(frameBuffer.internalNioBuffer(writtenBytes, writableBytes));
return true; frameBuffer.writerIndex(writtenBytes + readSize);
} }
final int writtenBytes = frameBuffer.writerIndex();
final int readSize = context.readInto(frameBuffer.internalNioBuffer(writtenBytes, writableBytes));
frameBuffer.writerIndex(writtenBytes + readSize);
sender.send(new NettyReadable(frameBuffer)); sender.send(new NettyReadable(frameBuffer));
position += readSize; position += readSize;
connection.instantFlush(); connection.instantFlush();