diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 24d0bcbe18..4810519437 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -836,9 +836,10 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr void resume() { connection.runNow(this::deliver); } - private static final int BUFFER_LENGTH = 1024; - void deliver() { + + int frameSize = protonSession.session.getConnection().getTransport().getOutboundFrameSizeLimit(); + // Let the Message decide how to present the message bytes LargeBodyReader context = message.getLargeBodyReader(); try { @@ -850,7 +851,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr // TODO: it would be nice to use pooled buffer here, // however I would need a version of ReadableBuffer for Netty - ByteBuffer buf = ByteBuffer.allocate(BUFFER_LENGTH); + ByteBuffer buf = ByteBuffer.allocate(frameSize); for (; position < bodySize; ) { if (!connection.flowControl(this::resume)) { @@ -860,11 +861,13 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr buf.clear(); int size = context.readInto(buf); - sender.send(buf.array(), 0, size); - - connection.instantFlush(); + sender.send(new ReadableBuffer.ByteBufferReader(buf)); position += size; + + if (position < bodySize) { + connection.instantFlush(); + } } } finally { context.close(); @@ -882,7 +885,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr sender.advance(); } - connection.flush(); + connection.instantFlush(); synchronized (creditsLock) { pending.decrementAndGet();