Issue #6728 - QUIC and HTTP/3

- Implemented goAway mechanism.
- Implemented idle timeout mechanism.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-10-11 13:28:25 +02:00
parent 3c2feabaf6
commit cd161b491e
25 changed files with 864 additions and 420 deletions

View File

@ -29,7 +29,6 @@ import org.eclipse.jetty.http3.qpack.QpackDecoder;
import org.eclipse.jetty.http3.qpack.QpackEncoder;
import org.eclipse.jetty.quic.client.ClientProtocolSession;
import org.eclipse.jetty.quic.client.ClientQuicSession;
import org.eclipse.jetty.quic.common.CloseInfo;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.quic.common.StreamType;
import org.eclipse.jetty.util.Callback;
@ -150,11 +149,28 @@ public class ClientHTTP3Session extends ClientProtocolSession
}
@Override
protected void onClose(CloseInfo closeInfo)
protected boolean onIdleTimeout()
{
if (LOG.isDebugEnabled())
LOG.debug("session closed remotely {} {}", closeInfo, this);
session.notifySessionFailure(closeInfo.error(), closeInfo.reason());
LOG.debug("idle timeout {} ms expired for {}", getQuicSession().getIdleTimeout(), this);
return session.onIdleTimeout();
}
@Override
public void inwardClose(long error, String reason)
{
if (LOG.isDebugEnabled())
LOG.debug("inward closing 0x{}/{} on {}", Long.toHexString(error), reason, this);
// TODO: maybe we should be harsher here... see onIdleTimeout()
session.goAway(false);
}
@Override
protected void onClose(long error, String reason)
{
if (LOG.isDebugEnabled())
LOG.debug("session closed remotely 0x{}/{} {}", Long.toHexString(error), reason, this);
session.onClose(error, reason);
}
private void configureUnidirectionalStreamEndPoint(QuicStreamEndPoint endPoint)

View File

@ -15,7 +15,6 @@ package org.eclipse.jetty.http3.client.internal;
import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.Frame;
@ -58,14 +57,14 @@ public class HTTP3SessionClient extends HTTP3Session implements Session.Client
@Override
public void onHeaders(long streamId, HeadersFrame frame)
{
QuicStreamEndPoint endPoint = getProtocolSession().getStreamEndPoint(streamId);
HTTP3Stream stream = getOrCreateStream(endPoint);
MetaData metaData = frame.getMetaData();
if (metaData.isResponse())
if (frame.getMetaData().isResponse())
{
QuicStreamEndPoint endPoint = getProtocolSession().getStreamEndPoint(streamId);
HTTP3Stream stream = getOrCreateStream(endPoint);
if (LOG.isDebugEnabled())
LOG.debug("received response {}#{} on {}", frame, streamId, this);
stream.onResponse(frame);
LOG.debug("received response {} on {}", frame, stream);
if (stream != null)
stream.onResponse(frame);
}
else
{

View File

@ -174,11 +174,22 @@ public interface Session
}
/**
* <p>Callback method invoked when a the underlying transport has been closed.</p>
* <p>Callback method invoked when the idle timeout has expired.</p>
*
* @param session the session
* @return true to confirm the idle timeout, false to ignore the idle timeout
*/
public default boolean onIdleTimeout(Session session)
{
return true;
}
/**
* <p>Callback method invoked when the underlying transport has been disconnected.</p>
*
* @param session the session
*/
public default void onTerminate(Session session)
public default void onDisconnect(Session session)
{
}
@ -225,10 +236,9 @@ public interface Session
* <p>Callback method invoked when a failure has been detected for this session.</p>
*
* @param session the session
* @param error the error code
* @param reason the error reason
* @param failure the cause of the failure
*/
public default void onSessionFailure(Session session, long error, String reason)
public default void onFailure(Session session, Throwable failure)
{
}
}

View File

@ -124,13 +124,21 @@ public interface Stream
public void demand();
/**
* <p> Sends the given HEADERS frame containing the trailer headers.</p>
* <p>Sends the given HEADERS frame containing the trailer headers.</p>
*
* @param frame the HEADERS frame containing the trailer headers
* @return the {@link CompletableFuture} that gets notified when the frame has been sent
*/
public CompletableFuture<Stream> trailer(HeadersFrame frame);
/**
* <p>Abruptly terminates this stream with the given error.</p>
*
* @param error the error code
* @param failure the failure that caused the reset of the stream
*/
public void reset(long error, Throwable failure);
/**
* <p>A {@link Stream.Listener} is the passive counterpart of a {@link Stream} and receives
* events happening on an HTTP/3 stream.</p>
@ -237,10 +245,9 @@ public interface Stream
* the stream has been reset.</p>
*
* @param stream the stream
* @param error the error code
* @param failure the cause of the failure
*/
public default void onFailure(Stream stream, long error, Throwable failure)
public default void onFailure(Stream stream, Throwable failure)
{
}
}

View File

@ -57,7 +57,7 @@ public abstract class HTTP3Session implements Session, ParserListener
private final Map<Long, HTTP3Stream> streams = new ConcurrentHashMap<>();
private final ProtocolSession session;
private final Listener listener;
private final AtomicInteger remoteStreamCount = new AtomicInteger();
private final AtomicInteger streamCount = new AtomicInteger();
private final StreamTimeouts streamTimeouts;
private long streamIdleTimeout;
private CloseState closeState = CloseState.CLOSED;
@ -115,8 +115,9 @@ public abstract class HTTP3Session implements Session, ParserListener
private CompletableFuture<Void> goAway(GoAwayFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("goaway with {} on {}", frame, this);
LOG.debug("goAway with {} on {}", frame, this);
boolean failStreams = false;
boolean sendGoAway = false;
Callback.Completable callback = null;
try (AutoLock l = lock.lock())
@ -153,8 +154,8 @@ public abstract class HTTP3Session implements Session, ParserListener
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("already sent {} on {}", goAwaySent, this);
closeState = CloseState.CLOSED;
failStreams = true;
}
}
break;
@ -179,7 +180,7 @@ public abstract class HTTP3Session implements Session, ParserListener
else
{
closeState = CloseState.CLOSING;
zeroStreamsAction = this::terminate;
zeroStreamsAction = () -> terminate("go_away");
}
}
break;
@ -215,6 +216,8 @@ public abstract class HTTP3Session implements Session, ParserListener
}
else
{
if (failStreams)
failStreams(stream -> true, "go_away", true);
return CompletableFuture.completedFuture(null);
}
}
@ -229,18 +232,11 @@ public abstract class HTTP3Session implements Session, ParserListener
Atomics.updateMax(lastId, id);
}
public void close(long error, String reason)
public void outwardClose(long error, String reason)
{
if (LOG.isDebugEnabled())
LOG.debug("closing 0x{}/{} on {}", Long.toHexString(error), reason, this);
try (AutoLock l = lock.lock())
{
closeState = CloseState.CLOSED;
zeroStreamsAction = null;
// TODO: what about field shutdown?
}
failStreams(stream -> true, false);
getProtocolSession().close(error, reason);
LOG.debug("outward closing 0x{}/{} on {}", Long.toHexString(error), reason, this);
getProtocolSession().outwardClose(error, reason);
}
public long getStreamIdleTimeout()
@ -260,6 +256,9 @@ public abstract class HTTP3Session implements Session, ParserListener
protected CompletableFuture<Stream> newRequest(long streamId, HeadersFrame frame, Stream.Listener listener)
{
if (LOG.isDebugEnabled())
LOG.debug("new request stream #{} with {} on {}", streamId, frame, this);
QuicStreamEndPoint endPoint = session.getOrCreateStreamEndPoint(streamId, session::configureProtocolEndPoint);
Promise.Completable<Stream> promise = new Promise.Completable<>();
@ -272,19 +271,24 @@ public abstract class HTTP3Session implements Session, ParserListener
if (stream == null)
return promise;
if (LOG.isDebugEnabled())
LOG.debug("created request/response stream {}", stream);
stream.setListener(listener);
Callback callback = Callback.from(Invocable.InvocationType.NON_BLOCKING, () ->
{
if (listener == null)
endPoint.shutdownInput(ErrorCode.NO_ERROR.code());
promise.succeeded(stream);
}, promise::failed);
stream.writeFrame(frame)
.whenComplete((r, x) ->
{
if (x == null)
{
if (listener == null)
endPoint.shutdownInput(ErrorCode.NO_ERROR.code());
promise.succeeded(stream);
}
else
{
promise.failed(x);
}
});
stream.updateClose(frame.isLast(), true);
writeMessageFrame(streamId, frame, callback);
return promise;
}
@ -301,6 +305,8 @@ public abstract class HTTP3Session implements Session, ParserListener
protected HTTP3Stream getOrCreateStream(QuicStreamEndPoint endPoint)
{
if (endPoint == null)
return null;
return streams.computeIfAbsent(endPoint.getStreamId(), id -> newHTTP3Stream(endPoint, null, false));
}
@ -310,14 +316,9 @@ public abstract class HTTP3Session implements Session, ParserListener
try (AutoLock l = lock.lock())
{
if (closeState == CloseState.NOT_CLOSED)
{
if (!local)
remoteStreamCount.incrementAndGet();
}
streamCount.incrementAndGet();
else
{
failure = new IllegalStateException("session_closed");
}
}
if (failure == null)
@ -327,7 +328,7 @@ public abstract class HTTP3Session implements Session, ParserListener
if (idleTimeout > 0)
stream.setIdleTimeout(idleTimeout);
if (LOG.isDebugEnabled())
LOG.debug("created {} on {}", stream, this);
LOG.debug("created {}", stream);
return stream;
}
else
@ -351,13 +352,10 @@ public abstract class HTTP3Session implements Session, ParserListener
if (removed)
{
if (LOG.isDebugEnabled())
LOG.debug("destroyed {} on {}", stream, this);
LOG.debug("destroyed {}", stream);
if (!stream.isLocal())
{
if (remoteStreamCount.decrementAndGet() == 0)
tryRunZeroStreamsAction();
}
if (streamCount.decrementAndGet() == 0)
tryRunZeroStreamsAction();
}
}
@ -418,11 +416,22 @@ public abstract class HTTP3Session implements Session, ParserListener
}
}
private boolean notifyIdleTimeout()
{
try
{
return listener.onIdleTimeout(this);
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
return true;
}
}
@Override
public void onHeaders(long streamId, HeadersFrame frame)
{
QuicStreamEndPoint endPoint = session.getStreamEndPoint(streamId);
HTTP3Stream stream = getOrCreateStream(endPoint);
MetaData metaData = frame.getMetaData();
if (metaData.isRequest() || metaData.isResponse())
{
@ -430,36 +439,44 @@ public abstract class HTTP3Session implements Session, ParserListener
}
else
{
QuicStreamEndPoint endPoint = session.getStreamEndPoint(streamId);
HTTP3Stream stream = getOrCreateStream(endPoint);
if (LOG.isDebugEnabled())
LOG.debug("received trailer {}#{} on {}", frame, streamId, this);
stream.onTrailer(frame);
LOG.debug("received trailer {} on {}", frame, stream);
if (stream != null)
stream.onTrailer(frame);
}
}
@Override
public void onData(long streamId, DataFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("received {}#{} on {}", frame, streamId, this);
HTTP3Stream stream = getStream(streamId);
if (LOG.isDebugEnabled())
LOG.debug("received {} on {}", frame, stream);
if (stream != null)
stream.onData(frame);
else
closeAndNotifyFailure(ErrorCode.FRAME_UNEXPECTED_ERROR.code(), "invalid_frame_sequence");
fail(ErrorCode.FRAME_UNEXPECTED_ERROR.code(), "invalid_frame_sequence");
}
public void onDataAvailable(long streamId)
{
if (LOG.isDebugEnabled())
LOG.debug("notifying data available for stream #{} on {}", streamId, this);
HTTP3Stream stream = getStream(streamId);
if (LOG.isDebugEnabled())
LOG.debug("notifying data available on {}", stream);
stream.onDataAvailable();
}
void closeAndNotifyFailure(long error, String reason)
void fail(long error, String reason)
{
close(error, reason);
notifySessionFailure(error, reason);
// Hard failure, no need to send a GOAWAY.
try (AutoLock l = lock.lock())
{
closeState = CloseState.CLOSED;
}
outwardClose(error, reason);
notifyFailure(new IOException(String.format("%d/%s", error, reason)));
}
@Override
@ -487,7 +504,7 @@ public abstract class HTTP3Session implements Session, ParserListener
goAwaySent = newGoAwayFrame(false);
closeState = CloseState.CLOSING;
GoAwayFrame goAwayFrame = goAwaySent;
zeroStreamsAction = () -> writeControlFrame(goAwayFrame, Callback.from(this::terminate));
zeroStreamsAction = () -> writeControlFrame(goAwayFrame, Callback.from(() -> terminate("go_away")));
failStreams = true;
}
break;
@ -507,11 +524,11 @@ public abstract class HTTP3Session implements Session, ParserListener
{
goAwaySent = newGoAwayFrame(false);
GoAwayFrame goAwayFrame = goAwaySent;
zeroStreamsAction = () -> writeControlFrame(goAwayFrame, Callback.from(this::terminate));
zeroStreamsAction = () -> writeControlFrame(goAwayFrame, Callback.from(() -> terminate("go_away")));
}
else
{
zeroStreamsAction = this::terminate;
zeroStreamsAction = () -> terminate("go_away");
failStreams = true;
}
}
@ -532,11 +549,11 @@ public abstract class HTTP3Session implements Session, ParserListener
{
goAwaySent = newGoAwayFrame(false);
GoAwayFrame goAwayFrame = goAwaySent;
zeroStreamsAction = () -> writeControlFrame(goAwayFrame, Callback.from(this::terminate));
zeroStreamsAction = () -> writeControlFrame(goAwayFrame, Callback.from(() -> terminate("go_away")));
}
else
{
zeroStreamsAction = this::terminate;
zeroStreamsAction = () -> terminate("go_away");
}
failStreams = true;
}
@ -563,38 +580,114 @@ public abstract class HTTP3Session implements Session, ParserListener
// The other peer sent us a GOAWAY with the last processed streamId,
// so we must fail the streams that have a bigger streamId.
Predicate<HTTP3Stream> predicate = stream -> stream.isLocal() && stream.getId() > frame.getLastId();
failStreams(predicate, true);
failStreams(predicate, "go_away", true);
}
tryRunZeroStreamsAction();
}
private void failStreams(Predicate<HTTP3Stream> predicate, boolean close)
public boolean onIdleTimeout()
{
boolean notify = false;
try (AutoLock l = lock.lock())
{
switch (closeState)
{
case NOT_CLOSED:
{
notify = true;
break;
}
case LOCALLY_CLOSED:
case REMOTELY_CLOSED:
{
break;
}
case CLOSING:
case CLOSED:
{
if (LOG.isDebugEnabled())
LOG.debug("already closed, ignored idle timeout for {}", this);
return false;
}
default:
{
throw new IllegalStateException();
}
}
}
boolean confirmed = true;
if (notify)
confirmed = notifyIdleTimeout();
if (LOG.isDebugEnabled())
LOG.debug("idle timeout {} for {}", confirmed ? "confirmed" : "ignored", this);
if (!confirmed)
return false;
GoAwayFrame goAwayFrame = null;
try (AutoLock l = lock.lock())
{
switch (closeState)
{
case NOT_CLOSED:
case LOCALLY_CLOSED:
case REMOTELY_CLOSED:
case CLOSING:
{
if (goAwaySent == null || goAwaySent.isGraceful())
goAwaySent = goAwayFrame = newGoAwayFrame(false);
closeState = CloseState.CLOSED;
break;
}
case CLOSED:
{
return false;
}
default:
{
throw new IllegalStateException();
}
}
}
failStreams(stream -> true, "session_idle_timeout", true);
if (goAwayFrame != null)
writeControlFrame(goAwayFrame, Callback.from(() -> terminate("idle_timeout")));
else
terminate("idle_timeout");
return false;
}
private void failStreams(Predicate<HTTP3Stream> predicate, String reason, boolean close)
{
long error = ErrorCode.REQUEST_CANCELLED_ERROR.code();
Throwable failure = new IOException("request_cancelled");
Throwable failure = new IOException(reason);
streams.values().stream()
.filter(predicate)
.forEach(stream ->
{
if (close)
stream.close(error, failure);
stream.reset(error, failure);
// Since the stream failure was generated
// by a GOAWAY, notify the application.
stream.onFailure(error, failure);
removeStream(stream);
stream.onFailure(failure);
});
}
private void terminate()
private void terminate(String reason)
{
if (LOG.isDebugEnabled())
LOG.debug("terminating {}", this);
LOG.debug("terminating reason={} for {}", reason, this);
streamTimeouts.destroy();
close(ErrorCode.NO_ERROR.code(), "terminate");
outwardClose(ErrorCode.NO_ERROR.code(), reason);
// Since the close() above is called by the
// implementation, notify the application.
notifyTerminate();
notifyDisconnect();
}
private void tryRunZeroStreamsAction()
@ -603,7 +696,7 @@ public abstract class HTTP3Session implements Session, ParserListener
CompletableFuture<Void> completable;
try (AutoLock l = lock.lock())
{
long count = remoteStreamCount.get();
long count = streamCount.get();
if (count > 0)
{
if (LOG.isDebugEnabled())
@ -663,25 +756,36 @@ public abstract class HTTP3Session implements Session, ParserListener
completable.complete(null);
}
public void onClose(int error, String reason)
public void onClose(long error, String reason)
{
// A close at the QUIC level does not allow
// any data to be sent, just update the state.
if (LOG.isDebugEnabled())
LOG.debug("session closed remotely 0x{}/{} {}", Long.toHexString(error), reason, this);
// A close at the QUIC level does not allow any
// data to be sent, update the state and notify.
boolean notifyFailure;
try (AutoLock l = lock.lock())
{
notifyFailure = closeState == CloseState.NOT_CLOSED;
closeState = CloseState.CLOSED;
zeroStreamsAction = null;
// TODO: what about field shutdown?
}
failStreams(stream -> true, false);
notifyTerminate();
// No point in closing the streams, as QUIC frames cannot be sent.
failStreams(stream -> true, "remote_close", false);
if (notifyFailure)
fail(error, reason);
notifyDisconnect();
}
private void notifyTerminate()
private void notifyDisconnect()
{
try
{
listener.onTerminate(this);
listener.onDisconnect(this);
}
catch (Throwable x)
{
@ -693,11 +797,11 @@ public abstract class HTTP3Session implements Session, ParserListener
public void onStreamFailure(long streamId, long error, Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("stream failure {}/{} for stream #{} on {}", error, failure, streamId, this, failure);
LOG.debug("stream failure 0x{}/{} for stream #{} on {}", Long.toHexString(error), failure.getMessage(), streamId, this);
HTTP3Stream stream = getStream(streamId);
if (stream != null)
{
stream.onFailure(error, failure);
stream.onFailure(failure);
removeStream(stream);
}
}
@ -708,11 +812,11 @@ public abstract class HTTP3Session implements Session, ParserListener
// TODO
}
public void notifySessionFailure(long error, String reason)
public void notifyFailure(Throwable failure)
{
try
{
listener.onSessionFailure(this, error, reason);
listener.onFailure(this, failure);
}
catch (Throwable x)
{
@ -728,7 +832,7 @@ public abstract class HTTP3Session implements Session, ParserListener
@Override
public String toString()
{
return String.format("%s@%x[streams=%d,%s]", getClass().getSimpleName(), hashCode(), remoteStreamCount.get(), closeState);
return String.format("%s@%x[streams=%d,%s]", getClass().getSimpleName(), hashCode(), streamCount.get(), closeState);
}
private enum CloseState
@ -754,7 +858,7 @@ public abstract class HTTP3Session implements Session, ParserListener
@Override
protected boolean onExpired(HTTP3Stream stream)
{
if (stream.processIdleTimeout(new TimeoutException("idle timeout " + stream.getIdleTimeout() + " ms elapsed")))
if (stream.onIdleTimeout(new TimeoutException("idle timeout " + stream.getIdleTimeout() + " ms elapsed")))
removeStream(stream);
// The iterator returned from the method above does not support removal.
return false;

View File

@ -105,7 +105,7 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable
expireNanoTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(idleTimeout);
}
boolean processIdleTimeout(TimeoutException timeout)
boolean onIdleTimeout(TimeoutException timeout)
{
if (LOG.isDebugEnabled())
LOG.debug("idle timeout {} ms expired on {}", getIdleTimeout(), this);
@ -138,12 +138,14 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable
{
HTTP3StreamConnection connection = (HTTP3StreamConnection)endPoint.getConnection();
Data data = connection.readData();
updateClose(data.isLast(), false);
if (data != null)
updateClose(data.isLast(), false);
return data;
}
catch (Throwable x)
{
updateClose(true, false);
reset(ErrorCode.REQUEST_CANCELLED_ERROR.code(), x);
// Rethrow to the application, so don't notify onFailure().
throw x;
}
}
@ -288,18 +290,19 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable
}
}
public void onFailure(long error, Throwable failure)
public void onFailure(Throwable failure)
{
notifyFailure(error, failure);
notifyFailure(failure);
session.removeStream(this);
}
private void notifyFailure(long error, Throwable failure)
private void notifyFailure(Throwable failure)
{
Listener listener = getListener();
try
{
if (listener != null)
listener.onFailure(this, error, failure);
listener.onFailure(this, failure);
}
catch (Throwable x)
{
@ -321,12 +324,12 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable
if (frameState == FrameState.FAILED)
return false;
frameState = FrameState.FAILED;
session.closeAndNotifyFailure(ErrorCode.FRAME_UNEXPECTED_ERROR.code(), "invalid_frame_sequence");
session.fail(ErrorCode.FRAME_UNEXPECTED_ERROR.code(), "invalid_frame_sequence");
return false;
}
}
private Promise.Completable<Stream> writeFrame(Frame frame)
Promise.Completable<Stream> writeFrame(Frame frame)
{
notIdle();
Promise.Completable<Stream> completable = new Promise.Completable<>();
@ -334,7 +337,7 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable
return completable;
}
private void updateClose(boolean update, boolean local)
void updateClose(boolean update, boolean local)
{
if (update)
{
@ -375,22 +378,26 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable
}
}
public void close(long error, Throwable failure)
@Override
public void reset(long error, Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("closing {} with error 0x{} {}", this, Long.toHexString(error), failure.toString());
LOG.debug("resetting {} with error 0x{} {}", this, Long.toHexString(error), failure.toString());
closeState = CloseState.CLOSED;
session.removeStream(this);
endPoint.close(error, failure);
}
@Override
public String toString()
{
return String.format("%s@%x#%d[demand=%b,idle=%d]",
return String.format("%s@%x#%d[demand=%b,idle=%d,session=%s]",
getClass().getSimpleName(),
hashCode(),
getId(),
hasDemand(),
TimeUnit.NANOSECONDS.toMillis(expireNanoTime - System.nanoTime())
TimeUnit.NANOSECONDS.toMillis(expireNanoTime - System.nanoTime()),
getSession()
);
}

View File

@ -351,7 +351,7 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
}
catch (IOException x)
{
throw new UncheckedIOException(x);
throw new UncheckedIOException(x.getMessage(), x);
}
}

View File

@ -13,7 +13,6 @@
package org.eclipse.jetty.http3.server.internal;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.GoAwayFrame;
@ -34,13 +33,6 @@ public class HTTP3SessionServer extends HTTP3Session implements Session.Server
super(session, listener);
}
@Override
public void onOpen()
{
super.onOpen();
notifyAccept();
}
@Override
public ServerHTTP3Session getProtocolSession()
{
@ -53,18 +45,27 @@ public class HTTP3SessionServer extends HTTP3Session implements Session.Server
return (Session.Server.Listener)super.getListener();
}
@Override
public void onOpen()
{
super.onOpen();
notifyAccept();
}
@Override
public void onHeaders(long streamId, HeadersFrame frame)
{
QuicStreamEndPoint endPoint = getProtocolSession().getStreamEndPoint(streamId);
HTTP3Stream stream = getOrCreateStream(endPoint);
MetaData metaData = frame.getMetaData();
if (metaData.isRequest())
if (frame.getMetaData().isRequest())
{
QuicStreamEndPoint endPoint = getProtocolSession().getStreamEndPoint(streamId);
HTTP3Stream stream = getOrCreateStream(endPoint);
if (LOG.isDebugEnabled())
LOG.debug("received request {}#{} on {}", frame, streamId, this);
updateLastId(streamId);
stream.onRequest(frame);
LOG.debug("received request {} on {}", frame, stream);
if (stream != null)
{
updateLastId(streamId);
stream.onRequest(frame);
}
}
else
{

View File

@ -27,7 +27,6 @@ import org.eclipse.jetty.http3.internal.InstructionHandler;
import org.eclipse.jetty.http3.internal.UnidirectionalStreamConnection;
import org.eclipse.jetty.http3.qpack.QpackDecoder;
import org.eclipse.jetty.http3.qpack.QpackEncoder;
import org.eclipse.jetty.quic.common.CloseInfo;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.quic.common.StreamType;
import org.eclipse.jetty.quic.server.ServerProtocolSession;
@ -149,11 +148,28 @@ public class ServerHTTP3Session extends ServerProtocolSession
}
@Override
protected void onClose(CloseInfo closeInfo)
protected boolean onIdleTimeout()
{
if (LOG.isDebugEnabled())
LOG.debug("session closed remotely {} {}", closeInfo, this);
session.onClose(closeInfo.error(), closeInfo.reason());
LOG.debug("idle timeout {} ms expired for {}", getQuicSession().getIdleTimeout(), this);
return session.onIdleTimeout();
}
@Override
public void inwardClose(long error, String reason)
{
if (LOG.isDebugEnabled())
LOG.debug("inward closing 0x{}/{} on {}", Long.toHexString(error), reason, this);
// TODO: maybe we should be harsher here... like halt() see onIdleTimeout()
session.goAway(false);
}
@Override
protected void onClose(long error, String reason)
{
if (LOG.isDebugEnabled())
LOG.debug("session closed remotely 0x{}/{} {}", Long.toHexString(error), reason, this);
session.onClose(error, reason);
}
private void configureUnidirectionalStreamEndPoint(QuicStreamEndPoint endPoint)
@ -177,7 +193,7 @@ public class ServerHTTP3Session extends ServerProtocolSession
messageFlusher.iterate();
}
protected void onDataAvailable(long streamId)
public void onDataAvailable(long streamId)
{
session.onDataAvailable(streamId);
}

View File

@ -24,7 +24,7 @@ import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.client.HTTP3Client;
import org.eclipse.jetty.http3.server.RawHTTP3ServerConnectionFactory;
import org.eclipse.jetty.quic.server.ServerQuicConnector;
import org.eclipse.jetty.quic.server.QuicServerConnector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.ssl.SslContextFactory;
@ -38,7 +38,7 @@ public class AbstractClientServerTest
@RegisterExtension
final BeforeTestExecutionCallback printMethodName = context ->
System.err.printf("Running %s.%s() %s%n", context.getRequiredTestClass().getSimpleName(), context.getRequiredTestMethod().getName(), context.getDisplayName());
protected ServerQuicConnector connector;
protected QuicServerConnector connector;
protected HTTP3Client client;
protected Server server;
@ -56,7 +56,7 @@ public class AbstractClientServerTest
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
server = new Server(serverThreads);
connector = new ServerQuicConnector(server, sslContextFactory, new RawHTTP3ServerConnectionFactory(listener));
connector = new QuicServerConnector(server, sslContextFactory, new RawHTTP3ServerConnectionFactory(listener));
server.addConnector(connector);
server.start();
}

View File

@ -25,16 +25,21 @@ import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.client.internal.HTTP3SessionClient;
import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.frames.GoAwayFrame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.ErrorCode;
import org.eclipse.jetty.http3.internal.HTTP3Session;
import org.eclipse.jetty.http3.server.internal.HTTP3SessionServer;
import org.eclipse.jetty.quic.client.ClientQuicSession;
import org.eclipse.jetty.quic.common.QuicConnection;
import org.eclipse.jetty.quic.server.ServerQuicSession;
import org.eclipse.jetty.util.BufferUtil;
import org.junit.jupiter.api.Test;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -44,14 +49,15 @@ public class GoAwayTest extends AbstractClientServerTest
@Test
public void testClientGoAwayServerReplies() throws Exception
{
CountDownLatch serverLatch = new CountDownLatch(1);
AtomicReference<Session> serverSessionRef = new AtomicReference<>();
CountDownLatch serverGoAwayLatch = new CountDownLatch(1);
AtomicReference<HTTP3SessionServer> serverSessionRef = new AtomicReference<>();
CountDownLatch serverDisconnectLatch = new CountDownLatch(1);
start(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{
serverSessionRef.set(stream.getSession());
serverSessionRef.set((HTTP3SessionServer)stream.getSession());
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY);
stream.respond(new HeadersFrame(response, true));
return null;
@ -60,17 +66,30 @@ public class GoAwayTest extends AbstractClientServerTest
@Override
public void onGoAway(Session session, GoAwayFrame frame)
{
serverLatch.countDown();
serverGoAwayLatch.countDown();
}
@Override
public void onDisconnect(Session session)
{
serverDisconnectLatch.countDown();
}
});
CountDownLatch clientLatch = new CountDownLatch(1);
Session.Client clientSession = newSession(new Session.Client.Listener()
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
CountDownLatch clientDisconnectLatch = new CountDownLatch(1);
HTTP3SessionClient clientSession = (HTTP3SessionClient)newSession(new Session.Client.Listener()
{
@Override
public void onGoAway(Session session, GoAwayFrame frame)
{
clientLatch.countDown();
clientGoAwayLatch.countDown();
}
@Override
public void onDisconnect(Session session)
{
clientDisconnectLatch.countDown();
}
});
clientSession.newRequest(new HeadersFrame(newRequest("/"), true), new Stream.Listener()
@ -84,11 +103,27 @@ public class GoAwayTest extends AbstractClientServerTest
}
});
assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverDisconnectLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientDisconnectLatch.await(5, TimeUnit.SECONDS));
await().atMost(1, TimeUnit.SECONDS).until(((HTTP3Session)serverSessionRef.get())::isClosed);
await().atMost(1, TimeUnit.SECONDS).until(((HTTP3Session)clientSession)::isClosed);
HTTP3SessionServer serverSession = serverSessionRef.get();
assertTrue(serverSession.isClosed());
assertTrue(serverSession.getStreams().isEmpty());
ServerQuicSession serverQuicSession = serverSession.getProtocolSession().getQuicSession();
// While HTTP/3 is completely closed, QUIC may still be exchanging packets, so we need to await().
await().atMost(1, TimeUnit.SECONDS).until(() -> serverQuicSession.getQuicStreamEndPoints().isEmpty());
await().atMost(1, TimeUnit.SECONDS).until(() -> serverQuicSession.getQuicConnection().getQuicSessions().isEmpty());
assertTrue(clientSession.isClosed());
assertTrue(clientSession.getStreams().isEmpty());
ClientQuicSession clientQuicSession = clientSession.getProtocolSession().getQuicSession();
// While HTTP/3 is completely closed, QUIC may still be exchanging packets, so we need to await().
await().atMost(1, TimeUnit.SECONDS).until(() -> clientQuicSession.getQuicStreamEndPoints().isEmpty());
QuicConnection quicConnection = clientQuicSession.getQuicConnection();
await().atMost(1, TimeUnit.SECONDS).until(() -> quicConnection.getQuicSessions().isEmpty());
await().atMost(1, TimeUnit.SECONDS).until(() -> quicConnection.getEndPoint().isOpen(), is(false));
}
@Test
@ -96,7 +131,7 @@ public class GoAwayTest extends AbstractClientServerTest
{
AtomicReference<Session> serverSessionRef = new AtomicReference<>();
CountDownLatch serverGoAwayLatch = new CountDownLatch(1);
CountDownLatch serverTerminateLatch = new CountDownLatch(1);
CountDownLatch serverDisconnectLatch = new CountDownLatch(1);
start(new Session.Server.Listener()
{
@Override
@ -115,14 +150,14 @@ public class GoAwayTest extends AbstractClientServerTest
}
@Override
public void onTerminate(Session session)
public void onDisconnect(Session session)
{
serverTerminateLatch.countDown();
serverDisconnectLatch.countDown();
}
});
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
CountDownLatch clientTerminateLatch = new CountDownLatch(1);
CountDownLatch clientDisconnectLatch = new CountDownLatch(1);
Session.Client clientSession = newSession(new Session.Client.Listener()
{
@Override
@ -132,14 +167,14 @@ public class GoAwayTest extends AbstractClientServerTest
}
@Override
public void onTerminate(Session session)
public void onDisconnect(Session session)
{
clientTerminateLatch.countDown();
clientDisconnectLatch.countDown();
}
});
CountDownLatch streamFailureLatch = new CountDownLatch(1);
clientSession.newRequest(new HeadersFrame(newRequest("/"), true), new Stream.Listener()
clientSession.newRequest(new HeadersFrame(newRequest("/1"), true), new Stream.Listener()
{
@Override
public void onResponse(Stream stream, HeadersFrame frame)
@ -149,10 +184,10 @@ public class GoAwayTest extends AbstractClientServerTest
serverSessionRef.get().goAway(false);
// The client sends the second request and should eventually fail it
// locally since it has a larger streamId, and the server discarded it.
clientSession.newRequest(new HeadersFrame(newRequest("/"), true), new Stream.Listener()
clientSession.newRequest(new HeadersFrame(newRequest("/2"), true), new Stream.Listener()
{
@Override
public void onFailure(Stream stream, long error, Throwable failure)
public void onFailure(Stream stream, Throwable failure)
{
streamFailureLatch.countDown();
}
@ -163,18 +198,18 @@ public class GoAwayTest extends AbstractClientServerTest
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(streamFailureLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientTerminateLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverTerminateLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientDisconnectLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverDisconnectLatch.await(5, TimeUnit.SECONDS));
await().atMost(1, TimeUnit.SECONDS).until(((HTTP3Session)clientSession)::isClosed);
await().atMost(1, TimeUnit.SECONDS).until(((HTTP3Session)serverSessionRef.get())::isClosed);
assertTrue(((HTTP3Session)clientSession).isClosed());
assertTrue(((HTTP3Session)serverSessionRef.get()).isClosed());
}
@Test
public void testServerGracefulGoAway() throws Exception
{
CountDownLatch serverGoAwayLatch = new CountDownLatch(1);
CountDownLatch serverTerminateLatch = new CountDownLatch(1);
CountDownLatch serverDisconnectLatch = new CountDownLatch(1);
AtomicReference<Session> serverSessionRef = new AtomicReference<>();
start(new Session.Server.Listener()
{
@ -194,15 +229,15 @@ public class GoAwayTest extends AbstractClientServerTest
}
@Override
public void onTerminate(Session session)
public void onDisconnect(Session session)
{
serverTerminateLatch.countDown();
serverDisconnectLatch.countDown();
}
});
CountDownLatch clientGracefulGoAwayLatch = new CountDownLatch(1);
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
CountDownLatch clientTerminateLatch = new CountDownLatch(1);
CountDownLatch clientDisconnectLatch = new CountDownLatch(1);
Session.Client clientSession = newSession(new Session.Client.Listener()
{
@Override
@ -215,9 +250,9 @@ public class GoAwayTest extends AbstractClientServerTest
}
@Override
public void onTerminate(Session session)
public void onDisconnect(Session session)
{
clientTerminateLatch.countDown();
clientDisconnectLatch.countDown();
}
});
CountDownLatch clientLatch = new CountDownLatch(1);
@ -240,19 +275,19 @@ public class GoAwayTest extends AbstractClientServerTest
assertTrue(clientGracefulGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientTerminateLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientDisconnectLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverTerminateLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverDisconnectLatch.await(5, TimeUnit.SECONDS));
await().atMost(1, TimeUnit.SECONDS).until(((HTTP3Session)serverSessionRef.get())::isClosed);
await().atMost(1, TimeUnit.SECONDS).until(((HTTP3Session)clientSession)::isClosed);
assertTrue(((HTTP3Session)serverSessionRef.get()).isClosed());
assertTrue(((HTTP3Session)clientSession).isClosed());
}
@Test
public void testServerGracefulGoAwayWithStreamsServerClosesWhenLastStreamCloses() throws Exception
{
CountDownLatch serverGoAwayLatch = new CountDownLatch(1);
CountDownLatch serverTerminateLatch = new CountDownLatch(1);
CountDownLatch serverDisconnectLatch = new CountDownLatch(1);
AtomicReference<Session> serverSessionRef = new AtomicReference<>();
AtomicReference<Stream> serverStreamRef = new AtomicReference<>();
start(new Session.Server.Listener()
@ -277,15 +312,15 @@ public class GoAwayTest extends AbstractClientServerTest
}
@Override
public void onTerminate(Session session)
public void onDisconnect(Session session)
{
serverTerminateLatch.countDown();
serverDisconnectLatch.countDown();
}
});
CountDownLatch clientGracefulGoAwayLatch = new CountDownLatch(1);
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
CountDownLatch clientTerminateLatch = new CountDownLatch(1);
CountDownLatch clientDisconnectLatch = new CountDownLatch(1);
Session.Client clientSession = newSession(new Session.Client.Listener()
{
@Override
@ -298,9 +333,9 @@ public class GoAwayTest extends AbstractClientServerTest
}
@Override
public void onTerminate(Session session)
public void onDisconnect(Session session)
{
clientTerminateLatch.countDown();
clientDisconnectLatch.countDown();
}
});
CountDownLatch clientLatch = new CountDownLatch(1);
@ -336,11 +371,11 @@ public class GoAwayTest extends AbstractClientServerTest
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientTerminateLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverTerminateLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientDisconnectLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverDisconnectLatch.await(5, TimeUnit.SECONDS));
await().atMost(1, TimeUnit.SECONDS).until(((HTTP3Session)serverSessionRef.get())::isClosed);
await().atMost(1, TimeUnit.SECONDS).until(((HTTP3Session)clientSession)::isClosed);
assertTrue(((HTTP3Session)serverSessionRef.get()).isClosed());
assertTrue(((HTTP3Session)clientSession).isClosed());
}
@Test
@ -349,7 +384,7 @@ public class GoAwayTest extends AbstractClientServerTest
AtomicReference<Stream> serverStreamRef = new AtomicReference<>();
CountDownLatch serverStreamLatch = new CountDownLatch(1);
CountDownLatch serverGoAwayLatch = new CountDownLatch(1);
CountDownLatch serverTerminateLatch = new CountDownLatch(1);
CountDownLatch serverDisconnectLatch = new CountDownLatch(1);
start(new Session.Server.Listener()
{
@Override
@ -367,14 +402,14 @@ public class GoAwayTest extends AbstractClientServerTest
}
@Override
public void onTerminate(Session session)
public void onDisconnect(Session session)
{
serverTerminateLatch.countDown();
serverDisconnectLatch.countDown();
}
});
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
CountDownLatch clientTerminateLatch = new CountDownLatch(1);
CountDownLatch clientDisconnectLatch = new CountDownLatch(1);
Session.Client clientSession = newSession(new Session.Client.Listener()
{
@Override
@ -384,9 +419,9 @@ public class GoAwayTest extends AbstractClientServerTest
}
@Override
public void onTerminate(Session session)
public void onDisconnect(Session session)
{
clientTerminateLatch.countDown();
clientDisconnectLatch.countDown();
}
});
@ -418,12 +453,12 @@ public class GoAwayTest extends AbstractClientServerTest
serverStream.respond(new HeadersFrame(response, true));
assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverTerminateLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverDisconnectLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientTerminateLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientDisconnectLatch.await(5, TimeUnit.SECONDS));
await().atMost(1, TimeUnit.SECONDS).until(((HTTP3Session)serverStreamRef.get().getSession())::isClosed);
await().atMost(1, TimeUnit.SECONDS).until(((HTTP3Session)clientSession)::isClosed);
assertTrue(((HTTP3Session)serverStreamRef.get().getSession()).isClosed());
assertTrue(((HTTP3Session)clientSession).isClosed());
}
@Test
@ -432,7 +467,7 @@ public class GoAwayTest extends AbstractClientServerTest
AtomicReference<Stream> serverStreamRef = new AtomicReference<>();
CountDownLatch serverStreamLatch = new CountDownLatch(1);
CountDownLatch serverGoAwayLatch = new CountDownLatch(1);
CountDownLatch serverTerminateLatch = new CountDownLatch(1);
CountDownLatch serverDisconnectLatch = new CountDownLatch(1);
start(new Session.Server.Listener()
{
@Override
@ -454,14 +489,14 @@ public class GoAwayTest extends AbstractClientServerTest
}
@Override
public void onTerminate(Session session)
public void onDisconnect(Session session)
{
serverTerminateLatch.countDown();
serverDisconnectLatch.countDown();
}
});
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
CountDownLatch clientTerminateLatch = new CountDownLatch(1);
CountDownLatch clientDisconnectLatch = new CountDownLatch(1);
Session.Client clientSession = newSession(new Session.Client.Listener()
{
@Override
@ -479,9 +514,9 @@ public class GoAwayTest extends AbstractClientServerTest
}
@Override
public void onTerminate(Session session)
public void onDisconnect(Session session)
{
clientTerminateLatch.countDown();
clientDisconnectLatch.countDown();
}
});
@ -508,22 +543,23 @@ public class GoAwayTest extends AbstractClientServerTest
// The server already received the client GOAWAY,
// so completing the last stream produces a close event.
assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverTerminateLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverDisconnectLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
// The client should receive the server non-graceful GOAWAY.
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientTerminateLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientDisconnectLatch.await(5, TimeUnit.SECONDS));
await().atMost(1, TimeUnit.SECONDS).until(((HTTP3Session)serverStreamRef.get().getSession())::isClosed);
await().atMost(1, TimeUnit.SECONDS).until(((HTTP3Session)clientSession)::isClosed);
assertTrue(((HTTP3Session)serverStreamRef.get().getSession()).isClosed());
assertTrue(((HTTP3Session)clientSession).isClosed());
}
@Test
public void testClientGracefulGoAwayWithStreamsServerGracefulGoAwayServerClosesWhenLastStreamCloses() throws Exception
{
AtomicReference<Stream> serverStreamRef = new AtomicReference<>();
CountDownLatch serverRequestLatch = new CountDownLatch(1);
CountDownLatch serverGoAwayLatch = new CountDownLatch(1);
CountDownLatch serverTerminateLatch = new CountDownLatch(1);
CountDownLatch serverDisconnectLatch = new CountDownLatch(1);
start(new Session.Server.Listener()
{
@Override
@ -531,18 +567,24 @@ public class GoAwayTest extends AbstractClientServerTest
{
serverStreamRef.set(stream);
stream.demand();
serverRequestLatch.countDown();
return new Stream.Listener()
{
@Override
public void onDataAvailable(Stream stream)
{
Stream.Data data = stream.readData();
data.complete();
if (data.isLast())
if (data != null)
data.complete();
if (data != null && data.isLast())
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY);
stream.respond(new HeadersFrame(response, true));
}
else
{
stream.demand();
}
}
};
}
@ -562,15 +604,15 @@ public class GoAwayTest extends AbstractClientServerTest
}
@Override
public void onTerminate(Session session)
public void onDisconnect(Session session)
{
serverTerminateLatch.countDown();
serverDisconnectLatch.countDown();
}
});
CountDownLatch clientGracefulGoAwayLatch = new CountDownLatch(1);
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
CountDownLatch clientTerminateLatch = new CountDownLatch(1);
CountDownLatch clientDisconnectLatch = new CountDownLatch(1);
Session.Client clientSession = newSession(new Session.Client.Listener()
{
@Override
@ -583,14 +625,16 @@ public class GoAwayTest extends AbstractClientServerTest
}
@Override
public void onTerminate(Session session)
public void onDisconnect(Session session)
{
clientTerminateLatch.countDown();
clientDisconnectLatch.countDown();
}
});
Stream clientStream = clientSession.newRequest(new HeadersFrame(newRequest("/"), false), new Stream.Listener() {})
.get(5, TimeUnit.SECONDS);
assertTrue(serverRequestLatch.await(5, TimeUnit.SECONDS));
// Send a graceful GOAWAY from the client.
clientSession.goAway(true);
@ -602,12 +646,12 @@ public class GoAwayTest extends AbstractClientServerTest
// Both client and server should send a non-graceful GOAWAY.
assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverTerminateLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverDisconnectLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientTerminateLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientDisconnectLatch.await(5, TimeUnit.SECONDS));
await().atMost(1, TimeUnit.SECONDS).until(((HTTP3Session)serverStreamRef.get().getSession())::isClosed);
await().atMost(1, TimeUnit.SECONDS).until(((HTTP3Session)clientSession)::isClosed);
assertTrue(((HTTP3Session)serverStreamRef.get().getSession()).isClosed());
assertTrue(((HTTP3Session)clientSession).isClosed());
}
@Test
@ -615,7 +659,7 @@ public class GoAwayTest extends AbstractClientServerTest
{
AtomicReference<Session> serverSessionRef = new AtomicReference<>();
CountDownLatch settingsLatch = new CountDownLatch(2);
CountDownLatch serverTerminateLatch = new CountDownLatch(1);
CountDownLatch serverDisconnectLatch = new CountDownLatch(1);
start(new Session.Server.Listener()
{
@Override
@ -626,30 +670,38 @@ public class GoAwayTest extends AbstractClientServerTest
}
@Override
public void onTerminate(Session session)
public void onDisconnect(Session session)
{
serverTerminateLatch.countDown();
serverDisconnectLatch.countDown();
}
});
Session.Client clientSession = newSession(new Session.Client.Listener()
CountDownLatch clientDisconnectLatch = new CountDownLatch(1);
HTTP3SessionClient clientSession = (HTTP3SessionClient)newSession(new Session.Client.Listener()
{
@Override
public void onSettings(Session session, SettingsFrame frame)
{
settingsLatch.countDown();
}
@Override
public void onDisconnect(Session session)
{
clientDisconnectLatch.countDown();
}
});
assertTrue(settingsLatch.await(5, TimeUnit.SECONDS));
// Issue a network close.
((HTTP3Session)clientSession).close(ErrorCode.NO_ERROR.code(), "close");
clientSession.getProtocolSession().getQuicSession().getQuicConnection().close();
assertTrue(serverTerminateLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverDisconnectLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientDisconnectLatch.await(5, TimeUnit.SECONDS));
await().atMost(1, TimeUnit.SECONDS).until(((HTTP3Session)serverSessionRef.get())::isClosed);
await().atMost(1, TimeUnit.SECONDS).until(((HTTP3Session)clientSession)::isClosed);
assertTrue(((HTTP3Session)serverSessionRef.get()).isClosed());
assertTrue(clientSession.isClosed());
}
@Test
@ -657,7 +709,7 @@ public class GoAwayTest extends AbstractClientServerTest
{
AtomicReference<Session> serverSessionRef = new AtomicReference<>();
CountDownLatch settingsLatch = new CountDownLatch(2);
CountDownLatch serverTerminateLatch = new CountDownLatch(1);
CountDownLatch serverDisconnectLatch = new CountDownLatch(1);
start(new Session.Server.Listener()
{
@Override
@ -668,12 +720,13 @@ public class GoAwayTest extends AbstractClientServerTest
}
@Override
public void onTerminate(Session session)
public void onDisconnect(Session session)
{
serverTerminateLatch.countDown();
serverDisconnectLatch.countDown();
}
});
CountDownLatch clientDisconnectLatch = new CountDownLatch(1);
Session.Client clientSession = newSession(new Session.Client.Listener()
{
@Override
@ -686,7 +739,13 @@ public class GoAwayTest extends AbstractClientServerTest
public void onGoAway(Session session, GoAwayFrame frame)
{
// Reply to the graceful GOAWAY from the server with a network close.
((HTTP3Session)session).close(ErrorCode.NO_ERROR.code(), "close");
((HTTP3Session)session).getProtocolSession().getQuicSession().getQuicConnection().close();
}
@Override
public void onDisconnect(Session session)
{
clientDisconnectLatch.countDown();
}
});
@ -695,13 +754,13 @@ public class GoAwayTest extends AbstractClientServerTest
// Send a graceful GOAWAY to the client.
serverSessionRef.get().goAway(true);
assertTrue(serverTerminateLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverDisconnectLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientDisconnectLatch.await(5, TimeUnit.SECONDS));
await().atMost(1, TimeUnit.SECONDS).until(((HTTP3Session)serverSessionRef.get())::isClosed);
await().atMost(1, TimeUnit.SECONDS).until(((HTTP3Session)clientSession)::isClosed);
assertTrue(((HTTP3Session)serverSessionRef.get()).isClosed());
assertTrue(((HTTP3Session)clientSession).isClosed());
}
/*
@Test
public void testServerIdleTimeout() throws Exception
{
@ -709,14 +768,14 @@ public class GoAwayTest extends AbstractClientServerTest
AtomicReference<Session> serverSessionRef = new AtomicReference<>();
CountDownLatch serverIdleTimeoutLatch = new CountDownLatch(1);
CountDownLatch serverTerminateLatch = new CountDownLatch(1);
CountDownLatch serverGoAwayLatch = new CountDownLatch(1);
CountDownLatch serverDisconnectLatch = new CountDownLatch(1);
start(new Session.Server.Listener()
{
@Override
public void onAccept(Session session)
{
serverSessionRef.set(session);
((HTTP2Session)session).getEndPoint().setIdleTimeout(idleTimeout);
}
@Override
@ -729,12 +788,19 @@ public class GoAwayTest extends AbstractClientServerTest
@Override
public void onGoAway(Session session, GoAwayFrame frame)
{
serverTerminateLatch.countDown();
serverGoAwayLatch.countDown();
}
@Override
public void onDisconnect(Session session)
{
serverDisconnectLatch.countDown();
}
});
connector.setIdleTimeout(idleTimeout);
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
CountDownLatch clientTerminateLatch = new CountDownLatch(1);
CountDownLatch clientDisconnectLatch = new CountDownLatch(1);
Session.Client clientSession = newSession(new Session.Client.Listener()
{
@Override
@ -745,9 +811,9 @@ public class GoAwayTest extends AbstractClientServerTest
}
@Override
public void onGoAway(Session session, GoAwayFrame frame)
public void onDisconnect(Session session)
{
clientTerminateLatch.countDown();
clientDisconnectLatch.countDown();
}
});
@ -755,11 +821,11 @@ public class GoAwayTest extends AbstractClientServerTest
// Server should send a GOAWAY to the client.
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
// The client replied to server's GOAWAY, but the server already closed.
assertTrue(clientTerminateLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverTerminateLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientDisconnectLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverDisconnectLatch.await(5, TimeUnit.SECONDS));
assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
assertTrue(((HTTP3Session)serverSessionRef.get()).isClosed());
assertTrue(((HTTP3Session)clientSession).isClosed());
}
@Test
@ -768,36 +834,42 @@ public class GoAwayTest extends AbstractClientServerTest
long idleTimeout = 1000;
AtomicReference<Session> serverSessionRef = new AtomicReference<>();
CountDownLatch serverTerminateLatch = new CountDownLatch(1);
CountDownLatch serverGoAwayLatch = new CountDownLatch(1);
CountDownLatch serverDisconnectLatch = new CountDownLatch(1);
start(new Session.Server.Listener()
{
@Override
public void onAccept(Session session)
{
serverSessionRef.set(session);
((HTTP2Session)session).getEndPoint().setIdleTimeout(idleTimeout);
}
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{
stream.setIdleTimeout(10 * idleTimeout);
// Send a graceful GOAWAY.
((HTTP2Session)stream.getSession()).goAway(GoAwayFrame.GRACEFUL, Callback.NOOP);
stream.getSession().goAway(true);
return null;
}
@Override
public void onGoAway(Session session, GoAwayFrame frame)
{
serverTerminateLatch.countDown();
serverGoAwayLatch.countDown();
}
@Override
public void onDisconnect(Session session)
{
serverDisconnectLatch.countDown();
}
});
connector.setIdleTimeout(idleTimeout);
CountDownLatch clientGracefulGoAwayLatch = new CountDownLatch(1);
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
CountDownLatch clientTerminateLatch = new CountDownLatch(1);
Session.Client clientSession = newSession(new Session.Client.Listener()
CountDownLatch clientDisconnectLatch = new CountDownLatch(1);
HTTP3SessionClient clientSession = (HTTP3SessionClient)newSession(new Session.Client.Listener()
{
@Override
public void onGoAway(Session session, GoAwayFrame frame)
@ -809,32 +881,45 @@ public class GoAwayTest extends AbstractClientServerTest
}
@Override
public void onGoAway(Session session, GoAwayFrame frame)
public void onDisconnect(Session session)
{
clientTerminateLatch.countDown();
clientDisconnectLatch.countDown();
}
});
CountDownLatch clientResetLatch = new CountDownLatch(1);
MetaData.Request request = newRequest(HttpMethod.GET.asString(), HttpFields.EMPTY);
CountDownLatch clientFailureLatch = new CountDownLatch(1);
// Send request headers but not data.
clientSession.newRequest(new HeadersFrame(request, null, false), new Promise.Adapter<>(), new Stream.Listener()
clientSession.newRequest(new HeadersFrame(newRequest("/"), false), new Stream.Listener()
{
@Override
public void onReset(Stream stream, ResetFrame frame)
public void onFailure(Stream stream, Throwable failure)
{
clientResetLatch.countDown();
clientFailureLatch.countDown();
}
});
assertTrue(clientGracefulGoAwayLatch.await(5, TimeUnit.SECONDS));
// Server idle timeout sends a non-graceful GOAWAY.
assertTrue(clientResetLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
assertTrue(clientFailureLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverTerminateLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientTerminateLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverDisconnectLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientDisconnectLatch.await(5, TimeUnit.SECONDS));
assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
HTTP3SessionServer serverSession = (HTTP3SessionServer)serverSessionRef.get();
assertTrue(serverSession.isClosed());
assertTrue(serverSession.getStreams().isEmpty());
ServerQuicSession serverQuicSession = serverSession.getProtocolSession().getQuicSession();
// While HTTP/3 is completely closed, QUIC may still be exchanging packets, so we need to await().
await().atMost(1, TimeUnit.SECONDS).until(() -> serverQuicSession.getQuicStreamEndPoints().isEmpty());
await().atMost(1, TimeUnit.SECONDS).until(() -> serverQuicSession.getQuicConnection().getQuicSessions().isEmpty());
assertTrue(clientSession.isClosed());
assertTrue(clientSession.getStreams().isEmpty());
ClientQuicSession clientQuicSession = clientSession.getProtocolSession().getQuicSession();
// While HTTP/3 is completely closed, QUIC may still be exchanging packets, so we need to await().
await().atMost(1, TimeUnit.SECONDS).until(() -> clientQuicSession.getQuicStreamEndPoints().isEmpty());
QuicConnection quicConnection = clientQuicSession.getQuicConnection();
await().atMost(1, TimeUnit.SECONDS).until(() -> quicConnection.getQuicSessions().isEmpty());
await().atMost(1, TimeUnit.SECONDS).until(() -> quicConnection.getEndPoint().isOpen(), is(false));
}
@Test
@ -843,21 +928,21 @@ public class GoAwayTest extends AbstractClientServerTest
long idleTimeout = 1000;
AtomicReference<Session> serverSessionRef = new AtomicReference<>();
CountDownLatch serverRequestLatch = new CountDownLatch(1);
CountDownLatch serverGracefulGoAwayLatch = new CountDownLatch(1);
CountDownLatch serverTerminateLatch = new CountDownLatch(1);
CountDownLatch serverDisconnectLatch = new CountDownLatch(1);
start(new Session.Server.Listener()
{
@Override
public void onAccept(Session session)
{
serverSessionRef.set(session);
((HTTP2Session)session).getEndPoint().setIdleTimeout(idleTimeout);
}
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{
stream.setIdleTimeout(10 * idleTimeout);
serverRequestLatch.countDown();
return null;
}
@ -869,15 +954,16 @@ public class GoAwayTest extends AbstractClientServerTest
}
@Override
public void onGoAway(Session session, GoAwayFrame frame)
public void onDisconnect(Session session)
{
serverTerminateLatch.countDown();
serverDisconnectLatch.countDown();
}
});
connector.setIdleTimeout(idleTimeout);
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
CountDownLatch clientTerminateLatch = new CountDownLatch(1);
Session.Client clientSession = newSession(new Session.Client.Listener()
CountDownLatch clientDisconnectLatch = new CountDownLatch(1);
HTTP3SessionClient clientSession = (HTTP3SessionClient)newSession(new Session.Client.Listener()
{
@Override
public void onGoAway(Session session, GoAwayFrame frame)
@ -886,40 +972,42 @@ public class GoAwayTest extends AbstractClientServerTest
}
@Override
public void onGoAway(Session session, GoAwayFrame frame)
public void onDisconnect(Session session)
{
clientTerminateLatch.countDown();
clientDisconnectLatch.countDown();
}
});
MetaData.Request request = newRequest(HttpMethod.GET.asString(), HttpFields.EMPTY);
CountDownLatch streamResetLatch = new CountDownLatch(1);
clientSession.newRequest(new HeadersFrame(request, null, false), new Promise.Adapter<>(), new Stream.Listener()
CountDownLatch streamFailureLatch = new CountDownLatch(1);
clientSession.newRequest(new HeadersFrame(newRequest("/"), false), new Stream.Listener()
{
@Override
public void onReset(Stream stream, ResetFrame frame)
public void onFailure(Stream stream, Throwable failure)
{
streamResetLatch.countDown();
streamFailureLatch.countDown();
}
});
assertTrue(serverRequestLatch.await(5, TimeUnit.SECONDS));
// Client sends a graceful GOAWAY.
((HTTP2Session)clientSession).goAway(GoAwayFrame.GRACEFUL, Callback.NOOP);
clientSession.goAway(true);
assertTrue(serverGracefulGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(streamResetLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverGracefulGoAwayLatch.await(555, TimeUnit.SECONDS));
assertTrue(streamFailureLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientGoAwayLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
assertTrue(serverTerminateLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientTerminateLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverDisconnectLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientDisconnectLatch.await(5, TimeUnit.SECONDS));
assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
assertTrue(((HTTP3Session)serverSessionRef.get()).isClosed());
assertTrue(clientSession.isClosed());
}
@Test
public void testServerGoAwayWithStreamsThenStop() throws Exception
public void testServerGoAwayWithStreamsThenShutdown() throws Exception
{
AtomicReference<Session> serverSessionRef = new AtomicReference<>();
CountDownLatch serverTerminateLatch = new CountDownLatch(1);
CountDownLatch serverGoAwayLatch = new CountDownLatch(1);
CountDownLatch serverDisconnectLatch = new CountDownLatch(1);
start(new Session.Server.Listener()
{
@Override
@ -927,20 +1015,26 @@ public class GoAwayTest extends AbstractClientServerTest
{
serverSessionRef.set(stream.getSession());
// Don't reply, don't reset the stream, just send the GOAWAY.
stream.getSession().close(ErrorCode.NO_ERROR.code, "close", Callback.NOOP);
stream.getSession().goAway(false);
return null;
}
@Override
public void onGoAway(Session session, GoAwayFrame frame)
{
serverTerminateLatch.countDown();
serverGoAwayLatch.countDown();
}
@Override
public void onDisconnect(Session session)
{
serverDisconnectLatch.countDown();
}
});
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
CountDownLatch clientTerminateLatch = new CountDownLatch(1);
Session.Client clientSession = newSession(new Session.Client.Listener()
CountDownLatch clientDisconnectLatch = new CountDownLatch(1);
HTTP3SessionClient clientSession = (HTTP3SessionClient)newSession(new Session.Client.Listener()
{
@Override
public void onGoAway(Session session, GoAwayFrame frame)
@ -949,36 +1043,35 @@ public class GoAwayTest extends AbstractClientServerTest
}
@Override
public void onGoAway(Session session, GoAwayFrame frame)
public void onDisconnect(Session session)
{
clientTerminateLatch.countDown();
clientDisconnectLatch.countDown();
}
});
MetaData.Request request = newRequest(HttpMethod.GET.asString(), HttpFields.EMPTY);
CountDownLatch clientResetLatch = new CountDownLatch(1);
clientSession.newRequest(new HeadersFrame(request, null, false), new Promise.Adapter<>(), new Stream.Listener()
CountDownLatch clientFailureLatch = new CountDownLatch(1);
clientSession.newRequest(new HeadersFrame(newRequest("/"), false), new Stream.Listener()
{
@Override
public void onReset(Stream stream, ResetFrame frame)
public void onFailure(Stream stream, Throwable failure)
{
clientResetLatch.countDown();
clientFailureLatch.countDown();
}
});
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
// Neither the client nor the server are finishing
// the pending stream, so force the stop on the server.
LifeCycle.stop(serverSessionRef.get());
// the pending stream, so force the close on the server.
HTTP3Session serverSession = (HTTP3Session)serverSessionRef.get();
serverSession.getProtocolSession().getQuicSession().getQuicConnection().close();
// The server should reset all the pending streams.
assertTrue(clientResetLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverTerminateLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientTerminateLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientFailureLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverDisconnectLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientDisconnectLatch.await(5, TimeUnit.SECONDS));
assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
assertTrue(serverSession.isClosed());
assertTrue(clientSession.isClosed());
}
*/
}

View File

@ -186,11 +186,11 @@ public class StreamIdleTimeoutTest extends AbstractClientServerTest
clientSession.newRequest(new HeadersFrame(newRequest("/idle"), false), new Stream.Listener()
{
@Override
public void onFailure(Stream stream, long error, Throwable failure)
public void onFailure(Stream stream, Throwable failure)
{
// The server idle times out, but did not send any data back.
// However, the stream is readable, but an attempt to read it
// will cause an exception that is notified here.
// However, the stream is readable and the implementation
// reading it will cause an exception that is notified here.
clientFailureLatch.countDown();
}
});

View File

@ -19,13 +19,11 @@ import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.internal.ErrorCode;
import org.eclipse.jetty.http3.internal.HTTP3Session;
import org.eclipse.jetty.util.Callback;
import org.junit.jupiter.api.Test;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class UnexpectedFrameTest extends AbstractClientServerTest
@ -37,29 +35,27 @@ public class UnexpectedFrameTest extends AbstractClientServerTest
start(new Session.Server.Listener()
{
@Override
public void onSessionFailure(Session session, long error, String reason)
public void onFailure(Session session, Throwable failure)
{
assertEquals(ErrorCode.FRAME_UNEXPECTED_ERROR.code(), error);
serverLatch.countDown();
}
});
CountDownLatch clientLatch = new CountDownLatch(1);
HTTP3Session session = (HTTP3Session)newSession(new Session.Client.Listener()
HTTP3Session clientSession = (HTTP3Session)newSession(new Session.Client.Listener()
{
@Override
public void onSessionFailure(Session session, long error, String reason)
public void onFailure(Session session, Throwable failure)
{
assertEquals(ErrorCode.FRAME_UNEXPECTED_ERROR.code(), error);
clientLatch.countDown();
}
});
session.writeMessageFrame(0, new DataFrame(ByteBuffer.allocate(128), false), Callback.NOOP);
clientSession.writeMessageFrame(0, new DataFrame(ByteBuffer.allocate(128), false), Callback.NOOP);
assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
await().atMost(1, TimeUnit.SECONDS).until(session::isClosed);
await().atMost(1, TimeUnit.SECONDS).until(clientSession::isClosed);
}
}

View File

@ -13,7 +13,6 @@
package org.eclipse.jetty.quic.client;
import org.eclipse.jetty.quic.common.CloseInfo;
import org.eclipse.jetty.quic.common.ProtocolSession;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.quic.common.StreamType;
@ -57,10 +56,10 @@ public class ClientProtocolSession extends ProtocolSession
}
@Override
protected void onClose(CloseInfo closeInfo)
protected void onClose(long error, String reason)
{
if (LOG.isDebugEnabled())
LOG.debug("session closed remotely {} {}", closeInfo, this);
LOG.debug("session closed remotely 0x{}/{} {}", Long.toHexString(error), reason, this);
// TODO: should probably close the stream.
}
}

View File

@ -17,6 +17,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -33,7 +34,6 @@ import org.eclipse.jetty.quic.common.QuicConnection;
import org.eclipse.jetty.quic.common.QuicSession;
import org.eclipse.jetty.quic.quiche.QuicheConfig;
import org.eclipse.jetty.quic.quiche.QuicheConnection;
import org.eclipse.jetty.quic.quiche.QuicheConnectionId;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger;
@ -50,7 +50,7 @@ public class ClientQuicConnection extends QuicConnection
public static final String APPLICATION_PROTOCOLS = "org.eclipse.jetty.quic.application.protocols";
private static final Logger LOG = LoggerFactory.getLogger(ClientQuicConnection.class);
private final Map<SocketAddress, QuicSession> pendingSessions = new ConcurrentHashMap<>();
private final Map<SocketAddress, ClientQuicSession> pendingSessions = new ConcurrentHashMap<>();
private final Map<String, Object> context;
public ClientQuicConnection(Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, EndPoint endPoint, Map<String, Object> context)
@ -82,7 +82,8 @@ public class ClientQuicConnection extends QuicConnection
quicheConfig.setApplicationProtos(protocols.toArray(String[]::new));
quicheConfig.setDisableActiveMigration(true);
quicheConfig.setVerifyPeer(false);
quicheConfig.setMaxIdleTimeout(getEndPoint().getIdleTimeout());
// Idle timeouts must not be managed by Quiche.
quicheConfig.setMaxIdleTimeout(0L);
quicheConfig.setInitialMaxData(10_000_000L);
quicheConfig.setInitialMaxStreamDataBidiLocal(10_000_000L);
quicheConfig.setInitialMaxStreamDataBidiRemote(10000000L);
@ -97,7 +98,7 @@ public class ClientQuicConnection extends QuicConnection
LOG.debug("connecting to {} with protocols {}", remoteAddress, protocols);
QuicheConnection quicheConnection = QuicheConnection.connect(quicheConfig, remoteAddress);
QuicSession session = new ClientQuicSession(getExecutor(), getScheduler(), getByteBufferPool(), quicheConnection, this, remoteAddress, context);
ClientQuicSession session = new ClientQuicSession(getExecutor(), getScheduler(), getByteBufferPool(), quicheConnection, this, remoteAddress, context);
pendingSessions.put(remoteAddress, session);
session.flush(); // send the response packet(s) that connect generated.
if (LOG.isDebugEnabled())
@ -128,16 +129,34 @@ public class ClientQuicConnection extends QuicConnection
}
@Override
protected void closeSession(QuicheConnectionId quicheConnectionId, QuicSession session, Throwable x)
public boolean onIdleExpired()
{
super.closeSession(quicheConnectionId, session, x);
SocketAddress remoteAddress = session.getRemoteAddress();
if (pendingSessions.remove(remoteAddress) != null)
boolean idle = isFillInterested();
long idleTimeout = getEndPoint().getIdleTimeout();
if (LOG.isDebugEnabled())
LOG.debug("{} elapsed idle timeout {} ms", idle ? "processing" : "ignoring", idleTimeout);
if (idle)
{
Promise<?> promise = (Promise<?>)context.get(ClientConnector.CONNECTION_PROMISE_CONTEXT_KEY);
if (promise != null)
promise.failed(x);
Collection<QuicSession> sessions = getQuicSessions();
sessions.forEach(QuicSession::onIdleTimeout);
}
return false;
}
@Override
public void outwardClose(QuicSession session, Throwable failure)
{
super.outwardClose(session, failure);
SocketAddress remoteAddress = session.getRemoteAddress();
if (remoteAddress != null)
{
if (pendingSessions.remove(remoteAddress) != null)
{
Promise<?> promise = (Promise<?>)context.get(ClientConnector.CONNECTION_PROMISE_CONTEXT_KEY);
if (promise != null)
promise.failed(failure);
}
}
getEndPoint().close(failure);
}
}

View File

@ -29,7 +29,7 @@ import org.eclipse.jetty.http2.client.http.ClientConnectionFactoryOverHTTP2;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.quic.server.ServerQuicConnector;
import org.eclipse.jetty.quic.server.QuicServerConnector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Request;
@ -47,7 +47,7 @@ import static org.hamcrest.core.Is.is;
public class End2EndClientTest
{
private Server server;
private ServerQuicConnector connector;
private QuicServerConnector connector;
private HttpClient client;
private final String responseContent = "" +
"<html>\n" +
@ -68,7 +68,7 @@ public class End2EndClientTest
HttpConfiguration httpConfiguration = new HttpConfiguration();
HttpConnectionFactory http1 = new HttpConnectionFactory(httpConfiguration);
HTTP2ServerConnectionFactory http2 = new HTTP2ServerConnectionFactory(httpConfiguration);
connector = new ServerQuicConnector(server, sslContextFactory, http1, http2);
connector = new QuicServerConnector(server, sslContextFactory, http1, http2);
server.addConnector(connector);
server.setHandler(new AbstractHandler()

View File

@ -69,7 +69,7 @@ public abstract class ProtocolSession
{
CloseInfo closeInfo = session.getRemoteCloseInfo();
if (closeInfo != null)
onClose(closeInfo);
onClose(closeInfo.error(), closeInfo.reason());
break;
}
}
@ -130,12 +130,22 @@ public abstract class ProtocolSession
connection.onOpen();
}
public boolean close(long error, String reason)
protected boolean onIdleTimeout()
{
return getQuicSession().close(error, reason);
return true;
}
protected abstract void onClose(CloseInfo closeInfo);
public void inwardClose(long error, String reason)
{
getQuicSession().outwardClose(error, reason);
}
public void outwardClose(long error, String reason)
{
getQuicSession().outwardClose(error, reason);
}
protected abstract void onClose(long error, String reason);
@Override
public String toString()

View File

@ -17,9 +17,12 @@ import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
@ -47,6 +50,7 @@ public abstract class QuicConnection extends AbstractConnection
private static final Logger LOG = LoggerFactory.getLogger(QuicConnection.class);
private final ConcurrentMap<QuicheConnectionId, QuicSession> sessions = new ConcurrentHashMap<>();
private final AtomicBoolean closed = new AtomicBoolean();
private final Scheduler scheduler;
private final ByteBufferPool byteBufferPool;
private final Flusher flusher = new Flusher();
@ -101,24 +105,35 @@ public abstract class QuicConnection extends AbstractConnection
this.useOutputDirectByteBuffers = useOutputDirectByteBuffers;
}
protected void closeSession(QuicheConnectionId quicheConnectionId, QuicSession session, Throwable x)
public Collection<QuicSession> getQuicSessions()
{
if (LOG.isDebugEnabled())
LOG.debug("closing session cid={} {}", quicheConnectionId, this);
if (quicheConnectionId != null)
sessions.remove(quicheConnectionId);
return List.copyOf(sessions.values());
}
@Override
public abstract boolean onIdleExpired();
@Override
public void close()
{
// This method should only be called when the client or the server are stopped.
if (closed.compareAndSet(false, true))
{
if (LOG.isDebugEnabled())
LOG.debug("closing connection {}", this);
long error = 0x00; // QUIC error code for NO_ERROR.
// Propagate the close inward to the protocol-specific session.
sessions.values().forEach(session -> session.inwardClose(error, "stop"));
}
}
public void outwardClose(QuicSession session, Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("closing connection {}", this);
sessions.values().forEach(QuicSession::close);
sessions.clear();
super.close();
if (LOG.isDebugEnabled())
LOG.debug("closed connection {}", this);
LOG.debug("outward close {} on {}", session, this);
QuicheConnectionId connectionId = session.getConnectionId();
if (connectionId != null)
sessions.remove(connectionId);
}
@Override
@ -172,6 +187,7 @@ public abstract class QuicConnection extends AbstractConnection
if (LOG.isDebugEnabled())
LOG.debug("session created");
session.setConnectionId(quicheConnectionId);
session.setIdleTimeout(getEndPoint().getIdleTimeout());
sessions.put(quicheConnectionId, session);
}
else

View File

@ -18,6 +18,7 @@ import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
@ -61,7 +62,7 @@ public abstract class QuicSession
private final AtomicLong[] ids = new AtomicLong[StreamType.values().length];
private final AutoLock strategyQueueLock = new AutoLock();
private final Queue<Runnable> strategyQueue = new ArrayDeque<>();
private final ConcurrentMap<Long, QuicStreamEndPoint> endpoints = new ConcurrentHashMap<>();
private final ConcurrentMap<Long, QuicStreamEndPoint> endPoints = new ConcurrentHashMap<>();
private final Executor executor;
private final Scheduler scheduler;
private final ByteBufferPool byteBufferPool;
@ -72,6 +73,7 @@ public abstract class QuicSession
private SocketAddress remoteAddress;
private ProtocolSession protocolSession;
private QuicheConnectionId quicheConnectionId;
private long idleTimeout;
protected QuicSession(Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, QuicheConnection quicheConnection, QuicConnection connection, SocketAddress remoteAddress)
{
@ -92,14 +94,6 @@ public abstract class QuicSession
return executor;
}
public CloseInfo getRemoteCloseInfo()
{
AtomicStampedReference<String> info = quicheConnection.getRemoteCloseInfo();
if (info != null)
return new CloseInfo(info.getStamp(), info.getReference());
return null;
}
public Scheduler getScheduler()
{
return scheduler;
@ -120,6 +114,46 @@ public abstract class QuicSession
return quicheConnection.getNegotiatedProtocol();
}
public QuicConnection getQuicConnection()
{
return connection;
}
public Collection<QuicStreamEndPoint> getQuicStreamEndPoints()
{
return List.copyOf(endPoints.values());
}
public CloseInfo getRemoteCloseInfo()
{
AtomicStampedReference<String> info = quicheConnection.getRemoteCloseInfo();
if (info != null)
return new CloseInfo(info.getStamp(), info.getReference());
return null;
}
public long getIdleTimeout()
{
return idleTimeout;
}
public void setIdleTimeout(long idleTimeout)
{
if (LOG.isDebugEnabled())
LOG.debug("setting idle timeout {} ms for {}", idleTimeout, this);
this.idleTimeout = idleTimeout;
}
public boolean onIdleTimeout()
{
return protocolSession.onIdleTimeout();
}
public void onOpen()
{
protocolSession.onOpen();
}
/**
* @param streamType the stream type
* @return a new stream ID for the given type
@ -131,11 +165,6 @@ public abstract class QuicSession
return (id << 2) + type;
}
public void onOpen()
{
protocolSession.onOpen();
}
public int fill(long streamId, ByteBuffer buffer) throws IOException
{
int drained = quicheConnection.drainClearBytesForStream(streamId, buffer);
@ -185,7 +214,7 @@ public abstract class QuicSession
public void onClose(long streamId)
{
endpoints.remove(streamId);
endPoints.remove(streamId);
}
public SocketAddress getLocalAddress()
@ -203,6 +232,11 @@ public abstract class QuicSession
return quicheConnection.isConnectionEstablished();
}
public QuicheConnectionId getConnectionId()
{
return quicheConnectionId;
}
public void setConnectionId(QuicheConnectionId quicheConnectionId)
{
this.quicheConnectionId = quicheConnectionId;
@ -274,7 +308,7 @@ public abstract class QuicSession
QuicStreamEndPoint getStreamEndPoint(long streamId)
{
return endpoints.get(streamId);
return endPoints.get(streamId);
}
public abstract Connection newConnection(QuicStreamEndPoint endPoint);
@ -297,7 +331,7 @@ public abstract class QuicSession
public QuicStreamEndPoint getOrCreateStreamEndPoint(long streamId, Consumer<QuicStreamEndPoint> consumer)
{
QuicStreamEndPoint endPoint = endpoints.compute(streamId, (id, quicStreamEndPoint) ->
QuicStreamEndPoint endPoint = endPoints.compute(streamId, (id, quicStreamEndPoint) ->
{
if (quicStreamEndPoint == null)
{
@ -318,40 +352,35 @@ public abstract class QuicSession
return new QuicStreamEndPoint(getScheduler(), this, streamId);
}
public void close()
public void inwardClose(long error, String reason)
{
if (quicheConnectionId == null)
close(new IOException("connection refused"));
else
close(new IOException("connection closed"));
protocolSession.inwardClose(error, reason);
}
private void close(Throwable x)
public void outwardClose(long error, String reason)
{
if (LOG.isDebugEnabled())
LOG.debug("closing {}", this);
LOG.debug("outward closing 0x{}/{} on {}", Long.toHexString(error), reason, this);
quicheConnection.close(error, reason);
// Flushing will eventually forward
// the outward close to the connection.
flush();
}
private void finishOutwardClose(Throwable failure)
{
try
{
endpoints.values().forEach(QuicStreamEndPoint::close);
endpoints.clear();
flusher.close();
connection.closeSession(quicheConnectionId, this, x);
endPoints.clear();
LifeCycle.stop(strategy);
flusher.close();
getQuicConnection().outwardClose(this, failure);
}
finally
{
// This call frees malloc'ed memory so make sure it always happens.
quicheConnection.dispose();
}
if (LOG.isDebugEnabled())
LOG.debug("closed {}", this);
}
public boolean close(long error, String reason)
{
boolean closed = quicheConnection.close(error, reason);
flush();
return closed;
}
@Override
@ -410,7 +439,7 @@ public abstract class QuicSession
boolean connectionClosed = quicheConnection.isConnectionClosed();
Action action = connectionClosed ? Action.SUCCEEDED : Action.IDLE;
if (LOG.isDebugEnabled())
LOG.debug("connection closed={}, action={} on {}", connectionClosed, action, QuicSession.this);
LOG.debug("connection draining={} closed={}, action={} on {}", quicheConnection.isDraining(), connectionClosed, action, QuicSession.this);
return action;
}
BufferUtil.flipToFlush(cipherBuffer, pos);
@ -440,16 +469,17 @@ public abstract class QuicSession
{
if (LOG.isDebugEnabled())
LOG.debug("connection closed {}", QuicSession.this);
QuicSession.this.close();
byteBufferPool.release(cipherBuffer);
finishOutwardClose(null);
}
@Override
protected void onCompleteFailure(Throwable cause)
protected void onCompleteFailure(Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("failed to write cipher bytes, closing session on {}", QuicSession.this, cause);
LOG.debug("failed to write cipher bytes, closing session on {}", QuicSession.this, failure);
byteBufferPool.release(cipherBuffer);
QuicSession.this.close(cause);
finishOutwardClose(failure);
}
}

View File

@ -347,7 +347,7 @@ public class QuicheConnection
try (AutoLock ignore = lock.lock())
{
if (quicheConn == null)
throw new IllegalStateException("Quiche connection was released");
throw new IllegalStateException("connection was released");
LibQuiche.quiche_stream_iter quiche_stream_iter;
if (write)
@ -386,7 +386,7 @@ public class QuicheConnection
info.from_len = s.getSize();
int received = LibQuiche.INSTANCE.quiche_conn_recv(quicheConn, buffer, new size_t(buffer.remaining()), info).intValue();
if (received < 0)
throw new IOException("Quiche failed to receive packet; err=" + LibQuiche.quiche_error.errToString(received));
throw new IOException("failed to receive packet; err=" + LibQuiche.quiche_error.errToString(received));
buffer.position(buffer.position() + received);
return received;
}
@ -412,7 +412,7 @@ public class QuicheConnection
if (written == LibQuiche.quiche_error.QUICHE_ERR_DONE)
return 0;
if (written < 0L)
throw new IOException("Quiche failed to send packet; err=" + LibQuiche.quiche_error.errToString(written));
throw new IOException("failed to send packet; err=" + LibQuiche.quiche_error.errToString(written));
int prevPosition = buffer.position();
buffer.position(prevPosition + written);
return written;
@ -424,7 +424,7 @@ public class QuicheConnection
try (AutoLock ignore = lock.lock())
{
if (quicheConn == null)
throw new IllegalStateException("Quiche connection was released");
throw new IllegalStateException("connection was released");
return LibQuiche.INSTANCE.quiche_conn_is_closed(quicheConn);
}
}
@ -434,7 +434,7 @@ public class QuicheConnection
try (AutoLock ignore = lock.lock())
{
if (quicheConn == null)
throw new IllegalStateException("Quiche connection was released");
throw new IllegalStateException("connection was released");
return LibQuiche.INSTANCE.quiche_conn_is_established(quicheConn);
}
}
@ -444,7 +444,7 @@ public class QuicheConnection
try (AutoLock ignore = lock.lock())
{
if (quicheConn == null)
throw new IllegalStateException("Quiche connection was released");
throw new IllegalStateException("connection was released");
return LibQuiche.INSTANCE.quiche_conn_is_in_early_data(quicheConn);
}
}
@ -454,7 +454,7 @@ public class QuicheConnection
try (AutoLock ignore = lock.lock())
{
if (quicheConn == null)
throw new IllegalStateException("Quiche connection was released");
throw new IllegalStateException("connection was released");
return LibQuiche.INSTANCE.quiche_conn_timeout_as_millis(quicheConn).longValue();
}
}
@ -464,7 +464,7 @@ public class QuicheConnection
try (AutoLock ignore = lock.lock())
{
if (quicheConn == null)
throw new IllegalStateException("Quiche connection was released");
throw new IllegalStateException("connection was released");
LibQuiche.INSTANCE.quiche_conn_on_timeout(quicheConn);
}
}
@ -474,7 +474,7 @@ public class QuicheConnection
try (AutoLock ignore = lock.lock())
{
if (quicheConn == null)
throw new IllegalStateException("Quiche connection was released");
throw new IllegalStateException("connection was released");
char_pointer out = new char_pointer();
size_t_pointer outLen = new size_t_pointer();
LibQuiche.INSTANCE.quiche_conn_application_proto(quicheConn, out, outLen);
@ -522,7 +522,7 @@ public class QuicheConnection
try (AutoLock ignore = lock.lock())
{
if (quicheConn == null)
throw new IllegalStateException("Quiche connection was released");
throw new IllegalStateException("connection was released");
return LibQuiche.INSTANCE.quiche_conn_is_draining(quicheConn);
}
}
@ -532,7 +532,7 @@ public class QuicheConnection
try (AutoLock ignore = lock.lock())
{
if (quicheConn == null)
throw new IllegalStateException("Quiche connection was released");
throw new IllegalStateException("connection was released");
LibQuiche.quiche_stats stats = new LibQuiche.quiche_stats();
LibQuiche.INSTANCE.quiche_conn_stats(quicheConn, stats);
return stats.cwnd.longValue();
@ -544,10 +544,10 @@ public class QuicheConnection
try (AutoLock ignore = lock.lock())
{
if (quicheConn == null)
throw new IOException("Quiche connection was released");
throw new IOException("connection was released");
long value = LibQuiche.INSTANCE.quiche_conn_stream_capacity(quicheConn, new uint64_t(streamId)).longValue();
if (value < 0)
throw new IOException("Quiche failed to read capacity of stream " + streamId + "; err=" + LibQuiche.quiche_error.errToString(value));
throw new IOException(" failed to read capacity of stream " + streamId + "; err=" + LibQuiche.quiche_error.errToString(value));
return value;
}
}
@ -557,7 +557,7 @@ public class QuicheConnection
try (AutoLock ignore = lock.lock())
{
if (quicheConn == null)
throw new IOException("Quiche connection was released");
throw new IOException("connection was released");
int direction = writeSide ? LibQuiche.quiche_shutdown.QUICHE_SHUTDOWN_WRITE : LibQuiche.quiche_shutdown.QUICHE_SHUTDOWN_READ;
int rc = LibQuiche.INSTANCE.quiche_conn_stream_shutdown(quicheConn, new uint64_t(streamId), direction, new uint64_t(error));
if (rc == 0 || rc == LibQuiche.quiche_error.QUICHE_ERR_DONE)
@ -571,12 +571,12 @@ public class QuicheConnection
try (AutoLock ignore = lock.lock())
{
if (quicheConn == null)
throw new IOException("Quiche connection was released");
throw new IOException("connection was released");
int written = LibQuiche.INSTANCE.quiche_conn_stream_send(quicheConn, new uint64_t(streamId), BufferUtil.EMPTY_BUFFER, new size_t(0), true).intValue();
if (written == LibQuiche.quiche_error.QUICHE_ERR_DONE)
return;
if (written < 0L)
throw new IOException("Quiche failed to write FIN to stream " + streamId + "; err=" + LibQuiche.quiche_error.errToString(written));
throw new IOException(" failed to write FIN to stream " + streamId + "; err=" + LibQuiche.quiche_error.errToString(written));
}
}
@ -590,12 +590,12 @@ public class QuicheConnection
try (AutoLock ignore = lock.lock())
{
if (quicheConn == null)
throw new IOException("Quiche connection was released");
throw new IOException("connection was released");
int written = LibQuiche.INSTANCE.quiche_conn_stream_send(quicheConn, new uint64_t(streamId), buffer, new size_t(buffer.remaining()), last).intValue();
if (written == LibQuiche.quiche_error.QUICHE_ERR_DONE)
return 0;
if (written < 0L)
throw new IOException("Quiche failed to write to stream " + streamId + "; err=" + LibQuiche.quiche_error.errToString(written));
throw new IOException("failed to write to stream " + streamId + "; err=" + LibQuiche.quiche_error.errToString(written));
buffer.position(buffer.position() + written);
return written;
}
@ -606,13 +606,13 @@ public class QuicheConnection
try (AutoLock ignore = lock.lock())
{
if (quicheConn == null)
throw new IOException("Quiche connection was released");
throw new IOException("connection was released");
bool_pointer fin = new bool_pointer();
int read = LibQuiche.INSTANCE.quiche_conn_stream_recv(quicheConn, new uint64_t(streamId), buffer, new size_t(buffer.remaining()), fin).intValue();
if (read == LibQuiche.quiche_error.QUICHE_ERR_DONE)
return isStreamFinished(streamId) ? -1 : 0;
if (read < 0L)
throw new IOException("Quiche failed to read from stream " + streamId + "; err=" + LibQuiche.quiche_error.errToString(read));
throw new IOException("failed to read from stream " + streamId + "; err=" + LibQuiche.quiche_error.errToString(read));
buffer.position(buffer.position() + read);
return read;
}
@ -623,7 +623,7 @@ public class QuicheConnection
try (AutoLock ignore = lock.lock())
{
if (quicheConn == null)
throw new IllegalStateException("Quiche connection was released");
throw new IllegalStateException("connection was released");
return LibQuiche.INSTANCE.quiche_conn_stream_finished(quicheConn, new uint64_t(streamId));
}
}
@ -633,7 +633,7 @@ public class QuicheConnection
try (AutoLock ignore = lock.lock())
{
if (quicheConn == null)
throw new IllegalStateException("Quiche connection was released");
throw new IllegalStateException("connection was released");
bool_pointer app = new bool_pointer();
uint64_t_pointer error = new uint64_t_pointer();
char_pointer reason = new char_pointer();

View File

@ -38,7 +38,7 @@ import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.Scheduler;
public class ServerQuicConnector extends AbstractNetworkConnector
public class QuicServerConnector extends AbstractNetworkConnector
{
private final ServerDatagramSelectorManager _manager;
private final SslContextFactory.Server _sslContextFactory;
@ -46,12 +46,12 @@ public class ServerQuicConnector extends AbstractNetworkConnector
private volatile DatagramChannel _datagramChannel;
private volatile int _localPort = -1;
public ServerQuicConnector(Server server, SslContextFactory.Server sslContextFactory, ConnectionFactory... factories)
public QuicServerConnector(Server server, SslContextFactory.Server sslContextFactory, ConnectionFactory... factories)
{
this(server, null, null, null, sslContextFactory, factories);
}
public ServerQuicConnector(Server server, Executor executor, Scheduler scheduler, ByteBufferPool bufferPool, SslContextFactory.Server sslContextFactory, ConnectionFactory... factories)
public QuicServerConnector(Server server, Executor executor, Scheduler scheduler, ByteBufferPool bufferPool, SslContextFactory.Server sslContextFactory, ConnectionFactory... factories)
{
super(server, executor, scheduler, bufferPool, 0, factories);
_manager = new ServerDatagramSelectorManager(getExecutor(), getScheduler(), 1);
@ -97,7 +97,8 @@ public class ServerQuicConnector extends AbstractNetworkConnector
_quicheConfig.setPrivKeyPemPath(pemFiles[0].getPath());
_quicheConfig.setCertChainPemPath(pemFiles[1].getPath());
_quicheConfig.setVerifyPeer(false);
_quicheConfig.setMaxIdleTimeout(getIdleTimeout());
// Idle timeouts must not be managed by Quiche.
_quicheConfig.setMaxIdleTimeout(0L);
_quicheConfig.setInitialMaxData(10000000L);
_quicheConfig.setInitialMaxStreamDataBidiLocal(10000000L);
_quicheConfig.setInitialMaxStreamDataBidiRemote(10000000L);
@ -141,6 +142,13 @@ public class ServerQuicConnector extends AbstractNetworkConnector
return datagramChannel;
}
@Override
public void setIdleTimeout(long idleTimeout)
{
super.setIdleTimeout(idleTimeout);
_manager.setIdleTimeout(idleTimeout);
}
@Override
protected void doStop() throws Exception
{
@ -193,7 +201,26 @@ public class ServerQuicConnector extends AbstractNetworkConnector
@Override
public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment)
{
return new ServerQuicConnection(ServerQuicConnector.this, endpoint, _quicheConfig);
return new ServerQuicConnection(QuicServerConnector.this, endpoint, _quicheConfig);
}
@Override
protected void endPointOpened(EndPoint endpoint)
{
super.endPointOpened(endpoint);
onEndPointOpened(endpoint);
}
@Override
protected void endPointClosed(EndPoint endpoint)
{
onEndPointClosed(endpoint);
super.endPointClosed(endpoint);
}
private void setIdleTimeout(long idleTimeout)
{
getConnectedEndPoints().forEach(endPoint -> endPoint.setIdleTimeout(idleTimeout));
}
}
}

View File

@ -13,7 +13,6 @@
package org.eclipse.jetty.quic.server;
import org.eclipse.jetty.quic.common.CloseInfo;
import org.eclipse.jetty.quic.common.ProtocolSession;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.slf4j.Logger;
@ -50,10 +49,10 @@ public class ServerProtocolSession extends ProtocolSession
}
@Override
protected void onClose(CloseInfo closeInfo)
protected void onClose(long error, String reason)
{
if (LOG.isDebugEnabled())
LOG.debug("session closed remotely {} {}", closeInfo, this);
LOG.debug("session closed remotely 0x{}/{} {}", Long.toHexString(error), reason, this);
// TODO: should probably reset the stream if it exists.
}
}

View File

@ -17,8 +17,10 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.Iterator;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.CyclicTimeouts;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.quic.common.QuicConnection;
import org.eclipse.jetty.quic.common.QuicSession;
@ -30,6 +32,7 @@ import org.eclipse.jetty.quic.server.internal.SimpleTokenValidator;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -42,12 +45,14 @@ public class ServerQuicConnection extends QuicConnection
private final QuicheConfig quicheConfig;
private final Connector connector;
private final SessionTimeouts sessionTimeouts;
protected ServerQuicConnection(Connector connector, EndPoint endPoint, QuicheConfig quicheConfig)
{
super(connector.getExecutor(), connector.getScheduler(), connector.getByteBufferPool(), endPoint);
this.quicheConfig = quicheConfig;
this.connector = connector;
this.sessionTimeouts = new SessionTimeouts(connector.getScheduler());
}
@Override
@ -91,4 +96,48 @@ public class ServerQuicConnection extends QuicConnection
return session;
}
}
public void schedule(ServerQuicSession session)
{
sessionTimeouts.schedule(session);
}
@Override
public boolean onIdleExpired()
{
// The current server architecture only has one listening
// DatagramChannelEndPoint, so we ignore idle timeouts.
return false;
}
@Override
public void outwardClose(QuicSession session, Throwable failure)
{
super.outwardClose(session, failure);
// Do nothing else, as the current architecture only has one
// listening DatagramChannelEndPoint, so it must not be closed.
}
private class SessionTimeouts extends CyclicTimeouts<ServerQuicSession>
{
private SessionTimeouts(Scheduler scheduler)
{
super(scheduler);
}
@Override
protected Iterator<ServerQuicSession> iterator()
{
return getQuicSessions().stream()
.map(ServerQuicSession.class::cast)
.iterator();
}
@Override
protected boolean onExpired(ServerQuicSession session)
{
session.onIdleTimeout();
return false;
}
}
}

View File

@ -13,12 +13,16 @@
package org.eclipse.jetty.quic.server;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.CyclicTimeouts;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.quic.common.ProtocolSession;
import org.eclipse.jetty.quic.common.QuicConnection;
@ -36,9 +40,10 @@ import org.eclipse.jetty.util.thread.Scheduler;
* retrieved from the server {@link Connector}, correspondent to the protocol
* negotiated with the client (or the default protocol).</p>
*/
public class ServerQuicSession extends QuicSession
public class ServerQuicSession extends QuicSession implements CyclicTimeouts.Expirable
{
private final Connector connector;
private long expireNanoTime;
protected ServerQuicSession(Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, QuicheConnection quicheConnection, QuicConnection connection, SocketAddress remoteAddress, Connector connector)
{
@ -46,6 +51,12 @@ public class ServerQuicSession extends QuicSession
this.connector = connector;
}
@Override
public ServerQuicConnection getQuicConnection()
{
return (ServerQuicConnection)super.getQuicConnection();
}
@Override
protected ProtocolSession createProtocolSession()
{
@ -76,4 +87,39 @@ public class ServerQuicSession extends QuicSession
throw new RuntimeIOException("No configured connection factory can handle protocol '" + negotiatedProtocol + "'");
return connectionFactory;
}
@Override
public long getExpireNanoTime()
{
return expireNanoTime;
}
@Override
public void setIdleTimeout(long idleTimeout)
{
super.setIdleTimeout(idleTimeout);
notIdle();
getQuicConnection().schedule(this);
}
@Override
public void process(SocketAddress remoteAddress, ByteBuffer cipherBufferIn) throws IOException
{
notIdle();
super.process(remoteAddress, cipherBufferIn);
}
@Override
public void flush()
{
notIdle();
super.flush();
}
private void notIdle()
{
long idleTimeout = getIdleTimeout();
if (idleTimeout > 0)
expireNanoTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(idleTimeout);
}
}

View File

@ -45,7 +45,7 @@ public class ServerQuicConnectorTest
config.setHttpCompliance(HttpCompliance.LEGACY); // enable HTTP/0.9
HttpConnectionFactory connectionFactory = new HttpConnectionFactory(config);
ServerQuicConnector connector = new ServerQuicConnector(server, sslContextFactory, connectionFactory);
QuicServerConnector connector = new QuicServerConnector(server, sslContextFactory, connectionFactory);
connector.setPort(8443);
server.addConnector(connector);
@ -86,7 +86,7 @@ public class ServerQuicConnectorTest
config.setHttpCompliance(HttpCompliance.LEGACY); // enable HTTP/0.9
HttpConnectionFactory connectionFactory = new HttpConnectionFactory(config);
ServerQuicConnector connector = new ServerQuicConnector(server, sslContextFactory, connectionFactory);
QuicServerConnector connector = new QuicServerConnector(server, sslContextFactory, connectionFactory);
connector.setPort(8443);
server.addConnector(connector);