WIP: reworking H3 channel impl + workaround quiche bug by using CUBIC congestion control
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
parent
4c66c02176
commit
336f39c73e
|
@ -132,64 +132,18 @@ public class HttpChannelOverHTTP3 extends HttpChannel
|
|||
|
||||
public Runnable onDataAvailable()
|
||||
{
|
||||
Stream.Data data = stream.readData();
|
||||
if (data == null)
|
||||
{
|
||||
stream.demand();
|
||||
return null;
|
||||
}
|
||||
|
||||
ByteBuffer buffer = data.getByteBuffer();
|
||||
int length = buffer.remaining();
|
||||
HttpInput.Content content = new HttpInput.Content(buffer)
|
||||
{
|
||||
@Override
|
||||
public boolean isEof()
|
||||
{
|
||||
return data.isLast();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
data.complete();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
data.complete();
|
||||
}
|
||||
};
|
||||
|
||||
this.content = content;
|
||||
boolean handle = onContent(content);
|
||||
|
||||
boolean isLast = data.isLast();
|
||||
if (isLast)
|
||||
{
|
||||
boolean handleContent = onContentComplete();
|
||||
// This will generate EOF -> must happen before onContentProducible.
|
||||
boolean handleRequest = onRequestComplete();
|
||||
handle |= handleContent | handleRequest;
|
||||
}
|
||||
|
||||
boolean woken = getRequest().getHttpInput().onContentProducible();
|
||||
handle |= woken;
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("HTTP3 Request #{}/{}: {} bytes of {} content, woken: {}, handle: {}",
|
||||
LOG.debug("HTTP3 Request #{}/{} woken: {}",
|
||||
stream.getId(),
|
||||
Integer.toHexString(stream.getSession().hashCode()),
|
||||
length,
|
||||
isLast ? "last" : "some",
|
||||
woken,
|
||||
handle);
|
||||
woken);
|
||||
}
|
||||
|
||||
boolean wasDelayed = delayedUntilContent;
|
||||
delayedUntilContent = false;
|
||||
return handle || wasDelayed ? this : null;
|
||||
return wasDelayed ? this : null;
|
||||
}
|
||||
|
||||
public Runnable onTrailer(HeadersFrame frame)
|
||||
|
@ -266,31 +220,57 @@ public class HttpChannelOverHTTP3 extends HttpChannel
|
|||
if (content != null)
|
||||
return true;
|
||||
|
||||
MessageParser.Result result = connection.parseAndFill();
|
||||
if (result == MessageParser.Result.FRAME)
|
||||
{
|
||||
DataFrame dataFrame = connection.pollContent();
|
||||
content = new HttpInput.Content(dataFrame.getByteBuffer())
|
||||
{
|
||||
@Override
|
||||
public boolean isEof()
|
||||
{
|
||||
return dataFrame.isLast();
|
||||
}
|
||||
};
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
stream.demand();
|
||||
return false;
|
||||
}
|
||||
stream.demand();
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpInput.Content produceContent()
|
||||
{
|
||||
HttpInput.Content result = content;
|
||||
if (content != null)
|
||||
{
|
||||
HttpInput.Content result = content;
|
||||
if (!content.isSpecial())
|
||||
content = null;
|
||||
return result;
|
||||
}
|
||||
|
||||
Stream.Data data = stream.readData();
|
||||
if (data == null)
|
||||
return null;
|
||||
|
||||
content = new HttpInput.Content(data.getByteBuffer())
|
||||
{
|
||||
@Override
|
||||
public boolean isEof()
|
||||
{
|
||||
return data.isLast();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
data.complete();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
data.complete();
|
||||
}
|
||||
};
|
||||
boolean handle = onContent(content);
|
||||
|
||||
boolean isLast = data.isLast();
|
||||
if (isLast)
|
||||
{
|
||||
boolean handleContent = onContentComplete();
|
||||
// This will generate EOF -> must happen before onContentProducible.
|
||||
boolean handleRequest = onRequestComplete();
|
||||
handle |= handleContent | handleRequest;
|
||||
}
|
||||
|
||||
HttpInput.Content result = this.content;
|
||||
if (result != null && !result.isSpecial())
|
||||
content = result.isEof() ? new HttpInput.EofContent() : null;
|
||||
return result;
|
||||
|
|
|
@ -137,16 +137,16 @@ public class HandlerClientServerTest extends AbstractClientServerTest
|
|||
stream.demand();
|
||||
}
|
||||
})
|
||||
.get(5, TimeUnit.SECONDS);
|
||||
.get(555, TimeUnit.SECONDS);
|
||||
|
||||
byte[] bytes = new byte[16 * 1024 * 1024];
|
||||
new Random().nextBytes(bytes);
|
||||
stream.data(new DataFrame(ByteBuffer.wrap(bytes, 0, bytes.length / 2), false))
|
||||
.thenCompose(s -> s.data(new DataFrame(ByteBuffer.wrap(bytes, bytes.length / 2, bytes.length), true)))
|
||||
.thenCompose(s -> s.data(new DataFrame(ByteBuffer.wrap(bytes, bytes.length / 2, bytes.length / 2), true)))
|
||||
.get(555, TimeUnit.SECONDS);
|
||||
|
||||
assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(clientResponseLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(serverLatch.await(555, TimeUnit.SECONDS));
|
||||
assertTrue(clientResponseLatch.await(555, TimeUnit.SECONDS));
|
||||
|
||||
int sum = clientReceivedBuffers.stream().mapToInt(Buffer::remaining).sum();
|
||||
assertThat(sum, is(bytes.length));
|
||||
|
|
|
@ -90,7 +90,7 @@ public class ClientQuicConnection extends QuicConnection
|
|||
quicheConfig.setInitialMaxStreamDataUni(10_000_000L);
|
||||
quicheConfig.setInitialMaxStreamsUni(100L);
|
||||
quicheConfig.setInitialMaxStreamsBidi(100L);
|
||||
quicheConfig.setCongestionControl(QuicheConfig.CongestionControl.RENO);
|
||||
quicheConfig.setCongestionControl(QuicheConfig.CongestionControl.CUBIC);
|
||||
|
||||
InetSocketAddress remoteAddress = (InetSocketAddress)context.get(ClientConnector.REMOTE_SOCKET_ADDRESS_CONTEXT_KEY);
|
||||
|
||||
|
|
|
@ -63,7 +63,7 @@ public class LowLevelQuicheTest
|
|||
clientQuicheConfig.setInitialMaxStreamDataUni(10_000_000L);
|
||||
clientQuicheConfig.setInitialMaxStreamsUni(100L);
|
||||
clientQuicheConfig.setInitialMaxStreamsBidi(100L);
|
||||
clientQuicheConfig.setCongestionControl(QuicheConfig.CongestionControl.RENO);
|
||||
clientQuicheConfig.setCongestionControl(QuicheConfig.CongestionControl.CUBIC);
|
||||
|
||||
SSLKeyPair serverKeyPair = new SSLKeyPair(Paths.get(Objects.requireNonNull(getClass().getResource("/keystore.p12")).toURI()).toFile(), "PKCS12", "storepwd".toCharArray(), "mykey", "storepwd".toCharArray());
|
||||
File[] pemFiles = serverKeyPair.export(new File(System.getProperty("java.io.tmpdir")));
|
||||
|
@ -79,7 +79,7 @@ public class LowLevelQuicheTest
|
|||
serverQuicheConfig.setInitialMaxStreamDataUni(10_000_000L);
|
||||
serverQuicheConfig.setInitialMaxStreamsUni(100L);
|
||||
serverQuicheConfig.setInitialMaxStreamsBidi(100L);
|
||||
serverQuicheConfig.setCongestionControl(QuicheConfig.CongestionControl.RENO);
|
||||
serverQuicheConfig.setCongestionControl(QuicheConfig.CongestionControl.CUBIC);
|
||||
|
||||
tokenMinter = new TestTokenMinter();
|
||||
tokenValidator = new TestTokenValidator();
|
||||
|
|
|
@ -153,7 +153,7 @@ public class QuicServerConnector extends AbstractNetworkConnector
|
|||
quicheConfig.setInitialMaxStreamDataUni(10000000L);
|
||||
quicheConfig.setInitialMaxStreamsUni(100L);
|
||||
quicheConfig.setInitialMaxStreamsBidi(100L);
|
||||
quicheConfig.setCongestionControl(QuicheConfig.CongestionControl.RENO);
|
||||
quicheConfig.setCongestionControl(QuicheConfig.CongestionControl.CUBIC);
|
||||
List<String> protocols = getProtocols();
|
||||
// This is only needed for Quiche example clients.
|
||||
protocols.add(0, "http/0.9");
|
||||
|
|
Loading…
Reference in New Issue