From d27d61f223fe88fd01f8d98415ddadb75605d374 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Mon, 13 Apr 2020 20:29:57 -0400 Subject: [PATCH] ARTEMIS-2706 Use FrameSize to decide when to flush large messages --- .../amqp/proton/ProtonServerSenderContext.java | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 24d0bcbe18..4810519437 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -836,9 +836,10 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr void resume() { connection.runNow(this::deliver); } - private static final int BUFFER_LENGTH = 1024; - void deliver() { + + int frameSize = protonSession.session.getConnection().getTransport().getOutboundFrameSizeLimit(); + // Let the Message decide how to present the message bytes LargeBodyReader context = message.getLargeBodyReader(); try { @@ -850,7 +851,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr // TODO: it would be nice to use pooled buffer here, // however I would need a version of ReadableBuffer for Netty - ByteBuffer buf = ByteBuffer.allocate(BUFFER_LENGTH); + ByteBuffer buf = ByteBuffer.allocate(frameSize); for (; position < bodySize; ) { if (!connection.flowControl(this::resume)) { @@ -860,11 +861,13 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr buf.clear(); int size = context.readInto(buf); - sender.send(buf.array(), 0, size); - - connection.instantFlush(); + sender.send(new ReadableBuffer.ByteBufferReader(buf)); position += size; + + if (position < bodySize) { + connection.instantFlush(); + } } } finally { context.close(); @@ -882,7 +885,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr sender.advance(); } - connection.flush(); + connection.instantFlush(); synchronized (creditsLock) { pending.decrementAndGet();