Issue #6728 - QUIC and HTTP/3

- Implemented stream idle timeouts.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-10-01 18:47:00 +02:00
parent 2b966c04ee
commit c8107539df
26 changed files with 702 additions and 179 deletions

View File

@ -46,6 +46,7 @@ public class HTTP3Client extends ContainerLifeCycle
private final ClientConnector connector;
private List<String> protocols = List.of("h3");
private long streamIdleTimeout = 30000;
public HTTP3Client()
{
@ -64,6 +65,17 @@ public class HTTP3Client extends ContainerLifeCycle
this.protocols = protocols;
}
@ManagedAttribute("The stream idle timeout in milliseconds")
public long getStreamIdleTimeout()
{
return streamIdleTimeout;
}
public void setStreamIdleTimeout(long streamIdleTimeout)
{
this.streamIdleTimeout = streamIdleTimeout;
}
public CompletableFuture<Session.Client> connect(SocketAddress address, Session.Client.Listener listener)
{
Map<String, Object> context = new ConcurrentHashMap<>();

View File

@ -60,13 +60,15 @@ public class HTTP3ClientConnectionFactory implements ClientConnectionFactory, Pr
@Override
public ProtocolSession newProtocolSession(QuicSession quicSession, Map<String, Object> context)
{
HTTP3Client client = (HTTP3Client)context.get(HTTP3Client.CLIENT_CONTEXT_KEY);
Session.Client.Listener listener = (Session.Client.Listener)context.get(HTTP3Client.SESSION_LISTENER_CONTEXT_KEY);
@SuppressWarnings("unchecked")
Promise<Session.Client> promise = (Promise<Session.Client>)context.get(HTTP3Client.SESSION_PROMISE_CONTEXT_KEY);
ClientHTTP3Session protocolSession = new ClientHTTP3Session((ClientQuicSession)quicSession, listener, promise, getMaxBlockedStreams(), getMaxResponseHeadersSize());
ClientHTTP3Session session = new ClientHTTP3Session((ClientQuicSession)quicSession, listener, promise, getMaxBlockedStreams(), getMaxResponseHeadersSize());
session.setStreamIdleTimeout(client.getStreamIdleTimeout());
if (LOG.isDebugEnabled())
LOG.debug("created protocol-specific {}", protocolSession);
return protocolSession;
LOG.debug("created protocol-specific {}", session);
return session;
}
@Override

View File

@ -90,6 +90,16 @@ public class ClientHTTP3Session extends ClientProtocolSession
return applicationSession;
}
public long getStreamIdleTimeout()
{
return applicationSession.getStreamIdleTimeout();
}
public void setStreamIdleTimeout(long streamIdleTimeout)
{
applicationSession.setStreamIdleTimeout(streamIdleTimeout);
}
@Override
public void onOpen()
{

View File

@ -20,6 +20,7 @@ import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.internal.ErrorCode;
import org.eclipse.jetty.http3.internal.HTTP3Session;
import org.eclipse.jetty.http3.internal.HTTP3Stream;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
@ -77,14 +78,20 @@ public class HTTP3SessionClient extends HTTP3Session implements Session.Client
{
ClientHTTP3Session session = getProtocolSession();
long streamId = session.getQuicSession().newStreamId(StreamType.CLIENT_BIDIRECTIONAL);
QuicStreamEndPoint streamEndPoint = session.getOrCreateStreamEndPoint(streamId, session::configureProtocolEndPoint);
QuicStreamEndPoint endPoint = session.getOrCreateStreamEndPoint(streamId, session::configureProtocolEndPoint);
if (LOG.isDebugEnabled())
LOG.debug("created request/response stream #{} on {}", streamId, streamEndPoint);
LOG.debug("created request/response stream #{} on {}", streamId, endPoint);
Promise.Completable<Stream> promise = new Promise.Completable<>();
HTTP3Stream stream = createStream(streamEndPoint);
HTTP3Stream stream = createStream(endPoint);
stream.setListener(listener);
Callback callback = Callback.from(Invocable.InvocationType.NON_BLOCKING, () -> promise.succeeded(stream), promise::failed);
Callback callback = Callback.from(Invocable.InvocationType.NON_BLOCKING, () ->
{
if (listener == null)
endPoint.shutdownInput(ErrorCode.NO_ERROR.code());
promise.succeeded(stream);
}, promise::failed);
session.writeFrame(streamId, frame, callback);
return promise;

View File

@ -162,12 +162,21 @@ public interface Session
* // Send a response.
* var response = new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY);
* stream.respond(new HeadersFrame(response, true));
* if (!frame.isLast())
* stream.demand();
* return null;
* }
* }
* </pre>
* <p>To read request content, applications should call
* {@link Stream#demand()} and return a {@link Stream.Listener} that overrides
* {@link Stream.Listener#onDataAvailable(Stream)}.</p>
* <p>If there is request content (indicated by the fact that the HEADERS frame
* is not the last in the stream), then applications either:</p>
* <ul>
* <li>return {@code null} to indicate that they are not interested in
* reading the content</li>
* <li><em>must</em> call {@link Stream#demand()} and return a {@link Stream.Listener}
* that overrides {@link Stream.Listener#onDataAvailable(Stream)} that reads
* and consumes the content.</li>
* </ul>
*
* @param stream the stream associated with the request
* @param frame the HEADERS frame containing the request headers
@ -186,7 +195,7 @@ public interface Session
* @param error the error code
* @param reason the error reason
*/
public default void onSessionFailure(Session session, int error, String reason)
public default void onSessionFailure(Session session, long error, String reason)
{
}
}

View File

@ -217,6 +217,32 @@ public interface Stream
public default void onTrailer(Stream stream, HeadersFrame frame)
{
}
/**
* <p>Callback method invoked when the stream idle timeout elapses.</p>
*
* @param stream the stream
* @param failure the timeout failure
* @return true to reset the stream, false to ignore the idle timeout
*/
public default boolean onIdleTimeout(Stream stream, Throwable failure)
{
return true;
}
/**
* <p>Callback method invoked when a stream failure occurred.</p>
* <p>Typical stream failures, among others, are failures to
* decode a HEADERS frame, or failures to read bytes because
* the stream has been reset.</p>
*
* @param error the error code
* @param failure a short description of the failure,
* or {@code null} if no short description is available
*/
public default void onFailure(long error, Throwable failure)
{
}
}
/**

View File

@ -15,6 +15,12 @@ package org.eclipse.jetty.http3.frames;
public abstract class Frame
{
public static boolean isLast(Frame frame)
{
return frame instanceof HeadersFrame && ((HeadersFrame)frame).isLast() ||
frame instanceof DataFrame && ((DataFrame)frame).isLast();
}
private final FrameType type;
public Frame(FrameType type)

View File

@ -35,9 +35,9 @@ public enum ErrorCode
HTTP_CONNECT_ERROR(0x10F),
VERSION_FALLBACK_ERROR(0x110);
private final int code;
private final long code;
ErrorCode(int code)
ErrorCode(long code)
{
this.code = code;
}
@ -50,7 +50,7 @@ public enum ErrorCode
return 0x1F * n + 0x21;
}
public int code()
public long code()
{
return code;
}

View File

@ -18,9 +18,7 @@ import java.util.ArrayDeque;
import java.util.List;
import java.util.Queue;
import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.internal.generator.MessageGenerator;
import org.eclipse.jetty.http3.qpack.QpackEncoder;
import org.eclipse.jetty.io.ByteBufferPool;
@ -69,16 +67,21 @@ public class HTTP3Flusher extends IteratingCallback
LOG.debug("flushing {} on {}", entry, this);
Frame frame = entry.frame;
if (frame instanceof FlushFrame)
{
succeeded();
return Action.SCHEDULED;
}
generator.generate(lease, entry.endPoint.getStreamId(), frame);
boolean last = frame instanceof HeadersFrame && ((HeadersFrame)frame).isLast() ||
frame instanceof DataFrame && ((DataFrame)frame).isLast();
QuicStreamEndPoint endPoint = entry.endPoint;
List<ByteBuffer> buffers = lease.getByteBuffers();
if (LOG.isDebugEnabled())
LOG.debug("writing {} buffers ({} bytes) for stream #{} on {}", buffers.size(), lease.getTotalLength(), endPoint.getStreamId(), this);
endPoint.write(this, buffers, last);
endPoint.write(this, buffers, Frame.isLast(frame));
return Action.SCHEDULED;
}
@ -86,7 +89,7 @@ public class HTTP3Flusher extends IteratingCallback
public void succeeded()
{
if (LOG.isDebugEnabled())
LOG.debug("succeeded to write {} on {}", entry, this);
LOG.debug("succeeded to flush {} on {}", entry, this);
lease.recycle();
entry.callback.succeeded();
entry = null;
@ -97,7 +100,7 @@ public class HTTP3Flusher extends IteratingCallback
protected void onCompleteFailure(Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("failed to write {} on {}", entry, this, failure);
LOG.debug("failed to flush {} on {}", entry, this, failure);
// TODO
}
@ -126,4 +129,12 @@ public class HTTP3Flusher extends IteratingCallback
return String.format("%s#%d", frame, endPoint.getStreamId());
}
}
public static class FlushFrame extends Frame
{
public FlushFrame()
{
super(null);
}
}
}

View File

@ -14,20 +14,24 @@
package org.eclipse.jetty.http3.internal;
import java.net.SocketAddress;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.parser.ParserListener;
import org.eclipse.jetty.io.CyclicTimeouts;
import org.eclipse.jetty.quic.common.ProtocolSession;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.quic.common.StreamType;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -38,12 +42,15 @@ public abstract class HTTP3Session implements Session, ParserListener
private final Map<Long, HTTP3Stream> streams = new ConcurrentHashMap<>();
private final ProtocolSession session;
private final Listener listener;
private final StreamTimeouts streamTimeouts;
private CloseState closeState = CloseState.CLOSED;
private long streamIdleTimeout;
public HTTP3Session(ProtocolSession session, Listener listener)
{
this.session = session;
this.listener = listener;
this.streamTimeouts = new StreamTimeouts(session.getQuicSession().getScheduler());
}
public ProtocolSession getProtocolSession()
@ -79,15 +86,30 @@ public abstract class HTTP3Session implements Session, ParserListener
return closeState != CloseState.NOT_CLOSED;
}
public void close(int error, String reason)
public void close(long error, String reason)
{
getProtocolSession().close(error, reason);
}
public long getStreamIdleTimeout()
{
return streamIdleTimeout;
}
public void setStreamIdleTimeout(long streamIdleTimeout)
{
this.streamIdleTimeout = streamIdleTimeout;
}
void scheduleIdleTimeout(HTTP3Stream stream)
{
streamTimeouts.schedule(stream);
}
protected HTTP3Stream createStream(QuicStreamEndPoint endPoint)
{
long streamId = endPoint.getStreamId();
HTTP3Stream stream = new HTTP3Stream(this, endPoint);
HTTP3Stream stream = newHTTP3Stream(endPoint);
if (streams.put(streamId, stream) != null)
throw new IllegalStateException("duplicate stream id " + streamId);
return stream;
@ -95,7 +117,20 @@ public abstract class HTTP3Session implements Session, ParserListener
protected HTTP3Stream getOrCreateStream(QuicStreamEndPoint endPoint)
{
return streams.computeIfAbsent(endPoint.getStreamId(), id -> new HTTP3Stream(this, endPoint));
return streams.computeIfAbsent(endPoint.getStreamId(), id -> newHTTP3Stream(endPoint));
}
private HTTP3Stream newHTTP3Stream(QuicStreamEndPoint endPoint)
{
HTTP3Stream stream = new HTTP3Stream(this, endPoint);
// Unidirectional streams must not idle timeout.
if (StreamType.isBidirectional(stream.getId()))
{
long idleTimeout = getStreamIdleTimeout();
if (idleTimeout > 0)
stream.setIdleTimeout(idleTimeout);
}
return stream;
}
protected HTTP3Stream getStream(long streamId)
@ -181,30 +216,32 @@ public abstract class HTTP3Session implements Session, ParserListener
if (LOG.isDebugEnabled())
LOG.debug("notifying data available for stream #{} on {}", streamId, this);
HTTP3Stream stream = getStream(streamId);
notifyDataAvailable(stream);
stream.processDataAvailable();
}
private void notifyDataAvailable(HTTP3Stream stream)
{
Stream.Listener listener = stream.getListener();
try
{
if (listener != null)
listener.onDataAvailable(stream);
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
}
}
void closeAndNotifyFailure(int error, String reason)
void closeAndNotifyFailure(long error, String reason)
{
close(error, reason);
notifySessionFailure(error, reason);
}
public void notifySessionFailure(int error, String reason)
@Override
public void onStreamFailure(long streamId, long error, Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("stream failure {}/{} for stream #{} on {}", error, failure, streamId, this, failure);
HTTP3Stream stream = getStream(streamId);
if (stream != null)
stream.processFailure(error, failure);
}
@Override
public void onSessionFailure(long error, String reason)
{
// TODO
}
public void notifySessionFailure(long error, String reason)
{
try
{
@ -226,4 +263,27 @@ public abstract class HTTP3Session implements Session, ParserListener
{
NOT_CLOSED, CLOSED
}
private class StreamTimeouts extends CyclicTimeouts<HTTP3Stream>
{
private StreamTimeouts(Scheduler scheduler)
{
super(scheduler);
}
@Override
protected Iterator<HTTP3Stream> iterator()
{
return streams.values().stream()
.filter(stream -> stream.getIdleTimeout() > 0)
.iterator();
}
@Override
protected boolean onExpired(HTTP3Stream stream)
{
stream.processIdleTimeout(new TimeoutException("idle timeout " + stream.getIdleTimeout() + " ms elapsed"));
return false;
}
}
}

View File

@ -15,12 +15,15 @@ package org.eclipse.jetty.http3.internal;
import java.util.EnumSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.io.CyclicTimeouts;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
@ -28,7 +31,7 @@ import org.eclipse.jetty.util.thread.Invocable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HTTP3Stream implements Stream
public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable
{
private static final Logger LOG = LoggerFactory.getLogger(HTTP3Stream.class);
@ -36,6 +39,8 @@ public class HTTP3Stream implements Stream
private final QuicStreamEndPoint endPoint;
private Listener listener;
private FrameState frameState = FrameState.INITIAL;
private long idleTimeout;
private long expireNanoTime;
public HTTP3Stream(HTTP3Session session, QuicStreamEndPoint endPoint)
{
@ -65,6 +70,41 @@ public class HTTP3Stream implements Stream
this.listener = listener;
}
public long getIdleTimeout()
{
return idleTimeout;
}
public void setIdleTimeout(long idleTimeout)
{
this.idleTimeout = idleTimeout;
notIdle();
session.scheduleIdleTimeout(this);
if (LOG.isDebugEnabled())
LOG.debug("set idle timeout {} ms for {}", idleTimeout, this);
}
@Override
public long getExpireNanoTime()
{
return expireNanoTime;
}
private void notIdle()
{
long idleTimeout = getIdleTimeout();
if (idleTimeout > 0)
expireNanoTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(idleTimeout);
}
void processIdleTimeout(TimeoutException timeout)
{
if (LOG.isDebugEnabled())
LOG.debug("idle timeout {} ms expired on {}", getIdleTimeout(), this);
if (notifyIdleTimeout(timeout))
endPoint.close(ErrorCode.REQUEST_CANCELLED_ERROR.code(), timeout);
}
@Override
public CompletableFuture<Stream> respond(HeadersFrame frame)
{
@ -109,12 +149,18 @@ public class HTTP3Stream implements Stream
{
if (validateAndUpdate(EnumSet.of(FrameState.INITIAL), FrameState.HEADER))
{
Stream.Listener streamListener = notifyRequest(frame);
setListener(streamListener);
notIdle();
Listener listener = notifyRequest(frame);
setListener(listener);
if (listener == null)
{
Callback callback = Callback.from(Invocable.InvocationType.NON_BLOCKING, () -> endPoint.shutdownInput(ErrorCode.NO_ERROR.code()));
session.writeFrame(getId(), new HTTP3Flusher.FlushFrame(), callback);
}
}
}
private Stream.Listener notifyRequest(HeadersFrame frame)
private Listener notifyRequest(HeadersFrame frame)
{
Session.Listener listener = session.getListener();
try
@ -131,8 +177,11 @@ public class HTTP3Stream implements Stream
public void processResponse(HeadersFrame frame)
{
if (validateAndUpdate(EnumSet.of(FrameState.INITIAL), FrameState.HEADER))
{
notIdle();
notifyResponse(frame);
}
}
private void notifyResponse(HeadersFrame frame)
{
@ -150,22 +199,80 @@ public class HTTP3Stream implements Stream
public void processData(DataFrame frame)
{
validateAndUpdate(EnumSet.of(FrameState.HEADER, FrameState.DATA), FrameState.DATA);
if (validateAndUpdate(EnumSet.of(FrameState.HEADER, FrameState.DATA), FrameState.DATA))
notIdle();
}
public void processDataAvailable()
{
notifyDataAvailable();
}
private void notifyDataAvailable()
{
Stream.Listener listener = getListener();
try
{
if (listener != null)
listener.onDataAvailable(this);
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
}
}
public void processTrailer(HeadersFrame frame)
{
if (validateAndUpdate(EnumSet.of(FrameState.HEADER, FrameState.DATA), FrameState.TRAILER))
notifyTrailer(this, frame);
{
notIdle();
notifyTrailer(frame);
}
}
private void notifyTrailer(HTTP3Stream stream, HeadersFrame frame)
private void notifyTrailer(HeadersFrame frame)
{
Stream.Listener listener = stream.getListener();
Listener listener = getListener();
try
{
if (listener != null)
listener.onTrailer(stream, frame);
listener.onTrailer(this, frame);
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
}
}
private boolean notifyIdleTimeout(TimeoutException timeout)
{
Listener listener = getListener();
try
{
if (listener != null)
return listener.onIdleTimeout(this, timeout);
return true;
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
return true;
}
}
public void processFailure(long error, Throwable failure)
{
notifyFailure(error, failure);
}
private void notifyFailure(long error, Throwable failure)
{
Listener listener = getListener();
try
{
if (listener != null)
listener.onFailure(error, failure);
}
catch (Throwable x)
{
@ -194,6 +301,7 @@ public class HTTP3Stream implements Stream
private Promise.Completable<Stream> writeFrame(Frame frame)
{
notIdle();
Promise.Completable<Stream> completable = new Promise.Completable<>();
session.writeFrame(endPoint.getStreamId(), frame, Callback.from(Invocable.InvocationType.NON_BLOCKING, () -> completable.succeeded(this), completable::failed));
return completable;
@ -202,7 +310,13 @@ public class HTTP3Stream implements Stream
@Override
public String toString()
{
return String.format("%s@%x#%d", getClass().getSimpleName(), hashCode(), getId());
return String.format("%s@%x#%d[demand=%b,idle=%d]",
getClass().getSimpleName(),
hashCode(),
getId(),
hasDemand(),
TimeUnit.NANOSECONDS.toMillis(expireNanoTime - System.nanoTime())
);
}
private enum FrameState

View File

@ -13,6 +13,8 @@
package org.eclipse.jetty.http3.internal;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Queue;
@ -82,6 +84,13 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
fillInterested();
}
@Override
protected boolean onReadTimeout(Throwable timeout)
{
// Idle timeouts are handled by HTTP3Stream.
return false;
}
@Override
public void onFillable()
{
@ -106,6 +115,8 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
}
private void processNonDataFrames()
{
try
{
while (true)
{
@ -136,10 +147,20 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
}
}
}
catch (Throwable x)
{
long error = ErrorCode.REQUEST_CANCELLED_ERROR.code();
getEndPoint().close(error, x);
// Notify the application that a failure happened.
parser.getListener().onStreamFailure(getEndPoint().getStreamId(), error, x);
}
}
protected abstract void onDataAvailable(long streamId);
public Stream.Data readData()
{
try
{
if (LOG.isDebugEnabled())
LOG.debug("reading data on {}", this);
@ -180,6 +201,13 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
}
}
}
catch (Throwable x)
{
getEndPoint().close(ErrorCode.REQUEST_CANCELLED_ERROR.code(), x);
// Rethrow so the application has a chance to handle it.
throw x;
}
}
public void demand()
{
@ -273,7 +301,7 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
byteBuffer = buffer.getBuffer();
}
int filled = getEndPoint().fill(byteBuffer);
int filled = fill(byteBuffer);
if (LOG.isDebugEnabled())
LOG.debug("filled {} on {} with buffer {}", filled, this, buffer);
@ -307,12 +335,23 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("could not process control stream {}", getEndPoint(), x);
LOG.debug("parse+fill failure on {}", this, x);
if (buffer != null)
buffer.release();
buffer = null;
getEndPoint().close(x);
return MessageParser.Result.NO_FRAME;
throw x;
}
}
private int fill(ByteBuffer byteBuffer)
{
try
{
return getEndPoint().fill(byteBuffer);
}
catch (IOException x)
{
throw new UncheckedIOException(x);
}
}

View File

@ -164,14 +164,14 @@ public class UnidirectionalStreamConnection extends AbstractConnection implement
if (StreamType.isReserved(streamType))
{
if (LOG.isDebugEnabled())
LOG.debug("reserved stream type {}, resetting on {}", Long.toHexString(streamType), this);
getEndPoint().reset(ErrorCode.randomReservedCode());
LOG.debug("reserved stream type {}, closing {}", Long.toHexString(streamType), this);
getEndPoint().close(ErrorCode.randomReservedCode(), null);
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("unsupported stream type {}, resetting on {}", Long.toHexString(streamType), this);
getEndPoint().reset(ErrorCode.STREAM_CREATION_ERROR.code());
LOG.debug("unsupported stream type {}, closing {}", Long.toHexString(streamType), this);
getEndPoint().close(ErrorCode.STREAM_CREATION_ERROR.code(), null);
}
}
}

View File

@ -67,13 +67,13 @@ public abstract class BodyParser
sessionFailure(buffer, ErrorCode.PROTOCOL_ERROR.code(), "invalid_frame");
}
protected void sessionFailure(ByteBuffer buffer, int error, String reason)
protected void sessionFailure(ByteBuffer buffer, long error, String reason)
{
BufferUtil.clear(buffer);
notifySessionFailure(error, reason);
}
protected void notifySessionFailure(int error, String reason)
protected void notifySessionFailure(long error, String reason)
{
try
{
@ -85,11 +85,11 @@ public abstract class BodyParser
}
}
protected void notifyStreamFailure(long streamId, int error, String reason)
protected void notifyStreamFailure(long streamId, int error, Throwable failure)
{
try
{
listener.onStreamFailure(streamId, error, reason);
listener.onStreamFailure(streamId, error, failure);
}
catch (Throwable x)
{

View File

@ -136,7 +136,7 @@ public class ControlParser
}
}
private void sessionFailure(ByteBuffer buffer, int error, String reason)
private void sessionFailure(ByteBuffer buffer, long error, String reason)
{
unknownBodyParser.sessionFailure(buffer, error, reason);
}

View File

@ -125,7 +125,7 @@ public class HeadersBodyParser extends BodyParser
{
if (LOG.isDebugEnabled())
LOG.debug("decode failure", x);
notifyStreamFailure(streamId, x.getErrorCode(), x.getMessage());
notifyStreamFailure(streamId, x.getErrorCode(), x);
}
catch (QpackException.SessionException x)
{

View File

@ -66,6 +66,11 @@ public class MessageParser
state = State.HEADER;
}
public ParserListener getListener()
{
return listener;
}
public void setDataMode(boolean enable)
{
this.dataMode = enable;
@ -167,7 +172,7 @@ public class MessageParser
}
}
private void sessionFailure(ByteBuffer buffer, int error, String reason)
private void sessionFailure(ByteBuffer buffer, long error, String reason)
{
unknownBodyParser.sessionFailure(buffer, error, reason);
}

View File

@ -31,11 +31,11 @@ public interface ParserListener
{
}
public default void onStreamFailure(long streamId, int error, String reason)
public default void onStreamFailure(long streamId, long error, Throwable failure)
{
}
public default void onSessionFailure(int error, String reason)
public default void onSessionFailure(long error, String reason)
{
}
@ -67,13 +67,13 @@ public interface ParserListener
}
@Override
public void onStreamFailure(long streamId, int error, String reason)
public void onStreamFailure(long streamId, long error, Throwable failure)
{
listener.onStreamFailure(streamId, error, reason);
listener.onStreamFailure(streamId, error, failure);
}
@Override
public void onSessionFailure(int error, String reason)
public void onSessionFailure(long error, String reason)
{
listener.onSessionFailure(error, reason);
}

View File

@ -37,7 +37,8 @@ public abstract class AbstractHTTP3ServerConnectionFactory extends AbstractConne
private final Session.Server.Listener listener;
private boolean useInputDirectByteBuffers = true;
private boolean useOutputDirectByteBuffers = true;
private int maxBlockedStreams;
private int maxBlockedStreams = 0;
private long streamIdleTimeout = 30000;
public AbstractHTTP3ServerConnectionFactory(HttpConfiguration httpConfiguration, Session.Server.Listener listener)
{
@ -74,6 +75,7 @@ public abstract class AbstractHTTP3ServerConnectionFactory extends AbstractConne
return httpConfiguration;
}
@ManagedAttribute("The max number of streams blocked in QPACK encoding")
public int getMaxBlockedStreams()
{
return maxBlockedStreams;
@ -84,10 +86,23 @@ public abstract class AbstractHTTP3ServerConnectionFactory extends AbstractConne
this.maxBlockedStreams = maxBlockedStreams;
}
@ManagedAttribute("The stream idle timeout in milliseconds")
public long getStreamIdleTimeout()
{
return streamIdleTimeout;
}
public void setStreamIdleTimeout(long streamIdleTimeout)
{
this.streamIdleTimeout = streamIdleTimeout;
}
@Override
public ProtocolSession newProtocolSession(QuicSession quicSession, Map<String, Object> context)
{
return new ServerHTTP3Session((ServerQuicSession)quicSession, listener, getMaxBlockedStreams(), getHttpConfiguration().getRequestHeaderSize());
ServerHTTP3Session session = new ServerHTTP3Session((ServerQuicSession)quicSession, listener, getMaxBlockedStreams(), getHttpConfiguration().getRequestHeaderSize());
session.setStreamIdleTimeout(getStreamIdleTimeout());
return session;
}
@Override

View File

@ -89,6 +89,16 @@ public class ServerHTTP3Session extends ServerProtocolSession
return applicationSession;
}
public long getStreamIdleTimeout()
{
return applicationSession.getStreamIdleTimeout();
}
public void setStreamIdleTimeout(long streamIdleTimeout)
{
applicationSession.setStreamIdleTimeout(streamIdleTimeout);
}
@Override
public void onOpen()
{

View File

@ -38,7 +38,7 @@ public class HTTP3UnexpectedFrameTest extends AbstractHTTP3ClientServerTest
startServer(new Session.Server.Listener()
{
@Override
public void onSessionFailure(Session session, int error, String reason)
public void onSessionFailure(Session session, long error, String reason)
{
assertEquals(ErrorCode.FRAME_UNEXPECTED_ERROR.code(), error);
serverLatch.countDown();
@ -50,7 +50,7 @@ public class HTTP3UnexpectedFrameTest extends AbstractHTTP3ClientServerTest
Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener()
{
@Override
public void onSessionFailure(Session session, int error, String reason)
public void onSessionFailure(Session session, long error, String reason)
{
assertEquals(ErrorCode.FRAME_UNEXPECTED_ERROR.code(), error);
clientLatch.countDown();

View File

@ -0,0 +1,205 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.tests;
import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.server.AbstractHTTP3ServerConnectionFactory;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class StreamIdleTimeoutTest extends AbstractHTTP3ClientServerTest
{
@Test
public void testClientStreamIdleTimeout() throws Exception
{
CountDownLatch serverLatch = new CountDownLatch(1);
startServer(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{
MetaData.Request request = (MetaData.Request)frame.getMetaData();
if ("/idle".equals(request.getURI().getPath()))
{
assertFalse(frame.isLast());
stream.demand();
return new Stream.Listener()
{
@Override
public void onDataAvailable(Stream stream)
{
// When the client closes the stream, the server
// may either receive an empty, last, DATA frame, or
// an exception because the stream has been reset.
try
{
Stream.Data data = stream.readData();
if (data != null)
{
assertTrue(data.isLast());
assertEquals(0, data.getByteBuffer().remaining());
serverLatch.countDown();
}
else
{
stream.demand();
}
}
catch (Exception x)
{
serverLatch.countDown();
throw x;
}
}
};
}
else
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY);
stream.respond(new HeadersFrame(response, true));
return null;
}
}
});
startClient();
long streamIdleTimeout = 1000;
client.setStreamIdleTimeout(streamIdleTimeout);
Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {})
.get(5, TimeUnit.SECONDS);
CountDownLatch clientIdleLatch = new CountDownLatch(1);
HttpURI uri1 = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/idle");
MetaData.Request request1 = new MetaData.Request(HttpMethod.GET.asString(), uri1, HttpVersion.HTTP_3, HttpFields.EMPTY);
session.newRequest(new HeadersFrame(request1, false), new Stream.Listener()
{
@Override
public boolean onIdleTimeout(Stream stream, Throwable failure)
{
clientIdleLatch.countDown();
// Signal to close the stream.
return true;
}
}).get(5, TimeUnit.SECONDS);
// The server does not reply, the client must idle timeout.
assertTrue(clientIdleLatch.await(2 * streamIdleTimeout, TimeUnit.MILLISECONDS));
assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
// The session should still be open, verify by sending another request.
CountDownLatch clientLatch = new CountDownLatch(1);
HttpURI uri2 = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/");
MetaData.Request request2 = new MetaData.Request(HttpMethod.GET.asString(), uri2, HttpVersion.HTTP_3, HttpFields.EMPTY);
session.newRequest(new HeadersFrame(request2, true), new Stream.Listener()
{
@Override
public void onResponse(Stream stream, HeadersFrame frame)
{
clientLatch.countDown();
}
});
assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testServerStreamIdleTimeout() throws Exception
{
long idleTimeout = 1000;
CountDownLatch serverIdleLatch = new CountDownLatch(1);
startServer(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{
MetaData.Request request = (MetaData.Request)frame.getMetaData();
if ("/idle".equals(request.getURI().getPath()))
{
return new Stream.Listener()
{
@Override
public boolean onIdleTimeout(Stream stream, Throwable failure)
{
serverIdleLatch.countDown();
return true;
}
};
}
else
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY);
stream.respond(new HeadersFrame(response, true));
return null;
}
}
});
AbstractHTTP3ServerConnectionFactory h3 = server.getConnectors()[0].getConnectionFactory(AbstractHTTP3ServerConnectionFactory.class);
assertNotNull(h3);
h3.setStreamIdleTimeout(idleTimeout);
startClient();
Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {})
.get(5, TimeUnit.SECONDS);
CountDownLatch clientFailureLatch = new CountDownLatch(1);
HttpURI uri1 = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/idle");
MetaData.Request request1 = new MetaData.Request(HttpMethod.GET.asString(), uri1, HttpVersion.HTTP_3, HttpFields.EMPTY);
session.newRequest(new HeadersFrame(request1, false), new Stream.Listener()
{
@Override
public void onFailure(long error, Throwable failure)
{
// The server idle times out, but did not send any data back.
// However, the stream is readable, but an attempt to read it
// will cause an exception that is notified here.
clientFailureLatch.countDown();
}
});
assertTrue(serverIdleLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
assertTrue(clientFailureLatch.await(5, TimeUnit.SECONDS));
// The session should still be open, verify by sending another request.
CountDownLatch clientLatch = new CountDownLatch(1);
HttpURI uri2 = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/");
MetaData.Request request2 = new MetaData.Request(HttpMethod.GET.asString(), uri2, HttpVersion.HTTP_3, HttpFields.EMPTY);
session.newRequest(new HeadersFrame(request2, true), new Stream.Listener()
{
@Override
public void onResponse(Stream stream, HeadersFrame frame)
{
clientLatch.countDown();
}
});
assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
}
}

View File

@ -120,7 +120,7 @@ public abstract class ProtocolSession
connection.onOpen();
}
public boolean close(int error, String reason)
public boolean close(long error, String reason)
{
return getQuicSession().close(error, reason);
}

View File

@ -172,14 +172,16 @@ public abstract class QuicSession
return quicheConnection.windowCapacity(streamId);
}
public void shutdownInput(long streamId) throws IOException
public void shutdownInput(long streamId, long error) throws IOException
{
quicheConnection.shutdownStream(streamId, false, 0);
quicheConnection.shutdownStream(streamId, false, error);
flush();
}
public void shutdownOutput(long streamId) throws IOException
public void shutdownOutput(long streamId, long error) throws IOException
{
quicheConnection.shutdownStream(streamId, true, 0);
quicheConnection.shutdownStream(streamId, true, error);
flush();
}
public void onClose(long streamId)
@ -187,19 +189,6 @@ public abstract class QuicSession
endpoints.remove(streamId);
}
public void resetStream(long streamId, long error)
{
try
{
quicheConnection.resetStream(streamId, error);
}
catch (IOException x)
{
if (LOG.isDebugEnabled())
LOG.debug("could not reset stream #{} with error {}", streamId, error, x);
}
}
public SocketAddress getLocalAddress()
{
return connection.getEndPoint().getLocalSocketAddress();
@ -359,7 +348,7 @@ public abstract class QuicSession
LOG.debug("closed {}", this);
}
public boolean close(int error, String reason)
public boolean close(long error, String reason)
{
return quicheConnection.close(error, reason);
}

View File

@ -22,6 +22,8 @@ import java.util.stream.IntStream;
import org.eclipse.jetty.io.AbstractEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.FillInterest;
import org.eclipse.jetty.io.WriteFlusher;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.Scheduler;
@ -75,57 +77,63 @@ public class QuicStreamEndPoint extends AbstractEndPoint
return session.isFinished(streamId);
}
@Override
protected void doShutdownInput()
public void shutdownInput(long error)
{
try
{
shutdownInput();
if (LOG.isDebugEnabled())
LOG.debug("shutting down input of stream {}", streamId);
session.shutdownInput(streamId);
LOG.debug("shutting down input of stream #{} with error 0x{}", streamId, Long.toHexString(error));
session.shutdownInput(streamId, error);
}
catch (IOException x)
{
if (LOG.isDebugEnabled())
LOG.debug("error shutting down output of stream {}", streamId, x);
LOG.debug("error shutting down input of stream #{} with error 0x{}", streamId, Long.toHexString(error), x);
}
}
@Override
protected void doShutdownOutput()
public void shutdownOutput(long error)
{
try
{
shutdownOutput();
if (LOG.isDebugEnabled())
LOG.debug("shutting down output of stream {}", streamId);
session.shutdownOutput(streamId);
LOG.debug("shutting down output of stream #{} with error 0x{}", streamId, Long.toHexString(error));
session.shutdownOutput(streamId, error);
}
catch (IOException x)
{
if (LOG.isDebugEnabled())
LOG.debug("error shutting down output of stream {}", streamId, x);
LOG.debug("error shutting down output of stream #{} with error 0x{}", streamId, Long.toHexString(error), x);
}
}
@Override
protected void doClose()
public void close(long error, Throwable failure)
{
shutdownInput(error);
FillInterest fillInterest = getFillInterest();
if (failure == null)
fillInterest.onClose();
else
fillInterest.onFail(failure);
shutdownOutput(error);
WriteFlusher writeFlusher = getWriteFlusher();
if (failure == null)
writeFlusher.onClose();
else
writeFlusher.onFail(failure);
if (LOG.isDebugEnabled())
LOG.debug("closing stream {}", streamId);
doShutdownInput();
doShutdownOutput();
LOG.debug("closed stream #{} with error 0x{}", streamId, Long.toHexString(error), failure);
}
@Override
public void onClose(Throwable failure)
{
super.onClose(failure);
session.onClose(streamId);
}
public void reset(long error)
{
session.resetStream(streamId, error);
// Implemented empty because we want to disable the standard
// EndPoint close mechanism, since QUIC uses error codes.
}
@Override

View File

@ -482,7 +482,7 @@ public class QuicheConnection
}
}
public boolean close(int error, String reason)
public boolean close(long error, String reason)
{
try (AutoLock ignore = lock.lock())
{
@ -566,11 +566,6 @@ public class QuicheConnection
}
}
public void resetStream(long streamId, long error) throws IOException
{
shutdownStream(streamId, true, error);
}
public void feedFinForStream(long streamId) throws IOException
{
try (AutoLock ignore = lock.lock())