Fixes #10679 - Review HTTP/2 rate control. (#10682)

* Bumped the rate control rate from 50 events/s to 128.
* Added rate control for all CONTINUATION frames.
* Added rate control for invalid PUSH_PROMISE frames.
* Added rate control for RST_STREAM frames.
* Added rate control for all SETTINGS frames.
* Fixed growth of header block accumulation buffer.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2023-10-09 15:19:00 +02:00 committed by GitHub
parent e6c57e2338
commit 7bd6c5211d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 179 additions and 49 deletions

View File

@ -76,16 +76,28 @@ public class ContinuationBodyParser extends BodyParser
int remaining = buffer.remaining(); int remaining = buffer.remaining();
if (remaining < length) if (remaining < length)
{ {
headerBlockFragments.storeFragment(buffer, remaining, false); ContinuationFrame frame = new ContinuationFrame(getStreamId(), false);
if (!rateControlOnEvent(frame))
return connectionFailure(buffer, ErrorCode.ENHANCE_YOUR_CALM_ERROR.code, "invalid_continuation_frame_rate");
if (!headerBlockFragments.storeFragment(buffer, remaining, false))
return connectionFailure(buffer, ErrorCode.PROTOCOL_ERROR.code, "invalid_continuation_stream");
length -= remaining; length -= remaining;
break; break;
} }
else else
{ {
boolean last = hasFlag(Flags.END_HEADERS); boolean endHeaders = hasFlag(Flags.END_HEADERS);
headerBlockFragments.storeFragment(buffer, length, last); ContinuationFrame frame = new ContinuationFrame(getStreamId(), endHeaders);
if (!rateControlOnEvent(frame))
return connectionFailure(buffer, ErrorCode.ENHANCE_YOUR_CALM_ERROR.code, "invalid_continuation_frame_rate");
if (!headerBlockFragments.storeFragment(buffer, length, endHeaders))
return connectionFailure(buffer, ErrorCode.PROTOCOL_ERROR.code, "invalid_continuation_stream");
reset(); reset();
if (last) if (endHeaders)
return onHeaders(buffer); return onHeaders(buffer);
return true; return true;
} }
@ -104,17 +116,21 @@ public class ContinuationBodyParser extends BodyParser
RetainableByteBuffer headerBlock = headerBlockFragments.complete(); RetainableByteBuffer headerBlock = headerBlockFragments.complete();
MetaData metaData = headerBlockParser.parse(headerBlock.getByteBuffer(), headerBlock.remaining()); MetaData metaData = headerBlockParser.parse(headerBlock.getByteBuffer(), headerBlock.remaining());
headerBlock.release(); headerBlock.release();
if (metaData == null) HeadersFrame frame = new HeadersFrame(getStreamId(), metaData, headerBlockFragments.getPriorityFrame(), headerBlockFragments.isEndStream());
return true; headerBlockFragments.reset();
if (metaData == HeaderBlockParser.SESSION_FAILURE) if (metaData == HeaderBlockParser.SESSION_FAILURE)
return false; return false;
HeadersFrame frame = new HeadersFrame(getStreamId(), metaData, headerBlockFragments.getPriorityFrame(), headerBlockFragments.isEndStream());
if (metaData == HeaderBlockParser.STREAM_FAILURE) if (metaData != HeaderBlockParser.STREAM_FAILURE)
{
notifyHeaders(frame);
}
else
{ {
if (!rateControlOnEvent(frame)) if (!rateControlOnEvent(frame))
return connectionFailure(buffer, ErrorCode.ENHANCE_YOUR_CALM_ERROR.code, "invalid_continuation_frame_rate"); return connectionFailure(buffer, ErrorCode.ENHANCE_YOUR_CALM_ERROR.code, "invalid_headers_frame_rate");
} }
notifyHeaders(frame);
return true; return true;
} }

View File

@ -23,22 +23,34 @@ import org.eclipse.jetty.util.BufferUtil;
public class HeaderBlockFragments public class HeaderBlockFragments
{ {
private final ByteBufferPool bufferPool; private final ByteBufferPool bufferPool;
private final int maxCapacity;
private PriorityFrame priorityFrame; private PriorityFrame priorityFrame;
private boolean endStream;
private int streamId; private int streamId;
private boolean endStream;
private RetainableByteBuffer storage; private RetainableByteBuffer storage;
public HeaderBlockFragments(ByteBufferPool bufferPool) public HeaderBlockFragments(ByteBufferPool bufferPool, int maxCapacity)
{ {
this.bufferPool = bufferPool; this.bufferPool = bufferPool;
this.maxCapacity = maxCapacity;
} }
public void storeFragment(ByteBuffer fragment, int length, boolean last) void reset()
{
priorityFrame = null;
streamId = 0;
endStream = false;
storage = null;
}
public boolean storeFragment(ByteBuffer fragment, int length, boolean last)
{ {
if (storage == null) if (storage == null)
{ {
int space = last ? length : length * 2; if (length > maxCapacity)
storage = bufferPool.acquire(space, fragment.isDirect()); return false;
int capacity = last ? length : length * 2;
storage = bufferPool.acquire(capacity, fragment.isDirect());
BufferUtil.flipToFill(storage.getByteBuffer()); BufferUtil.flipToFill(storage.getByteBuffer());
} }
@ -46,6 +58,8 @@ public class HeaderBlockFragments
if (storage.remaining() < length) if (storage.remaining() < length)
{ {
ByteBuffer byteBuffer = storage.getByteBuffer(); ByteBuffer byteBuffer = storage.getByteBuffer();
if (byteBuffer.position() + length > maxCapacity)
return false;
int space = last ? length : length * 2; int space = last ? length : length * 2;
int capacity = byteBuffer.position() + space; int capacity = byteBuffer.position() + space;
RetainableByteBuffer newStorage = bufferPool.acquire(capacity, storage.isDirect()); RetainableByteBuffer newStorage = bufferPool.acquire(capacity, storage.isDirect());
@ -61,6 +75,7 @@ public class HeaderBlockFragments
fragment.limit(fragment.position() + length); fragment.limit(fragment.position() + length);
storage.getByteBuffer().put(fragment); storage.getByteBuffer().put(fragment);
fragment.limit(limit); fragment.limit(limit);
return true;
} }
public PriorityFrame getPriorityFrame() public PriorityFrame getPriorityFrame()
@ -85,10 +100,8 @@ public class HeaderBlockFragments
public RetainableByteBuffer complete() public RetainableByteBuffer complete()
{ {
RetainableByteBuffer result = storage; storage.getByteBuffer().flip();
storage = null; return storage;
result.getByteBuffer().flip();
return result;
} }
public int getStreamId() public int getStreamId()

View File

@ -74,11 +74,18 @@ public class HeadersBodyParser extends BodyParser
onHeaders(frame); onHeaders(frame);
} }
else else
{
if (headerBlockFragments.getStreamId() != 0)
{
connectionFailure(buffer, ErrorCode.PROTOCOL_ERROR.code, "invalid_headers_frame");
}
else
{ {
headerBlockFragments.setStreamId(getStreamId()); headerBlockFragments.setStreamId(getStreamId());
headerBlockFragments.setEndStream(isEndStream()); headerBlockFragments.setEndStream(isEndStream());
} }
} }
}
@Override @Override
public boolean parse(ByteBuffer buffer) public boolean parse(ByteBuffer buffer)
@ -171,6 +178,18 @@ public class HeadersBodyParser extends BodyParser
break; break;
} }
case HEADERS: case HEADERS:
{
if (!hasFlag(Flags.END_HEADERS))
{
headerBlockFragments.setStreamId(getStreamId());
headerBlockFragments.setEndStream(isEndStream());
if (hasFlag(Flags.PRIORITY))
headerBlockFragments.setPriorityFrame(new PriorityFrame(getStreamId(), parentStreamId, weight, exclusive));
}
state = State.HEADER_BLOCK;
break;
}
case HEADER_BLOCK:
{ {
if (hasFlag(Flags.END_HEADERS)) if (hasFlag(Flags.END_HEADERS))
{ {
@ -195,7 +214,7 @@ public class HeadersBodyParser extends BodyParser
{ {
HeadersFrame frame = new HeadersFrame(getStreamId(), metaData, null, isEndStream()); HeadersFrame frame = new HeadersFrame(getStreamId(), metaData, null, isEndStream());
if (!rateControlOnEvent(frame)) if (!rateControlOnEvent(frame))
connectionFailure(buffer, ErrorCode.ENHANCE_YOUR_CALM_ERROR.code, "invalid_headers_frame_rate"); return connectionFailure(buffer, ErrorCode.ENHANCE_YOUR_CALM_ERROR.code, "invalid_headers_frame_rate");
} }
} }
} }
@ -204,16 +223,14 @@ public class HeadersBodyParser extends BodyParser
int remaining = buffer.remaining(); int remaining = buffer.remaining();
if (remaining < length) if (remaining < length)
{ {
headerBlockFragments.storeFragment(buffer, remaining, false); if (!headerBlockFragments.storeFragment(buffer, remaining, false))
return connectionFailure(buffer, ErrorCode.PROTOCOL_ERROR.code, "invalid_headers_frame");
length -= remaining; length -= remaining;
} }
else else
{ {
headerBlockFragments.setStreamId(getStreamId()); if (!headerBlockFragments.storeFragment(buffer, length, false))
headerBlockFragments.setEndStream(isEndStream()); return connectionFailure(buffer, ErrorCode.PROTOCOL_ERROR.code, "invalid_headers_frame");
if (hasFlag(Flags.PRIORITY))
headerBlockFragments.setPriorityFrame(new PriorityFrame(getStreamId(), parentStreamId, weight, exclusive));
headerBlockFragments.storeFragment(buffer, length, false);
state = State.PADDING; state = State.PADDING;
loop = paddingLength == 0; loop = paddingLength == 0;
} }
@ -257,6 +274,6 @@ public class HeadersBodyParser extends BodyParser
private enum State private enum State
{ {
PREPARE, PADDING_LENGTH, EXCLUSIVE, PARENT_STREAM_ID, PARENT_STREAM_ID_BYTES, WEIGHT, HEADERS, PADDING PREPARE, PADDING_LENGTH, EXCLUSIVE, PARENT_STREAM_ID, PARENT_STREAM_ID_BYTES, WEIGHT, HEADERS, HEADER_BLOCK, PADDING
} }
} }

View File

@ -74,7 +74,7 @@ public class Parser
this.listener = listener; this.listener = listener;
unknownBodyParser = new UnknownBodyParser(headerParser, listener); unknownBodyParser = new UnknownBodyParser(headerParser, listener);
HeaderBlockParser headerBlockParser = new HeaderBlockParser(headerParser, bufferPool, hpackDecoder, unknownBodyParser); HeaderBlockParser headerBlockParser = new HeaderBlockParser(headerParser, bufferPool, hpackDecoder, unknownBodyParser);
HeaderBlockFragments headerBlockFragments = new HeaderBlockFragments(bufferPool); HeaderBlockFragments headerBlockFragments = new HeaderBlockFragments(bufferPool, hpackDecoder.getMaxHeaderListSize());
bodyParsers[FrameType.DATA.getType()] = new DataBodyParser(headerParser, listener); bodyParsers[FrameType.DATA.getType()] = new DataBodyParser(headerParser, listener);
bodyParsers[FrameType.HEADERS.getType()] = new HeadersBodyParser(headerParser, listener, headerBlockParser, headerBlockFragments); bodyParsers[FrameType.HEADERS.getType()] = new HeadersBodyParser(headerParser, listener, headerBlockParser, headerBlockFragments);
bodyParsers[FrameType.PRIORITY.getType()] = new PriorityBodyParser(headerParser, listener); bodyParsers[FrameType.PRIORITY.getType()] = new PriorityBodyParser(headerParser, listener);

View File

@ -18,6 +18,7 @@ import java.nio.ByteBuffer;
import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCode; import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.Flags; import org.eclipse.jetty.http2.Flags;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PushPromiseFrame; import org.eclipse.jetty.http2.frames.PushPromiseFrame;
public class PushPromiseBodyParser extends BodyParser public class PushPromiseBodyParser extends BodyParser
@ -65,13 +66,9 @@ public class PushPromiseBodyParser extends BodyParser
length = getBodyLength(); length = getBodyLength();
if (isPadding()) if (isPadding())
{
state = State.PADDING_LENGTH; state = State.PADDING_LENGTH;
}
else else
{
state = State.STREAM_ID; state = State.STREAM_ID;
}
break; break;
} }
case PADDING_LENGTH: case PADDING_LENGTH:
@ -131,8 +128,16 @@ public class PushPromiseBodyParser extends BodyParser
state = State.PADDING; state = State.PADDING;
loop = paddingLength == 0; loop = paddingLength == 0;
if (metaData != HeaderBlockParser.STREAM_FAILURE) if (metaData != HeaderBlockParser.STREAM_FAILURE)
{
onPushPromise(streamId, metaData); onPushPromise(streamId, metaData);
} }
else
{
HeadersFrame frame = new HeadersFrame(getStreamId(), metaData, null, isEndStream());
if (!rateControlOnEvent(frame))
return connectionFailure(buffer, ErrorCode.ENHANCE_YOUR_CALM_ERROR.code, "invalid_headers_frame_rate");
}
}
break; break;
} }
case PADDING: case PADDING:

View File

@ -58,7 +58,7 @@ public class ResetBodyParser extends BodyParser
{ {
if (buffer.remaining() >= 4) if (buffer.remaining() >= 4)
{ {
return onReset(buffer.getInt()); return onReset(buffer, buffer.getInt());
} }
else else
{ {
@ -73,7 +73,7 @@ public class ResetBodyParser extends BodyParser
--cursor; --cursor;
error += currByte << (8 * cursor); error += currByte << (8 * cursor);
if (cursor == 0) if (cursor == 0)
return onReset(error); return onReset(buffer, error);
break; break;
} }
default: default:
@ -85,9 +85,11 @@ public class ResetBodyParser extends BodyParser
return false; return false;
} }
private boolean onReset(int error) private boolean onReset(ByteBuffer buffer, int error)
{ {
ResetFrame frame = new ResetFrame(getStreamId(), error); ResetFrame frame = new ResetFrame(getStreamId(), error);
if (!rateControlOnEvent(frame))
return connectionFailure(buffer, ErrorCode.ENHANCE_YOUR_CALM_ERROR.code, "invalid_rst_stream_frame_rate");
reset(); reset();
notifyReset(frame); notifyReset(frame);
return true; return true;

View File

@ -73,10 +73,7 @@ public class SettingsBodyParser extends BodyParser
return; return;
boolean isReply = hasFlag(Flags.ACK); boolean isReply = hasFlag(Flags.ACK);
SettingsFrame frame = new SettingsFrame(Collections.emptyMap(), isReply); SettingsFrame frame = new SettingsFrame(Collections.emptyMap(), isReply);
if (!isReply && !rateControlOnEvent(frame)) onSettings(buffer, frame);
connectionFailure(buffer, ErrorCode.ENHANCE_YOUR_CALM_ERROR.code, "invalid_settings_frame_rate");
else
onSettings(frame);
} }
private boolean validateFrame(ByteBuffer buffer, int streamId, int bodyLength) private boolean validateFrame(ByteBuffer buffer, int streamId, int bodyLength)
@ -219,11 +216,13 @@ public class SettingsBodyParser extends BodyParser
return connectionFailure(buffer, ErrorCode.PROTOCOL_ERROR.code, "invalid_settings_max_frame_size"); return connectionFailure(buffer, ErrorCode.PROTOCOL_ERROR.code, "invalid_settings_max_frame_size");
SettingsFrame frame = new SettingsFrame(settings, hasFlag(Flags.ACK)); SettingsFrame frame = new SettingsFrame(settings, hasFlag(Flags.ACK));
return onSettings(frame); return onSettings(buffer, frame);
} }
private boolean onSettings(SettingsFrame frame) private boolean onSettings(ByteBuffer buffer, SettingsFrame frame)
{ {
if (!rateControlOnEvent(frame))
return connectionFailure(buffer, ErrorCode.ENHANCE_YOUR_CALM_ERROR.code, "invalid_settings_frame_rate");
reset(); reset();
notifySettings(frame); notifySettings(frame);
return true; return true;

View File

@ -35,7 +35,6 @@ public class UnknownBodyParser extends BodyParser
boolean parsed = cursor == 0; boolean parsed = cursor == 0;
if (parsed && !rateControlOnEvent(new UnknownFrame(getFrameType()))) if (parsed && !rateControlOnEvent(new UnknownFrame(getFrameType())))
return connectionFailure(buffer, ErrorCode.ENHANCE_YOUR_CALM_ERROR.code, "invalid_unknown_frame_rate"); return connectionFailure(buffer, ErrorCode.ENHANCE_YOUR_CALM_ERROR.code, "invalid_unknown_frame_rate");
return parsed; return parsed;
} }

View File

@ -16,6 +16,7 @@ package org.eclipse.jetty.http2.frames;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.http.HostPortHttpField; import org.eclipse.jetty.http.HostPortHttpField;
import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.http.HttpField;
@ -32,6 +33,8 @@ import org.eclipse.jetty.io.ArrayByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
@ -41,7 +44,7 @@ public class ContinuationParseTest
@Test @Test
public void testParseOneByteAtATime() throws Exception public void testParseOneByteAtATime() throws Exception
{ {
ByteBufferPool bufferPool = new ArrayByteBufferPool(); ArrayByteBufferPool.Tracking bufferPool = new ArrayByteBufferPool.Tracking();
HeadersGenerator generator = new HeadersGenerator(new HeaderGenerator(bufferPool), new HpackEncoder()); HeadersGenerator generator = new HeadersGenerator(new HeaderGenerator(bufferPool), new HpackEncoder());
final List<HeadersFrame> frames = new ArrayList<>(); final List<HeadersFrame> frames = new ArrayList<>();
@ -137,6 +140,7 @@ public class ContinuationParseTest
parser.parse(ByteBuffer.wrap(new byte[]{buffer.get()})); parser.parse(ByteBuffer.wrap(new byte[]{buffer.get()}));
} }
} }
accumulator.release();
assertEquals(1, frames.size()); assertEquals(1, frames.size());
HeadersFrame frame = frames.get(0); HeadersFrame frame = frames.get(0);
@ -152,6 +156,56 @@ public class ContinuationParseTest
} }
PriorityFrame priority = frame.getPriority(); PriorityFrame priority = frame.getPriority();
assertNull(priority); assertNull(priority);
assertEquals(0, bufferPool.getLeaks().size(), bufferPool.dumpLeaks());
} }
} }
@Test
public void testLargeHeadersBlock() throws Exception
{
// Use a ByteBufferPool with a small factor, so that the accumulation buffer is not too large.
ByteBufferPool bufferPool = new ArrayByteBufferPool(0, 128, -1);
// A small max headers size, used for both accumulation and decoding.
int maxHeadersSize = 512;
Parser parser = new Parser(bufferPool, maxHeadersSize);
// Specify headers block size to generate CONTINUATION frames.
int maxHeadersBlockFragment = 128;
HeadersGenerator generator = new HeadersGenerator(new HeaderGenerator(bufferPool), new HpackEncoder(), maxHeadersBlockFragment);
int streamId = 13;
HttpFields fields = HttpFields.build()
.put("Accept", "text/html")
// Large header that generates a large headers block.
.put("User-Agent", "Jetty".repeat(256));
MetaData.Request metaData = new MetaData.Request("GET", HttpScheme.HTTP.asString(), new HostPortHttpField("localhost:8080"), "/path", HttpVersion.HTTP_2, fields, -1);
ByteBufferPool.Accumulator accumulator = new ByteBufferPool.Accumulator();
generator.generateHeaders(accumulator, streamId, metaData, null, true);
List<ByteBuffer> byteBuffers = accumulator.getByteBuffers();
assertThat(byteBuffers.stream().mapToInt(ByteBuffer::remaining).sum(), greaterThan(maxHeadersSize));
AtomicBoolean failed = new AtomicBoolean();
parser.init(new Parser.Listener()
{
@Override
public void onConnectionFailure(int error, String reason)
{
failed.set(true);
}
});
// Set a large max headers size for decoding, to ensure
// the failure is due to accumulation, not decoding.
parser.getHpackDecoder().setMaxHeaderListSize(10 * maxHeadersSize);
for (ByteBuffer byteBuffer : byteBuffers)
{
parser.parse(byteBuffer);
if (failed.get())
break;
}
accumulator.release();
assertTrue(failed.get());
}
} }

View File

@ -96,12 +96,20 @@ public class FrameFloodTest
} }
@Test @Test
public void testSettingsFrameFlood() public void testEmptySettingsFrameFlood()
{ {
byte[] payload = new byte[0]; byte[] payload = new byte[0];
testFrameFlood(null, frameFrom(payload.length, FrameType.SETTINGS.getType(), 0, 0, payload)); testFrameFlood(null, frameFrom(payload.length, FrameType.SETTINGS.getType(), 0, 0, payload));
} }
@Test
public void testSettingsFrameFlood()
{
// | Key0 | Key1 | Value0 | Value1 | Value2 | Value3 |
byte[] payload = new byte[]{0, 8, 0, 0, 0, 1};
testFrameFlood(null, frameFrom(payload.length, FrameType.SETTINGS.getType(), 0, 0, payload));
}
@Test @Test
public void testPingFrameFlood() public void testPingFrameFlood()
{ {
@ -110,7 +118,7 @@ public class FrameFloodTest
} }
@Test @Test
public void testContinuationFrameFlood() public void testEmptyContinuationFrameFlood()
{ {
int streamId = 13; int streamId = 13;
byte[] headersPayload = new byte[0]; byte[] headersPayload = new byte[0];
@ -119,6 +127,23 @@ public class FrameFloodTest
testFrameFlood(headersBytes, frameFrom(continuationPayload.length, FrameType.CONTINUATION.getType(), 0, streamId, continuationPayload)); testFrameFlood(headersBytes, frameFrom(continuationPayload.length, FrameType.CONTINUATION.getType(), 0, streamId, continuationPayload));
} }
@Test
public void testContinuationFrameFlood()
{
int streamId = 13;
byte[] headersPayload = new byte[0];
byte[] headersBytes = frameFrom(headersPayload.length, FrameType.HEADERS.getType(), 0, streamId, headersPayload);
byte[] continuationPayload = new byte[1];
testFrameFlood(headersBytes, frameFrom(continuationPayload.length, FrameType.CONTINUATION.getType(), 0, streamId, continuationPayload));
}
@Test
public void testResetStreamFrameFlood()
{
byte[] payload = {0, 0, 0, 0};
testFrameFlood(null, frameFrom(payload.length, FrameType.RST_STREAM.getType(), 0, 13, payload));
}
@Test @Test
public void testUnknownFrameFlood() public void testUnknownFrameFlood()
{ {

View File

@ -73,7 +73,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
private int maxFrameSize = Frame.DEFAULT_MAX_SIZE; private int maxFrameSize = Frame.DEFAULT_MAX_SIZE;
private int maxSettingsKeys = SettingsFrame.DEFAULT_MAX_KEYS; private int maxSettingsKeys = SettingsFrame.DEFAULT_MAX_KEYS;
private boolean connectProtocolEnabled = true; private boolean connectProtocolEnabled = true;
private RateControl.Factory rateControlFactory = new WindowRateControl.Factory(50); private RateControl.Factory rateControlFactory = new WindowRateControl.Factory(128);
private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F); private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F);
private long streamIdleTimeout; private long streamIdleTimeout;
private boolean useInputDirectByteBuffers; private boolean useInputDirectByteBuffers;