From 06ffd014f940ab4f9da95f96fcb868ab38a6c406 Mon Sep 17 00:00:00 2001
From: Simone Bordet
Date: Wed, 29 Sep 2021 22:02:27 +0200
Subject: [PATCH] Issue #6728 - QUIC and HTTP/3
- Implemented support for GREASE for unidirectional and bidirectional streams.
- Implemented UnknownBodyParser.
- Strengthened validation of frames in control and message streams.
Signed-off-by: Simone Bordet
---
.../org/eclipse/jetty/http3/api/Stream.java | 8 +-
.../eclipse/jetty/http3/frames/FrameType.java | 21 +++-
.../internal/ControlStreamConnection.java | 2 +-
.../internal/DecoderStreamConnection.java | 2 +-
.../internal/EncoderStreamConnection.java | 2 +-
.../jetty/http3/internal/ErrorCode.java | 10 ++
.../http3/internal/HTTP3StreamConnection.java | 13 +++
.../http3/internal/InstructionFlusher.java | 4 +-
.../UnidirectionalStreamConnection.java | 83 ++++++++------
.../http3/internal/parser/ControlParser.java | 22 ++--
.../http3/internal/parser/HeaderParser.java | 6 +-
.../http3/internal/parser/MessageParser.java | 27 +++--
.../internal/parser/UnknownBodyParser.java | 18 ++-
.../jetty/http3/tests/ExternalServerTest.java | 103 ++++++++++++++++++
.../http3/tests/HTTP3UnexpectedFrameTest.java | 2 +-
.../jetty/quic/common/QuicSession.java | 37 +++++--
.../jetty/quic/common/QuicStreamEndPoint.java | 5 +
.../eclipse/jetty/quic/common/StreamType.java | 6 +
.../jetty/quic/quiche/QuicheConnection.java | 9 +-
19 files changed, 300 insertions(+), 80 deletions(-)
create mode 100644 jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/ExternalServerTest.java
diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Stream.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Stream.java
index 1727eaec4e1..3d17ab0cfc0 100644
--- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Stream.java
+++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Stream.java
@@ -239,7 +239,9 @@ public interface Stream
}
/**
- * @return the {@link ByteBuffer} containing the bytes
+ * @return the {@link ByteBuffer} containing the data bytes
+ *
+ * @see #complete()
*/
public ByteBuffer getByteBuffer()
{
@@ -257,7 +259,9 @@ public interface Stream
/**
* The method that applications must invoke to
- * signal that the bytes have been processed.
+ * signal that the data bytes have been processed.
+ *
+ * @see #getByteBuffer()
*/
public void complete()
{
diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/frames/FrameType.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/frames/FrameType.java
index 24ce40ea8bf..cfddf94e1ca 100644
--- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/frames/FrameType.java
+++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/frames/FrameType.java
@@ -26,11 +26,26 @@ public enum FrameType
GOAWAY(0x7),
MAX_PUSH_ID(0xD);
- public static FrameType from(int type)
+ public static FrameType from(long type)
{
return Types.types.get(type);
}
+ public static boolean isControl(long frameType)
+ {
+ return frameType == CANCEL_PUSH.type() ||
+ frameType == SETTINGS.type() ||
+ frameType == GOAWAY.type() ||
+ frameType == MAX_PUSH_ID.type();
+ }
+
+ public static boolean isMessage(long frameType)
+ {
+ return frameType == DATA.type() ||
+ frameType == HEADERS.type() ||
+ frameType == PUSH_PROMISE.type();
+ }
+
public static int maxType()
{
return MAX_PUSH_ID.type();
@@ -41,7 +56,7 @@ public enum FrameType
private FrameType(int type)
{
this.type = type;
- Types.types.put(type, this);
+ Types.types.put((long)type, this);
}
public int type()
@@ -51,6 +66,6 @@ public enum FrameType
private static class Types
{
- private static final Map types = new HashMap<>();
+ private static final Map types = new HashMap<>();
}
}
diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/ControlStreamConnection.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/ControlStreamConnection.java
index df8dd17f7ed..ae55144fdcc 100644
--- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/ControlStreamConnection.java
+++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/ControlStreamConnection.java
@@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory;
public class ControlStreamConnection extends AbstractConnection implements Connection.UpgradeTo
{
// SPEC: Control Stream Type.
- public static final int STREAM_TYPE = 0x00;
+ public static final long STREAM_TYPE = 0x00;
private static final Logger LOG = LoggerFactory.getLogger(ControlStreamConnection.class);
private final ByteBufferPool byteBufferPool;
diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/DecoderStreamConnection.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/DecoderStreamConnection.java
index 2675008e9a6..09617eb4fd4 100644
--- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/DecoderStreamConnection.java
+++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/DecoderStreamConnection.java
@@ -24,7 +24,7 @@ import org.eclipse.jetty.io.EndPoint;
public class DecoderStreamConnection extends InstructionStreamConnection
{
// SPEC: QPACK Encoder Stream Type.
- public static final int STREAM_TYPE = 0x03;
+ public static final long STREAM_TYPE = 0x03;
private final QpackEncoder encoder;
diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/EncoderStreamConnection.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/EncoderStreamConnection.java
index 34b1c0adf61..ca8ddc7d327 100644
--- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/EncoderStreamConnection.java
+++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/EncoderStreamConnection.java
@@ -24,7 +24,7 @@ import org.eclipse.jetty.io.EndPoint;
public class EncoderStreamConnection extends InstructionStreamConnection
{
// SPEC: QPACK Encoder Stream Type.
- public static final int STREAM_TYPE = 0x02;
+ public static final long STREAM_TYPE = 0x02;
private final QpackDecoder decoder;
diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/ErrorCode.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/ErrorCode.java
index 9148b6a3078..a4f441b9b8e 100644
--- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/ErrorCode.java
+++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/ErrorCode.java
@@ -13,6 +13,8 @@
package org.eclipse.jetty.http3.internal;
+import java.util.concurrent.ThreadLocalRandom;
+
public enum ErrorCode
{
NO_ERROR(0x100),
@@ -40,6 +42,14 @@ public enum ErrorCode
this.code = code;
}
+ public static long randomReservedCode()
+ {
+ // SPEC: reserved errors have the form 0x1F * n + 0x21.
+ // This constant avoids to overflow VarLenInt, which is how an error code is encoded.
+ long n = ThreadLocalRandom.current().nextLong(0x210842108421084L);
+ return 0x1F * n + 0x21;
+ }
+
public int code()
{
return code;
diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3StreamConnection.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3StreamConnection.java
index 02e76200c39..9ea348f63b7 100644
--- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3StreamConnection.java
+++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3StreamConnection.java
@@ -111,6 +111,19 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
{
if (parseAndFill() == MessageParser.Result.NO_FRAME)
break;
+
+ // TODO: we should also exit if the connection was closed due to errors.
+ // There is not yet a isClosed() primitive though.
+ if (remotelyClosed)
+ {
+ // We have detected the end of the stream,
+ // do not loop around to fill & parse again.
+ // However, the last frame may have
+ // caused a write that we need to flush.
+ getEndPoint().getQuicSession().flush();
+ break;
+ }
+
if (dataMode)
{
if (LOG.isDebugEnabled())
diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/InstructionFlusher.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/InstructionFlusher.java
index c14924b7545..24dda201668 100644
--- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/InstructionFlusher.java
+++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/InstructionFlusher.java
@@ -40,10 +40,10 @@ public class InstructionFlusher extends IteratingCallback
private final Queue queue = new ArrayDeque<>();
private final ByteBufferPool.Lease lease;
private final QuicStreamEndPoint endPoint;
- private final int streamType;
+ private final long streamType;
private boolean initialized;
- public InstructionFlusher(QuicSession session, QuicStreamEndPoint endPoint, int streamType)
+ public InstructionFlusher(QuicSession session, QuicStreamEndPoint endPoint, long streamType)
{
this.lease = new ByteBufferPool.Lease(session.getByteBufferPool());
this.endPoint = endPoint;
diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/UnidirectionalStreamConnection.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/UnidirectionalStreamConnection.java
index 61ca2cd9476..108a4c8fb60 100644
--- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/UnidirectionalStreamConnection.java
+++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/UnidirectionalStreamConnection.java
@@ -23,7 +23,9 @@ import org.eclipse.jetty.http3.qpack.QpackEncoder;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
-import org.eclipse.jetty.io.EndPoint;
+import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
+import org.eclipse.jetty.quic.common.StreamType;
+import org.eclipse.jetty.util.BufferUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,7 +41,7 @@ public class UnidirectionalStreamConnection extends AbstractConnection implement
private boolean useInputDirectByteBuffers = true;
private ByteBuffer buffer;
- public UnidirectionalStreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, QpackEncoder encoder, QpackDecoder decoder, ParserListener listener)
+ public UnidirectionalStreamConnection(QuicStreamEndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, QpackEncoder encoder, QpackDecoder decoder, ParserListener listener)
{
super(endPoint, executor);
this.byteBufferPool = byteBufferPool;
@@ -48,6 +50,12 @@ public class UnidirectionalStreamConnection extends AbstractConnection implement
this.listener = listener;
}
+ @Override
+ public QuicStreamEndPoint getEndPoint()
+ {
+ return (QuicStreamEndPoint)super.getEndPoint();
+ }
+
public boolean isUseInputDirectByteBuffers()
{
return useInputDirectByteBuffers;
@@ -89,11 +97,11 @@ public class UnidirectionalStreamConnection extends AbstractConnection implement
{
int filled = getEndPoint().fill(buffer);
if (LOG.isDebugEnabled())
- LOG.debug("filled {} on {}", filled, this);
+ LOG.debug("filled {} on {}: {}", filled, this, BufferUtil.toDetailString(buffer));
if (filled > 0)
{
- if (parser.parseInt(buffer, this::detectAndUpgrade))
+ if (parser.parseLong(buffer, this::detectAndUpgrade))
break;
}
else if (filled == 0)
@@ -121,44 +129,49 @@ public class UnidirectionalStreamConnection extends AbstractConnection implement
}
}
- private void detectAndUpgrade(int streamType)
+ private void detectAndUpgrade(long streamType)
{
- switch (streamType)
+ if (streamType == ControlStreamConnection.STREAM_TYPE)
{
- case ControlStreamConnection.STREAM_TYPE:
+ ControlParser parser = new ControlParser(listener);
+ ControlStreamConnection newConnection = new ControlStreamConnection(getEndPoint(), getExecutor(), byteBufferPool, parser);
+ newConnection.setInputBufferSize(getInputBufferSize());
+ newConnection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers());
+ if (LOG.isDebugEnabled())
+ LOG.debug("upgrading to {}", newConnection);
+ getEndPoint().upgrade(newConnection);
+ }
+ else if (streamType == EncoderStreamConnection.STREAM_TYPE)
+ {
+ EncoderStreamConnection newConnection = new EncoderStreamConnection(getEndPoint(), getExecutor(), byteBufferPool, decoder);
+ newConnection.setInputBufferSize(getInputBufferSize());
+ newConnection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers());
+ if (LOG.isDebugEnabled())
+ LOG.debug("upgrading to {}", newConnection);
+ getEndPoint().upgrade(newConnection);
+ }
+ else if (streamType == DecoderStreamConnection.STREAM_TYPE)
+ {
+ DecoderStreamConnection newConnection = new DecoderStreamConnection(getEndPoint(), getExecutor(), byteBufferPool, encoder);
+ newConnection.setInputBufferSize(getInputBufferSize());
+ newConnection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers());
+ if (LOG.isDebugEnabled())
+ LOG.debug("upgrading to {}", newConnection);
+ getEndPoint().upgrade(newConnection);
+ }
+ else
+ {
+ if (StreamType.isReserved(streamType))
{
- ControlParser parser = new ControlParser(listener);
- ControlStreamConnection newConnection = new ControlStreamConnection(getEndPoint(), getExecutor(), byteBufferPool, parser);
- newConnection.setInputBufferSize(getInputBufferSize());
- newConnection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers());
if (LOG.isDebugEnabled())
- LOG.debug("upgrading to {}", newConnection);
- getEndPoint().upgrade(newConnection);
- break;
+ LOG.debug("reserved stream type {}, resetting on {}", Long.toHexString(streamType), this);
+ getEndPoint().reset(ErrorCode.randomReservedCode());
}
- case EncoderStreamConnection.STREAM_TYPE:
+ else
{
- EncoderStreamConnection newConnection = new EncoderStreamConnection(getEndPoint(), getExecutor(), byteBufferPool, decoder);
- newConnection.setInputBufferSize(getInputBufferSize());
- newConnection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers());
if (LOG.isDebugEnabled())
- LOG.debug("upgrading to {}", newConnection);
- getEndPoint().upgrade(newConnection);
- break;
- }
- case DecoderStreamConnection.STREAM_TYPE:
- {
- DecoderStreamConnection newConnection = new DecoderStreamConnection(getEndPoint(), getExecutor(), byteBufferPool, encoder);
- newConnection.setInputBufferSize(getInputBufferSize());
- newConnection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers());
- if (LOG.isDebugEnabled())
- LOG.debug("upgrading to {}", newConnection);
- getEndPoint().upgrade(newConnection);
- break;
- }
- default:
- {
- throw new IllegalStateException("unexpected stream type " + Integer.toHexString(streamType));
+ LOG.debug("unsupported stream type {}, resetting on {}", Long.toHexString(streamType), this);
+ getEndPoint().reset(ErrorCode.STREAM_CREATION_ERROR.code());
}
}
}
diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/ControlParser.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/ControlParser.java
index 2cf5f4a3365..abac2a9dd38 100644
--- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/ControlParser.java
+++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/ControlParser.java
@@ -75,15 +75,24 @@ public class ControlParser
case BODY:
{
BodyParser bodyParser = null;
- int frameType = headerParser.getFrameType();
+ long frameType = headerParser.getFrameType();
if (frameType >= 0 && frameType < bodyParsers.length)
- bodyParser = bodyParsers[frameType];
+ bodyParser = bodyParsers[(int)frameType];
if (bodyParser == null)
{
- // TODO: enforce only control frames, but ignore unknown.
+ if (FrameType.isMessage(frameType))
+ {
+ // SPEC: message frames on the control stream are invalid.
+ if (LOG.isDebugEnabled())
+ LOG.debug("invalid message frame type {} on control stream", Long.toHexString(frameType));
+ sessionFailure(buffer, ErrorCode.FRAME_UNEXPECTED_ERROR.code(), "invalid_frame_type");
+ return;
+ }
+
if (LOG.isDebugEnabled())
- LOG.debug("ignoring unknown frame type {}", Integer.toHexString(frameType));
+ LOG.debug("ignoring unknown frame type {}", Long.toHexString(frameType));
+
BodyParser.Result result = unknownBodyParser.parse(buffer);
if (result == BodyParser.Result.NO_FRAME)
return;
@@ -123,12 +132,11 @@ public class ControlParser
{
if (LOG.isDebugEnabled())
LOG.debug("parse failed", x);
- buffer.clear();
- connectionFailure(buffer, ErrorCode.INTERNAL_ERROR.code(), "parser_error");
+ sessionFailure(buffer, ErrorCode.INTERNAL_ERROR.code(), "parser_error");
}
}
- private void connectionFailure(ByteBuffer buffer, int error, String reason)
+ private void sessionFailure(ByteBuffer buffer, int error, String reason)
{
unknownBodyParser.sessionFailure(buffer, error, reason);
}
diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/HeaderParser.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/HeaderParser.java
index 9f3c18678f3..c2567460feb 100644
--- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/HeaderParser.java
+++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/HeaderParser.java
@@ -27,7 +27,7 @@ public class HeaderParser
// TODO: RateControl?
private final VarLenInt varLenInt = new VarLenInt();
private State state = State.TYPE;
- private int type;
+ private long type;
private long length;
public void reset()
@@ -55,7 +55,7 @@ public class HeaderParser
{
case TYPE:
{
- if (varLenInt.parseInt(buffer, v -> type = v))
+ if (varLenInt.parseLong(buffer, v -> type = v))
{
state = State.LENGTH;
break;
@@ -80,7 +80,7 @@ public class HeaderParser
return false;
}
- public int getFrameType()
+ public long getFrameType()
{
return type;
}
diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/MessageParser.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/MessageParser.java
index 26462f73473..5a45b72c523 100644
--- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/MessageParser.java
+++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/MessageParser.java
@@ -102,18 +102,29 @@ public class MessageParser
case BODY:
{
BodyParser bodyParser = null;
- int frameType = headerParser.getFrameType();
+ long frameType = headerParser.getFrameType();
if (frameType >= 0 && frameType < bodyParsers.length)
- bodyParser = bodyParsers[frameType];
+ bodyParser = bodyParsers[(int)frameType];
if (bodyParser == null)
{
- // Unknown frame types must be ignored.
+ if (FrameType.isControl(frameType))
+ {
+ // SPEC: control frames on a message stream are invalid.
+ if (LOG.isDebugEnabled())
+ LOG.debug("invalid control frame type {} on message stream", Long.toHexString(frameType));
+ sessionFailure(buffer, ErrorCode.FRAME_UNEXPECTED_ERROR.code(), "invalid_frame_type");
+ return Result.NO_FRAME;
+ }
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("ignoring unknown frame type {}", Long.toHexString(frameType));
+
BodyParser.Result result = unknownBodyParser.parse(buffer);
if (result == BodyParser.Result.NO_FRAME)
return Result.NO_FRAME;
if (LOG.isDebugEnabled())
- LOG.debug("parsed unknown frame body for type {}", Integer.toHexString(frameType));
+ LOG.debug("parsed unknown frame body for type {}", Long.toHexString(frameType));
if (result == BodyParser.Result.WHOLE_FRAME)
reset();
break;
@@ -126,7 +137,6 @@ public class MessageParser
if (LOG.isDebugEnabled())
LOG.debug("parsed {} empty frame body from {}", FrameType.from(frameType), BufferUtil.toDetailString(buffer));
reset();
- return Result.FRAME;
}
else
{
@@ -137,8 +147,8 @@ public class MessageParser
LOG.debug("parsed {} frame body from {}", FrameType.from(frameType), BufferUtil.toDetailString(buffer));
if (result == BodyParser.Result.WHOLE_FRAME)
reset();
- return Result.FRAME;
}
+ return Result.FRAME;
}
}
default:
@@ -152,13 +162,12 @@ public class MessageParser
{
if (LOG.isDebugEnabled())
LOG.debug("parse failed", x);
- buffer.clear();
- connectionFailure(buffer, ErrorCode.INTERNAL_ERROR.code(), "parser_error");
+ sessionFailure(buffer, ErrorCode.INTERNAL_ERROR.code(), "parser_error");
return Result.NO_FRAME;
}
}
- private void connectionFailure(ByteBuffer buffer, int error, String reason)
+ private void sessionFailure(ByteBuffer buffer, int error, String reason)
{
unknownBodyParser.sessionFailure(buffer, error, reason);
}
diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/UnknownBodyParser.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/UnknownBodyParser.java
index 6609c964d43..c8d1538ee80 100644
--- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/UnknownBodyParser.java
+++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/UnknownBodyParser.java
@@ -17,6 +17,8 @@ import java.nio.ByteBuffer;
public class UnknownBodyParser extends BodyParser
{
+ private long length = -1;
+
public UnknownBodyParser(HeaderParser headerParser, ParserListener listener)
{
super(headerParser, listener);
@@ -25,6 +27,20 @@ public class UnknownBodyParser extends BodyParser
@Override
public Result parse(ByteBuffer buffer)
{
- throw new UnsupportedOperationException();
+ if (length < 0)
+ length = getBodyLength();
+ int remaining = buffer.remaining();
+ if (remaining >= length)
+ {
+ buffer.position(buffer.position() + (int)length);
+ length = -1;
+ return Result.WHOLE_FRAME;
+ }
+ else
+ {
+ buffer.position(buffer.limit());
+ length -= remaining;
+ return Result.NO_FRAME;
+ }
}
}
diff --git a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/ExternalServerTest.java b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/ExternalServerTest.java
new file mode 100644
index 00000000000..c8bcaaeadcb
--- /dev/null
+++ b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/ExternalServerTest.java
@@ -0,0 +1,103 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
+//
+// This program and the accompanying materials are made available under the
+// terms of the Eclipse Public License v. 2.0 which is available at
+// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+// which is available at https://www.apache.org/licenses/LICENSE-2.0.
+//
+// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+// ========================================================================
+//
+
+package org.eclipse.jetty.http3.tests;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.eclipse.jetty.http.HttpFields;
+import org.eclipse.jetty.http.HttpMethod;
+import org.eclipse.jetty.http.HttpURI;
+import org.eclipse.jetty.http.HttpVersion;
+import org.eclipse.jetty.http.MetaData;
+import org.eclipse.jetty.http3.api.Session;
+import org.eclipse.jetty.http3.api.Stream;
+import org.eclipse.jetty.http3.client.HTTP3Client;
+import org.eclipse.jetty.http3.frames.HeadersFrame;
+import org.eclipse.jetty.util.HostPort;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Disabled
+public class ExternalServerTest
+{
+ @Test
+ @Tag("external")
+ public void testExternalServer() throws Exception
+ {
+ HTTP3Client client = new HTTP3Client();
+ client.start();
+ try
+ {
+// HostPort hostPort = new HostPort("nghttp2.org:4433");
+ HostPort hostPort = new HostPort("quic.tech:8443");
+// HostPort hostPort = new HostPort("h2o.examp1e.net:443");
+// HostPort hostPort = new HostPort("test.privateoctopus.com:4433");
+ Session.Client session = client.connect(new InetSocketAddress(hostPort.getHost(), hostPort.getPort()), new Session.Client.Listener() {})
+ .get(5, TimeUnit.SECONDS);
+
+ CountDownLatch requestLatch = new CountDownLatch(1);
+ HttpURI uri = HttpURI.from(String.format("https://%s/", hostPort));
+ MetaData.Request request = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY);
+ session.newRequest(new HeadersFrame(request, true), new Stream.Listener()
+ {
+ @Override
+ public void onResponse(Stream stream, HeadersFrame frame)
+ {
+ System.err.println("RESPONSE HEADER = " + frame);
+ if (frame.isLast())
+ {
+ requestLatch.countDown();
+ return;
+ }
+ stream.demand();
+ }
+
+ @Override
+ public void onDataAvailable(Stream stream)
+ {
+ Stream.Data data = stream.readData();
+ System.err.println("RESPONSE DATA = " + data);
+ if (data != null)
+ {
+ data.complete();
+ if (data.isLast())
+ {
+ requestLatch.countDown();
+ return;
+ }
+ }
+ stream.demand();
+ }
+
+ @Override
+ public void onTrailer(Stream stream, HeadersFrame frame)
+ {
+ System.err.println("RESPONSE TRAILER = " + frame);
+ requestLatch.countDown();
+ }
+ });
+
+ assertTrue(requestLatch.await(5, TimeUnit.SECONDS));
+ }
+ finally
+ {
+ client.stop();
+ }
+ }
+}
diff --git a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HTTP3UnexpectedFrameTest.java b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HTTP3UnexpectedFrameTest.java
index 9e0f2da5efb..b3c084348bb 100644
--- a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HTTP3UnexpectedFrameTest.java
+++ b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HTTP3UnexpectedFrameTest.java
@@ -58,7 +58,7 @@ public class HTTP3UnexpectedFrameTest extends AbstractHTTP3ClientServerTest
})
.get(5, TimeUnit.SECONDS);
- ((HTTP3Session)session).writeFrame(0, new DataFrame(ByteBuffer.allocate(128), true), Callback.NOOP);
+ ((HTTP3Session)session).writeFrame(0, new DataFrame(ByteBuffer.allocate(128), false), Callback.NOOP);
assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
diff --git a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java
index 38ab2797ae1..e14169420e8 100644
--- a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java
+++ b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java
@@ -174,12 +174,12 @@ public abstract class QuicSession
public void shutdownInput(long streamId) throws IOException
{
- quicheConnection.shutdownStream(streamId, false);
+ quicheConnection.shutdownStream(streamId, false, 0);
}
public void shutdownOutput(long streamId) throws IOException
{
- quicheConnection.shutdownStream(streamId, true);
+ quicheConnection.shutdownStream(streamId, true, 0);
}
public void onClose(long streamId)
@@ -187,6 +187,19 @@ public abstract class QuicSession
endpoints.remove(streamId);
}
+ public void resetStream(long streamId, long error)
+ {
+ try
+ {
+ quicheConnection.resetStream(streamId, error);
+ }
+ catch (IOException x)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("could not reset stream #{} with error {}", streamId, error, x);
+ }
+ }
+
public SocketAddress getLocalAddress()
{
return connection.getEndPoint().getLocalSocketAddress();
@@ -370,10 +383,10 @@ public abstract class QuicSession
public void onTimeoutExpired()
{
if (LOG.isDebugEnabled())
- LOG.debug("quiche timeout callback called cid={}", quicheConnectionId);
+ LOG.debug("quiche timeout expired {}", QuicSession.this);
quicheConnection.onTimeout();
if (LOG.isDebugEnabled())
- LOG.debug("re-iterating quiche after timeout cid={}", quicheConnectionId);
+ LOG.debug("re-iterating after quiche timeout {}", QuicSession.this);
// Do not use the timer thread to iterate.
dispatch(() -> iterate());
}
@@ -395,10 +408,10 @@ public abstract class QuicSession
int pos = BufferUtil.flipToFill(cipherBuffer);
int drained = quicheConnection.drainCipherBytes(cipherBuffer);
if (LOG.isDebugEnabled())
- LOG.debug("drained {} byte(s) of cipher text from {}", drained, this);
+ LOG.debug("drained {} byte(s) of cipher bytes from {}", drained, QuicSession.this);
long nextTimeoutInMs = quicheConnection.nextTimeout();
if (LOG.isDebugEnabled())
- LOG.debug("next quiche timeout: {} ms", nextTimeoutInMs);
+ LOG.debug("next quiche timeout: {} ms on {}", nextTimeoutInMs, QuicSession.this);
if (nextTimeoutInMs < 0)
timeout.cancel();
else
@@ -408,13 +421,13 @@ public abstract class QuicSession
boolean connectionClosed = quicheConnection.isConnectionClosed();
Action action = connectionClosed ? Action.SUCCEEDED : Action.IDLE;
if (LOG.isDebugEnabled())
- LOG.debug("connection closed={}, action={}", connectionClosed, action);
+ LOG.debug("connection closed={}, action={} on {}", connectionClosed, action, QuicSession.this);
return action;
}
BufferUtil.flipToFlush(cipherBuffer, pos);
- connection.write(this, remoteAddress, cipherBuffer);
if (LOG.isDebugEnabled())
- LOG.debug("wrote cipher text for {}", remoteAddress);
+ LOG.debug("writing cipher bytes for {} on {}", remoteAddress, QuicSession.this);
+ connection.write(this, remoteAddress, cipherBuffer);
return Action.SCHEDULED;
}
@@ -422,7 +435,7 @@ public abstract class QuicSession
public void succeeded()
{
if (LOG.isDebugEnabled())
- LOG.debug("cipher text writing succeeded");
+ LOG.debug("written cipher bytes on {}", QuicSession.this);
byteBufferPool.release(cipherBuffer);
super.succeeded();
}
@@ -437,7 +450,7 @@ public abstract class QuicSession
protected void onCompleteSuccess()
{
if (LOG.isDebugEnabled())
- LOG.debug("quiche connection is in closed state");
+ LOG.debug("connection closed {}", QuicSession.this);
QuicSession.this.close();
}
@@ -445,7 +458,7 @@ public abstract class QuicSession
protected void onCompleteFailure(Throwable cause)
{
if (LOG.isDebugEnabled())
- LOG.debug("cipher text writing failed, closing session", cause);
+ LOG.debug("failed to write cipher bytes, closing session on {}", QuicSession.this, cause);
byteBufferPool.release(cipherBuffer);
QuicSession.this.close(cause);
}
diff --git a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicStreamEndPoint.java b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicStreamEndPoint.java
index 610f0b6b188..e2ce0b886e1 100644
--- a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicStreamEndPoint.java
+++ b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicStreamEndPoint.java
@@ -123,6 +123,11 @@ public class QuicStreamEndPoint extends AbstractEndPoint
session.onClose(streamId);
}
+ public void reset(long error)
+ {
+ session.resetStream(streamId, error);
+ }
+
@Override
public int fill(ByteBuffer buffer) throws IOException
{
diff --git a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/StreamType.java b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/StreamType.java
index 2e9906018dc..0d5620b0fe7 100644
--- a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/StreamType.java
+++ b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/StreamType.java
@@ -39,6 +39,12 @@ public enum StreamType
return (streamId & 0b01) == 0b00;
}
+ public static boolean isReserved(long streamType)
+ {
+ // SPEC: reserved stream types follow the formula: 0x1F * N + 0x21.
+ return (streamType - 0x21) % 0x1F == 0;
+ }
+
private final int type;
private StreamType(int type)
diff --git a/jetty-quic/quic-quiche/src/main/java/org/eclipse/jetty/quic/quiche/QuicheConnection.java b/jetty-quic/quic-quiche/src/main/java/org/eclipse/jetty/quic/quiche/QuicheConnection.java
index 0fa88f82345..abf8d9da443 100644
--- a/jetty-quic/quic-quiche/src/main/java/org/eclipse/jetty/quic/quiche/QuicheConnection.java
+++ b/jetty-quic/quic-quiche/src/main/java/org/eclipse/jetty/quic/quiche/QuicheConnection.java
@@ -552,20 +552,25 @@ public class QuicheConnection
}
}
- public void shutdownStream(long streamId, boolean writeSide) throws IOException
+ public void shutdownStream(long streamId, boolean writeSide, long error) throws IOException
{
try (AutoLock ignore = lock.lock())
{
if (quicheConn == null)
throw new IOException("Quiche connection was released");
int direction = writeSide ? LibQuiche.quiche_shutdown.QUICHE_SHUTDOWN_WRITE : LibQuiche.quiche_shutdown.QUICHE_SHUTDOWN_READ;
- int rc = LibQuiche.INSTANCE.quiche_conn_stream_shutdown(quicheConn, new uint64_t(streamId), direction, new uint64_t(0));
+ int rc = LibQuiche.INSTANCE.quiche_conn_stream_shutdown(quicheConn, new uint64_t(streamId), direction, new uint64_t(error));
if (rc == 0 || rc == LibQuiche.quiche_error.QUICHE_ERR_DONE)
return;
throw new IOException("failed to shutdown stream " + streamId + ": " + LibQuiche.quiche_error.errToString(rc));
}
}
+ public void resetStream(long streamId, long error) throws IOException
+ {
+ shutdownStream(streamId, true, error);
+ }
+
public void feedFinForStream(long streamId) throws IOException
{
try (AutoLock ignore = lock.lock())