Issue #6728 - QUIC and HTTP/3

- Fixed 100 Continue handling.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-10-28 21:27:50 +02:00
parent dc5ffe1a8e
commit 44132b966d
9 changed files with 177 additions and 47 deletions

View File

@ -18,6 +18,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.DataFrame;
@ -232,7 +234,13 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, Attachable
public void onResponse(HeadersFrame frame)
{
if (validateAndUpdate(EnumSet.of(FrameState.INITIAL), FrameState.HEADER))
MetaData.Response response = (MetaData.Response)frame.getMetaData();
boolean valid;
if (response.getStatus() == HttpStatus.CONTINUE_100)
valid = validateAndUpdate(EnumSet.of(FrameState.INITIAL), FrameState.CONTINUE);
else
valid = validateAndUpdate(EnumSet.of(FrameState.INITIAL, FrameState.CONTINUE), FrameState.HEADER);
if (valid)
{
notIdle();
notifyResponse(frame);
@ -437,7 +445,7 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, Attachable
private enum FrameState
{
INITIAL, HEADER, DATA, TRAILER, FAILED
INITIAL, CONTINUE, HEADER, DATA, TRAILER, FAILED
}
private enum CloseState

View File

@ -18,6 +18,7 @@ import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.DataFrame;
@ -44,7 +45,8 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
private final MessageParser parser;
private boolean useInputDirectByteBuffers = true;
private RetainableByteBuffer buffer;
private boolean dataMode;
private boolean applicationMode;
private boolean parserDataMode;
private boolean dataDemand;
private boolean dataStalled;
private DataFrame dataFrame;
@ -76,6 +78,11 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
this.useInputDirectByteBuffers = useInputDirectByteBuffers;
}
public void setApplicationMode(boolean mode)
{
this.applicationMode = mode;
}
@Override
public void onOpen()
{
@ -94,8 +101,8 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
public void onFillable()
{
if (LOG.isDebugEnabled())
LOG.debug("processing dataMode={} on {}", dataMode, this);
if (dataMode)
LOG.debug("processing parserDataMode={} on {}", parserDataMode, this);
if (parserDataMode)
processDataFrames();
else
processNonDataFrames();
@ -104,7 +111,7 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
private void processDataFrames()
{
processDataDemand();
if (!dataMode)
if (!parserDataMode)
{
if (buffer.hasRemaining())
processNonDataFrames();
@ -134,14 +141,26 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
break;
}
if (dataMode)
if (parserDataMode)
{
if (LOG.isDebugEnabled())
LOG.debug("switching to dataMode=true on {}", this);
if (buffer.hasRemaining())
{
processDataFrames();
}
else
fillInterested();
{
if (applicationMode)
{
if (LOG.isDebugEnabled())
LOG.debug("skipping fill interest on {}", this);
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("setting fill interest on {}", this);
fillInterested();
}
}
break;
}
}
@ -185,9 +204,9 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
case MODE_SWITCH:
{
if (LOG.isDebugEnabled())
LOG.debug("switching to dataMode=false on {}", this);
LOG.debug("switching to parserDataMode=false on {}", this);
dataLast = true;
dataMode = false;
parserDataMode = false;
parser.setDataMode(false);
return null;
}
@ -294,7 +313,7 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
try
{
if (LOG.isDebugEnabled())
LOG.debug("parse+fill interest={} on {} with buffer {}", setFillInterest, this, buffer);
LOG.debug("parse+fill setFillInterest={} on {} with buffer {}", setFillInterest, this, buffer);
if (buffer == null)
buffer = buffers.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
@ -376,7 +395,7 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
@Override
public String toConnectionString()
{
return String.format("%s[demand=%b,stalled=%b,dataMode=%b]", super.toConnectionString(), hasDemand(), isStalled(), dataMode);
return String.format("%s[demand=%b,stalled=%b,parserDataMode=%b]", super.toConnectionString(), hasDemand(), isStalled(), parserDataMode);
}
private class MessageListener extends ParserListener.Wrapper
@ -390,11 +409,30 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
public void onHeaders(long streamId, HeadersFrame frame)
{
MetaData metaData = frame.getMetaData();
if (metaData.isRequest() || metaData.isResponse())
if (metaData.isRequest())
{
// Expect DATA frames now.
dataMode = true;
parserDataMode = true;
parser.setDataMode(true);
if (LOG.isDebugEnabled())
LOG.debug("switching to parserDataMode=true for request {} on {}", metaData, this);
}
else if (metaData.isResponse())
{
MetaData.Response response = (MetaData.Response)metaData;
if (response.getStatus() != HttpStatus.CONTINUE_100)
{
// Expect DATA frames now.
parserDataMode = true;
parser.setDataMode(true);
if (LOG.isDebugEnabled())
LOG.debug("switching to parserDataMode=true for response {} on {}", metaData, this);
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("staying in parserDataMode=false for response {} on {}", metaData, this);
}
}
else
{

View File

@ -13,11 +13,13 @@
package org.eclipse.jetty.http3.server.internal;
import java.io.IOException;
import java.util.function.Consumer;
import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpGenerator;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpStatus;
@ -67,6 +69,32 @@ public class HttpChannelOverHTTP3 extends HttpChannel
getRequest().getHttpInput().consumeAll();
}
@Override
public boolean isExpecting100Continue()
{
return expect100Continue;
}
@Override
public void continue100(int available) throws IOException
{
if (isExpecting100Continue())
{
expect100Continue = false;
// is content missing?
if (available == 0)
{
if (getResponse().isCommitted())
throw new IOException("Committed before 100 Continues");
boolean committed = sendResponse(HttpGenerator.CONTINUE_100_INFO, null, false);
if (!committed)
throw new IOException("Concurrent commit while trying to send 100-Continue");
}
}
}
public Runnable onRequest(HeadersFrame frame)
{
try
@ -95,17 +123,21 @@ public class HttpChannelOverHTTP3 extends HttpChannel
delayedUntilContent = getHttpConfiguration().isDelayDispatchUntilContent() &&
!endStream && !expect100Continue && !connect;
// Delay the demand of DATA frames for CONNECT with :protocol
// or for normal requests expecting 100 continue.
if (connect)
{
// Delay the demand of DATA frames for CONNECT with :protocol,
// since we want the other protocol to trigger content demand.
if (request.getProtocol() == null)
stream.demand();
}
else
{
// When the dispatch to the application is delayed, then
// demand for content, so when it arrives we can dispatch.
if (delayedUntilContent)
stream.demand();
else
connection.setApplicationMode(true);
}
if (LOG.isDebugEnabled())
@ -133,6 +165,18 @@ public class HttpChannelOverHTTP3 extends HttpChannel
}
}
@Override
protected void commit(MetaData.Response info)
{
super.commit(info);
if (LOG.isDebugEnabled())
{
LOG.debug("HTTP3 commit response #{}/{}:{}{} {} {}{}{}",
stream.getId(), Integer.toHexString(stream.getSession().hashCode()), System.lineSeparator(), info.getHttpVersion(), info.getStatus(), info.getReason(),
System.lineSeparator(), info.getFields());
}
}
public Runnable onDataAvailable()
{
boolean woken = getRequest().getHttpInput().onContentProducible();
@ -146,6 +190,10 @@ public class HttpChannelOverHTTP3 extends HttpChannel
boolean wasDelayed = delayedUntilContent;
delayedUntilContent = false;
if (wasDelayed)
connection.setApplicationMode(true);
return wasDelayed || woken ? this : null;
}
@ -169,6 +217,10 @@ public class HttpChannelOverHTTP3 extends HttpChannel
boolean wasDelayed = delayedUntilContent;
delayedUntilContent = false;
if (wasDelayed)
connection.setApplicationMode(true);
return wasDelayed || handle ? this : null;
}
@ -182,9 +234,12 @@ public class HttpChannelOverHTTP3 extends HttpChannel
return false;
}
boolean delayed = delayedUntilContent;
boolean wasDelayed = delayedUntilContent;
delayedUntilContent = false;
if (wasDelayed)
connection.setApplicationMode(true);
boolean reset = getState().isIdle();
if (reset)
consumeInput();
@ -194,7 +249,7 @@ public class HttpChannelOverHTTP3 extends HttpChannel
failure.addSuppressed(new Throwable("idle timeout"));
boolean needed = getRequest().getHttpInput().onContentProducible();
if (needed || delayed)
if (needed || wasDelayed)
{
consumer.accept(this::handleWithContext);
reset = false;
@ -308,26 +363,7 @@ public class HttpChannelOverHTTP3 extends HttpChannel
private HttpInput.Content newContent(Stream.Data data)
{
return new HttpInput.Content(data.getByteBuffer())
{
@Override
public boolean isEof()
{
return data.isLast();
}
@Override
public void succeeded()
{
data.complete();
}
@Override
public void failed(Throwable x)
{
data.complete();
}
};
return new DataContent(data);
}
@Override
@ -361,4 +397,33 @@ public class HttpChannelOverHTTP3 extends HttpChannel
return false;
}
}
private static class DataContent extends HttpInput.Content
{
private final Stream.Data data;
public DataContent(Stream.Data data)
{
super(data.getByteBuffer());
this.data = data;
}
@Override
public boolean isEof()
{
return data.isLast();
}
@Override
public void succeeded()
{
data.complete();
}
@Override
public void failed(Throwable x)
{
data.complete();
}
}
}

View File

@ -417,4 +417,6 @@ public class ClientServerTest extends AbstractClientServerTest
assertTrue(responseLatch.await(5, TimeUnit.SECONDS));
}
// TODO: write a test calling readData() from onRequest() (not from onDataAvailable()).
}

View File

@ -206,7 +206,7 @@ public abstract class QuicConnection extends AbstractConnection
{
boolean interested = isFillInterested();
if (LOG.isDebugEnabled())
LOG.debug("receiveAndProcess() interested={}", interested);
LOG.debug("receiveAndProcess() fillInterested={}", interested);
if (interested)
return null;

View File

@ -223,7 +223,7 @@ public class QuicStreamEndPoint extends AbstractEndPoint
public void onWritable()
{
if (LOG.isDebugEnabled())
LOG.debug("stream {} is writable", streamId);
LOG.debug("stream #{} is writable", streamId);
getWriteFlusher().completeWrite();
}
@ -247,7 +247,7 @@ public class QuicStreamEndPoint extends AbstractEndPoint
boolean interested = isFillInterested();
if (LOG.isDebugEnabled())
LOG.debug("stream {} is readable, processing: {}", streamId, interested);
LOG.debug("stream #{} is readable, processing: {}", streamId, interested);
if (interested)
getFillInterest().fillable();
return interested;
@ -256,6 +256,8 @@ public class QuicStreamEndPoint extends AbstractEndPoint
@Override
public void fillInterested(Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("setting fill interest on {}", this);
super.fillInterested(callback);
// TODO: see above
@ -269,6 +271,8 @@ public class QuicStreamEndPoint extends AbstractEndPoint
@Override
public boolean tryFillInterested(Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("try setting fill interest on {}", this);
boolean result = super.tryFillInterested(callback);
getQuicSession().getProtocolSession().produce();
return result;

View File

@ -331,6 +331,12 @@ public class QuicheConnection
return quicheConnection;
}
public void enableQlog(String filename, String title, String desc) throws IOException
{
if (!LibQuiche.INSTANCE.quiche_conn_set_qlog_path(quicheConn, filename, title, desc))
throw new IOException("unable to set qlog path to " + filename);
}
public List<Long> readableStreamIds()
{
return iterableStreamIds(false);

View File

@ -374,6 +374,10 @@ public interface LibQuiche extends Library
// Returns true if the given protocol version is supported.
boolean quiche_version_is_supported(uint32_t version);
// Enables qlog to the specified file path. Returns true on success.
boolean quiche_conn_set_qlog_path(quiche_conn conn, String path,
String log_title, String log_desc);
// Writes a version negotiation packet.
ssize_t quiche_negotiate_version(byte[] scid, size_t scid_len,
byte[] dcid, size_t dcid_len,

View File

@ -1,7 +1,10 @@
git clone --recursive https://github.com/cloudflare/quiche
cd quiche
git checkout -b tag-0.10.0 tags/0.10.0
cargo build --features ffi
version=0.10.0
# checkout and build quiche
git clone --recursive https://github.com/cloudflare/quiche ${version}
cd ${version}
git checkout -b tag-${version} tags/${version}
cargo build --features ffi,qlog
ls ./target/debug/libquiche.so
# create universal binary on mac: