Issue #6728 - QUIC and HTTP/3

- Initial support for closing HTTP/3 sessions and session failure events.
- Enforced HTTP/3 frame sequence.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-09-28 17:03:43 +02:00
parent b09191d2f8
commit dc889bd7d8
15 changed files with 460 additions and 132 deletions

View File

@ -29,6 +29,7 @@ import org.eclipse.jetty.http3.qpack.QpackDecoder;
import org.eclipse.jetty.http3.qpack.QpackEncoder; import org.eclipse.jetty.http3.qpack.QpackEncoder;
import org.eclipse.jetty.quic.client.ClientProtocolSession; import org.eclipse.jetty.quic.client.ClientProtocolSession;
import org.eclipse.jetty.quic.client.ClientQuicSession; import org.eclipse.jetty.quic.client.ClientQuicSession;
import org.eclipse.jetty.quic.common.CloseInfo;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint; import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.quic.common.StreamType; import org.eclipse.jetty.quic.common.StreamType;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
@ -138,6 +139,14 @@ public class ClientHTTP3Session extends ClientProtocolSession
} }
} }
@Override
protected void onClosed(CloseInfo closeInfo)
{
if (LOG.isDebugEnabled())
LOG.debug("session closed remotely {} {}", closeInfo, this);
applicationSession.notifySessionFailure(closeInfo.error(), closeInfo.reason());
}
private void configureUnidirectionalStreamEndPoint(QuicStreamEndPoint endPoint) private void configureUnidirectionalStreamEndPoint(QuicStreamEndPoint endPoint)
{ {
UnidirectionalStreamConnection connection = new UnidirectionalStreamConnection(endPoint, getQuicSession().getExecutor(), getQuicSession().getByteBufferPool(), encoder, decoder, applicationSession); UnidirectionalStreamConnection connection = new UnidirectionalStreamConnection(endPoint, getQuicSession().getExecutor(), getQuicSession().getByteBufferPool(), encoder, decoder, applicationSession);
@ -157,10 +166,4 @@ public class ClientHTTP3Session extends ClientProtocolSession
{ {
applicationSession.onDataAvailable(streamId); applicationSession.onDataAvailable(streamId);
} }
@Override
public String toString()
{
return String.format("%s@%x", getClass().getSimpleName(), hashCode());
}
} }

View File

@ -64,7 +64,7 @@ public class HTTP3SessionClient extends HTTP3Session implements Session.Client
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("received response {}#{} on {}", frame, streamId, this); LOG.debug("received response {}#{} on {}", frame, streamId, this);
notifyResponse(stream, frame); stream.processResponse(frame);
} }
else else
{ {
@ -72,20 +72,6 @@ public class HTTP3SessionClient extends HTTP3Session implements Session.Client
} }
} }
private void notifyResponse(HTTP3Stream stream, HeadersFrame frame)
{
Stream.Listener listener = stream.getListener();
try
{
if (listener != null)
listener.onResponse(stream, frame);
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
}
}
@Override @Override
public CompletableFuture<Stream> newRequest(HeadersFrame frame, Stream.Listener listener) public CompletableFuture<Stream> newRequest(HeadersFrame frame, Stream.Listener listener)
{ {
@ -105,7 +91,7 @@ public class HTTP3SessionClient extends HTTP3Session implements Session.Client
} }
@Override @Override
protected void writeFrame(long streamId, Frame frame, Callback callback) public void writeFrame(long streamId, Frame frame, Callback callback)
{ {
getProtocolSession().writeFrame(streamId, frame, callback); getProtocolSession().writeFrame(streamId, frame, callback);
} }

View File

@ -49,6 +49,14 @@ public interface Session
return null; return null;
} }
/**
* @return whether this session is not open
*/
public default boolean isClosed()
{
return false;
}
/** /**
* <p>The client-side HTTP/3 API representing a connection with a server.</p> * <p>The client-side HTTP/3 API representing a connection with a server.</p>
* <p>Once a {@link Session} has been obtained, it can be used to make HTTP/3 requests:</p> * <p>Once a {@link Session} has been obtained, it can be used to make HTTP/3 requests:</p>
@ -68,7 +76,7 @@ public interface Session
* @see Stream * @see Stream
* @see Stream.Listener * @see Stream.Listener
*/ */
public interface Client public interface Client extends Session
{ {
/** /**
* <p>Makes a request by creating a HTTP/3 stream and sending the given HEADERS frame.</p> * <p>Makes a request by creating a HTTP/3 stream and sending the given HEADERS frame.</p>
@ -91,7 +99,7 @@ public interface Session
* <p>The server-side HTTP/3 API representing a connection with a client.</p> * <p>The server-side HTTP/3 API representing a connection with a client.</p>
* <p>To receive HTTP/3 request events, see {@link Session.Server.Listener#onRequest(Stream, HeadersFrame)}.</p> * <p>To receive HTTP/3 request events, see {@link Session.Server.Listener#onRequest(Stream, HeadersFrame)}.</p>
*/ */
public interface Server public interface Server extends Session
{ {
/** /**
* <p>The server-side specific {@link Session.Listener}.</p> * <p>The server-side specific {@link Session.Listener}.</p>
@ -106,36 +114,6 @@ public interface Session
public default void onAccept(Session session) public default void onAccept(Session session)
{ {
} }
/**
* <p>Callback method invoked when a request is received.</p>
* <p>Applications should implement this method to process HTTP/3 requests,
* typically providing an HTTP/3 response via {@link Stream#respond(HeadersFrame)}:</p>
* <pre>
* class MyServer implements Session.Server.Listener
* {
* &#64;Override
* public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
* {
* // Send a response.
* var response = new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY);
* stream.respond(new HeadersFrame(response, true));
* }
* }
* </pre>
* <p>To read request content, applications should call
* {@link Stream#demand()} and return a {@link Stream.Listener} that overrides
* {@link Stream.Listener#onDataAvailable(Stream)}.</p>
*
* @param stream the stream associated with the request
* @param frame the HEADERS frame containing the request headers
* @return a {@link Stream.Listener} that will be notified of stream events
* @see Stream.Listener#onDataAvailable(Stream)
*/
public default Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{
return null;
}
} }
} }
@ -170,5 +148,46 @@ public interface Session
public default void onSettings(Session session, SettingsFrame frame) public default void onSettings(Session session, SettingsFrame frame)
{ {
} }
/**
* <p>Callback method invoked when a request is received.</p>
* <p>Applications should implement this method to process HTTP/3 requests,
* typically providing an HTTP/3 response via {@link Stream#respond(HeadersFrame)}:</p>
* <pre>
* class MyServer implements Session.Server.Listener
* {
* &#64;Override
* public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
* {
* // Send a response.
* var response = new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY);
* stream.respond(new HeadersFrame(response, true));
* }
* }
* </pre>
* <p>To read request content, applications should call
* {@link Stream#demand()} and return a {@link Stream.Listener} that overrides
* {@link Stream.Listener#onDataAvailable(Stream)}.</p>
*
* @param stream the stream associated with the request
* @param frame the HEADERS frame containing the request headers
* @return a {@link Stream.Listener} that will be notified of stream events
* @see Stream.Listener#onDataAvailable(Stream)
*/
public default Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{
return null;
}
/**
* <p>Callback method invoked when a failure has been detected for this session.</p>
*
* @param session the session
* @param error the error code
* @param reason the error reason
*/
public default void onSessionFailure(Session session, int error, String reason)
{
}
} }
} }

View File

@ -38,6 +38,7 @@ public abstract class HTTP3Session implements Session, ParserListener
private final Map<Long, HTTP3Stream> streams = new ConcurrentHashMap<>(); private final Map<Long, HTTP3Stream> streams = new ConcurrentHashMap<>();
private final ProtocolSession session; private final ProtocolSession session;
private final Listener listener; private final Listener listener;
private CloseState closeState = CloseState.CLOSED;
public HTTP3Session(ProtocolSession session, Listener listener) public HTTP3Session(ProtocolSession session, Listener listener)
{ {
@ -57,6 +58,7 @@ public abstract class HTTP3Session implements Session, ParserListener
public void onOpen() public void onOpen()
{ {
closeState = CloseState.NOT_CLOSED;
} }
@Override @Override
@ -71,6 +73,17 @@ public abstract class HTTP3Session implements Session, ParserListener
return getProtocolSession().getQuicSession().getRemoteAddress(); return getProtocolSession().getQuicSession().getRemoteAddress();
} }
@Override
public boolean isClosed()
{
return closeState != CloseState.NOT_CLOSED;
}
public void close(int error, String reason)
{
getProtocolSession().close(error, reason);
}
protected HTTP3Stream createStream(QuicStreamEndPoint endPoint) protected HTTP3Stream createStream(QuicStreamEndPoint endPoint)
{ {
long streamId = endPoint.getStreamId(); long streamId = endPoint.getStreamId();
@ -90,7 +103,7 @@ public abstract class HTTP3Session implements Session, ParserListener
return streams.get(streamId); return streams.get(streamId);
} }
protected abstract void writeFrame(long streamId, Frame frame, Callback callback); public abstract void writeFrame(long streamId, Frame frame, Callback callback);
public Map<Long, Long> onPreface() public Map<Long, Long> onPreface()
{ {
@ -147,21 +160,7 @@ public abstract class HTTP3Session implements Session, ParserListener
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("received trailer {}#{} on {}", frame, streamId, this); LOG.debug("received trailer {}#{} on {}", frame, streamId, this);
notifyTrailer(stream, frame); stream.processTrailer(frame);
}
}
private void notifyTrailer(HTTP3Stream stream, HeadersFrame frame)
{
try
{
Stream.Listener listener = stream.getListener();
if (listener != null)
listener.onTrailer(stream, frame);
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
} }
} }
@ -170,6 +169,11 @@ public abstract class HTTP3Session implements Session, ParserListener
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("received {}#{} on {}", frame, streamId, this); LOG.debug("received {}#{} on {}", frame, streamId, this);
HTTP3Stream stream = getStream(streamId);
if (stream != null)
stream.processData(frame);
else
closeAndNotifyFailure(ErrorCode.FRAME_UNEXPECTED_ERROR.code(), "invalid_frame_sequence");
} }
public void onDataAvailable(long streamId) public void onDataAvailable(long streamId)
@ -177,14 +181,49 @@ public abstract class HTTP3Session implements Session, ParserListener
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("notifying data available for stream #{} on {}", streamId, this); LOG.debug("notifying data available for stream #{} on {}", streamId, this);
HTTP3Stream stream = getStream(streamId); HTTP3Stream stream = getStream(streamId);
notifyDataAvailable(stream);
}
private void notifyDataAvailable(HTTP3Stream stream)
{
Stream.Listener listener = stream.getListener(); Stream.Listener listener = stream.getListener();
try
{
if (listener != null) if (listener != null)
listener.onDataAvailable(stream); listener.onDataAvailable(stream);
} }
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
}
}
void closeAndNotifyFailure(int error, String reason)
{
close(error, reason);
notifySessionFailure(error, reason);
}
public void notifySessionFailure(int error, String reason)
{
try
{
listener.onSessionFailure(this, error, reason);
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
}
}
@Override @Override
public String toString() public String toString()
{ {
return String.format("%s@%x", getClass().getSimpleName(), hashCode()); return String.format("%s@%x", getClass().getSimpleName(), hashCode());
} }
private enum CloseState
{
NOT_CLOSED, CLOSED
}
} }

View File

@ -13,6 +13,7 @@
package org.eclipse.jetty.http3.internal; package org.eclipse.jetty.http3.internal;
import java.util.EnumSet;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.http3.api.Session; import org.eclipse.jetty.http3.api.Session;
@ -24,12 +25,17 @@ import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.Invocable; import org.eclipse.jetty.util.thread.Invocable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HTTP3Stream implements Stream public class HTTP3Stream implements Stream
{ {
private static final Logger LOG = LoggerFactory.getLogger(HTTP3Stream.class);
private final HTTP3Session session; private final HTTP3Session session;
private final QuicStreamEndPoint endPoint; private final QuicStreamEndPoint endPoint;
private Listener listener; private Listener listener;
private FrameState frameState = FrameState.INITIAL;
public HTTP3Stream(HTTP3Session session, QuicStreamEndPoint endPoint) public HTTP3Stream(HTTP3Session session, QuicStreamEndPoint endPoint)
{ {
@ -99,6 +105,93 @@ public class HTTP3Stream implements Stream
return connection.hasDemand(); return connection.hasDemand();
} }
public void processRequest(HeadersFrame frame)
{
if (validateAndUpdate(EnumSet.of(FrameState.INITIAL), FrameState.HEADER))
{
Stream.Listener streamListener = notifyRequest(frame);
setListener(streamListener);
}
}
private Stream.Listener notifyRequest(HeadersFrame frame)
{
Session.Listener listener = session.getListener();
try
{
return listener.onRequest(this, frame);
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
return null;
}
}
public void processResponse(HeadersFrame frame)
{
if (validateAndUpdate(EnumSet.of(FrameState.INITIAL), FrameState.HEADER))
notifyResponse(frame);
}
private void notifyResponse(HeadersFrame frame)
{
Listener listener = getListener();
try
{
if (listener != null)
listener.onResponse(this, frame);
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
}
}
public void processData(DataFrame frame)
{
validateAndUpdate(EnumSet.of(FrameState.HEADER, FrameState.DATA), FrameState.DATA);
}
public void processTrailer(HeadersFrame frame)
{
if (validateAndUpdate(EnumSet.of(FrameState.HEADER, FrameState.DATA), FrameState.TRAILER))
notifyTrailer(this, frame);
}
private void notifyTrailer(HTTP3Stream stream, HeadersFrame frame)
{
Stream.Listener listener = stream.getListener();
try
{
if (listener != null)
listener.onTrailer(stream, frame);
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
}
}
private boolean validateAndUpdate(EnumSet<FrameState> allowed, FrameState target)
{
if (allowed.contains(frameState))
{
frameState = target;
return true;
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("invalid frame sequence, current={}, allowed={}, next={}", frameState, allowed, target);
if (frameState == FrameState.FAILED)
return false;
frameState = FrameState.FAILED;
session.closeAndNotifyFailure(ErrorCode.FRAME_UNEXPECTED_ERROR.code(), "invalid_frame_sequence");
return false;
}
}
private Promise.Completable<Stream> writeFrame(Frame frame) private Promise.Completable<Stream> writeFrame(Frame frame)
{ {
Promise.Completable<Stream> completable = new Promise.Completable<>(); Promise.Completable<Stream> completable = new Promise.Completable<>();
@ -111,4 +204,9 @@ public class HTTP3Stream implements Stream
{ {
return String.format("%s@%x#%d", getClass().getSimpleName(), hashCode(), getId()); return String.format("%s@%x#%d", getClass().getSimpleName(), hashCode(), getId());
} }
private enum FrameState
{
INITIAL, HEADER, DATA, TRAILER, FAILED
}
} }

View File

@ -15,7 +15,6 @@ package org.eclipse.jetty.http3.server.internal;
import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http3.api.Session; import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.Frame; import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.HeadersFrame; import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.internal.HTTP3Session; import org.eclipse.jetty.http3.internal.HTTP3Session;
@ -63,8 +62,7 @@ public class HTTP3SessionServer extends HTTP3Session implements Session.Server
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("received request {}#{} on {}", frame, streamId, this); LOG.debug("received request {}#{} on {}", frame, streamId, this);
Stream.Listener streamListener = notifyRequest(stream, frame); stream.processRequest(frame);
stream.setListener(streamListener);
} }
else else
{ {
@ -72,22 +70,8 @@ public class HTTP3SessionServer extends HTTP3Session implements Session.Server
} }
} }
private Stream.Listener notifyRequest(HTTP3Stream stream, HeadersFrame frame)
{
Server.Listener listener = getListener();
try
{
return listener.onRequest(stream, frame);
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
return null;
}
}
@Override @Override
protected void writeFrame(long streamId, Frame frame, Callback callback) public void writeFrame(long streamId, Frame frame, Callback callback)
{ {
getProtocolSession().writeFrame(streamId, frame, callback); getProtocolSession().writeFrame(streamId, frame, callback);
} }

View File

@ -27,6 +27,7 @@ import org.eclipse.jetty.http3.internal.InstructionHandler;
import org.eclipse.jetty.http3.internal.UnidirectionalStreamConnection; import org.eclipse.jetty.http3.internal.UnidirectionalStreamConnection;
import org.eclipse.jetty.http3.qpack.QpackDecoder; import org.eclipse.jetty.http3.qpack.QpackDecoder;
import org.eclipse.jetty.http3.qpack.QpackEncoder; import org.eclipse.jetty.http3.qpack.QpackEncoder;
import org.eclipse.jetty.quic.common.CloseInfo;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint; import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.quic.common.StreamType; import org.eclipse.jetty.quic.common.StreamType;
import org.eclipse.jetty.quic.server.ServerProtocolSession; import org.eclipse.jetty.quic.server.ServerProtocolSession;
@ -137,6 +138,14 @@ public class ServerHTTP3Session extends ServerProtocolSession
} }
} }
@Override
protected void onClosed(CloseInfo closeInfo)
{
if (LOG.isDebugEnabled())
LOG.debug("session closed remotely {} {}", closeInfo, this);
notifySessionFailure(closeInfo);
}
private void configureUnidirectionalStreamEndPoint(QuicStreamEndPoint endPoint) private void configureUnidirectionalStreamEndPoint(QuicStreamEndPoint endPoint)
{ {
UnidirectionalStreamConnection connection = new UnidirectionalStreamConnection(endPoint, getQuicSession().getExecutor(), getQuicSession().getByteBufferPool(), encoder, decoder, applicationSession); UnidirectionalStreamConnection connection = new UnidirectionalStreamConnection(endPoint, getQuicSession().getExecutor(), getQuicSession().getByteBufferPool(), encoder, decoder, applicationSession);
@ -157,9 +166,16 @@ public class ServerHTTP3Session extends ServerProtocolSession
applicationSession.onDataAvailable(streamId); applicationSession.onDataAvailable(streamId);
} }
@Override private void notifySessionFailure(CloseInfo closeInfo)
public String toString()
{ {
return String.format("%s@%x", getClass().getSimpleName(), hashCode()); Session.Listener listener = applicationSession.getListener();
try
{
listener.onSessionFailure(applicationSession, closeInfo.error(), closeInfo.reason());
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
}
} }
} }

View File

@ -0,0 +1,68 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.tests;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.internal.ErrorCode;
import org.eclipse.jetty.http3.internal.HTTP3Session;
import org.eclipse.jetty.util.Callback;
import org.junit.jupiter.api.Test;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class HTTP3UnexpectedFrameTest extends AbstractHTTP3ClientServerTest
{
@Test
public void testDataBeforeHeaders() throws Exception
{
CountDownLatch serverLatch = new CountDownLatch(1);
startServer(new Session.Server.Listener()
{
@Override
public void onSessionFailure(Session session, int error, String reason)
{
assertEquals(ErrorCode.FRAME_UNEXPECTED_ERROR.code(), error);
serverLatch.countDown();
}
});
startClient();
CountDownLatch clientLatch = new CountDownLatch(1);
Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener()
{
@Override
public void onSessionFailure(Session session, int error, String reason)
{
assertEquals(ErrorCode.FRAME_UNEXPECTED_ERROR.code(), error);
clientLatch.countDown();
}
})
.get(5, TimeUnit.SECONDS);
((HTTP3Session)session).writeFrame(0, new DataFrame(ByteBuffer.allocate(128), true), Callback.NOOP);
assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
await().atMost(1, TimeUnit.SECONDS).until(session::isClosed);
}
}

View File

@ -13,6 +13,7 @@
package org.eclipse.jetty.quic.client; package org.eclipse.jetty.quic.client;
import org.eclipse.jetty.quic.common.CloseInfo;
import org.eclipse.jetty.quic.common.ProtocolSession; import org.eclipse.jetty.quic.common.ProtocolSession;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint; import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.quic.common.StreamType; import org.eclipse.jetty.quic.common.StreamType;
@ -54,4 +55,12 @@ public class ClientProtocolSession extends ProtocolSession
return streamEndPoint.onReadable(); return streamEndPoint.onReadable();
return false; return false;
} }
@Override
protected void onClosed(CloseInfo closeInfo)
{
if (LOG.isDebugEnabled())
LOG.debug("session closed remotely {} {}", closeInfo, this);
// TODO: should probably close the stream.
}
} }

View File

@ -0,0 +1,42 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.quic.common;
public class CloseInfo
{
private final int error;
private final String reason;
public CloseInfo(int error, String reason)
{
this.error = error;
this.reason = reason;
}
public int error()
{
return error;
}
public String reason()
{
return reason;
}
@Override
public String toString()
{
return String.format("%s@%x[error=%d,reason=%s]", getClass().getSimpleName(), hashCode(), error(), reason());
}
}

View File

@ -52,6 +52,11 @@ public abstract class ProtocolSession
processWritableStreams(); processWritableStreams();
if (processReadableStreams()) if (processReadableStreams())
continue; continue;
CloseInfo closeInfo = session.getRemoteCloseInfo();
if (closeInfo != null)
onClosed(closeInfo);
// Exit if did not process any stream and we are idle. // Exit if did not process any stream and we are idle.
if (active.decrementAndGet() == 0) if (active.decrementAndGet() == 0)
break; break;
@ -115,6 +120,19 @@ public abstract class ProtocolSession
connection.onOpen(); connection.onOpen();
} }
public boolean close(int error, String reason)
{
return getQuicSession().close(error, reason);
}
protected abstract void onClosed(CloseInfo closeInfo);
@Override
public String toString()
{
return String.format("%s@%x[%s]", getClass().getSimpleName(), hashCode(), getQuicSession());
}
public interface Factory public interface Factory
{ {
public ProtocolSession newProtocolSession(QuicSession quicSession, Map<String, Object> context); public ProtocolSession newProtocolSession(QuicSession quicSession, Map<String, Object> context);

View File

@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicStampedReference;
import java.util.function.Consumer; import java.util.function.Consumer;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
@ -92,6 +93,14 @@ public abstract class QuicSession
return executor; return executor;
} }
public CloseInfo getRemoteCloseInfo()
{
AtomicStampedReference<String> info = quicheConnection.getRemoteCloseInfo();
if (info != null)
return new CloseInfo(info.getStamp(), info.getReference());
return null;
}
public Scheduler getScheduler() public Scheduler getScheduler()
{ {
return scheduler; return scheduler;
@ -130,14 +139,14 @@ public abstract class QuicSession
public int fill(long streamId, ByteBuffer buffer) throws IOException public int fill(long streamId, ByteBuffer buffer) throws IOException
{ {
int drained = quicheConnection.drainClearTextForStream(streamId, buffer); int drained = quicheConnection.drainClearBytesForStream(streamId, buffer);
flush(); flush();
return drained; return drained;
} }
public int flush(long streamId, ByteBuffer buffer, boolean last) throws IOException public int flush(long streamId, ByteBuffer buffer, boolean last) throws IOException
{ {
int flushed = quicheConnection.feedClearTextForStream(streamId, buffer, last); int flushed = quicheConnection.feedClearBytesForStream(streamId, buffer, last);
flush(); flush();
return flushed; return flushed;
} }
@ -205,9 +214,9 @@ public abstract class QuicSession
this.remoteAddress = remoteAddress; this.remoteAddress = remoteAddress;
int remaining = cipherBufferIn.remaining(); int remaining = cipherBufferIn.remaining();
int accepted = quicheConnection.feedCipherText(cipherBufferIn, remoteAddress);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("feeding {}/{} cipher bytes to cid={}", accepted, remaining, quicheConnectionId); LOG.debug("feeding {} cipher bytes to {}", remaining, this);
int accepted = quicheConnection.feedCipherBytes(cipherBufferIn, remoteAddress);
if (accepted != remaining) if (accepted != remaining)
throw new IllegalStateException(); throw new IllegalStateException();
@ -281,25 +290,25 @@ public abstract class QuicSession
public void flush() public void flush()
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("flushing session cid={}", quicheConnectionId); LOG.debug("flushing {}", this);
flusher.iterate(); flusher.iterate();
} }
public QuicStreamEndPoint getOrCreateStreamEndPoint(long streamId, Consumer<QuicStreamEndPoint> consumer) public QuicStreamEndPoint getOrCreateStreamEndPoint(long streamId, Consumer<QuicStreamEndPoint> consumer)
{ {
QuicStreamEndPoint endPoint = endpoints.compute(streamId, (sid, quicStreamEndPoint) -> QuicStreamEndPoint endPoint = endpoints.compute(streamId, (id, quicStreamEndPoint) ->
{ {
if (quicStreamEndPoint == null) if (quicStreamEndPoint == null)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("creating endpoint for stream {}", sid); LOG.debug("creating endpoint for stream {} for {}", id, this);
quicStreamEndPoint = newQuicStreamEndPoint(streamId); quicStreamEndPoint = newQuicStreamEndPoint(streamId);
consumer.accept(quicStreamEndPoint); consumer.accept(quicStreamEndPoint);
} }
return quicStreamEndPoint; return quicStreamEndPoint;
}); });
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("returning endpoint for stream {}", streamId); LOG.debug("returning endpoint for stream {} for {}", streamId, this);
return endPoint; return endPoint;
} }
@ -311,15 +320,15 @@ public abstract class QuicSession
public void close() public void close()
{ {
if (quicheConnectionId == null) if (quicheConnectionId == null)
close(new IOException("Quic connection refused")); close(new IOException("connection refused"));
else else
close(new IOException("Quic connection closed")); close(new IOException("connection closed"));
} }
private void close(Throwable x) private void close(Throwable x)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("closing Quic session cid={}", quicheConnectionId); LOG.debug("closing {}", this);
try try
{ {
endpoints.values().forEach(QuicStreamEndPoint::close); endpoints.values().forEach(QuicStreamEndPoint::close);
@ -334,13 +343,18 @@ public abstract class QuicSession
quicheConnection.dispose(); quicheConnection.dispose();
} }
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("closed Quic session cid={}", quicheConnectionId); LOG.debug("closed {}", this);
}
public boolean close(int error, String reason)
{
return quicheConnection.close(error, reason);
} }
@Override @Override
public String toString() public String toString()
{ {
return getClass().getSimpleName() + " id=" + quicheConnectionId; return String.format("%s@%x[id=%s]", getClass().getSimpleName(), hashCode(), quicheConnectionId);
} }
private class Flusher extends IteratingCallback private class Flusher extends IteratingCallback
@ -379,9 +393,9 @@ public abstract class QuicSession
// TODO make the buffer size configurable // TODO make the buffer size configurable
cipherBuffer = byteBufferPool.acquire(LibQuiche.QUICHE_MIN_CLIENT_INITIAL_LEN, true); cipherBuffer = byteBufferPool.acquire(LibQuiche.QUICHE_MIN_CLIENT_INITIAL_LEN, true);
int pos = BufferUtil.flipToFill(cipherBuffer); int pos = BufferUtil.flipToFill(cipherBuffer);
int drained = quicheConnection.drainCipherText(cipherBuffer); int drained = quicheConnection.drainCipherBytes(cipherBuffer);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("drained {} byte(s) of cipher text from quiche", drained); LOG.debug("drained {} byte(s) of cipher text from {}", drained, this);
long nextTimeoutInMs = quicheConnection.nextTimeout(); long nextTimeoutInMs = quicheConnection.nextTimeout();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("next quiche timeout: {} ms", nextTimeoutInMs); LOG.debug("next quiche timeout: {} ms", nextTimeoutInMs);

View File

@ -22,6 +22,7 @@ import java.nio.charset.StandardCharsets;
import java.security.SecureRandom; import java.security.SecureRandom;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicStampedReference;
import org.eclipse.jetty.quic.quiche.ffi.LibQuiche; import org.eclipse.jetty.quic.quiche.ffi.LibQuiche;
import org.eclipse.jetty.quic.quiche.ffi.SizedStructure; import org.eclipse.jetty.quic.quiche.ffi.SizedStructure;
@ -324,7 +325,7 @@ public class QuicheConnection
LOG.debug("accepted, immediately receiving the same packet - remaining in buffer: {}", packetRead.remaining()); LOG.debug("accepted, immediately receiving the same packet - remaining in buffer: {}", packetRead.remaining());
while (packetRead.hasRemaining()) while (packetRead.hasRemaining())
{ {
quicheConnection.feedCipherText(packetRead, peer); quicheConnection.feedCipherBytes(packetRead, peer);
} }
return quicheConnection; return quicheConnection;
} }
@ -370,7 +371,7 @@ public class QuicheConnection
* @return how many bytes were consumed. * @return how many bytes were consumed.
* @throws IOException * @throws IOException
*/ */
public int feedCipherText(ByteBuffer buffer, SocketAddress peer) throws IOException public int feedCipherBytes(ByteBuffer buffer, SocketAddress peer) throws IOException
{ {
try (AutoLock ignore = lock.lock()) try (AutoLock ignore = lock.lock())
{ {
@ -395,7 +396,7 @@ public class QuicheConnection
* @return how many bytes were added to the buffer. * @return how many bytes were added to the buffer.
* @throws IOException * @throws IOException
*/ */
public int drainCipherText(ByteBuffer buffer) throws IOException public int drainCipherBytes(ByteBuffer buffer) throws IOException
{ {
try (AutoLock ignore = lock.lock()) try (AutoLock ignore = lock.lock())
{ {
@ -479,18 +480,24 @@ public class QuicheConnection
} }
} }
public boolean close() throws IOException public boolean close(int error, String reason)
{ {
try (AutoLock ignore = lock.lock()) try (AutoLock ignore = lock.lock())
{ {
if (quicheConn == null) if (quicheConn == null)
throw new IOException("Quiche connection was released"); {
int rc = LibQuiche.INSTANCE.quiche_conn_close(quicheConn, true, new uint64_t(0), null, new size_t(0)); if (LOG.isDebugEnabled())
LOG.debug("connection was released");
return false;
}
int rc = LibQuiche.INSTANCE.quiche_conn_close(quicheConn, true, new uint64_t(error), reason, new size_t(reason == null ? 0 : reason.length()));
if (rc == 0) if (rc == 0)
return true; return true;
if (rc == LibQuiche.quiche_error.QUICHE_ERR_DONE) if (rc == LibQuiche.quiche_error.QUICHE_ERR_DONE)
return false; return false;
throw new IOException("failed to close connection: " + LibQuiche.quiche_error.errToString(rc)); if (LOG.isDebugEnabled())
LOG.debug("could not close connection: {}", LibQuiche.quiche_error.errToString(rc));
return false;
} }
} }
@ -570,12 +577,12 @@ public class QuicheConnection
} }
} }
public int feedClearTextForStream(long streamId, ByteBuffer buffer) throws IOException public int feedClearBytesForStream(long streamId, ByteBuffer buffer) throws IOException
{ {
return feedClearTextForStream(streamId, buffer, false); return feedClearBytesForStream(streamId, buffer, false);
} }
public int feedClearTextForStream(long streamId, ByteBuffer buffer, boolean last) throws IOException public int feedClearBytesForStream(long streamId, ByteBuffer buffer, boolean last) throws IOException
{ {
try (AutoLock ignore = lock.lock()) try (AutoLock ignore = lock.lock())
{ {
@ -591,7 +598,7 @@ public class QuicheConnection
} }
} }
public int drainClearTextForStream(long streamId, ByteBuffer buffer) throws IOException public int drainClearBytesForStream(long streamId, ByteBuffer buffer) throws IOException
{ {
try (AutoLock ignore = lock.lock()) try (AutoLock ignore = lock.lock())
{ {
@ -618,6 +625,22 @@ public class QuicheConnection
} }
} }
public AtomicStampedReference<String> getRemoteCloseInfo()
{
try (AutoLock ignore = lock.lock())
{
if (quicheConn == null)
throw new IllegalStateException("Quiche connection was released");
bool_pointer app = new bool_pointer();
uint64_t_pointer error = new uint64_t_pointer();
char_pointer reason = new char_pointer();
size_t_pointer reasonLength = new size_t_pointer();
if (LibQuiche.INSTANCE.quiche_conn_peer_error(quicheConn, app, error, reason.getPointer(), reasonLength))
return new AtomicStampedReference<>(reason.getValueAsString((int)reasonLength.getValue(), StandardCharsets.UTF_8), (int)error.getValue());
return null;
}
}
public interface TokenMinter public interface TokenMinter
{ {
byte[] mint(byte[] dcid, int len); byte[] mint(byte[] dcid, int len);

View File

@ -102,7 +102,7 @@ public class LowLevelQuicheTest
QuicheConnection serverQuicheConnection = entry.getValue(); QuicheConnection serverQuicheConnection = entry.getValue();
// client sends 16 bytes of payload over stream 0 // client sends 16 bytes of payload over stream 0
assertThat(clientQuicheConnection.feedClearTextForStream(0, ByteBuffer.allocate(16) assertThat(clientQuicheConnection.feedClearBytesForStream(0, ByteBuffer.allocate(16)
.putInt(0xdeadbeef) .putInt(0xdeadbeef)
.putInt(0xcafebabe) .putInt(0xcafebabe)
.putInt(0xdeadc0de) .putInt(0xdeadc0de)
@ -116,7 +116,7 @@ public class LowLevelQuicheTest
assertThat(readableStreamIds.get(0), is(0L)); assertThat(readableStreamIds.get(0), is(0L));
// server reads 16 bytes from stream 0 // server reads 16 bytes from stream 0
assertThat(serverQuicheConnection.drainClearTextForStream(0, ByteBuffer.allocate(1000)), is(16)); assertThat(serverQuicheConnection.drainClearBytesForStream(0, ByteBuffer.allocate(1000)), is(16));
// assert that stream 0 is not finished on server // assert that stream 0 is not finished on server
assertThat(serverQuicheConnection.isStreamFinished(0), is(false)); assertThat(serverQuicheConnection.isStreamFinished(0), is(false));
@ -142,7 +142,7 @@ public class LowLevelQuicheTest
QuicheConnection serverQuicheConnection = entry.getValue(); QuicheConnection serverQuicheConnection = entry.getValue();
// client sends 16 bytes of payload over stream 0 and finish it // client sends 16 bytes of payload over stream 0 and finish it
assertThat(clientQuicheConnection.feedClearTextForStream(0, ByteBuffer.allocate(16) assertThat(clientQuicheConnection.feedClearBytesForStream(0, ByteBuffer.allocate(16)
.putInt(0xdeadbeef) .putInt(0xdeadbeef)
.putInt(0xcafebabe) .putInt(0xcafebabe)
.putInt(0xdeadc0de) .putInt(0xdeadc0de)
@ -160,7 +160,7 @@ public class LowLevelQuicheTest
assertThat(serverQuicheConnection.isStreamFinished(0), is(false)); assertThat(serverQuicheConnection.isStreamFinished(0), is(false));
// server reads 16 bytes from stream 0 // server reads 16 bytes from stream 0
assertThat(serverQuicheConnection.drainClearTextForStream(0, ByteBuffer.allocate(1000)), is(16)); assertThat(serverQuicheConnection.drainClearBytesForStream(0, ByteBuffer.allocate(1000)), is(16));
// assert that stream 0 is finished on server // assert that stream 0 is finished on server
assertThat(serverQuicheConnection.isStreamFinished(0), is(true)); assertThat(serverQuicheConnection.isStreamFinished(0), is(true));
@ -187,10 +187,10 @@ public class LowLevelQuicheTest
QuicheConnection serverQuicheConnection = entry.getValue(); QuicheConnection serverQuicheConnection = entry.getValue();
ByteBuffer buffer = ByteBuffer.allocate(LibQuiche.QUICHE_MIN_CLIENT_INITIAL_LEN); ByteBuffer buffer = ByteBuffer.allocate(LibQuiche.QUICHE_MIN_CLIENT_INITIAL_LEN);
int drained = serverQuicheConnection.drainCipherText(buffer); int drained = serverQuicheConnection.drainCipherBytes(buffer);
assertThat(drained, is(expectedSize)); assertThat(drained, is(expectedSize));
buffer.flip(); buffer.flip();
int fed = clientQuicheConnection.feedCipherText(buffer, serverSocketAddress); int fed = clientQuicheConnection.feedCipherBytes(buffer, serverSocketAddress);
assertThat(fed, is(expectedSize)); assertThat(fed, is(expectedSize));
} }
@ -200,10 +200,10 @@ public class LowLevelQuicheTest
QuicheConnection serverQuicheConnection = entry.getValue(); QuicheConnection serverQuicheConnection = entry.getValue();
ByteBuffer buffer = ByteBuffer.allocate(LibQuiche.QUICHE_MIN_CLIENT_INITIAL_LEN); ByteBuffer buffer = ByteBuffer.allocate(LibQuiche.QUICHE_MIN_CLIENT_INITIAL_LEN);
int drained = clientQuicheConnection.drainCipherText(buffer); int drained = clientQuicheConnection.drainCipherBytes(buffer);
assertThat(drained, is(expectedSize)); assertThat(drained, is(expectedSize));
buffer.flip(); buffer.flip();
int fed = serverQuicheConnection.feedCipherText(buffer, clientSocketAddress); int fed = serverQuicheConnection.feedCipherBytes(buffer, clientSocketAddress);
assertThat(fed, is(expectedSize)); assertThat(fed, is(expectedSize));
} }
@ -215,7 +215,7 @@ public class LowLevelQuicheTest
QuicheConnection clientQuicheConnection = QuicheConnection.connect(clientQuicheConfig, serverSocketAddress); QuicheConnection clientQuicheConnection = QuicheConnection.connect(clientQuicheConfig, serverSocketAddress);
connectionsToDisposeOf.add(clientQuicheConnection); connectionsToDisposeOf.add(clientQuicheConnection);
int drained = clientQuicheConnection.drainCipherText(buffer); int drained = clientQuicheConnection.drainCipherBytes(buffer);
assertThat(drained, is(1200)); assertThat(drained, is(1200));
buffer.flip(); buffer.flip();
@ -225,11 +225,11 @@ public class LowLevelQuicheTest
assertThat(negotiated, is(true)); assertThat(negotiated, is(true));
buffer2.flip(); buffer2.flip();
int fed = clientQuicheConnection.feedCipherText(buffer2, serverSocketAddress); int fed = clientQuicheConnection.feedCipherBytes(buffer2, serverSocketAddress);
assertThat(fed, is(79)); assertThat(fed, is(79));
buffer.clear(); buffer.clear();
drained = clientQuicheConnection.drainCipherText(buffer); drained = clientQuicheConnection.drainCipherBytes(buffer);
assertThat(drained, is(1200)); assertThat(drained, is(1200));
buffer.flip(); buffer.flip();
@ -238,11 +238,11 @@ public class LowLevelQuicheTest
connectionsToDisposeOf.add(serverQuicheConnection); connectionsToDisposeOf.add(serverQuicheConnection);
buffer.clear(); buffer.clear();
drained = serverQuicheConnection.drainCipherText(buffer); drained = serverQuicheConnection.drainCipherBytes(buffer);
assertThat(drained, is(1200)); assertThat(drained, is(1200));
buffer.flip(); buffer.flip();
fed = clientQuicheConnection.feedCipherText(buffer, serverSocketAddress); fed = clientQuicheConnection.feedCipherBytes(buffer, serverSocketAddress);
assertThat(fed, is(1200)); assertThat(fed, is(1200));
assertThat(serverQuicheConnection.isConnectionEstablished(), is(false)); assertThat(serverQuicheConnection.isConnectionEstablished(), is(false));

View File

@ -13,6 +13,7 @@
package org.eclipse.jetty.quic.server; package org.eclipse.jetty.quic.server;
import org.eclipse.jetty.quic.common.CloseInfo;
import org.eclipse.jetty.quic.common.ProtocolSession; import org.eclipse.jetty.quic.common.ProtocolSession;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint; import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -47,4 +48,12 @@ public class ServerProtocolSession extends ProtocolSession
LOG.debug("stream #{} selected for read: {}", readableStreamId, streamEndPoint); LOG.debug("stream #{} selected for read: {}", readableStreamId, streamEndPoint);
return streamEndPoint.onReadable(); return streamEndPoint.onReadable();
} }
@Override
protected void onClosed(CloseInfo closeInfo)
{
if (LOG.isDebugEnabled())
LOG.debug("session closed remotely {} {}", closeInfo, this);
// TODO: should probably reset the stream if it exists.
}
} }