This commit is contained in:
Clebert Suconic 2020-04-14 14:54:31 -04:00
commit 17d659052d
1 changed files with 10 additions and 7 deletions

View File

@ -836,9 +836,10 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
void resume() {
connection.runNow(this::deliver);
}
private static final int BUFFER_LENGTH = 1024;
void deliver() {
int frameSize = protonSession.session.getConnection().getTransport().getOutboundFrameSizeLimit();
// Let the Message decide how to present the message bytes
LargeBodyReader context = message.getLargeBodyReader();
try {
@ -850,7 +851,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
// TODO: it would be nice to use pooled buffer here,
// however I would need a version of ReadableBuffer for Netty
ByteBuffer buf = ByteBuffer.allocate(BUFFER_LENGTH);
ByteBuffer buf = ByteBuffer.allocate(frameSize);
for (; position < bodySize; ) {
if (!connection.flowControl(this::resume)) {
@ -860,11 +861,13 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
buf.clear();
int size = context.readInto(buf);
sender.send(buf.array(), 0, size);
connection.instantFlush();
sender.send(new ReadableBuffer.ByteBufferReader(buf));
position += size;
if (position < bodySize) {
connection.instantFlush();
}
}
} finally {
context.close();
@ -882,7 +885,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
sender.advance();
}
connection.flush();
connection.instantFlush();
synchronized (creditsLock) {
pending.decrementAndGet();