Issue #6728 - QUIC and HTTP/3

- Changed the parsers to emit during parsing, rather than returning the event.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-09-16 23:07:26 +02:00
parent b42bfa214a
commit fcdabeb933
27 changed files with 309 additions and 345 deletions

View File

@ -20,7 +20,7 @@ module org.eclipse.jetty.http3.client
requires transitive org.eclipse.jetty.http3.qpack;
requires transitive org.eclipse.jetty.io;
requires transitive org.eclipse.jetty.quic.common;
requires org.eclipse.jetty.quic.client;
requires transitive org.eclipse.jetty.quic.client;
requires transitive org.eclipse.jetty.util;
requires org.slf4j;
}

View File

@ -77,7 +77,7 @@ public class HTTP3ClientConnectionFactory implements ClientConnectionFactory, Pr
long streamId = streamEndPoint.getStreamId();
ClientHTTP3Session http3Session = (ClientHTTP3Session)streamEndPoint.getQuicSession().getProtocolSession();
// TODO: Parser may be created internally, if I pass the QuicStreamEndPoint and ClientHTTP3Session.
MessageParser parser = new MessageParser(streamId, http3Session.getQpackDecoder());
return new HTTP3Connection(streamEndPoint, http3Session.getQuicSession().getExecutor(), http3Session.getQuicSession().getByteBufferPool(), parser, http3Session.getSessionClient());
MessageParser parser = new MessageParser(http3Session.getSessionClient(), http3Session.getQpackDecoder(), streamId, streamEndPoint::isStreamFinished);
return new HTTP3Connection(streamEndPoint, http3Session.getQuicSession().getExecutor(), http3Session.getQuicSession().getByteBufferPool(), parser);
}
}

View File

@ -22,9 +22,9 @@ module org.eclipse.jetty.http3.common
exports org.eclipse.jetty.http3.internal;
requires transitive org.eclipse.jetty.http;
requires org.eclipse.jetty.http3.qpack;
requires transitive org.eclipse.jetty.http3.qpack;
requires org.eclipse.jetty.io;
requires org.eclipse.jetty.quic.common;
requires transitive org.eclipse.jetty.quic.common;
requires org.eclipse.jetty.util;
requires org.slf4j;
}

View File

@ -16,11 +16,7 @@ package org.eclipse.jetty.http3.internal;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.FrameType;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.parser.ControlParser;
import org.eclipse.jetty.http3.internal.parser.ParserListener;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
@ -37,16 +33,14 @@ public class ControlConnection extends AbstractConnection implements Connection.
private final ByteBufferPool byteBufferPool;
private final ControlParser parser;
private final ParserListener listener;
private boolean useInputDirectByteBuffers = true;
private ByteBuffer buffer;
public ControlConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, ControlParser parser, ParserListener listener)
public ControlConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, ControlParser parser)
{
super(endPoint, executor);
this.byteBufferPool = byteBufferPool;
this.parser = parser;
this.listener = listener;
}
public boolean isUseInputDirectByteBuffers()
@ -90,13 +84,7 @@ public class ControlConnection extends AbstractConnection implements Connection.
while (true)
{
// Parse first in case of bytes from the upgrade.
while (buffer.hasRemaining())
{
Frame frame = parser.parse(buffer);
if (frame == null)
break;
notifyFrame(frame);
}
parser.parse(buffer);
// Then read from the EndPoint.
int filled = getEndPoint().fill(buffer);
@ -128,33 +116,4 @@ public class ControlConnection extends AbstractConnection implements Connection.
getEndPoint().close(x);
}
}
private void notifyFrame(Frame frame)
{
FrameType frameType = frame.getFrameType();
switch (frameType)
{
case SETTINGS:
{
notifySettings((SettingsFrame)frame);
break;
}
default:
{
throw new UnsupportedOperationException("unsupported frame type " + frameType);
}
}
}
private void notifySettings(SettingsFrame frame)
{
try
{
listener.onSettings(frame);
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
}
}
}

View File

@ -16,12 +16,7 @@ package org.eclipse.jetty.http3.internal;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.FrameType;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.internal.parser.MessageParser;
import org.eclipse.jetty.http3.internal.parser.ParserListener;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
@ -34,15 +29,13 @@ public class HTTP3Connection extends AbstractConnection
private final ByteBufferPool byteBufferPool;
private final MessageParser parser;
private final ParserListener listener;
private boolean useInputDirectByteBuffers = true;
public HTTP3Connection(QuicStreamEndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, MessageParser parser, ParserListener listener)
public HTTP3Connection(QuicStreamEndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, MessageParser parser)
{
super(endPoint, executor);
this.byteBufferPool = byteBufferPool;
this.parser = parser;
this.listener = listener;
}
public boolean isUseInputDirectByteBuffers()
@ -76,17 +69,7 @@ public class HTTP3Connection extends AbstractConnection
if (filled > 0)
{
while (buffer.hasRemaining())
{
Frame frame = parser.parse(buffer);
if (frame == null)
break;
if (frame instanceof HeadersFrame)
frame = ((HeadersFrame)frame).withLast(getEndPoint().isInputShutdown());
else if (frame instanceof DataFrame)
frame = ((DataFrame)frame).withLast(getEndPoint().isInputShutdown());
notifyFrame(frame);
}
parser.parse(buffer);
}
else if (filled == 0)
{
@ -110,52 +93,4 @@ public class HTTP3Connection extends AbstractConnection
getEndPoint().close(x);
}
}
private void notifyFrame(Frame frame)
{
FrameType frameType = frame.getFrameType();
switch (frameType)
{
case HEADERS:
{
notifyHeaders((HeadersFrame)frame);
break;
}
case DATA:
{
notifyData((DataFrame)frame);
break;
}
default:
{
throw new UnsupportedOperationException("unsupported frame type " + frameType);
}
}
}
private void notifyHeaders(HeadersFrame frame)
{
try
{
long streamId = ((QuicStreamEndPoint)getEndPoint()).getStreamId();
listener.onHeaders(streamId, frame);
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
}
}
private void notifyData(DataFrame frame)
{
try
{
long streamId = ((QuicStreamEndPoint)getEndPoint()).getStreamId();
listener.onData(streamId, frame);
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
}
}
}

View File

@ -1,3 +1,16 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.internal;
import java.nio.ByteBuffer;

View File

@ -127,8 +127,8 @@ public class UnidirectionalStreamConnection extends AbstractConnection implement
{
case ControlConnection.STREAM_TYPE:
{
ControlParser parser = new ControlParser();
ControlConnection newConnection = new ControlConnection(getEndPoint(), getExecutor(), byteBufferPool, parser, listener);
ControlParser parser = new ControlParser(listener);
ControlConnection newConnection = new ControlConnection(getEndPoint(), getExecutor(), byteBufferPool, parser);
newConnection.setInputBufferSize(getInputBufferSize());
newConnection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers());
if (LOG.isDebugEnabled())

View File

@ -16,7 +16,10 @@ package org.eclipse.jetty.http3.internal.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http3.ErrorCode;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.util.BufferUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>The base parser for the frame body of HTTP/3 frames.</p>
@ -27,18 +30,20 @@ import org.eclipse.jetty.http3.frames.Frame;
*/
public abstract class BodyParser
{
private final long streamId;
private final HeaderParser headerParser;
private static final Logger LOG = LoggerFactory.getLogger(BodyParser.class);
protected BodyParser(long streamId, HeaderParser headerParser)
private final HeaderParser headerParser;
private final ParserListener listener;
protected BodyParser(HeaderParser headerParser, ParserListener listener)
{
this.streamId = streamId;
this.headerParser = headerParser;
this.listener = listener;
}
protected long getStreamId()
protected ParserListener getParserListener()
{
return streamId;
return listener;
}
protected long getBodyLength()
@ -47,18 +52,60 @@ public abstract class BodyParser
}
/**
* <p>Parses the frame body bytes in the given {@code buffer}, producing a {@link Frame}.</p>
* <p>Only the frame body bytes are consumed, therefore when this method returns, the buffer
* may contain unconsumed bytes, for example for other frames.</p>
* <p>Parses the frame body bytes in the given {@code buffer}.</p>
* <p>Only the frame body bytes are consumed, therefore when this method returns,
* the buffer may contain unconsumed bytes, for example for other frames.</p>
*
* @param buffer the buffer to parse
* @return the parsed frame if all the frame body bytes were parsed, or an error frame,
* or null if not enough frame body bytes were present in the buffer
* @return true if all the frame body bytes were parsed;
* false if not enough frame body bytes were present in the buffer
*/
public abstract Frame parse(ByteBuffer buffer) throws ParseException;
public abstract boolean parse(ByteBuffer buffer);
protected Frame emptyBody(ByteBuffer buffer) throws ParseException
protected void emptyBody(ByteBuffer buffer)
{
throw new ParseException(ErrorCode.PROTOCOL_ERROR.code(), "invalid_frame");
sessionFailure(buffer, ErrorCode.PROTOCOL_ERROR.code(), "invalid_frame");
}
protected void sessionFailure(ByteBuffer buffer, int error, String reason)
{
BufferUtil.clear(buffer);
notifySessionFailure(error, reason);
}
protected void notifySessionFailure(int error, String reason)
{
try
{
listener.onSessionFailure(error, reason);
}
catch (Throwable x)
{
LOG.info("failure while notifying listener {}", listener, x);
}
}
protected void notifyStreamFailure(long streamId, int error, String reason)
{
try
{
listener.onStreamFailure(streamId, error, reason);
}
catch (Throwable x)
{
LOG.info("failure while notifying listener {}", listener, x);
}
}
protected void notifySettings(SettingsFrame frame)
{
try
{
listener.onSettings(frame);
}
catch (Throwable x)
{
LOG.info("failure while notifying listener {}", listener, x);
}
}
}

View File

@ -15,17 +15,15 @@ package org.eclipse.jetty.http3.internal.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http3.frames.Frame;
public class CancelPushBodyParser extends BodyParser
{
public CancelPushBodyParser(HeaderParser headerParser)
public CancelPushBodyParser(HeaderParser headerParser, ParserListener listener)
{
super(1, headerParser);
super(headerParser, listener);
}
@Override
public Frame parse(ByteBuffer buffer)
public boolean parse(ByteBuffer buffer)
{
throw new UnsupportedOperationException();
}

View File

@ -16,7 +16,6 @@ package org.eclipse.jetty.http3.internal.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http3.ErrorCode;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.FrameType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -35,14 +34,14 @@ public class ControlParser
private final BodyParser unknownBodyParser;
private State state = State.HEADER;
public ControlParser()
public ControlParser(ParserListener listener)
{
this.headerParser = new HeaderParser();
this.bodyParsers[FrameType.CANCEL_PUSH.type()] = new CancelPushBodyParser(headerParser);
this.bodyParsers[FrameType.SETTINGS.type()] = new SettingsBodyParser(headerParser);
this.bodyParsers[FrameType.GOAWAY.type()] = new GoAwayBodyParser(headerParser);
this.bodyParsers[FrameType.MAX_PUSH_ID.type()] = new MaxPushIdBodyParser(headerParser);
this.unknownBodyParser = new UnknownBodyParser(headerParser);
this.bodyParsers[FrameType.CANCEL_PUSH.type()] = new CancelPushBodyParser(headerParser, listener);
this.bodyParsers[FrameType.SETTINGS.type()] = new SettingsBodyParser(headerParser, listener);
this.bodyParsers[FrameType.GOAWAY.type()] = new GoAwayBodyParser(headerParser, listener);
this.bodyParsers[FrameType.MAX_PUSH_ID.type()] = new MaxPushIdBodyParser(headerParser, listener);
this.unknownBodyParser = new UnknownBodyParser(headerParser, listener);
}
private void reset()
@ -56,7 +55,7 @@ public class ControlParser
*
* @param buffer the buffer to parse
*/
public Frame parse(ByteBuffer buffer) throws ParseException
public void parse(ByteBuffer buffer)
{
try
{
@ -71,7 +70,7 @@ public class ControlParser
state = State.BODY;
break;
}
return null;
return;
}
case BODY:
{
@ -85,25 +84,26 @@ public class ControlParser
// TODO: enforce only control frames, but ignore unknown.
if (LOG.isDebugEnabled())
LOG.debug("ignoring unknown frame type {}", Integer.toHexString(frameType));
Frame frame = unknownBodyParser.parse(buffer);
if (frame == null)
return null;
if (!unknownBodyParser.parse(buffer))
return;
reset();
break;
}
else
{
Frame frame;
if (headerParser.getFrameLength() == 0)
frame = bodyParser.emptyBody(buffer);
{
bodyParser.emptyBody(buffer);
}
else
frame = bodyParser.parse(buffer);
{
if (!bodyParser.parse(buffer))
return;
}
if (LOG.isDebugEnabled())
LOG.debug("parsed {} frame body from {}", FrameType.from(frameType), buffer);
if (frame != null)
reset();
return frame;
reset();
}
break;
}
default:
{
@ -112,22 +112,20 @@ public class ControlParser
}
}
}
catch (ParseException x)
{
if (LOG.isDebugEnabled())
LOG.debug("parse failed", x);
buffer.clear();
throw x;
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("parse failed", x);
buffer.clear();
throw new ParseException(ErrorCode.INTERNAL_ERROR.code(), "parser_error", true, x);
connectionFailure(buffer, ErrorCode.INTERNAL_ERROR.code(), "parser_error");
}
}
private void connectionFailure(ByteBuffer buffer, int error, String reason)
{
unknownBodyParser.sessionFailure(buffer, error, reason);
}
private enum State
{
HEADER, BODY

View File

@ -14,9 +14,9 @@
package org.eclipse.jetty.http3.internal.parser;
import java.nio.ByteBuffer;
import java.util.function.BooleanSupplier;
import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.util.BufferUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -25,12 +25,16 @@ public class DataBodyParser extends BodyParser
{
private static final Logger LOG = LoggerFactory.getLogger(DataBodyParser.class);
private final long streamId;
private final BooleanSupplier isLast;
private State state = State.INIT;
private long length;
public DataBodyParser(long streamId, HeaderParser headerParser)
public DataBodyParser(HeaderParser headerParser, ParserListener listener, long streamId, BooleanSupplier isLast)
{
super(streamId, headerParser);
super(headerParser, listener);
this.streamId = streamId;
this.isLast = isLast;
}
private void reset()
@ -40,13 +44,13 @@ public class DataBodyParser extends BodyParser
}
@Override
protected Frame emptyBody(ByteBuffer buffer)
protected void emptyBody(ByteBuffer buffer)
{
return onData(BufferUtil.EMPTY_BUFFER, false);
onData(BufferUtil.EMPTY_BUFFER, false);
}
@Override
public Frame parse(ByteBuffer buffer)
public boolean parse(ByteBuffer buffer)
{
while (buffer.hasRemaining())
{
@ -72,12 +76,14 @@ public class DataBodyParser extends BodyParser
if (length == 0)
{
reset();
return onData(slice, false);
onData(slice, false);
return true;
}
else
{
// We got partial data, simulate a smaller frame, and stay in DATA state.
return onData(slice, true);
onData(slice, true);
break;
}
}
default:
@ -86,15 +92,28 @@ public class DataBodyParser extends BodyParser
}
}
}
return null;
return false;
}
private DataFrame onData(ByteBuffer buffer, boolean fragment)
private void onData(ByteBuffer buffer, boolean fragment)
{
DataFrame frame = new DataFrame(buffer, true);
DataFrame frame = new DataFrame(buffer, isLast.getAsBoolean());
if (LOG.isDebugEnabled())
LOG.debug("notifying synthetic={} {}#{}", fragment, frame, getStreamId());
return frame;
LOG.debug("notifying synthetic={} {}#{}", fragment, frame, streamId);
notifyData(frame);
}
private void notifyData(DataFrame frame)
{
ParserListener listener = getParserListener();
try
{
listener.onData(streamId, frame);
}
catch (Throwable x)
{
LOG.info("failure while notifying listener {}", listener, x);
}
}
private enum State

View File

@ -15,17 +15,15 @@ package org.eclipse.jetty.http3.internal.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http3.frames.Frame;
public class GoAwayBodyParser extends BodyParser
{
public GoAwayBodyParser(HeaderParser headerParser)
public GoAwayBodyParser(HeaderParser headerParser, ParserListener listener)
{
super(1, headerParser);
super(headerParser, listener);
}
@Override
public Frame parse(ByteBuffer buffer)
public boolean parse(ByteBuffer buffer)
{
throw new UnsupportedOperationException();
}

View File

@ -16,10 +16,10 @@ package org.eclipse.jetty.http3.internal.parser;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.function.BooleanSupplier;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http3.ErrorCode;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.qpack.QpackDecoder;
import org.eclipse.jetty.http3.qpack.QpackException;
@ -32,14 +32,17 @@ public class HeadersBodyParser extends BodyParser
private static final Logger LOG = LoggerFactory.getLogger(HeadersBodyParser.class);
private final List<ByteBuffer> byteBuffers = new ArrayList<>();
private final long streamId;
private final BooleanSupplier isLast;
private final QpackDecoder decoder;
private State state = State.INIT;
private long length;
private Frame frame;
public HeadersBodyParser(long streamId, HeaderParser headerParser, QpackDecoder decoder)
public HeadersBodyParser(HeaderParser headerParser, ParserListener listener, QpackDecoder decoder, long streamId, BooleanSupplier isLast)
{
super(streamId, headerParser);
super(headerParser, listener);
this.streamId = streamId;
this.isLast = isLast;
this.decoder = decoder;
}
@ -47,11 +50,10 @@ public class HeadersBodyParser extends BodyParser
{
state = State.INIT;
length = 0;
frame = null;
}
@Override
public Frame parse(ByteBuffer buffer) throws ParseException
public boolean parse(ByteBuffer buffer)
{
while (buffer.hasRemaining())
{
@ -72,7 +74,7 @@ public class HeadersBodyParser extends BodyParser
length -= remaining;
ByteBuffer copy = BufferUtil.copy(buffer);
byteBuffers.add(copy);
return null;
return false;
}
else
{
@ -106,45 +108,54 @@ public class HeadersBodyParser extends BodyParser
}
}
}
return null;
return false;
}
private Frame decode(ByteBuffer encoded) throws ParseException
private boolean decode(ByteBuffer encoded)
{
try
{
// TODO: do a proper reset when the lambda is notified asynchronously.
if (decoder.decode(getStreamId(), encoded, (streamId, metaData) -> this.frame = onHeaders(metaData)))
{
Frame frame = this.frame;
reset();
return frame;
}
return null;
return decoder.decode(streamId, encoded, (streamId, metaData) -> onHeaders(metaData));
}
catch (QpackException.StreamException x)
{
if (LOG.isDebugEnabled())
LOG.debug("decode failure", x);
throw new ParseException(x.getErrorCode(), x.getMessage());
notifyStreamFailure(streamId, x.getErrorCode(), x.getMessage());
}
catch (QpackException.SessionException x)
{
if (LOG.isDebugEnabled())
LOG.debug("decode failure", x);
throw new ParseException(x.getErrorCode(), x.getMessage(), true);
notifySessionFailure(x.getErrorCode(), x.getMessage());
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("decode failure", x);
throw new ParseException(ErrorCode.INTERNAL_ERROR.code(), "internal_error", true, x);
notifySessionFailure(ErrorCode.INTERNAL_ERROR.code(), "internal_error");
}
return false;
}
private Frame onHeaders(MetaData metaData)
private void onHeaders(MetaData metaData)
{
return new HeadersFrame(metaData, false);
HeadersFrame frame = new HeadersFrame(metaData, isLast.getAsBoolean());
reset();
notifyHeaders(frame);
}
protected void notifyHeaders(HeadersFrame frame)
{
ParserListener listener = getParserListener();
try
{
listener.onHeaders(streamId, frame);
}
catch (Throwable x)
{
LOG.info("failure while notifying listener {}", listener, x);
}
}
private enum State

View File

@ -15,17 +15,15 @@ package org.eclipse.jetty.http3.internal.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http3.frames.Frame;
public class MaxPushIdBodyParser extends BodyParser
{
public MaxPushIdBodyParser(HeaderParser headerParser)
public MaxPushIdBodyParser(HeaderParser headerParser, ParserListener listener)
{
super(1, headerParser);
super(headerParser, listener);
}
@Override
public Frame parse(ByteBuffer buffer)
public boolean parse(ByteBuffer buffer)
{
throw new UnsupportedOperationException();
}

View File

@ -14,9 +14,9 @@
package org.eclipse.jetty.http3.internal.parser;
import java.nio.ByteBuffer;
import java.util.function.BooleanSupplier;
import org.eclipse.jetty.http3.ErrorCode;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.FrameType;
import org.eclipse.jetty.http3.qpack.QpackDecoder;
import org.slf4j.Logger;
@ -36,13 +36,13 @@ public class MessageParser
private final BodyParser unknownBodyParser;
private State state = State.HEADER;
public MessageParser(long streamId, QpackDecoder decoder)
public MessageParser(ParserListener listener, QpackDecoder decoder, long streamId, BooleanSupplier isLast)
{
this.headerParser = new HeaderParser();
this.bodyParsers[FrameType.DATA.type()] = new DataBodyParser(streamId, headerParser);
this.bodyParsers[FrameType.HEADERS.type()] = new HeadersBodyParser(streamId, headerParser, decoder);
this.bodyParsers[FrameType.PUSH_PROMISE.type()] = new PushPromiseBodyParser(headerParser);
this.unknownBodyParser = new UnknownBodyParser(headerParser);
this.bodyParsers[FrameType.DATA.type()] = new DataBodyParser(headerParser, listener, streamId, isLast);
this.bodyParsers[FrameType.HEADERS.type()] = new HeadersBodyParser(headerParser, listener, decoder, streamId, isLast);
this.bodyParsers[FrameType.PUSH_PROMISE.type()] = new PushPromiseBodyParser(headerParser, listener);
this.unknownBodyParser = new UnknownBodyParser(headerParser, listener);
}
private void reset()
@ -52,12 +52,11 @@ public class MessageParser
}
/**
* <p>Parses the given {@code buffer} bytes and returns parsed frames.</p>
* <p>Parses the given {@code buffer} bytes and emit events to a {@link ParserListener}.</p>
*
* @param buffer the buffer to parse
* @return a parsed frame, or null if not enough bytes were provided to parse a frame
*/
public Frame parse(ByteBuffer buffer) throws ParseException
public void parse(ByteBuffer buffer)
{
try
{
@ -72,7 +71,7 @@ public class MessageParser
state = State.BODY;
break;
}
return null;
return;
}
case BODY:
{
@ -86,27 +85,25 @@ public class MessageParser
// Unknown frame types must be ignored.
if (LOG.isDebugEnabled())
LOG.debug("Ignoring unknown frame type {}", Integer.toHexString(frameType));
Frame frame = unknownBodyParser.parse(buffer);
if (frame == null)
return null;
reset();
break;
if (!unknownBodyParser.parse(buffer))
return;
}
else
{
Frame frame;
if (headerParser.getFrameLength() == 0)
frame = bodyParser.emptyBody(buffer);
else
frame = bodyParser.parse(buffer);
if (frame != null)
{
if (LOG.isDebugEnabled())
LOG.debug("Parsed {} frame body from {}", FrameType.from(frameType), buffer);
reset();
bodyParser.emptyBody(buffer);
}
return frame;
else
{
if (!bodyParser.parse(buffer))
return;
}
if (LOG.isDebugEnabled())
LOG.debug("Parsed {} frame body from {}", FrameType.from(frameType), buffer);
}
reset();
break;
}
default:
{
@ -115,22 +112,20 @@ public class MessageParser
}
}
}
catch (ParseException x)
{
if (LOG.isDebugEnabled())
LOG.debug("parse failed", x);
buffer.clear();
throw x;
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("parse failed", x);
buffer.clear();
throw new ParseException(ErrorCode.INTERNAL_ERROR.code(), "parser_error", true, x);
connectionFailure(buffer, ErrorCode.INTERNAL_ERROR.code(), "parser_error");
}
}
private void connectionFailure(ByteBuffer buffer, int error, String reason)
{
unknownBodyParser.sessionFailure(buffer, error, reason);
}
private enum State
{
HEADER, BODY

View File

@ -1,34 +0,0 @@
package org.eclipse.jetty.http3.internal.parser;
public class ParseException extends Exception
{
private final int error;
private final boolean fatal;
public ParseException(int error, String message)
{
this(error, message, false);
}
public ParseException(int error, String message, boolean fatal)
{
this(error, message, fatal, null);
}
public ParseException(int error, String message, boolean fatal, Throwable cause)
{
super(message, cause);
this.error = error;
this.fatal = fatal;
}
public int getErrorCode()
{
return error;
}
public boolean isFatal()
{
return fatal;
}
}

View File

@ -15,17 +15,15 @@ package org.eclipse.jetty.http3.internal.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http3.frames.Frame;
public class PushPromiseBodyParser extends BodyParser
{
public PushPromiseBodyParser(HeaderParser headerParser)
public PushPromiseBodyParser(HeaderParser headerParser, ParserListener listener)
{
super(1, headerParser);
super(headerParser, listener);
}
@Override
public Frame parse(ByteBuffer buffer)
public boolean parse(ByteBuffer buffer)
{
throw new UnsupportedOperationException();
}

View File

@ -18,7 +18,6 @@ import java.util.LinkedHashMap;
import java.util.Map;
import org.eclipse.jetty.http3.ErrorCode;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.VarLenInt;
@ -30,9 +29,9 @@ public class SettingsBodyParser extends BodyParser
private long key;
private Map<Long, Long> settings;
public SettingsBodyParser(HeaderParser headerParser)
public SettingsBodyParser(HeaderParser headerParser, ParserListener listener)
{
super(1, headerParser);
super(headerParser, listener);
}
private void reset()
@ -45,13 +44,13 @@ public class SettingsBodyParser extends BodyParser
}
@Override
protected Frame emptyBody(ByteBuffer buffer)
protected void emptyBody(ByteBuffer buffer)
{
return onSettings(Map.of());
onSettings(Map.of());
}
@Override
public Frame parse(ByteBuffer buffer) throws ParseException
public boolean parse(ByteBuffer buffer)
{
while (buffer.hasRemaining())
{
@ -73,16 +72,27 @@ public class SettingsBodyParser extends BodyParser
}))
{
if (settings.containsKey(key))
throw new ParseException(ErrorCode.SETTINGS_ERROR.code(), "settings_duplicate");
{
sessionFailure(buffer, ErrorCode.SETTINGS_ERROR.code(), "settings_duplicate");
return true;
}
if (SettingsFrame.isReserved(key))
throw new ParseException(ErrorCode.SETTINGS_ERROR.code(), "settings_reserved");
{
sessionFailure(buffer, ErrorCode.SETTINGS_ERROR.code(), "settings_reserved");
return true;
}
if (length > 0)
{
state = State.VALUE;
}
else
throw new ParseException(ErrorCode.FRAME_ERROR.code(), "settings_invalid_format");
{
sessionFailure(buffer, ErrorCode.FRAME_ERROR.code(), "settings_invalid_format");
return true;
}
break;
}
return null;
return false;
}
case VALUE:
{
@ -101,15 +111,17 @@ public class SettingsBodyParser extends BodyParser
{
Map<Long, Long> settings = this.settings;
reset();
return onSettings(settings);
onSettings(settings);
return true;
}
else
{
throw new ParseException(ErrorCode.FRAME_ERROR.code(), "settings_invalid_format");
sessionFailure(buffer, ErrorCode.FRAME_ERROR.code(), "settings_invalid_format");
return true;
}
break;
}
return null;
return false;
}
default:
{
@ -117,12 +129,13 @@ public class SettingsBodyParser extends BodyParser
}
}
}
return null;
return false;
}
private SettingsFrame onSettings(Map<Long, Long> settings)
private void onSettings(Map<Long, Long> settings)
{
return new SettingsFrame(settings);
SettingsFrame frame = new SettingsFrame(settings);
notifySettings(frame);
}
private enum State

View File

@ -15,17 +15,15 @@ package org.eclipse.jetty.http3.internal.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http3.frames.Frame;
public class UnknownBodyParser extends BodyParser
{
public UnknownBodyParser(HeaderParser headerParser)
public UnknownBodyParser(HeaderParser headerParser, ParserListener listener)
{
super(1, headerParser);
super(headerParser, listener);
}
@Override
public Frame parse(ByteBuffer buffer)
public boolean parse(ByteBuffer buffer)
{
throw new UnsupportedOperationException();
}

View File

@ -19,9 +19,9 @@ import java.util.List;
import java.util.Random;
import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.internal.generator.MessageGenerator;
import org.eclipse.jetty.http3.internal.parser.MessageParser;
import org.eclipse.jetty.http3.internal.parser.ParserListener;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.NullByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
@ -29,24 +29,25 @@ import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
public class DataGenerateParseTest
{
@Test
public void testGenerateParseEmpty() throws Exception
public void testGenerateParseEmpty()
{
testGenerateParse(BufferUtil.EMPTY_BUFFER);
}
@Test
public void testGenerateParse() throws Exception
public void testGenerateParse()
{
byte[] bytes = new byte[1024];
new Random().nextBytes(bytes);
testGenerateParse(ByteBuffer.wrap(bytes));
}
private void testGenerateParse(ByteBuffer byteBuffer) throws Exception
private void testGenerateParse(ByteBuffer byteBuffer)
{
byte[] inputBytes = new byte[byteBuffer.remaining()];
byteBuffer.get(inputBytes);
@ -55,20 +56,23 @@ public class DataGenerateParseTest
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(new NullByteBufferPool());
new MessageGenerator(null, 8192, true).generate(lease, 0, input);
List<Frame> frames = new ArrayList<>();
MessageParser parser = new MessageParser(0, null);
List<DataFrame> frames = new ArrayList<>();
MessageParser parser = new MessageParser(new ParserListener()
{
@Override
public void onData(long streamId, DataFrame frame)
{
frames.add(frame);
}
}, null, 13, () -> true);
for (ByteBuffer buffer : lease.getByteBuffers())
{
while (buffer.hasRemaining())
{
Frame frame = parser.parse(buffer);
if (frame != null)
frames.add(frame);
}
parser.parse(buffer);
assertFalse(buffer.hasRemaining());
}
assertEquals(1, frames.size());
DataFrame output = (DataFrame)frames.get(0);
DataFrame output = frames.get(0);
byte[] outputBytes = new byte[output.getData().remaining()];
output.getData().get(outputBytes);
assertArrayEquals(inputBytes, outputBytes);

View File

@ -22,10 +22,10 @@ import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.internal.generator.MessageGenerator;
import org.eclipse.jetty.http3.internal.parser.MessageParser;
import org.eclipse.jetty.http3.internal.parser.ParserListener;
import org.eclipse.jetty.http3.qpack.QpackDecoder;
import org.eclipse.jetty.http3.qpack.QpackEncoder;
import org.eclipse.jetty.io.ByteBufferPool;
@ -33,11 +33,12 @@ import org.eclipse.jetty.io.NullByteBufferPool;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
public class HeadersGenerateParseTest
{
@Test
public void testGenerateParse() throws Exception
public void testGenerateParse()
{
HttpURI uri = HttpURI.from("http://host:1234/path?a=b");
HttpFields fields = HttpFields.build()
@ -50,20 +51,23 @@ public class HeadersGenerateParseTest
new MessageGenerator(encoder, 8192, true).generate(lease, 0, input);
QpackDecoder decoder = new QpackDecoder(instructions -> {}, 8192);
List<Frame> frames = new ArrayList<>();
MessageParser parser = new MessageParser(0, decoder);
List<HeadersFrame> frames = new ArrayList<>();
MessageParser parser = new MessageParser(new ParserListener()
{
@Override
public void onHeaders(long streamId, HeadersFrame frame)
{
frames.add(frame);
}
}, decoder, 13, () -> true);
for (ByteBuffer buffer : lease.getByteBuffers())
{
while (buffer.hasRemaining())
{
Frame frame = parser.parse(buffer);
if (frame != null)
frames.add(frame);
}
parser.parse(buffer);
assertFalse(buffer.hasRemaining());
}
assertEquals(1, frames.size());
HeadersFrame output = (HeadersFrame)frames.get(0);
HeadersFrame output = frames.get(0);
MetaData.Request inputMetaData = (MetaData.Request)input.getMetaData();
MetaData.Request outputMetaData = (MetaData.Request)output.getMetaData();

View File

@ -18,51 +18,55 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.generator.ControlGenerator;
import org.eclipse.jetty.http3.internal.parser.ControlParser;
import org.eclipse.jetty.http3.internal.parser.ParserListener;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.NullByteBufferPool;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
public class SettingsGenerateParseTest
{
@Test
public void testGenerateParseEmpty() throws Exception
public void testGenerateParseEmpty()
{
testGenerateParse(Map.of());
}
@Test
public void testGenerateParse() throws Exception
public void testGenerateParse()
{
testGenerateParse(Map.of(13L, 7L, 31L, 29L));
}
private void testGenerateParse(Map<Long, Long> settings) throws Exception
private void testGenerateParse(Map<Long, Long> settings)
{
SettingsFrame input = new SettingsFrame(settings);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(new NullByteBufferPool());
new ControlGenerator().generate(lease, 0, input);
List<Frame> frames = new ArrayList<>();
ControlParser parser = new ControlParser();
List<SettingsFrame> frames = new ArrayList<>();
ControlParser parser = new ControlParser(new ParserListener()
{
@Override
public void onSettings(SettingsFrame frame)
{
frames.add(frame);
}
});
for (ByteBuffer buffer : lease.getByteBuffers())
{
while (buffer.hasRemaining())
{
Frame frame = parser.parse(buffer);
if (frame != null)
frames.add(frame);
}
parser.parse(buffer);
assertFalse(buffer.hasRemaining());
}
assertEquals(1, frames.size());
SettingsFrame output = (SettingsFrame)frames.get(0);
SettingsFrame output = frames.get(0);
assertEquals(input.getSettings(), output.getSettings());
}

View File

@ -130,7 +130,8 @@ public class EncodedFieldSection
private EncodedField parseNameReference(ByteBuffer buffer) throws EncodingException
{
LOG.info("parseLiteralFieldLineWithNameReference: " + BufferUtil.toDetailString(buffer));
if (LOG.isDebugEnabled())
LOG.debug("parseLiteralFieldLineWithNameReference: " + BufferUtil.toDetailString(buffer));
byte firstByte = buffer.get(buffer.position());
boolean allowEncoding = (firstByte & 0x20) != 0;

View File

@ -97,8 +97,8 @@ public abstract class AbstractHTTP3ServerConnectionFactory extends AbstractConne
QuicStreamEndPoint streamEndPoint = (QuicStreamEndPoint)endPoint;
long streamId = streamEndPoint.getStreamId();
ServerHTTP3Session http3Session = (ServerHTTP3Session)streamEndPoint.getQuicSession().getProtocolSession();
MessageParser parser = new MessageParser(streamId, http3Session.getQpackDecoder());
HTTP3Connection connection = new HTTP3Connection(streamEndPoint, connector.getExecutor(), connector.getByteBufferPool(), parser, http3Session.getSessionServer());
MessageParser parser = new MessageParser(http3Session.getSessionServer(), http3Session.getQpackDecoder(), streamId, streamEndPoint::isStreamFinished);
HTTP3Connection connection = new HTTP3Connection(streamEndPoint, connector.getExecutor(), connector.getByteBufferPool(), parser);
return connection;
}
}

View File

@ -15,10 +15,10 @@ module org.eclipse.jetty.quic.client
{
exports org.eclipse.jetty.quic.client;
requires org.eclipse.jetty.quic.common;
requires transitive org.eclipse.jetty.quic.common;
requires org.eclipse.jetty.quic.quiche;
requires org.eclipse.jetty.client;
requires org.eclipse.jetty.io;
requires org.eclipse.jetty.util;
requires transitive org.eclipse.jetty.io;
requires transitive org.eclipse.jetty.util;
requires org.slf4j;
}

View File

@ -68,6 +68,11 @@ public class QuicStreamEndPoint extends AbstractEndPoint
return session.getRemoteAddress();
}
public boolean isStreamFinished()
{
return session.isFinished(streamId);
}
@Override
protected void doShutdownInput()
{

View File

@ -18,7 +18,7 @@ module org.eclipse.jetty.quic.server
requires transitive org.eclipse.jetty.quic.common;
requires org.eclipse.jetty.quic.quiche;
requires org.eclipse.jetty.io;
requires org.eclipse.jetty.server;
requires transitive org.eclipse.jetty.server;
requires org.eclipse.jetty.util;
requires org.slf4j;
}