ARTEMIS-2706 Use FrameSize to decide when to flush large messages

This commit is contained in:
Clebert Suconic 2020-04-13 20:29:57 -04:00
parent 5085fabd9a
commit d27d61f223
1 changed files with 10 additions and 7 deletions

View File

@ -836,9 +836,10 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
void resume() { void resume() {
connection.runNow(this::deliver); connection.runNow(this::deliver);
} }
private static final int BUFFER_LENGTH = 1024;
void deliver() { void deliver() {
int frameSize = protonSession.session.getConnection().getTransport().getOutboundFrameSizeLimit();
// Let the Message decide how to present the message bytes // Let the Message decide how to present the message bytes
LargeBodyReader context = message.getLargeBodyReader(); LargeBodyReader context = message.getLargeBodyReader();
try { try {
@ -850,7 +851,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
// TODO: it would be nice to use pooled buffer here, // TODO: it would be nice to use pooled buffer here,
// however I would need a version of ReadableBuffer for Netty // however I would need a version of ReadableBuffer for Netty
ByteBuffer buf = ByteBuffer.allocate(BUFFER_LENGTH); ByteBuffer buf = ByteBuffer.allocate(frameSize);
for (; position < bodySize; ) { for (; position < bodySize; ) {
if (!connection.flowControl(this::resume)) { if (!connection.flowControl(this::resume)) {
@ -860,11 +861,13 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
buf.clear(); buf.clear();
int size = context.readInto(buf); int size = context.readInto(buf);
sender.send(buf.array(), 0, size); sender.send(new ReadableBuffer.ByteBufferReader(buf));
connection.instantFlush();
position += size; position += size;
if (position < bodySize) {
connection.instantFlush();
}
} }
} finally { } finally {
context.close(); context.close();
@ -882,7 +885,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
sender.advance(); sender.advance();
} }
connection.flush(); connection.instantFlush();
synchronized (creditsLock) { synchronized (creditsLock) {
pending.decrementAndGet(); pending.decrementAndGet();