Issue #6728 - QUIC and HTTP/3
- Implemented support for GREASE for unidirectional and bidirectional streams. - Implemented UnknownBodyParser. - Strengthened validation of frames in control and message streams. Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
parent
2115e4bf3e
commit
06ffd014f9
|
@ -239,7 +239,9 @@ public interface Stream
|
|||
}
|
||||
|
||||
/**
|
||||
* @return the {@link ByteBuffer} containing the bytes
|
||||
* @return the {@link ByteBuffer} containing the data bytes
|
||||
*
|
||||
* @see #complete()
|
||||
*/
|
||||
public ByteBuffer getByteBuffer()
|
||||
{
|
||||
|
@ -257,7 +259,9 @@ public interface Stream
|
|||
|
||||
/**
|
||||
* <p>The method that applications <em>must</em> invoke to
|
||||
* signal that the bytes have been processed.</p>
|
||||
* signal that the data bytes have been processed.</p>
|
||||
*
|
||||
* @see #getByteBuffer()
|
||||
*/
|
||||
public void complete()
|
||||
{
|
||||
|
|
|
@ -26,11 +26,26 @@ public enum FrameType
|
|||
GOAWAY(0x7),
|
||||
MAX_PUSH_ID(0xD);
|
||||
|
||||
public static FrameType from(int type)
|
||||
public static FrameType from(long type)
|
||||
{
|
||||
return Types.types.get(type);
|
||||
}
|
||||
|
||||
public static boolean isControl(long frameType)
|
||||
{
|
||||
return frameType == CANCEL_PUSH.type() ||
|
||||
frameType == SETTINGS.type() ||
|
||||
frameType == GOAWAY.type() ||
|
||||
frameType == MAX_PUSH_ID.type();
|
||||
}
|
||||
|
||||
public static boolean isMessage(long frameType)
|
||||
{
|
||||
return frameType == DATA.type() ||
|
||||
frameType == HEADERS.type() ||
|
||||
frameType == PUSH_PROMISE.type();
|
||||
}
|
||||
|
||||
public static int maxType()
|
||||
{
|
||||
return MAX_PUSH_ID.type();
|
||||
|
@ -41,7 +56,7 @@ public enum FrameType
|
|||
private FrameType(int type)
|
||||
{
|
||||
this.type = type;
|
||||
Types.types.put(type, this);
|
||||
Types.types.put((long)type, this);
|
||||
}
|
||||
|
||||
public int type()
|
||||
|
@ -51,6 +66,6 @@ public enum FrameType
|
|||
|
||||
private static class Types
|
||||
{
|
||||
private static final Map<Integer, FrameType> types = new HashMap<>();
|
||||
private static final Map<Long, FrameType> types = new HashMap<>();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory;
|
|||
public class ControlStreamConnection extends AbstractConnection implements Connection.UpgradeTo
|
||||
{
|
||||
// SPEC: Control Stream Type.
|
||||
public static final int STREAM_TYPE = 0x00;
|
||||
public static final long STREAM_TYPE = 0x00;
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ControlStreamConnection.class);
|
||||
|
||||
private final ByteBufferPool byteBufferPool;
|
||||
|
|
|
@ -24,7 +24,7 @@ import org.eclipse.jetty.io.EndPoint;
|
|||
public class DecoderStreamConnection extends InstructionStreamConnection
|
||||
{
|
||||
// SPEC: QPACK Encoder Stream Type.
|
||||
public static final int STREAM_TYPE = 0x03;
|
||||
public static final long STREAM_TYPE = 0x03;
|
||||
|
||||
private final QpackEncoder encoder;
|
||||
|
||||
|
|
|
@ -24,7 +24,7 @@ import org.eclipse.jetty.io.EndPoint;
|
|||
public class EncoderStreamConnection extends InstructionStreamConnection
|
||||
{
|
||||
// SPEC: QPACK Encoder Stream Type.
|
||||
public static final int STREAM_TYPE = 0x02;
|
||||
public static final long STREAM_TYPE = 0x02;
|
||||
|
||||
private final QpackDecoder decoder;
|
||||
|
||||
|
|
|
@ -13,6 +13,8 @@
|
|||
|
||||
package org.eclipse.jetty.http3.internal;
|
||||
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
public enum ErrorCode
|
||||
{
|
||||
NO_ERROR(0x100),
|
||||
|
@ -40,6 +42,14 @@ public enum ErrorCode
|
|||
this.code = code;
|
||||
}
|
||||
|
||||
public static long randomReservedCode()
|
||||
{
|
||||
// SPEC: reserved errors have the form 0x1F * n + 0x21.
|
||||
// This constant avoids to overflow VarLenInt, which is how an error code is encoded.
|
||||
long n = ThreadLocalRandom.current().nextLong(0x210842108421084L);
|
||||
return 0x1F * n + 0x21;
|
||||
}
|
||||
|
||||
public int code()
|
||||
{
|
||||
return code;
|
||||
|
|
|
@ -111,6 +111,19 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
|
|||
{
|
||||
if (parseAndFill() == MessageParser.Result.NO_FRAME)
|
||||
break;
|
||||
|
||||
// TODO: we should also exit if the connection was closed due to errors.
|
||||
// There is not yet a isClosed() primitive though.
|
||||
if (remotelyClosed)
|
||||
{
|
||||
// We have detected the end of the stream,
|
||||
// do not loop around to fill & parse again.
|
||||
// However, the last frame may have
|
||||
// caused a write that we need to flush.
|
||||
getEndPoint().getQuicSession().flush();
|
||||
break;
|
||||
}
|
||||
|
||||
if (dataMode)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
|
|
|
@ -40,10 +40,10 @@ public class InstructionFlusher extends IteratingCallback
|
|||
private final Queue<Instruction> queue = new ArrayDeque<>();
|
||||
private final ByteBufferPool.Lease lease;
|
||||
private final QuicStreamEndPoint endPoint;
|
||||
private final int streamType;
|
||||
private final long streamType;
|
||||
private boolean initialized;
|
||||
|
||||
public InstructionFlusher(QuicSession session, QuicStreamEndPoint endPoint, int streamType)
|
||||
public InstructionFlusher(QuicSession session, QuicStreamEndPoint endPoint, long streamType)
|
||||
{
|
||||
this.lease = new ByteBufferPool.Lease(session.getByteBufferPool());
|
||||
this.endPoint = endPoint;
|
||||
|
|
|
@ -23,7 +23,9 @@ import org.eclipse.jetty.http3.qpack.QpackEncoder;
|
|||
import org.eclipse.jetty.io.AbstractConnection;
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.io.Connection;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
|
||||
import org.eclipse.jetty.quic.common.StreamType;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -39,7 +41,7 @@ public class UnidirectionalStreamConnection extends AbstractConnection implement
|
|||
private boolean useInputDirectByteBuffers = true;
|
||||
private ByteBuffer buffer;
|
||||
|
||||
public UnidirectionalStreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, QpackEncoder encoder, QpackDecoder decoder, ParserListener listener)
|
||||
public UnidirectionalStreamConnection(QuicStreamEndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, QpackEncoder encoder, QpackDecoder decoder, ParserListener listener)
|
||||
{
|
||||
super(endPoint, executor);
|
||||
this.byteBufferPool = byteBufferPool;
|
||||
|
@ -48,6 +50,12 @@ public class UnidirectionalStreamConnection extends AbstractConnection implement
|
|||
this.listener = listener;
|
||||
}
|
||||
|
||||
@Override
|
||||
public QuicStreamEndPoint getEndPoint()
|
||||
{
|
||||
return (QuicStreamEndPoint)super.getEndPoint();
|
||||
}
|
||||
|
||||
public boolean isUseInputDirectByteBuffers()
|
||||
{
|
||||
return useInputDirectByteBuffers;
|
||||
|
@ -89,11 +97,11 @@ public class UnidirectionalStreamConnection extends AbstractConnection implement
|
|||
{
|
||||
int filled = getEndPoint().fill(buffer);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("filled {} on {}", filled, this);
|
||||
LOG.debug("filled {} on {}: {}", filled, this, BufferUtil.toDetailString(buffer));
|
||||
|
||||
if (filled > 0)
|
||||
{
|
||||
if (parser.parseInt(buffer, this::detectAndUpgrade))
|
||||
if (parser.parseLong(buffer, this::detectAndUpgrade))
|
||||
break;
|
||||
}
|
||||
else if (filled == 0)
|
||||
|
@ -121,44 +129,49 @@ public class UnidirectionalStreamConnection extends AbstractConnection implement
|
|||
}
|
||||
}
|
||||
|
||||
private void detectAndUpgrade(int streamType)
|
||||
private void detectAndUpgrade(long streamType)
|
||||
{
|
||||
switch (streamType)
|
||||
if (streamType == ControlStreamConnection.STREAM_TYPE)
|
||||
{
|
||||
case ControlStreamConnection.STREAM_TYPE:
|
||||
ControlParser parser = new ControlParser(listener);
|
||||
ControlStreamConnection newConnection = new ControlStreamConnection(getEndPoint(), getExecutor(), byteBufferPool, parser);
|
||||
newConnection.setInputBufferSize(getInputBufferSize());
|
||||
newConnection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers());
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("upgrading to {}", newConnection);
|
||||
getEndPoint().upgrade(newConnection);
|
||||
}
|
||||
else if (streamType == EncoderStreamConnection.STREAM_TYPE)
|
||||
{
|
||||
EncoderStreamConnection newConnection = new EncoderStreamConnection(getEndPoint(), getExecutor(), byteBufferPool, decoder);
|
||||
newConnection.setInputBufferSize(getInputBufferSize());
|
||||
newConnection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers());
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("upgrading to {}", newConnection);
|
||||
getEndPoint().upgrade(newConnection);
|
||||
}
|
||||
else if (streamType == DecoderStreamConnection.STREAM_TYPE)
|
||||
{
|
||||
DecoderStreamConnection newConnection = new DecoderStreamConnection(getEndPoint(), getExecutor(), byteBufferPool, encoder);
|
||||
newConnection.setInputBufferSize(getInputBufferSize());
|
||||
newConnection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers());
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("upgrading to {}", newConnection);
|
||||
getEndPoint().upgrade(newConnection);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (StreamType.isReserved(streamType))
|
||||
{
|
||||
ControlParser parser = new ControlParser(listener);
|
||||
ControlStreamConnection newConnection = new ControlStreamConnection(getEndPoint(), getExecutor(), byteBufferPool, parser);
|
||||
newConnection.setInputBufferSize(getInputBufferSize());
|
||||
newConnection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers());
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("upgrading to {}", newConnection);
|
||||
getEndPoint().upgrade(newConnection);
|
||||
break;
|
||||
LOG.debug("reserved stream type {}, resetting on {}", Long.toHexString(streamType), this);
|
||||
getEndPoint().reset(ErrorCode.randomReservedCode());
|
||||
}
|
||||
case EncoderStreamConnection.STREAM_TYPE:
|
||||
else
|
||||
{
|
||||
EncoderStreamConnection newConnection = new EncoderStreamConnection(getEndPoint(), getExecutor(), byteBufferPool, decoder);
|
||||
newConnection.setInputBufferSize(getInputBufferSize());
|
||||
newConnection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers());
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("upgrading to {}", newConnection);
|
||||
getEndPoint().upgrade(newConnection);
|
||||
break;
|
||||
}
|
||||
case DecoderStreamConnection.STREAM_TYPE:
|
||||
{
|
||||
DecoderStreamConnection newConnection = new DecoderStreamConnection(getEndPoint(), getExecutor(), byteBufferPool, encoder);
|
||||
newConnection.setInputBufferSize(getInputBufferSize());
|
||||
newConnection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers());
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("upgrading to {}", newConnection);
|
||||
getEndPoint().upgrade(newConnection);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
throw new IllegalStateException("unexpected stream type " + Integer.toHexString(streamType));
|
||||
LOG.debug("unsupported stream type {}, resetting on {}", Long.toHexString(streamType), this);
|
||||
getEndPoint().reset(ErrorCode.STREAM_CREATION_ERROR.code());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -75,15 +75,24 @@ public class ControlParser
|
|||
case BODY:
|
||||
{
|
||||
BodyParser bodyParser = null;
|
||||
int frameType = headerParser.getFrameType();
|
||||
long frameType = headerParser.getFrameType();
|
||||
if (frameType >= 0 && frameType < bodyParsers.length)
|
||||
bodyParser = bodyParsers[frameType];
|
||||
bodyParser = bodyParsers[(int)frameType];
|
||||
|
||||
if (bodyParser == null)
|
||||
{
|
||||
// TODO: enforce only control frames, but ignore unknown.
|
||||
if (FrameType.isMessage(frameType))
|
||||
{
|
||||
// SPEC: message frames on the control stream are invalid.
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("invalid message frame type {} on control stream", Long.toHexString(frameType));
|
||||
sessionFailure(buffer, ErrorCode.FRAME_UNEXPECTED_ERROR.code(), "invalid_frame_type");
|
||||
return;
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("ignoring unknown frame type {}", Integer.toHexString(frameType));
|
||||
LOG.debug("ignoring unknown frame type {}", Long.toHexString(frameType));
|
||||
|
||||
BodyParser.Result result = unknownBodyParser.parse(buffer);
|
||||
if (result == BodyParser.Result.NO_FRAME)
|
||||
return;
|
||||
|
@ -123,12 +132,11 @@ public class ControlParser
|
|||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("parse failed", x);
|
||||
buffer.clear();
|
||||
connectionFailure(buffer, ErrorCode.INTERNAL_ERROR.code(), "parser_error");
|
||||
sessionFailure(buffer, ErrorCode.INTERNAL_ERROR.code(), "parser_error");
|
||||
}
|
||||
}
|
||||
|
||||
private void connectionFailure(ByteBuffer buffer, int error, String reason)
|
||||
private void sessionFailure(ByteBuffer buffer, int error, String reason)
|
||||
{
|
||||
unknownBodyParser.sessionFailure(buffer, error, reason);
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@ public class HeaderParser
|
|||
// TODO: RateControl?
|
||||
private final VarLenInt varLenInt = new VarLenInt();
|
||||
private State state = State.TYPE;
|
||||
private int type;
|
||||
private long type;
|
||||
private long length;
|
||||
|
||||
public void reset()
|
||||
|
@ -55,7 +55,7 @@ public class HeaderParser
|
|||
{
|
||||
case TYPE:
|
||||
{
|
||||
if (varLenInt.parseInt(buffer, v -> type = v))
|
||||
if (varLenInt.parseLong(buffer, v -> type = v))
|
||||
{
|
||||
state = State.LENGTH;
|
||||
break;
|
||||
|
@ -80,7 +80,7 @@ public class HeaderParser
|
|||
return false;
|
||||
}
|
||||
|
||||
public int getFrameType()
|
||||
public long getFrameType()
|
||||
{
|
||||
return type;
|
||||
}
|
||||
|
|
|
@ -102,18 +102,29 @@ public class MessageParser
|
|||
case BODY:
|
||||
{
|
||||
BodyParser bodyParser = null;
|
||||
int frameType = headerParser.getFrameType();
|
||||
long frameType = headerParser.getFrameType();
|
||||
if (frameType >= 0 && frameType < bodyParsers.length)
|
||||
bodyParser = bodyParsers[frameType];
|
||||
bodyParser = bodyParsers[(int)frameType];
|
||||
|
||||
if (bodyParser == null)
|
||||
{
|
||||
// Unknown frame types must be ignored.
|
||||
if (FrameType.isControl(frameType))
|
||||
{
|
||||
// SPEC: control frames on a message stream are invalid.
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("invalid control frame type {} on message stream", Long.toHexString(frameType));
|
||||
sessionFailure(buffer, ErrorCode.FRAME_UNEXPECTED_ERROR.code(), "invalid_frame_type");
|
||||
return Result.NO_FRAME;
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("ignoring unknown frame type {}", Long.toHexString(frameType));
|
||||
|
||||
BodyParser.Result result = unknownBodyParser.parse(buffer);
|
||||
if (result == BodyParser.Result.NO_FRAME)
|
||||
return Result.NO_FRAME;
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("parsed unknown frame body for type {}", Integer.toHexString(frameType));
|
||||
LOG.debug("parsed unknown frame body for type {}", Long.toHexString(frameType));
|
||||
if (result == BodyParser.Result.WHOLE_FRAME)
|
||||
reset();
|
||||
break;
|
||||
|
@ -126,7 +137,6 @@ public class MessageParser
|
|||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("parsed {} empty frame body from {}", FrameType.from(frameType), BufferUtil.toDetailString(buffer));
|
||||
reset();
|
||||
return Result.FRAME;
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -137,8 +147,8 @@ public class MessageParser
|
|||
LOG.debug("parsed {} frame body from {}", FrameType.from(frameType), BufferUtil.toDetailString(buffer));
|
||||
if (result == BodyParser.Result.WHOLE_FRAME)
|
||||
reset();
|
||||
return Result.FRAME;
|
||||
}
|
||||
return Result.FRAME;
|
||||
}
|
||||
}
|
||||
default:
|
||||
|
@ -152,13 +162,12 @@ public class MessageParser
|
|||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("parse failed", x);
|
||||
buffer.clear();
|
||||
connectionFailure(buffer, ErrorCode.INTERNAL_ERROR.code(), "parser_error");
|
||||
sessionFailure(buffer, ErrorCode.INTERNAL_ERROR.code(), "parser_error");
|
||||
return Result.NO_FRAME;
|
||||
}
|
||||
}
|
||||
|
||||
private void connectionFailure(ByteBuffer buffer, int error, String reason)
|
||||
private void sessionFailure(ByteBuffer buffer, int error, String reason)
|
||||
{
|
||||
unknownBodyParser.sessionFailure(buffer, error, reason);
|
||||
}
|
||||
|
|
|
@ -17,6 +17,8 @@ import java.nio.ByteBuffer;
|
|||
|
||||
public class UnknownBodyParser extends BodyParser
|
||||
{
|
||||
private long length = -1;
|
||||
|
||||
public UnknownBodyParser(HeaderParser headerParser, ParserListener listener)
|
||||
{
|
||||
super(headerParser, listener);
|
||||
|
@ -25,6 +27,20 @@ public class UnknownBodyParser extends BodyParser
|
|||
@Override
|
||||
public Result parse(ByteBuffer buffer)
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
if (length < 0)
|
||||
length = getBodyLength();
|
||||
int remaining = buffer.remaining();
|
||||
if (remaining >= length)
|
||||
{
|
||||
buffer.position(buffer.position() + (int)length);
|
||||
length = -1;
|
||||
return Result.WHOLE_FRAME;
|
||||
}
|
||||
else
|
||||
{
|
||||
buffer.position(buffer.limit());
|
||||
length -= remaining;
|
||||
return Result.NO_FRAME;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,103 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
|
||||
//
|
||||
// This program and the accompanying materials are made available under the
|
||||
// terms of the Eclipse Public License v. 2.0 which is available at
|
||||
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
|
||||
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
|
||||
//
|
||||
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.http3.tests;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.eclipse.jetty.http.HttpFields;
|
||||
import org.eclipse.jetty.http.HttpMethod;
|
||||
import org.eclipse.jetty.http.HttpURI;
|
||||
import org.eclipse.jetty.http.HttpVersion;
|
||||
import org.eclipse.jetty.http.MetaData;
|
||||
import org.eclipse.jetty.http3.api.Session;
|
||||
import org.eclipse.jetty.http3.api.Stream;
|
||||
import org.eclipse.jetty.http3.client.HTTP3Client;
|
||||
import org.eclipse.jetty.http3.frames.HeadersFrame;
|
||||
import org.eclipse.jetty.util.HostPort;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Tag;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
@Disabled
|
||||
public class ExternalServerTest
|
||||
{
|
||||
@Test
|
||||
@Tag("external")
|
||||
public void testExternalServer() throws Exception
|
||||
{
|
||||
HTTP3Client client = new HTTP3Client();
|
||||
client.start();
|
||||
try
|
||||
{
|
||||
// HostPort hostPort = new HostPort("nghttp2.org:4433");
|
||||
HostPort hostPort = new HostPort("quic.tech:8443");
|
||||
// HostPort hostPort = new HostPort("h2o.examp1e.net:443");
|
||||
// HostPort hostPort = new HostPort("test.privateoctopus.com:4433");
|
||||
Session.Client session = client.connect(new InetSocketAddress(hostPort.getHost(), hostPort.getPort()), new Session.Client.Listener() {})
|
||||
.get(5, TimeUnit.SECONDS);
|
||||
|
||||
CountDownLatch requestLatch = new CountDownLatch(1);
|
||||
HttpURI uri = HttpURI.from(String.format("https://%s/", hostPort));
|
||||
MetaData.Request request = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY);
|
||||
session.newRequest(new HeadersFrame(request, true), new Stream.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onResponse(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
System.err.println("RESPONSE HEADER = " + frame);
|
||||
if (frame.isLast())
|
||||
{
|
||||
requestLatch.countDown();
|
||||
return;
|
||||
}
|
||||
stream.demand();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDataAvailable(Stream stream)
|
||||
{
|
||||
Stream.Data data = stream.readData();
|
||||
System.err.println("RESPONSE DATA = " + data);
|
||||
if (data != null)
|
||||
{
|
||||
data.complete();
|
||||
if (data.isLast())
|
||||
{
|
||||
requestLatch.countDown();
|
||||
return;
|
||||
}
|
||||
}
|
||||
stream.demand();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrailer(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
System.err.println("RESPONSE TRAILER = " + frame);
|
||||
requestLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
assertTrue(requestLatch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
finally
|
||||
{
|
||||
client.stop();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -58,7 +58,7 @@ public class HTTP3UnexpectedFrameTest extends AbstractHTTP3ClientServerTest
|
|||
})
|
||||
.get(5, TimeUnit.SECONDS);
|
||||
|
||||
((HTTP3Session)session).writeFrame(0, new DataFrame(ByteBuffer.allocate(128), true), Callback.NOOP);
|
||||
((HTTP3Session)session).writeFrame(0, new DataFrame(ByteBuffer.allocate(128), false), Callback.NOOP);
|
||||
|
||||
assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
|
||||
|
|
|
@ -174,12 +174,12 @@ public abstract class QuicSession
|
|||
|
||||
public void shutdownInput(long streamId) throws IOException
|
||||
{
|
||||
quicheConnection.shutdownStream(streamId, false);
|
||||
quicheConnection.shutdownStream(streamId, false, 0);
|
||||
}
|
||||
|
||||
public void shutdownOutput(long streamId) throws IOException
|
||||
{
|
||||
quicheConnection.shutdownStream(streamId, true);
|
||||
quicheConnection.shutdownStream(streamId, true, 0);
|
||||
}
|
||||
|
||||
public void onClose(long streamId)
|
||||
|
@ -187,6 +187,19 @@ public abstract class QuicSession
|
|||
endpoints.remove(streamId);
|
||||
}
|
||||
|
||||
public void resetStream(long streamId, long error)
|
||||
{
|
||||
try
|
||||
{
|
||||
quicheConnection.resetStream(streamId, error);
|
||||
}
|
||||
catch (IOException x)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("could not reset stream #{} with error {}", streamId, error, x);
|
||||
}
|
||||
}
|
||||
|
||||
public SocketAddress getLocalAddress()
|
||||
{
|
||||
return connection.getEndPoint().getLocalSocketAddress();
|
||||
|
@ -370,10 +383,10 @@ public abstract class QuicSession
|
|||
public void onTimeoutExpired()
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("quiche timeout callback called cid={}", quicheConnectionId);
|
||||
LOG.debug("quiche timeout expired {}", QuicSession.this);
|
||||
quicheConnection.onTimeout();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("re-iterating quiche after timeout cid={}", quicheConnectionId);
|
||||
LOG.debug("re-iterating after quiche timeout {}", QuicSession.this);
|
||||
// Do not use the timer thread to iterate.
|
||||
dispatch(() -> iterate());
|
||||
}
|
||||
|
@ -395,10 +408,10 @@ public abstract class QuicSession
|
|||
int pos = BufferUtil.flipToFill(cipherBuffer);
|
||||
int drained = quicheConnection.drainCipherBytes(cipherBuffer);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("drained {} byte(s) of cipher text from {}", drained, this);
|
||||
LOG.debug("drained {} byte(s) of cipher bytes from {}", drained, QuicSession.this);
|
||||
long nextTimeoutInMs = quicheConnection.nextTimeout();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("next quiche timeout: {} ms", nextTimeoutInMs);
|
||||
LOG.debug("next quiche timeout: {} ms on {}", nextTimeoutInMs, QuicSession.this);
|
||||
if (nextTimeoutInMs < 0)
|
||||
timeout.cancel();
|
||||
else
|
||||
|
@ -408,13 +421,13 @@ public abstract class QuicSession
|
|||
boolean connectionClosed = quicheConnection.isConnectionClosed();
|
||||
Action action = connectionClosed ? Action.SUCCEEDED : Action.IDLE;
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("connection closed={}, action={}", connectionClosed, action);
|
||||
LOG.debug("connection closed={}, action={} on {}", connectionClosed, action, QuicSession.this);
|
||||
return action;
|
||||
}
|
||||
BufferUtil.flipToFlush(cipherBuffer, pos);
|
||||
connection.write(this, remoteAddress, cipherBuffer);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("wrote cipher text for {}", remoteAddress);
|
||||
LOG.debug("writing cipher bytes for {} on {}", remoteAddress, QuicSession.this);
|
||||
connection.write(this, remoteAddress, cipherBuffer);
|
||||
return Action.SCHEDULED;
|
||||
}
|
||||
|
||||
|
@ -422,7 +435,7 @@ public abstract class QuicSession
|
|||
public void succeeded()
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("cipher text writing succeeded");
|
||||
LOG.debug("written cipher bytes on {}", QuicSession.this);
|
||||
byteBufferPool.release(cipherBuffer);
|
||||
super.succeeded();
|
||||
}
|
||||
|
@ -437,7 +450,7 @@ public abstract class QuicSession
|
|||
protected void onCompleteSuccess()
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("quiche connection is in closed state");
|
||||
LOG.debug("connection closed {}", QuicSession.this);
|
||||
QuicSession.this.close();
|
||||
}
|
||||
|
||||
|
@ -445,7 +458,7 @@ public abstract class QuicSession
|
|||
protected void onCompleteFailure(Throwable cause)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("cipher text writing failed, closing session", cause);
|
||||
LOG.debug("failed to write cipher bytes, closing session on {}", QuicSession.this, cause);
|
||||
byteBufferPool.release(cipherBuffer);
|
||||
QuicSession.this.close(cause);
|
||||
}
|
||||
|
|
|
@ -123,6 +123,11 @@ public class QuicStreamEndPoint extends AbstractEndPoint
|
|||
session.onClose(streamId);
|
||||
}
|
||||
|
||||
public void reset(long error)
|
||||
{
|
||||
session.resetStream(streamId, error);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int fill(ByteBuffer buffer) throws IOException
|
||||
{
|
||||
|
|
|
@ -39,6 +39,12 @@ public enum StreamType
|
|||
return (streamId & 0b01) == 0b00;
|
||||
}
|
||||
|
||||
public static boolean isReserved(long streamType)
|
||||
{
|
||||
// SPEC: reserved stream types follow the formula: 0x1F * N + 0x21.
|
||||
return (streamType - 0x21) % 0x1F == 0;
|
||||
}
|
||||
|
||||
private final int type;
|
||||
|
||||
private StreamType(int type)
|
||||
|
|
|
@ -552,20 +552,25 @@ public class QuicheConnection
|
|||
}
|
||||
}
|
||||
|
||||
public void shutdownStream(long streamId, boolean writeSide) throws IOException
|
||||
public void shutdownStream(long streamId, boolean writeSide, long error) throws IOException
|
||||
{
|
||||
try (AutoLock ignore = lock.lock())
|
||||
{
|
||||
if (quicheConn == null)
|
||||
throw new IOException("Quiche connection was released");
|
||||
int direction = writeSide ? LibQuiche.quiche_shutdown.QUICHE_SHUTDOWN_WRITE : LibQuiche.quiche_shutdown.QUICHE_SHUTDOWN_READ;
|
||||
int rc = LibQuiche.INSTANCE.quiche_conn_stream_shutdown(quicheConn, new uint64_t(streamId), direction, new uint64_t(0));
|
||||
int rc = LibQuiche.INSTANCE.quiche_conn_stream_shutdown(quicheConn, new uint64_t(streamId), direction, new uint64_t(error));
|
||||
if (rc == 0 || rc == LibQuiche.quiche_error.QUICHE_ERR_DONE)
|
||||
return;
|
||||
throw new IOException("failed to shutdown stream " + streamId + ": " + LibQuiche.quiche_error.errToString(rc));
|
||||
}
|
||||
}
|
||||
|
||||
public void resetStream(long streamId, long error) throws IOException
|
||||
{
|
||||
shutdownStream(streamId, true, error);
|
||||
}
|
||||
|
||||
public void feedFinForStream(long streamId) throws IOException
|
||||
{
|
||||
try (AutoLock ignore = lock.lock())
|
||||
|
|
Loading…
Reference in New Issue