mirror of
https://github.com/jetty/jetty.project.git
synced 2025-02-28 02:49:11 +00:00
Issue #6728 - QUIC and HTTP/3
- Implemented GOAWAY parsing/generation. - Implemented handling of GOAWAY frames. Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
parent
f061cc41db
commit
3c2feabaf6
@ -72,7 +72,7 @@ public class ClientHTTP3Session extends ClientProtocolSession
|
||||
|
||||
long controlStreamId = getQuicSession().newStreamId(StreamType.CLIENT_UNIDIRECTIONAL);
|
||||
QuicStreamEndPoint controlEndPoint = configureControlEndPoint(controlStreamId);
|
||||
this.controlFlusher = new ControlFlusher(session, controlEndPoint);
|
||||
this.controlFlusher = new ControlFlusher(session, controlEndPoint, true);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("created control stream #{} on {}", controlStreamId, controlEndPoint);
|
||||
|
||||
@ -150,7 +150,7 @@ public class ClientHTTP3Session extends ClientProtocolSession
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onClosed(CloseInfo closeInfo)
|
||||
protected void onClose(CloseInfo closeInfo)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("session closed remotely {} {}", closeInfo, this);
|
||||
@ -165,7 +165,13 @@ public class ClientHTTP3Session extends ClientProtocolSession
|
||||
connection.onOpen();
|
||||
}
|
||||
|
||||
void writeFrame(long streamId, Frame frame, Callback callback)
|
||||
void writeControlFrame(Frame frame, Callback callback)
|
||||
{
|
||||
controlFlusher.offer(frame, callback);
|
||||
controlFlusher.iterate();
|
||||
}
|
||||
|
||||
void writeMessageFrame(long streamId, Frame frame, Callback callback)
|
||||
{
|
||||
QuicStreamEndPoint endPoint = getOrCreateStreamEndPoint(streamId, this::configureProtocolEndPoint);
|
||||
messageFlusher.offer(endPoint, frame, callback);
|
||||
|
@ -19,15 +19,14 @@ import org.eclipse.jetty.http.MetaData;
|
||||
import org.eclipse.jetty.http3.api.Session;
|
||||
import org.eclipse.jetty.http3.api.Stream;
|
||||
import org.eclipse.jetty.http3.frames.Frame;
|
||||
import org.eclipse.jetty.http3.frames.GoAwayFrame;
|
||||
import org.eclipse.jetty.http3.frames.HeadersFrame;
|
||||
import org.eclipse.jetty.http3.internal.ErrorCode;
|
||||
import org.eclipse.jetty.http3.internal.HTTP3Session;
|
||||
import org.eclipse.jetty.http3.internal.HTTP3Stream;
|
||||
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
|
||||
import org.eclipse.jetty.quic.common.StreamType;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.Promise;
|
||||
import org.eclipse.jetty.util.thread.Invocable;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -52,6 +51,7 @@ public class HTTP3SessionClient extends HTTP3Session implements Session.Client
|
||||
@Override
|
||||
public void onOpen()
|
||||
{
|
||||
super.onOpen();
|
||||
promise.succeeded(this);
|
||||
}
|
||||
|
||||
@ -65,9 +65,7 @@ public class HTTP3SessionClient extends HTTP3Session implements Session.Client
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("received response {}#{} on {}", frame, streamId, this);
|
||||
stream.processResponse(frame);
|
||||
if (frame.isLast())
|
||||
removeStream(stream);
|
||||
stream.onResponse(frame);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -78,30 +76,27 @@ public class HTTP3SessionClient extends HTTP3Session implements Session.Client
|
||||
@Override
|
||||
public CompletableFuture<Stream> newRequest(HeadersFrame frame, Stream.Listener listener)
|
||||
{
|
||||
ClientHTTP3Session session = getProtocolSession();
|
||||
long streamId = session.getQuicSession().newStreamId(StreamType.CLIENT_BIDIRECTIONAL);
|
||||
QuicStreamEndPoint endPoint = session.getOrCreateStreamEndPoint(streamId, session::configureProtocolEndPoint);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("created request/response stream #{} on {}", streamId, endPoint);
|
||||
|
||||
Promise.Completable<Stream> promise = new Promise.Completable<>();
|
||||
HTTP3Stream stream = createStream(endPoint);
|
||||
stream.setListener(listener);
|
||||
|
||||
Callback callback = Callback.from(Invocable.InvocationType.NON_BLOCKING, () ->
|
||||
{
|
||||
if (listener == null)
|
||||
endPoint.shutdownInput(ErrorCode.NO_ERROR.code());
|
||||
promise.succeeded(stream);
|
||||
}, promise::failed);
|
||||
|
||||
session.writeFrame(streamId, frame, callback);
|
||||
return promise;
|
||||
long streamId = getProtocolSession().getQuicSession().newStreamId(StreamType.CLIENT_BIDIRECTIONAL);
|
||||
return newRequest(streamId, frame, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeFrame(long streamId, Frame frame, Callback callback)
|
||||
public void writeControlFrame(Frame frame, Callback callback)
|
||||
{
|
||||
getProtocolSession().writeFrame(streamId, frame, callback);
|
||||
getProtocolSession().writeControlFrame(frame, callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeMessageFrame(long streamId, Frame frame, Callback callback)
|
||||
{
|
||||
getProtocolSession().writeMessageFrame(streamId, frame, callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected GoAwayFrame newGoAwayFrame(boolean graceful)
|
||||
{
|
||||
if (graceful)
|
||||
return GoAwayFrame.CLIENT_GRACEFUL;
|
||||
return super.newGoAwayFrame(graceful);
|
||||
}
|
||||
}
|
||||
|
@ -19,6 +19,7 @@ import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
import org.eclipse.jetty.http3.frames.GoAwayFrame;
|
||||
import org.eclipse.jetty.http3.frames.HeadersFrame;
|
||||
import org.eclipse.jetty.http3.frames.SettingsFrame;
|
||||
|
||||
@ -51,14 +52,6 @@ public interface Session
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return whether this session is not open
|
||||
*/
|
||||
public default boolean isClosed()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return a snapshot of all the streams currently belonging to this session
|
||||
*/
|
||||
@ -67,6 +60,17 @@ public interface Session
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Initiates the shutdown of this session by sending a GOAWAY frame to the other peer.</p>
|
||||
*
|
||||
* @param graceful whether the shutdown should be graceful
|
||||
* @return the {@link CompletableFuture} that gets notified when the frame has been sent
|
||||
*/
|
||||
public default CompletableFuture<Void> goAway(boolean graceful)
|
||||
{
|
||||
return CompletableFuture.completedFuture(null);
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>The client-side HTTP/3 API representing a connection with a server.</p>
|
||||
* <p>Once a {@link Session} has been obtained, it can be used to make HTTP/3 requests:</p>
|
||||
@ -159,6 +163,25 @@ public interface Session
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Callback method invoked when a GOAWAY frame has been received.</p>
|
||||
*
|
||||
* @param session the session
|
||||
* @param frame the GOAWAY frame received
|
||||
*/
|
||||
public default void onGoAway(Session session, GoAwayFrame frame)
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Callback method invoked when a the underlying transport has been closed.</p>
|
||||
*
|
||||
* @param session the session
|
||||
*/
|
||||
public default void onTerminate(Session session)
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Callback method invoked when a request is received.</p>
|
||||
* <p>Applications should implement this method to process HTTP/3 requests,
|
||||
|
@ -236,11 +236,11 @@ public interface Stream
|
||||
* decode a HEADERS frame, or failures to read bytes because
|
||||
* the stream has been reset.</p>
|
||||
*
|
||||
* @param stream the stream
|
||||
* @param error the error code
|
||||
* @param failure a short description of the failure,
|
||||
* or {@code null} if no short description is available
|
||||
* @param failure the cause of the failure
|
||||
*/
|
||||
public default void onFailure(long error, Throwable failure)
|
||||
public default void onFailure(Stream stream, long error, Throwable failure)
|
||||
{
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,44 @@
|
||||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
|
||||
//
|
||||
// This program and the accompanying materials are made available under the
|
||||
// terms of the Eclipse Public License v. 2.0 which is available at
|
||||
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
|
||||
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
|
||||
//
|
||||
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.http3.frames;
|
||||
|
||||
public class GoAwayFrame extends Frame
|
||||
{
|
||||
public static final GoAwayFrame CLIENT_GRACEFUL = new GoAwayFrame((1L << 62) - 1);
|
||||
public static final GoAwayFrame SERVER_GRACEFUL = new GoAwayFrame((1L << 62) - 4);
|
||||
|
||||
private final long lastId;
|
||||
|
||||
public GoAwayFrame(long lastId)
|
||||
{
|
||||
super(FrameType.GOAWAY);
|
||||
this.lastId = lastId;
|
||||
}
|
||||
|
||||
public long getLastId()
|
||||
{
|
||||
return lastId;
|
||||
}
|
||||
|
||||
public boolean isGraceful()
|
||||
{
|
||||
return lastId == CLIENT_GRACEFUL.lastId || lastId == SERVER_GRACEFUL.lastId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return String.format("%s@%x[lastId=%d,graceful=%b]", getClass().getSimpleName(), hashCode(), getLastId(), isGraceful());
|
||||
}
|
||||
}
|
@ -44,11 +44,11 @@ public class ControlFlusher extends IteratingCallback
|
||||
private List<Entry> entries;
|
||||
private InvocationType invocationType = InvocationType.NON_BLOCKING;
|
||||
|
||||
public ControlFlusher(QuicSession session, QuicStreamEndPoint endPoint)
|
||||
public ControlFlusher(QuicSession session, QuicStreamEndPoint endPoint, boolean useDirectByteBuffers)
|
||||
{
|
||||
this.lease = new ByteBufferPool.Lease(session.getByteBufferPool());
|
||||
this.endPoint = endPoint;
|
||||
this.generator = new ControlGenerator();
|
||||
this.generator = new ControlGenerator(useDirectByteBuffers);
|
||||
}
|
||||
|
||||
public void offer(Frame frame, Callback callback)
|
||||
|
@ -13,27 +13,37 @@
|
||||
|
||||
package org.eclipse.jetty.http3.internal;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.SocketAddress;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import org.eclipse.jetty.http.MetaData;
|
||||
import org.eclipse.jetty.http3.api.Session;
|
||||
import org.eclipse.jetty.http3.api.Stream;
|
||||
import org.eclipse.jetty.http3.frames.DataFrame;
|
||||
import org.eclipse.jetty.http3.frames.Frame;
|
||||
import org.eclipse.jetty.http3.frames.GoAwayFrame;
|
||||
import org.eclipse.jetty.http3.frames.HeadersFrame;
|
||||
import org.eclipse.jetty.http3.frames.SettingsFrame;
|
||||
import org.eclipse.jetty.http3.internal.parser.ParserListener;
|
||||
import org.eclipse.jetty.io.CyclicTimeouts;
|
||||
import org.eclipse.jetty.quic.common.ProtocolSession;
|
||||
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
|
||||
import org.eclipse.jetty.quic.common.StreamType;
|
||||
import org.eclipse.jetty.util.Atomics;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.Promise;
|
||||
import org.eclipse.jetty.util.thread.AutoLock;
|
||||
import org.eclipse.jetty.util.thread.Invocable;
|
||||
import org.eclipse.jetty.util.thread.Scheduler;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -42,12 +52,19 @@ public abstract class HTTP3Session implements Session, ParserListener
|
||||
{
|
||||
private static final Logger LOG = LoggerFactory.getLogger(HTTP3Session.class);
|
||||
|
||||
private final AutoLock lock = new AutoLock();
|
||||
private final AtomicLong lastId = new AtomicLong();
|
||||
private final Map<Long, HTTP3Stream> streams = new ConcurrentHashMap<>();
|
||||
private final ProtocolSession session;
|
||||
private final Listener listener;
|
||||
private final AtomicInteger remoteStreamCount = new AtomicInteger();
|
||||
private final StreamTimeouts streamTimeouts;
|
||||
private CloseState closeState = CloseState.CLOSED;
|
||||
private long streamIdleTimeout;
|
||||
private CloseState closeState = CloseState.CLOSED;
|
||||
private GoAwayFrame goAwaySent;
|
||||
private GoAwayFrame goAwayRecv;
|
||||
private Runnable zeroStreamsAction;
|
||||
private Callback.Completable shutdown;
|
||||
|
||||
public HTTP3Session(ProtocolSession session, Listener listener)
|
||||
{
|
||||
@ -83,20 +100,146 @@ public abstract class HTTP3Session implements Session, ParserListener
|
||||
return getProtocolSession().getQuicSession().getRemoteAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isClosed()
|
||||
{
|
||||
return closeState != CloseState.NOT_CLOSED;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<Stream> getStreams()
|
||||
{
|
||||
return List.copyOf(streams.values());
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> goAway(boolean graceful)
|
||||
{
|
||||
return goAway(newGoAwayFrame(graceful));
|
||||
}
|
||||
|
||||
private CompletableFuture<Void> goAway(GoAwayFrame frame)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("goaway with {} on {}", frame, this);
|
||||
|
||||
boolean sendGoAway = false;
|
||||
Callback.Completable callback = null;
|
||||
try (AutoLock l = lock.lock())
|
||||
{
|
||||
switch (closeState)
|
||||
{
|
||||
case NOT_CLOSED:
|
||||
{
|
||||
goAwaySent = frame;
|
||||
sendGoAway = true;
|
||||
closeState = CloseState.LOCALLY_CLOSED;
|
||||
if (frame.isGraceful())
|
||||
{
|
||||
// Send the non-graceful GOAWAY when the last stream is destroyed.
|
||||
zeroStreamsAction = () -> goAway(false);
|
||||
shutdown = callback = new Callback.Completable();
|
||||
}
|
||||
break;
|
||||
}
|
||||
case LOCALLY_CLOSED:
|
||||
{
|
||||
if (frame.isGraceful())
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("already sent {} on {}", goAwaySent, this);
|
||||
}
|
||||
else
|
||||
{
|
||||
// SPEC: see section about connection shutdown.
|
||||
if (goAwaySent.isGraceful() || frame.getLastId() < goAwaySent.getLastId())
|
||||
{
|
||||
goAwaySent = frame;
|
||||
sendGoAway = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("already sent {} on {}", goAwaySent, this);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
case REMOTELY_CLOSED:
|
||||
{
|
||||
goAwaySent = frame;
|
||||
sendGoAway = true;
|
||||
if (frame.isGraceful())
|
||||
{
|
||||
// Send the non-graceful GOAWAY when the last stream is destroyed.
|
||||
zeroStreamsAction = () -> goAway(false);
|
||||
shutdown = callback = new Callback.Completable();
|
||||
}
|
||||
else
|
||||
{
|
||||
if (goAwayRecv.isGraceful())
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("waiting non-graceful GOAWAY on {}", this);
|
||||
}
|
||||
else
|
||||
{
|
||||
closeState = CloseState.CLOSING;
|
||||
zeroStreamsAction = this::terminate;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
case CLOSING:
|
||||
case CLOSED:
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("already closed on {}", this);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (sendGoAway)
|
||||
{
|
||||
if (callback == null)
|
||||
{
|
||||
callback = new Callback.Completable();
|
||||
callback.thenRun(this::tryRunZeroStreamsAction);
|
||||
writeControlFrame(frame, callback);
|
||||
}
|
||||
else
|
||||
{
|
||||
Callback writeCallback = Callback.from(Invocable.InvocationType.NON_BLOCKING, this::tryRunZeroStreamsAction, callback::failed);
|
||||
writeControlFrame(frame, writeCallback);
|
||||
}
|
||||
return callback;
|
||||
}
|
||||
else
|
||||
{
|
||||
return CompletableFuture.completedFuture(null);
|
||||
}
|
||||
}
|
||||
|
||||
protected GoAwayFrame newGoAwayFrame(boolean graceful)
|
||||
{
|
||||
return new GoAwayFrame(lastId.get());
|
||||
}
|
||||
|
||||
protected void updateLastId(long id)
|
||||
{
|
||||
Atomics.updateMax(lastId, id);
|
||||
}
|
||||
|
||||
public void close(long error, String reason)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("closing 0x{}/{} on {}", Long.toHexString(error), reason, this);
|
||||
try (AutoLock l = lock.lock())
|
||||
{
|
||||
closeState = CloseState.CLOSED;
|
||||
zeroStreamsAction = null;
|
||||
// TODO: what about field shutdown?
|
||||
}
|
||||
failStreams(stream -> true, false);
|
||||
getProtocolSession().close(error, reason);
|
||||
}
|
||||
|
||||
@ -115,35 +258,86 @@ public abstract class HTTP3Session implements Session, ParserListener
|
||||
streamTimeouts.schedule(stream);
|
||||
}
|
||||
|
||||
protected HTTP3Stream createStream(QuicStreamEndPoint endPoint)
|
||||
protected CompletableFuture<Stream> newRequest(long streamId, HeadersFrame frame, Stream.Listener listener)
|
||||
{
|
||||
QuicStreamEndPoint endPoint = session.getOrCreateStreamEndPoint(streamId, session::configureProtocolEndPoint);
|
||||
|
||||
Promise.Completable<Stream> promise = new Promise.Completable<>();
|
||||
promise.whenComplete((s, x) ->
|
||||
{
|
||||
if (x != null)
|
||||
endPoint.close(ErrorCode.REQUEST_CANCELLED_ERROR.code(), x);
|
||||
});
|
||||
HTTP3Stream stream = createStream(endPoint, promise::failed);
|
||||
if (stream == null)
|
||||
return promise;
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("created request/response stream {}", stream);
|
||||
|
||||
stream.setListener(listener);
|
||||
|
||||
Callback callback = Callback.from(Invocable.InvocationType.NON_BLOCKING, () ->
|
||||
{
|
||||
if (listener == null)
|
||||
endPoint.shutdownInput(ErrorCode.NO_ERROR.code());
|
||||
promise.succeeded(stream);
|
||||
}, promise::failed);
|
||||
|
||||
writeMessageFrame(streamId, frame, callback);
|
||||
return promise;
|
||||
}
|
||||
|
||||
protected HTTP3Stream createStream(QuicStreamEndPoint endPoint, Consumer<Throwable> fail)
|
||||
{
|
||||
long streamId = endPoint.getStreamId();
|
||||
return streams.compute(streamId, (id, stream) ->
|
||||
{
|
||||
if (stream != null)
|
||||
throw new IllegalStateException("duplicate stream id " + streamId);
|
||||
return newHTTP3Stream(endPoint);
|
||||
return newHTTP3Stream(endPoint, fail, true);
|
||||
});
|
||||
}
|
||||
|
||||
protected HTTP3Stream getOrCreateStream(QuicStreamEndPoint endPoint)
|
||||
{
|
||||
return streams.computeIfAbsent(endPoint.getStreamId(), id -> newHTTP3Stream(endPoint));
|
||||
return streams.computeIfAbsent(endPoint.getStreamId(), id -> newHTTP3Stream(endPoint, null, false));
|
||||
}
|
||||
|
||||
private HTTP3Stream newHTTP3Stream(QuicStreamEndPoint endPoint)
|
||||
private HTTP3Stream newHTTP3Stream(QuicStreamEndPoint endPoint, Consumer<Throwable> fail, boolean local)
|
||||
{
|
||||
HTTP3Stream stream = new HTTP3Stream(this, endPoint);
|
||||
// Unidirectional streams must not idle timeout.
|
||||
if (StreamType.isBidirectional(stream.getId()))
|
||||
Throwable failure = null;
|
||||
try (AutoLock l = lock.lock())
|
||||
{
|
||||
if (closeState == CloseState.NOT_CLOSED)
|
||||
{
|
||||
if (!local)
|
||||
remoteStreamCount.incrementAndGet();
|
||||
}
|
||||
else
|
||||
{
|
||||
failure = new IllegalStateException("session_closed");
|
||||
}
|
||||
}
|
||||
|
||||
if (failure == null)
|
||||
{
|
||||
HTTP3Stream stream = new HTTP3Stream(this, endPoint, local);
|
||||
long idleTimeout = getStreamIdleTimeout();
|
||||
if (idleTimeout > 0)
|
||||
stream.setIdleTimeout(idleTimeout);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("created {} on {}", stream, this);
|
||||
return stream;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("could not create stream for {} on {}", endPoint, this);
|
||||
if (fail != null)
|
||||
fail.accept(failure);
|
||||
return null;
|
||||
}
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("created {} on {}", stream, this);
|
||||
return stream;
|
||||
}
|
||||
|
||||
protected HTTP3Stream getStream(long streamId)
|
||||
@ -158,10 +352,18 @@ public abstract class HTTP3Session implements Session, ParserListener
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("destroyed {} on {}", stream, this);
|
||||
|
||||
if (!stream.isLocal())
|
||||
{
|
||||
if (remoteStreamCount.decrementAndGet() == 0)
|
||||
tryRunZeroStreamsAction();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public abstract void writeFrame(long streamId, Frame frame, Callback callback);
|
||||
public abstract void writeControlFrame(Frame frame, Callback callback);
|
||||
|
||||
public abstract void writeMessageFrame(long streamId, Frame frame, Callback callback);
|
||||
|
||||
public Map<Long, Long> onPreface()
|
||||
{
|
||||
@ -204,6 +406,18 @@ public abstract class HTTP3Session implements Session, ParserListener
|
||||
}
|
||||
}
|
||||
|
||||
private void notifyGoAway(GoAwayFrame frame)
|
||||
{
|
||||
try
|
||||
{
|
||||
listener.onGoAway(this, frame);
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
LOG.info("failure notifying listener {}", listener, x);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onHeaders(long streamId, HeadersFrame frame)
|
||||
{
|
||||
@ -218,8 +432,7 @@ public abstract class HTTP3Session implements Session, ParserListener
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("received trailer {}#{} on {}", frame, streamId, this);
|
||||
stream.processTrailer(frame);
|
||||
removeStream(stream);
|
||||
stream.onTrailer(frame);
|
||||
}
|
||||
}
|
||||
|
||||
@ -230,7 +443,7 @@ public abstract class HTTP3Session implements Session, ParserListener
|
||||
LOG.debug("received {}#{} on {}", frame, streamId, this);
|
||||
HTTP3Stream stream = getStream(streamId);
|
||||
if (stream != null)
|
||||
stream.processData(frame);
|
||||
stream.onData(frame);
|
||||
else
|
||||
closeAndNotifyFailure(ErrorCode.FRAME_UNEXPECTED_ERROR.code(), "invalid_frame_sequence");
|
||||
}
|
||||
@ -240,7 +453,7 @@ public abstract class HTTP3Session implements Session, ParserListener
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("notifying data available for stream #{} on {}", streamId, this);
|
||||
HTTP3Stream stream = getStream(streamId);
|
||||
stream.processDataAvailable();
|
||||
stream.onDataAvailable();
|
||||
}
|
||||
|
||||
void closeAndNotifyFailure(long error, String reason)
|
||||
@ -249,6 +462,233 @@ public abstract class HTTP3Session implements Session, ParserListener
|
||||
notifySessionFailure(error, reason);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onGoAway(GoAwayFrame frame)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("received {} on {}", frame, this);
|
||||
|
||||
boolean failStreams = false;
|
||||
try (AutoLock l = lock.lock())
|
||||
{
|
||||
switch (closeState)
|
||||
{
|
||||
case NOT_CLOSED:
|
||||
{
|
||||
goAwayRecv = frame;
|
||||
if (frame.isGraceful())
|
||||
{
|
||||
closeState = CloseState.REMOTELY_CLOSED;
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("waiting non-graceful GOAWAY on {}", this);
|
||||
}
|
||||
else
|
||||
{
|
||||
goAwaySent = newGoAwayFrame(false);
|
||||
closeState = CloseState.CLOSING;
|
||||
GoAwayFrame goAwayFrame = goAwaySent;
|
||||
zeroStreamsAction = () -> writeControlFrame(goAwayFrame, Callback.from(this::terminate));
|
||||
failStreams = true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case LOCALLY_CLOSED:
|
||||
{
|
||||
goAwayRecv = frame;
|
||||
if (frame.isGraceful())
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("waiting non-graceful GOAWAY on {}", this);
|
||||
}
|
||||
else
|
||||
{
|
||||
closeState = CloseState.CLOSING;
|
||||
if (goAwaySent.isGraceful())
|
||||
{
|
||||
goAwaySent = newGoAwayFrame(false);
|
||||
GoAwayFrame goAwayFrame = goAwaySent;
|
||||
zeroStreamsAction = () -> writeControlFrame(goAwayFrame, Callback.from(this::terminate));
|
||||
}
|
||||
else
|
||||
{
|
||||
zeroStreamsAction = this::terminate;
|
||||
failStreams = true;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
case REMOTELY_CLOSED:
|
||||
{
|
||||
if (frame.isGraceful())
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("already received {} on {}", goAwayRecv, this);
|
||||
}
|
||||
else
|
||||
{
|
||||
goAwayRecv = frame;
|
||||
closeState = CloseState.CLOSING;
|
||||
if (goAwaySent == null || goAwaySent.isGraceful())
|
||||
{
|
||||
goAwaySent = newGoAwayFrame(false);
|
||||
GoAwayFrame goAwayFrame = goAwaySent;
|
||||
zeroStreamsAction = () -> writeControlFrame(goAwayFrame, Callback.from(this::terminate));
|
||||
}
|
||||
else
|
||||
{
|
||||
zeroStreamsAction = this::terminate;
|
||||
}
|
||||
failStreams = true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case CLOSING:
|
||||
case CLOSED:
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("already closed on {}", this);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
notifyGoAway(frame);
|
||||
|
||||
if (failStreams)
|
||||
{
|
||||
// The other peer sent us a GOAWAY with the last processed streamId,
|
||||
// so we must fail the streams that have a bigger streamId.
|
||||
Predicate<HTTP3Stream> predicate = stream -> stream.isLocal() && stream.getId() > frame.getLastId();
|
||||
failStreams(predicate, true);
|
||||
}
|
||||
|
||||
tryRunZeroStreamsAction();
|
||||
}
|
||||
|
||||
private void failStreams(Predicate<HTTP3Stream> predicate, boolean close)
|
||||
{
|
||||
long error = ErrorCode.REQUEST_CANCELLED_ERROR.code();
|
||||
Throwable failure = new IOException("request_cancelled");
|
||||
streams.values().stream()
|
||||
.filter(predicate)
|
||||
.forEach(stream ->
|
||||
{
|
||||
if (close)
|
||||
stream.close(error, failure);
|
||||
// Since the stream failure was generated
|
||||
// by a GOAWAY, notify the application.
|
||||
stream.onFailure(error, failure);
|
||||
removeStream(stream);
|
||||
});
|
||||
}
|
||||
|
||||
private void terminate()
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("terminating {}", this);
|
||||
streamTimeouts.destroy();
|
||||
close(ErrorCode.NO_ERROR.code(), "terminate");
|
||||
// Since the close() above is called by the
|
||||
// implementation, notify the application.
|
||||
notifyTerminate();
|
||||
}
|
||||
|
||||
private void tryRunZeroStreamsAction()
|
||||
{
|
||||
Runnable action = null;
|
||||
CompletableFuture<Void> completable;
|
||||
try (AutoLock l = lock.lock())
|
||||
{
|
||||
long count = remoteStreamCount.get();
|
||||
if (count > 0)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("deferring closing action, {} pending streams on {}", count, this);
|
||||
return;
|
||||
}
|
||||
|
||||
completable = shutdown;
|
||||
|
||||
switch (closeState)
|
||||
{
|
||||
case LOCALLY_CLOSED:
|
||||
{
|
||||
if (goAwaySent.isGraceful())
|
||||
{
|
||||
action = zeroStreamsAction;
|
||||
zeroStreamsAction = null;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case REMOTELY_CLOSED:
|
||||
{
|
||||
if (goAwaySent != null && goAwaySent.isGraceful())
|
||||
{
|
||||
action = zeroStreamsAction;
|
||||
zeroStreamsAction = null;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case CLOSING:
|
||||
{
|
||||
closeState = CloseState.CLOSED;
|
||||
action = zeroStreamsAction;
|
||||
zeroStreamsAction = null;
|
||||
break;
|
||||
}
|
||||
case NOT_CLOSED:
|
||||
case CLOSED:
|
||||
{
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (action != null)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("executing zero streams action on {}", this);
|
||||
action.run();
|
||||
}
|
||||
|
||||
if (completable != null)
|
||||
completable.complete(null);
|
||||
}
|
||||
|
||||
public void onClose(int error, String reason)
|
||||
{
|
||||
// A close at the QUIC level does not allow
|
||||
// any data to be sent, just update the state.
|
||||
try (AutoLock l = lock.lock())
|
||||
{
|
||||
closeState = CloseState.CLOSED;
|
||||
zeroStreamsAction = null;
|
||||
// TODO: what about field shutdown?
|
||||
}
|
||||
failStreams(stream -> true, false);
|
||||
notifyTerminate();
|
||||
}
|
||||
|
||||
private void notifyTerminate()
|
||||
{
|
||||
try
|
||||
{
|
||||
listener.onTerminate(this);
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
LOG.info("failure notifying listener {}", listener, x);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onStreamFailure(long streamId, long error, Throwable failure)
|
||||
{
|
||||
@ -257,7 +697,7 @@ public abstract class HTTP3Session implements Session, ParserListener
|
||||
HTTP3Stream stream = getStream(streamId);
|
||||
if (stream != null)
|
||||
{
|
||||
stream.processFailure(error, failure);
|
||||
stream.onFailure(error, failure);
|
||||
removeStream(stream);
|
||||
}
|
||||
}
|
||||
@ -280,15 +720,20 @@ public abstract class HTTP3Session implements Session, ParserListener
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isClosed()
|
||||
{
|
||||
return closeState == CloseState.CLOSED;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return String.format("%s@%x", getClass().getSimpleName(), hashCode());
|
||||
return String.format("%s@%x[streams=%d,%s]", getClass().getSimpleName(), hashCode(), remoteStreamCount.get(), closeState);
|
||||
}
|
||||
|
||||
private enum CloseState
|
||||
{
|
||||
NOT_CLOSED, CLOSED
|
||||
NOT_CLOSED, LOCALLY_CLOSED, REMOTELY_CLOSED, CLOSING, CLOSED
|
||||
}
|
||||
|
||||
private class StreamTimeouts extends CyclicTimeouts<HTTP3Stream>
|
||||
|
@ -37,15 +37,18 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable
|
||||
|
||||
private final HTTP3Session session;
|
||||
private final QuicStreamEndPoint endPoint;
|
||||
private final boolean local;
|
||||
private CloseState closeState = CloseState.NOT_CLOSED;
|
||||
private Listener listener;
|
||||
private FrameState frameState = FrameState.INITIAL;
|
||||
private long idleTimeout;
|
||||
private long expireNanoTime;
|
||||
|
||||
public HTTP3Stream(HTTP3Session session, QuicStreamEndPoint endPoint)
|
||||
public HTTP3Stream(HTTP3Session session, QuicStreamEndPoint endPoint, boolean local)
|
||||
{
|
||||
this.session = session;
|
||||
this.endPoint = endPoint;
|
||||
this.local = local;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -60,6 +63,11 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable
|
||||
return session;
|
||||
}
|
||||
|
||||
public boolean isLocal()
|
||||
{
|
||||
return local;
|
||||
}
|
||||
|
||||
public Listener getListener()
|
||||
{
|
||||
return listener;
|
||||
@ -110,13 +118,17 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable
|
||||
@Override
|
||||
public CompletableFuture<Stream> respond(HeadersFrame frame)
|
||||
{
|
||||
return writeFrame(frame);
|
||||
Promise.Completable<Stream> completable = writeFrame(frame);
|
||||
updateClose(frame.isLast(), true);
|
||||
return completable;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Stream> data(DataFrame frame)
|
||||
{
|
||||
return writeFrame(frame);
|
||||
Promise.Completable<Stream> completable = writeFrame(frame);
|
||||
updateClose(frame.isLast(), true);
|
||||
return completable;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -125,11 +137,13 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable
|
||||
try
|
||||
{
|
||||
HTTP3StreamConnection connection = (HTTP3StreamConnection)endPoint.getConnection();
|
||||
return connection.readData();
|
||||
Data data = connection.readData();
|
||||
updateClose(data.isLast(), false);
|
||||
return data;
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
session.removeStream(this);
|
||||
updateClose(true, false);
|
||||
throw x;
|
||||
}
|
||||
}
|
||||
@ -155,7 +169,7 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable
|
||||
return connection.hasDemand();
|
||||
}
|
||||
|
||||
public void processRequest(HeadersFrame frame)
|
||||
public void onRequest(HeadersFrame frame)
|
||||
{
|
||||
if (validateAndUpdate(EnumSet.of(FrameState.INITIAL), FrameState.HEADER))
|
||||
{
|
||||
@ -165,8 +179,9 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable
|
||||
if (listener == null)
|
||||
{
|
||||
Callback callback = Callback.from(Invocable.InvocationType.NON_BLOCKING, () -> endPoint.shutdownInput(ErrorCode.NO_ERROR.code()));
|
||||
session.writeFrame(getId(), new HTTP3Flusher.FlushFrame(), callback);
|
||||
session.writeMessageFrame(getId(), new HTTP3Flusher.FlushFrame(), callback);
|
||||
}
|
||||
updateClose(frame.isLast(), false);
|
||||
}
|
||||
}
|
||||
|
||||
@ -184,12 +199,13 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable
|
||||
}
|
||||
}
|
||||
|
||||
public void processResponse(HeadersFrame frame)
|
||||
public void onResponse(HeadersFrame frame)
|
||||
{
|
||||
if (validateAndUpdate(EnumSet.of(FrameState.INITIAL), FrameState.HEADER))
|
||||
{
|
||||
notIdle();
|
||||
notifyResponse(frame);
|
||||
updateClose(frame.isLast(), false);
|
||||
}
|
||||
}
|
||||
|
||||
@ -207,13 +223,13 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable
|
||||
}
|
||||
}
|
||||
|
||||
public void processData(DataFrame frame)
|
||||
public void onData(DataFrame frame)
|
||||
{
|
||||
if (validateAndUpdate(EnumSet.of(FrameState.HEADER, FrameState.DATA), FrameState.DATA))
|
||||
notIdle();
|
||||
}
|
||||
|
||||
public void processDataAvailable()
|
||||
public void onDataAvailable()
|
||||
{
|
||||
notifyDataAvailable();
|
||||
}
|
||||
@ -232,12 +248,13 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable
|
||||
}
|
||||
}
|
||||
|
||||
public void processTrailer(HeadersFrame frame)
|
||||
public void onTrailer(HeadersFrame frame)
|
||||
{
|
||||
if (validateAndUpdate(EnumSet.of(FrameState.HEADER, FrameState.DATA), FrameState.TRAILER))
|
||||
{
|
||||
notIdle();
|
||||
notifyTrailer(frame);
|
||||
updateClose(frame.isLast(), false);
|
||||
}
|
||||
}
|
||||
|
||||
@ -271,7 +288,7 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable
|
||||
}
|
||||
}
|
||||
|
||||
public void processFailure(long error, Throwable failure)
|
||||
public void onFailure(long error, Throwable failure)
|
||||
{
|
||||
notifyFailure(error, failure);
|
||||
}
|
||||
@ -282,7 +299,7 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable
|
||||
try
|
||||
{
|
||||
if (listener != null)
|
||||
listener.onFailure(error, failure);
|
||||
listener.onFailure(this, error, failure);
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
@ -313,10 +330,58 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable
|
||||
{
|
||||
notIdle();
|
||||
Promise.Completable<Stream> completable = new Promise.Completable<>();
|
||||
session.writeFrame(endPoint.getStreamId(), frame, Callback.from(Invocable.InvocationType.NON_BLOCKING, () -> completable.succeeded(this), completable::failed));
|
||||
session.writeMessageFrame(endPoint.getStreamId(), frame, Callback.from(Invocable.InvocationType.NON_BLOCKING, () -> completable.succeeded(this), completable::failed));
|
||||
return completable;
|
||||
}
|
||||
|
||||
private void updateClose(boolean update, boolean local)
|
||||
{
|
||||
if (update)
|
||||
{
|
||||
switch (closeState)
|
||||
{
|
||||
case NOT_CLOSED:
|
||||
{
|
||||
closeState = local ? CloseState.LOCALLY_CLOSED : CloseState.REMOTELY_CLOSED;
|
||||
break;
|
||||
}
|
||||
case LOCALLY_CLOSED:
|
||||
{
|
||||
if (!local)
|
||||
{
|
||||
closeState = CloseState.CLOSED;
|
||||
session.removeStream(this);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case REMOTELY_CLOSED:
|
||||
{
|
||||
if (local)
|
||||
{
|
||||
closeState = CloseState.CLOSED;
|
||||
session.removeStream(this);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case CLOSED:
|
||||
{
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void close(long error, Throwable failure)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("closing {} with error 0x{} {}", this, Long.toHexString(error), failure.toString());
|
||||
endPoint.close(error, failure);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
@ -333,4 +398,9 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable
|
||||
{
|
||||
INITIAL, HEADER, DATA, TRAILER, FAILED
|
||||
}
|
||||
|
||||
private enum CloseState
|
||||
{
|
||||
NOT_CLOSED, LOCALLY_CLOSED, REMOTELY_CLOSED, CLOSED
|
||||
}
|
||||
}
|
||||
|
@ -36,15 +36,15 @@ public class VarLenInt
|
||||
|
||||
public boolean parseInt(ByteBuffer buffer, IntConsumer consumer)
|
||||
{
|
||||
return parseInt(buffer, consumer, null);
|
||||
return parse(buffer, consumer, null);
|
||||
}
|
||||
|
||||
public boolean parseLong(ByteBuffer buffer, LongConsumer consumer)
|
||||
{
|
||||
return parseInt(buffer, null, consumer);
|
||||
return parse(buffer, null, consumer);
|
||||
}
|
||||
|
||||
private boolean parseInt(ByteBuffer buffer, IntConsumer intConsumer, LongConsumer longConsumer)
|
||||
private boolean parse(ByteBuffer buffer, IntConsumer intConsumer, LongConsumer longConsumer)
|
||||
{
|
||||
while (buffer.hasRemaining())
|
||||
{
|
||||
|
@ -21,11 +21,11 @@ public class ControlGenerator
|
||||
{
|
||||
private final FrameGenerator[] generators = new FrameGenerator[FrameType.maxType() + 1];
|
||||
|
||||
public ControlGenerator()
|
||||
public ControlGenerator(boolean useDirectByteBuffers)
|
||||
{
|
||||
generators[FrameType.CANCEL_PUSH.type()] = new CancelPushGenerator();
|
||||
generators[FrameType.SETTINGS.type()] = new SettingsGenerator();
|
||||
generators[FrameType.GOAWAY.type()] = new GoAwayGenerator();
|
||||
generators[FrameType.SETTINGS.type()] = new SettingsGenerator(useDirectByteBuffers);
|
||||
generators[FrameType.GOAWAY.type()] = new GoAwayGenerator(useDirectByteBuffers);
|
||||
generators[FrameType.MAX_PUSH_ID.type()] = new MaxPushIdGenerator();
|
||||
}
|
||||
|
||||
|
@ -23,6 +23,13 @@ import org.eclipse.jetty.io.ByteBufferPool;
|
||||
|
||||
public class DataGenerator extends FrameGenerator
|
||||
{
|
||||
private final boolean useDirectByteBuffers;
|
||||
|
||||
public DataGenerator(boolean useDirectByteBuffers)
|
||||
{
|
||||
this.useDirectByteBuffers = useDirectByteBuffers;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int generate(ByteBufferPool.Lease lease, long streamId, Frame frame)
|
||||
{
|
||||
@ -35,11 +42,11 @@ public class DataGenerator extends FrameGenerator
|
||||
ByteBuffer data = frame.getByteBuffer();
|
||||
int dataLength = data.remaining();
|
||||
int headerLength = VarLenInt.length(FrameType.DATA.type()) + VarLenInt.length(dataLength);
|
||||
ByteBuffer header = ByteBuffer.allocate(headerLength);
|
||||
ByteBuffer header = lease.acquire(headerLength, useDirectByteBuffers);
|
||||
VarLenInt.generate(header, FrameType.DATA.type());
|
||||
VarLenInt.generate(header, dataLength);
|
||||
header.flip();
|
||||
lease.append(header, false);
|
||||
lease.append(header, true);
|
||||
lease.append(data, false);
|
||||
return headerLength + dataLength;
|
||||
}
|
||||
|
@ -13,14 +13,41 @@
|
||||
|
||||
package org.eclipse.jetty.http3.internal.generator;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.eclipse.jetty.http3.frames.Frame;
|
||||
import org.eclipse.jetty.http3.frames.FrameType;
|
||||
import org.eclipse.jetty.http3.frames.GoAwayFrame;
|
||||
import org.eclipse.jetty.http3.internal.VarLenInt;
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
|
||||
public class GoAwayGenerator extends FrameGenerator
|
||||
{
|
||||
private final boolean useDirectByteBuffers;
|
||||
|
||||
public GoAwayGenerator(boolean useDirectByteBuffers)
|
||||
{
|
||||
this.useDirectByteBuffers = useDirectByteBuffers;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int generate(ByteBufferPool.Lease lease, long streamId, Frame frame)
|
||||
{
|
||||
return 0;
|
||||
GoAwayFrame goAwayFrame = (GoAwayFrame)frame;
|
||||
return generateGoAwayFrame(lease, goAwayFrame);
|
||||
}
|
||||
|
||||
private int generateGoAwayFrame(ByteBufferPool.Lease lease, GoAwayFrame frame)
|
||||
{
|
||||
long lastId = frame.getLastId();
|
||||
int lastIdLength = VarLenInt.length(lastId);
|
||||
int length = VarLenInt.length(FrameType.GOAWAY.type()) + VarLenInt.length(lastIdLength) + lastIdLength;
|
||||
ByteBuffer buffer = lease.acquire(length, useDirectByteBuffers);
|
||||
VarLenInt.generate(buffer, FrameType.GOAWAY.type());
|
||||
VarLenInt.generate(buffer, lastIdLength);
|
||||
VarLenInt.generate(buffer, lastId);
|
||||
buffer.flip();
|
||||
lease.append(buffer, true);
|
||||
return length;
|
||||
}
|
||||
}
|
||||
|
@ -24,7 +24,7 @@ public class MessageGenerator
|
||||
|
||||
public MessageGenerator(QpackEncoder encoder, int maxHeadersLength, boolean useDirectByteBuffers)
|
||||
{
|
||||
generators[FrameType.DATA.type()] = new DataGenerator();
|
||||
generators[FrameType.DATA.type()] = new DataGenerator(useDirectByteBuffers);
|
||||
generators[FrameType.HEADERS.type()] = new HeadersGenerator(encoder, maxHeadersLength, useDirectByteBuffers);
|
||||
generators[FrameType.PUSH_PROMISE.type()] = new PushPromiseGenerator();
|
||||
}
|
||||
|
@ -24,6 +24,13 @@ import org.eclipse.jetty.util.BufferUtil;
|
||||
|
||||
public class SettingsGenerator extends FrameGenerator
|
||||
{
|
||||
private final boolean useDirectByteBuffers;
|
||||
|
||||
public SettingsGenerator(boolean useDirectByteBuffers)
|
||||
{
|
||||
this.useDirectByteBuffers = useDirectByteBuffers;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int generate(ByteBufferPool.Lease lease, long streamId, Frame frame)
|
||||
{
|
||||
@ -40,8 +47,7 @@ public class SettingsGenerator extends FrameGenerator
|
||||
length += VarLenInt.length(e.getKey()) + VarLenInt.length(e.getValue());
|
||||
}
|
||||
int capacity = VarLenInt.length(frame.getFrameType().type()) + VarLenInt.length(length) + length;
|
||||
// TODO: configure buffer directness.
|
||||
ByteBuffer buffer = lease.acquire(capacity, true);
|
||||
ByteBuffer buffer = lease.acquire(capacity, useDirectByteBuffers);
|
||||
VarLenInt.generate(buffer, frame.getFrameType().type());
|
||||
VarLenInt.generate(buffer, length);
|
||||
for (Map.Entry<Long, Long> e : settings.entrySet())
|
||||
|
@ -15,6 +15,7 @@ package org.eclipse.jetty.http3.internal.parser;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.eclipse.jetty.http3.frames.GoAwayFrame;
|
||||
import org.eclipse.jetty.http3.frames.SettingsFrame;
|
||||
import org.eclipse.jetty.http3.internal.ErrorCode;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
@ -109,6 +110,18 @@ public abstract class BodyParser
|
||||
}
|
||||
}
|
||||
|
||||
protected void notifyGoAway(GoAwayFrame frame)
|
||||
{
|
||||
try
|
||||
{
|
||||
listener.onGoAway(frame);
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
LOG.info("failure while notifying listener {}", listener, x);
|
||||
}
|
||||
}
|
||||
|
||||
public enum Result
|
||||
{
|
||||
NO_FRAME, FRAGMENT_FRAME, WHOLE_FRAME
|
||||
|
@ -15,8 +15,13 @@ package org.eclipse.jetty.http3.internal.parser;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.eclipse.jetty.http3.frames.GoAwayFrame;
|
||||
import org.eclipse.jetty.http3.internal.VarLenInt;
|
||||
|
||||
public class GoAwayBodyParser extends BodyParser
|
||||
{
|
||||
private final VarLenInt varLenInt = new VarLenInt();
|
||||
|
||||
public GoAwayBodyParser(HeaderParser headerParser, ParserListener listener)
|
||||
{
|
||||
super(headerParser, listener);
|
||||
@ -25,6 +30,14 @@ public class GoAwayBodyParser extends BodyParser
|
||||
@Override
|
||||
public Result parse(ByteBuffer buffer)
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
if (varLenInt.parseLong(buffer, this::onGoAway))
|
||||
return Result.WHOLE_FRAME;
|
||||
return Result.NO_FRAME;
|
||||
}
|
||||
|
||||
private void onGoAway(long id)
|
||||
{
|
||||
GoAwayFrame frame = new GoAwayFrame(id);
|
||||
notifyGoAway(frame);
|
||||
}
|
||||
}
|
||||
|
@ -14,6 +14,7 @@
|
||||
package org.eclipse.jetty.http3.internal.parser;
|
||||
|
||||
import org.eclipse.jetty.http3.frames.DataFrame;
|
||||
import org.eclipse.jetty.http3.frames.GoAwayFrame;
|
||||
import org.eclipse.jetty.http3.frames.HeadersFrame;
|
||||
import org.eclipse.jetty.http3.frames.SettingsFrame;
|
||||
|
||||
@ -31,6 +32,10 @@ public interface ParserListener
|
||||
{
|
||||
}
|
||||
|
||||
public default void onGoAway(GoAwayFrame frame)
|
||||
{
|
||||
}
|
||||
|
||||
public default void onStreamFailure(long streamId, long error, Throwable failure)
|
||||
{
|
||||
}
|
||||
|
@ -0,0 +1,61 @@
|
||||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
|
||||
//
|
||||
// This program and the accompanying materials are made available under the
|
||||
// terms of the Eclipse Public License v. 2.0 which is available at
|
||||
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
|
||||
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
|
||||
//
|
||||
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.http3.internal;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.eclipse.jetty.http3.frames.GoAwayFrame;
|
||||
import org.eclipse.jetty.http3.internal.generator.ControlGenerator;
|
||||
import org.eclipse.jetty.http3.internal.parser.ControlParser;
|
||||
import org.eclipse.jetty.http3.internal.parser.ParserListener;
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.io.NullByteBufferPool;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
|
||||
public class GoAwayGenerateParseTest
|
||||
{
|
||||
@Test
|
||||
public void testGenerateParse()
|
||||
{
|
||||
GoAwayFrame input = GoAwayFrame.CLIENT_GRACEFUL;
|
||||
|
||||
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(new NullByteBufferPool());
|
||||
new ControlGenerator(true).generate(lease, 0, input);
|
||||
|
||||
List<GoAwayFrame> frames = new ArrayList<>();
|
||||
ControlParser parser = new ControlParser(new ParserListener()
|
||||
{
|
||||
@Override
|
||||
public void onGoAway(GoAwayFrame frame)
|
||||
{
|
||||
frames.add(frame);
|
||||
}
|
||||
});
|
||||
for (ByteBuffer buffer : lease.getByteBuffers())
|
||||
{
|
||||
parser.parse(buffer);
|
||||
assertFalse(buffer.hasRemaining());
|
||||
}
|
||||
|
||||
assertEquals(1, frames.size());
|
||||
GoAwayFrame output = frames.get(0);
|
||||
|
||||
assertEquals(input.getLastId(), output.getLastId());
|
||||
}
|
||||
}
|
@ -48,7 +48,7 @@ public class SettingsGenerateParseTest
|
||||
SettingsFrame input = new SettingsFrame(settings);
|
||||
|
||||
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(new NullByteBufferPool());
|
||||
new ControlGenerator().generate(lease, 0, input);
|
||||
new ControlGenerator(true).generate(lease, 0, input);
|
||||
|
||||
List<SettingsFrame> frames = new ArrayList<>();
|
||||
ControlParser parser = new ControlParser(new ParserListener()
|
||||
|
@ -16,6 +16,7 @@ package org.eclipse.jetty.http3.server.internal;
|
||||
import org.eclipse.jetty.http.MetaData;
|
||||
import org.eclipse.jetty.http3.api.Session;
|
||||
import org.eclipse.jetty.http3.frames.Frame;
|
||||
import org.eclipse.jetty.http3.frames.GoAwayFrame;
|
||||
import org.eclipse.jetty.http3.frames.HeadersFrame;
|
||||
import org.eclipse.jetty.http3.internal.HTTP3Session;
|
||||
import org.eclipse.jetty.http3.internal.HTTP3Stream;
|
||||
@ -62,9 +63,8 @@ public class HTTP3SessionServer extends HTTP3Session implements Session.Server
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("received request {}#{} on {}", frame, streamId, this);
|
||||
stream.processRequest(frame);
|
||||
if (frame.isLast())
|
||||
removeStream(stream);
|
||||
updateLastId(streamId);
|
||||
stream.onRequest(frame);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -73,9 +73,23 @@ public class HTTP3SessionServer extends HTTP3Session implements Session.Server
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeFrame(long streamId, Frame frame, Callback callback)
|
||||
public void writeControlFrame(Frame frame, Callback callback)
|
||||
{
|
||||
getProtocolSession().writeFrame(streamId, frame, callback);
|
||||
getProtocolSession().writeControlFrame(frame, callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeMessageFrame(long streamId, Frame frame, Callback callback)
|
||||
{
|
||||
getProtocolSession().writeMessageFrame(streamId, frame, callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected GoAwayFrame newGoAwayFrame(boolean graceful)
|
||||
{
|
||||
if (graceful)
|
||||
return GoAwayFrame.SERVER_GRACEFUL;
|
||||
return super.newGoAwayFrame(graceful);
|
||||
}
|
||||
|
||||
private void notifyAccept()
|
||||
|
@ -71,7 +71,7 @@ public class ServerHTTP3Session extends ServerProtocolSession
|
||||
|
||||
long controlStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL);
|
||||
QuicStreamEndPoint controlEndPoint = configureControlEndPoint(controlStreamId);
|
||||
this.controlFlusher = new ControlFlusher(session, controlEndPoint);
|
||||
this.controlFlusher = new ControlFlusher(session, controlEndPoint, true);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("created control stream #{} on {}", controlStreamId, controlEndPoint);
|
||||
|
||||
@ -149,11 +149,11 @@ public class ServerHTTP3Session extends ServerProtocolSession
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onClosed(CloseInfo closeInfo)
|
||||
protected void onClose(CloseInfo closeInfo)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("session closed remotely {} {}", closeInfo, this);
|
||||
notifySessionFailure(closeInfo);
|
||||
session.onClose(closeInfo.error(), closeInfo.reason());
|
||||
}
|
||||
|
||||
private void configureUnidirectionalStreamEndPoint(QuicStreamEndPoint endPoint)
|
||||
@ -164,7 +164,13 @@ public class ServerHTTP3Session extends ServerProtocolSession
|
||||
connection.onOpen();
|
||||
}
|
||||
|
||||
void writeFrame(long streamId, Frame frame, Callback callback)
|
||||
void writeControlFrame(Frame frame, Callback callback)
|
||||
{
|
||||
controlFlusher.offer(frame, callback);
|
||||
controlFlusher.iterate();
|
||||
}
|
||||
|
||||
void writeMessageFrame(long streamId, Frame frame, Callback callback)
|
||||
{
|
||||
QuicStreamEndPoint endPoint = getOrCreateStreamEndPoint(streamId, this::configureProtocolEndPoint);
|
||||
messageFlusher.offer(endPoint, frame, callback);
|
||||
@ -175,17 +181,4 @@ public class ServerHTTP3Session extends ServerProtocolSession
|
||||
{
|
||||
session.onDataAvailable(streamId);
|
||||
}
|
||||
|
||||
private void notifySessionFailure(CloseInfo closeInfo)
|
||||
{
|
||||
Session.Listener listener = session.getListener();
|
||||
try
|
||||
{
|
||||
listener.onSessionFailure(session, closeInfo.error(), closeInfo.reason());
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
LOG.info("failure notifying listener {}", listener, x);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -13,6 +13,14 @@
|
||||
|
||||
package org.eclipse.jetty.http3.tests;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.eclipse.jetty.http.HttpFields;
|
||||
import org.eclipse.jetty.http.HttpMethod;
|
||||
import org.eclipse.jetty.http.HttpURI;
|
||||
import org.eclipse.jetty.http.HttpVersion;
|
||||
import org.eclipse.jetty.http.MetaData;
|
||||
import org.eclipse.jetty.http3.api.Session;
|
||||
import org.eclipse.jetty.http3.client.HTTP3Client;
|
||||
import org.eclipse.jetty.http3.server.RawHTTP3ServerConnectionFactory;
|
||||
@ -25,7 +33,7 @@ import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.extension.BeforeTestExecutionCallback;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
|
||||
public class AbstractHTTP3ClientServerTest
|
||||
public class AbstractClientServerTest
|
||||
{
|
||||
@RegisterExtension
|
||||
final BeforeTestExecutionCallback printMethodName = context ->
|
||||
@ -34,6 +42,12 @@ public class AbstractHTTP3ClientServerTest
|
||||
protected HTTP3Client client;
|
||||
protected Server server;
|
||||
|
||||
protected void start(Session.Server.Listener listener) throws Exception
|
||||
{
|
||||
startServer(listener);
|
||||
startClient();
|
||||
}
|
||||
|
||||
protected void startServer(Session.Server.Listener listener) throws Exception
|
||||
{
|
||||
SslContextFactory.Server sslContextFactory = new SslContextFactory.Server();
|
||||
@ -53,6 +67,28 @@ public class AbstractHTTP3ClientServerTest
|
||||
client.start();
|
||||
}
|
||||
|
||||
protected Session.Client newSession(Session.Client.Listener listener) throws Exception
|
||||
{
|
||||
InetSocketAddress address = new InetSocketAddress("localhost", connector.getLocalPort());
|
||||
return client.connect(address, listener).get(5, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
protected MetaData.Request newRequest(String path)
|
||||
{
|
||||
return newRequest(HttpMethod.GET, path);
|
||||
}
|
||||
|
||||
protected MetaData.Request newRequest(HttpMethod method, String path)
|
||||
{
|
||||
return newRequest(method, path, HttpFields.EMPTY);
|
||||
}
|
||||
|
||||
protected MetaData.Request newRequest(HttpMethod method, String path, HttpFields fields)
|
||||
{
|
||||
HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort() + (path == null ? "/" : path));
|
||||
return new MetaData.Request(method.asString(), uri, HttpVersion.HTTP_3, fields);
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void dispose()
|
||||
{
|
@ -13,7 +13,6 @@
|
||||
|
||||
package org.eclipse.jetty.http3.tests;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
@ -24,7 +23,6 @@ import java.util.concurrent.atomic.AtomicReference;
|
||||
import org.eclipse.jetty.http.HttpFields;
|
||||
import org.eclipse.jetty.http.HttpMethod;
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.http.HttpURI;
|
||||
import org.eclipse.jetty.http.HttpVersion;
|
||||
import org.eclipse.jetty.http.MetaData;
|
||||
import org.eclipse.jetty.http3.api.Session;
|
||||
@ -40,14 +38,14 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class HTTP3ClientServerTest extends AbstractHTTP3ClientServerTest
|
||||
public class ClientServerTest extends AbstractClientServerTest
|
||||
{
|
||||
@Test
|
||||
public void testConnectTriggersSettingsFrame() throws Exception
|
||||
{
|
||||
CountDownLatch serverPrefaceLatch = new CountDownLatch(1);
|
||||
CountDownLatch serverSettingsLatch = new CountDownLatch(1);
|
||||
startServer(new Session.Server.Listener()
|
||||
start(new Session.Server.Listener()
|
||||
{
|
||||
@Override
|
||||
public Map<Long, Long> onPreface(Session session)
|
||||
@ -62,26 +60,24 @@ public class HTTP3ClientServerTest extends AbstractHTTP3ClientServerTest
|
||||
serverSettingsLatch.countDown();
|
||||
}
|
||||
});
|
||||
startClient();
|
||||
|
||||
CountDownLatch clientPrefaceLatch = new CountDownLatch(1);
|
||||
CountDownLatch clientSettingsLatch = new CountDownLatch(1);
|
||||
Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener()
|
||||
Session.Client session = newSession(new Session.Client.Listener()
|
||||
{
|
||||
@Override
|
||||
public Map<Long, Long> onPreface(Session session)
|
||||
{
|
||||
@Override
|
||||
public Map<Long, Long> onPreface(Session session)
|
||||
{
|
||||
clientPrefaceLatch.countDown();
|
||||
return Map.of();
|
||||
}
|
||||
clientPrefaceLatch.countDown();
|
||||
return Map.of();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSettings(Session session, SettingsFrame frame)
|
||||
{
|
||||
clientSettingsLatch.countDown();
|
||||
}
|
||||
})
|
||||
.get(5, TimeUnit.SECONDS);
|
||||
@Override
|
||||
public void onSettings(Session session, SettingsFrame frame)
|
||||
{
|
||||
clientSettingsLatch.countDown();
|
||||
}
|
||||
});
|
||||
assertNotNull(session);
|
||||
|
||||
assertTrue(serverSettingsLatch.await(5, TimeUnit.SECONDS));
|
||||
@ -92,7 +88,7 @@ public class HTTP3ClientServerTest extends AbstractHTTP3ClientServerTest
|
||||
public void testGETThenResponseWithoutContent() throws Exception
|
||||
{
|
||||
CountDownLatch serverRequestLatch = new CountDownLatch(1);
|
||||
startServer(new Session.Server.Listener()
|
||||
start(new Session.Server.Listener()
|
||||
{
|
||||
@Override
|
||||
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
|
||||
@ -104,15 +100,11 @@ public class HTTP3ClientServerTest extends AbstractHTTP3ClientServerTest
|
||||
return null;
|
||||
}
|
||||
});
|
||||
startClient();
|
||||
|
||||
Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {})
|
||||
.get(5, TimeUnit.SECONDS);
|
||||
Session.Client session = newSession(new Session.Client.Listener() {});
|
||||
|
||||
CountDownLatch clientResponseLatch = new CountDownLatch(1);
|
||||
HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/");
|
||||
MetaData.Request metaData = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY);
|
||||
HeadersFrame frame = new HeadersFrame(metaData, true);
|
||||
HeadersFrame frame = new HeadersFrame(newRequest("/"), true);
|
||||
Stream stream = session.newRequest(frame, new Stream.Listener()
|
||||
{
|
||||
@Override
|
||||
@ -132,7 +124,7 @@ public class HTTP3ClientServerTest extends AbstractHTTP3ClientServerTest
|
||||
public void testDiscardRequestContent() throws Exception
|
||||
{
|
||||
AtomicReference<CountDownLatch> serverLatch = new AtomicReference<>(new CountDownLatch(1));
|
||||
startServer(new Session.Server.Listener()
|
||||
start(new Session.Server.Listener()
|
||||
{
|
||||
@Override
|
||||
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
|
||||
@ -163,15 +155,11 @@ public class HTTP3ClientServerTest extends AbstractHTTP3ClientServerTest
|
||||
};
|
||||
}
|
||||
});
|
||||
startClient();
|
||||
|
||||
Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {})
|
||||
.get(5, TimeUnit.SECONDS);
|
||||
Session.Client session = newSession(new Session.Client.Listener() {});
|
||||
|
||||
AtomicReference<CountDownLatch> clientLatch = new AtomicReference<>(new CountDownLatch(1));
|
||||
HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/");
|
||||
MetaData.Request metaData = new MetaData.Request(HttpMethod.POST.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY);
|
||||
HeadersFrame frame = new HeadersFrame(metaData, false);
|
||||
HeadersFrame frame = new HeadersFrame(newRequest(HttpMethod.POST, "/"), false);
|
||||
Stream.Listener streamListener = new Stream.Listener()
|
||||
{
|
||||
@Override
|
||||
@ -204,7 +192,7 @@ public class HTTP3ClientServerTest extends AbstractHTTP3ClientServerTest
|
||||
@ValueSource(ints = {1024, 10 * 1024, 100 * 1024, 1000 * 1024})
|
||||
public void testEchoRequestContentAsResponseContent(int length) throws Exception
|
||||
{
|
||||
startServer(new Session.Server.Listener()
|
||||
start(new Session.Server.Listener()
|
||||
{
|
||||
@Override
|
||||
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
|
||||
@ -234,15 +222,11 @@ public class HTTP3ClientServerTest extends AbstractHTTP3ClientServerTest
|
||||
};
|
||||
}
|
||||
});
|
||||
startClient();
|
||||
|
||||
Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {})
|
||||
.get(5, TimeUnit.SECONDS);
|
||||
Session.Client session = newSession(new Session.Client.Listener() {});
|
||||
|
||||
CountDownLatch clientResponseLatch = new CountDownLatch(1);
|
||||
HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/");
|
||||
MetaData.Request metaData = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY);
|
||||
HeadersFrame frame = new HeadersFrame(metaData, false);
|
||||
HeadersFrame frame = new HeadersFrame(newRequest("/"), false);
|
||||
byte[] bytesSent = new byte[length];
|
||||
new Random().nextBytes(bytesSent);
|
||||
byte[] bytesReceived = new byte[bytesSent.length];
|
@ -13,7 +13,6 @@
|
||||
|
||||
package org.eclipse.jetty.http3.tests;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
@ -26,8 +25,6 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.eclipse.jetty.http.HttpFields;
|
||||
import org.eclipse.jetty.http.HttpMethod;
|
||||
import org.eclipse.jetty.http.HttpURI;
|
||||
import org.eclipse.jetty.http.HttpVersion;
|
||||
import org.eclipse.jetty.http.MetaData;
|
||||
import org.eclipse.jetty.http3.api.Session;
|
||||
@ -46,7 +43,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
|
||||
public class DataDemandTest extends AbstractClientServerTest
|
||||
{
|
||||
@Test
|
||||
public void testOnDataAvailableThenExit() throws Exception
|
||||
@ -55,7 +52,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
|
||||
CountDownLatch serverStreamLatch = new CountDownLatch(1);
|
||||
CountDownLatch serverDataLatch = new CountDownLatch(1);
|
||||
AtomicLong onDataAvailableCalls = new AtomicLong();
|
||||
startServer(new Session.Server.Listener()
|
||||
start(new Session.Server.Listener()
|
||||
{
|
||||
@Override
|
||||
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
|
||||
@ -85,14 +82,10 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
|
||||
};
|
||||
}
|
||||
});
|
||||
startClient();
|
||||
|
||||
Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {})
|
||||
.get(5, TimeUnit.SECONDS);
|
||||
Session.Client session = newSession(new Session.Client.Listener() {});
|
||||
|
||||
HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/");
|
||||
MetaData.Request metaData = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY);
|
||||
HeadersFrame request = new HeadersFrame(metaData, false);
|
||||
HeadersFrame request = new HeadersFrame(newRequest("/"), false);
|
||||
Stream stream = session.newRequest(request, new Stream.Listener() {}).get(5, TimeUnit.SECONDS);
|
||||
stream.data(new DataFrame(ByteBuffer.allocate(8192), true));
|
||||
|
||||
@ -114,7 +107,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
|
||||
CountDownLatch serverStreamLatch = new CountDownLatch(1);
|
||||
CountDownLatch serverDataLatch = new CountDownLatch(1);
|
||||
AtomicLong onDataAvailableCalls = new AtomicLong();
|
||||
startServer(new Session.Server.Listener()
|
||||
start(new Session.Server.Listener()
|
||||
{
|
||||
@Override
|
||||
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
|
||||
@ -146,14 +139,10 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
|
||||
};
|
||||
}
|
||||
});
|
||||
startClient();
|
||||
|
||||
Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {})
|
||||
.get(5, TimeUnit.SECONDS);
|
||||
Session.Client session = newSession(new Session.Client.Listener() {});
|
||||
|
||||
HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/");
|
||||
MetaData.Request metaData = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY);
|
||||
HeadersFrame request = new HeadersFrame(metaData, false);
|
||||
HeadersFrame request = new HeadersFrame(newRequest("/"), false);
|
||||
Stream stream = session.newRequest(request, new Stream.Listener() {}).get(5, TimeUnit.SECONDS);
|
||||
stream.data(new DataFrame(ByteBuffer.allocate(16), false));
|
||||
|
||||
@ -180,7 +169,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
|
||||
CountDownLatch serverStreamLatch = new CountDownLatch(1);
|
||||
CountDownLatch serverDataLatch = new CountDownLatch(1);
|
||||
AtomicLong onDataAvailableCalls = new AtomicLong();
|
||||
startServer(new Session.Server.Listener()
|
||||
start(new Session.Server.Listener()
|
||||
{
|
||||
@Override
|
||||
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
|
||||
@ -218,14 +207,10 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
|
||||
};
|
||||
}
|
||||
});
|
||||
startClient();
|
||||
|
||||
Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {})
|
||||
.get(5, TimeUnit.SECONDS);
|
||||
Session.Client session = newSession(new Session.Client.Listener() {});
|
||||
|
||||
HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/");
|
||||
MetaData.Request metaData = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY);
|
||||
HeadersFrame request = new HeadersFrame(metaData, false);
|
||||
HeadersFrame request = new HeadersFrame(newRequest("/"), false);
|
||||
Stream stream = session.newRequest(request, new Stream.Listener() {}).get(5, TimeUnit.SECONDS);
|
||||
stream.data(new DataFrame(ByteBuffer.allocate(16), false));
|
||||
|
||||
@ -249,7 +234,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
|
||||
CountDownLatch serverDataLatch = new CountDownLatch(1);
|
||||
CountDownLatch serverTrailerLatch = new CountDownLatch(1);
|
||||
AtomicLong onDataAvailableCalls = new AtomicLong();
|
||||
startServer(new Session.Server.Listener()
|
||||
start(new Session.Server.Listener()
|
||||
{
|
||||
@Override
|
||||
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
|
||||
@ -277,14 +262,10 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
|
||||
};
|
||||
}
|
||||
});
|
||||
startClient();
|
||||
|
||||
Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {})
|
||||
.get(5, TimeUnit.SECONDS);
|
||||
Session.Client session = newSession(new Session.Client.Listener() {});
|
||||
|
||||
HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/");
|
||||
MetaData.Request metaData = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY);
|
||||
HeadersFrame request = new HeadersFrame(metaData, false);
|
||||
HeadersFrame request = new HeadersFrame(newRequest("/"), false);
|
||||
Stream stream = session.newRequest(request, new Stream.Listener() {}).get(5, TimeUnit.SECONDS);
|
||||
stream.trailer(new HeadersFrame(new MetaData(HttpVersion.HTTP_3, HttpFields.EMPTY), true)).get(5, TimeUnit.SECONDS);
|
||||
|
||||
@ -305,7 +286,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
|
||||
CountDownLatch serverDataLatch = new CountDownLatch(1);
|
||||
CountDownLatch serverTrailerLatch = new CountDownLatch(1);
|
||||
AtomicLong onDataAvailableCalls = new AtomicLong();
|
||||
startServer(new Session.Server.Listener()
|
||||
start(new Session.Server.Listener()
|
||||
{
|
||||
@Override
|
||||
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
|
||||
@ -334,14 +315,10 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
|
||||
};
|
||||
}
|
||||
});
|
||||
startClient();
|
||||
|
||||
Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {})
|
||||
.get(5, TimeUnit.SECONDS);
|
||||
Session.Client session = newSession(new Session.Client.Listener() {});
|
||||
|
||||
HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/");
|
||||
MetaData.Request metaData = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY);
|
||||
HeadersFrame request = new HeadersFrame(metaData, false);
|
||||
HeadersFrame request = new HeadersFrame(newRequest("/"), false);
|
||||
Stream stream = session.newRequest(request, new Stream.Listener() {}).get(5, TimeUnit.SECONDS);
|
||||
|
||||
stream.data(new DataFrame(ByteBuffer.allocate(dataLength), false));
|
||||
@ -363,7 +340,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
|
||||
{
|
||||
CountDownLatch serverDataLatch = new CountDownLatch(1);
|
||||
List<Stream.Data> datas = new ArrayList<>();
|
||||
startServer(new Session.Server.Listener()
|
||||
start(new Session.Server.Listener()
|
||||
{
|
||||
@Override
|
||||
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
|
||||
@ -391,14 +368,10 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
|
||||
};
|
||||
}
|
||||
});
|
||||
startClient();
|
||||
|
||||
Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {})
|
||||
.get(5, TimeUnit.SECONDS);
|
||||
Session.Client session = newSession(new Session.Client.Listener() {});
|
||||
|
||||
HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/");
|
||||
MetaData.Request metaData = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY);
|
||||
HeadersFrame request = new HeadersFrame(metaData, false);
|
||||
HeadersFrame request = new HeadersFrame(newRequest("/"), false);
|
||||
Stream stream = session.newRequest(request, new Stream.Listener() {}).get(5, TimeUnit.SECONDS);
|
||||
|
||||
byte[] bytesSent = new byte[16384];
|
||||
@ -421,7 +394,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
|
||||
CountDownLatch serverRequestLatch = new CountDownLatch(1);
|
||||
CountDownLatch serverDataLatch = new CountDownLatch(1);
|
||||
AtomicLong onDataAvailableCalls = new AtomicLong();
|
||||
startServer(new Session.Server.Listener()
|
||||
start(new Session.Server.Listener()
|
||||
{
|
||||
@Override
|
||||
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
|
||||
@ -443,14 +416,10 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
|
||||
};
|
||||
}
|
||||
});
|
||||
startClient();
|
||||
|
||||
Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {})
|
||||
.get(5, TimeUnit.SECONDS);
|
||||
Session.Client session = newSession(new Session.Client.Listener() {});
|
||||
|
||||
HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/");
|
||||
MetaData.Request metaData = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY);
|
||||
HeadersFrame request = new HeadersFrame(metaData, false);
|
||||
HeadersFrame request = new HeadersFrame(newRequest("/"), false);
|
||||
Stream stream = session.newRequest(request, new Stream.Listener() {}).get(5, TimeUnit.SECONDS);
|
||||
|
||||
stream.data(new DataFrame(ByteBuffer.allocate(4096), true));
|
||||
@ -472,7 +441,7 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
|
||||
{
|
||||
CountDownLatch blockLatch = new CountDownLatch(1);
|
||||
CountDownLatch dataLatch = new CountDownLatch(1);
|
||||
startServer(new Session.Server.Listener()
|
||||
start(new Session.Server.Listener()
|
||||
{
|
||||
@Override
|
||||
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
|
||||
@ -525,14 +494,10 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
|
||||
};
|
||||
}
|
||||
});
|
||||
startClient();
|
||||
|
||||
Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {})
|
||||
.get(5, TimeUnit.SECONDS);
|
||||
Session.Client session = newSession(new Session.Client.Listener() {});
|
||||
|
||||
HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/");
|
||||
MetaData.Request metaData = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY);
|
||||
HeadersFrame request = new HeadersFrame(metaData, false);
|
||||
HeadersFrame request = new HeadersFrame(newRequest("/"), false);
|
||||
Stream stream = session.newRequest(request, new Stream.Listener() {}).get(5, TimeUnit.SECONDS);
|
||||
|
||||
// Send a first chunk of data.
|
@ -0,0 +1,984 @@
|
||||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
|
||||
//
|
||||
// This program and the accompanying materials are made available under the
|
||||
// terms of the Eclipse Public License v. 2.0 which is available at
|
||||
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
|
||||
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
|
||||
//
|
||||
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.http3.tests;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.eclipse.jetty.http.HttpFields;
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.http.HttpVersion;
|
||||
import org.eclipse.jetty.http.MetaData;
|
||||
import org.eclipse.jetty.http3.api.Session;
|
||||
import org.eclipse.jetty.http3.api.Stream;
|
||||
import org.eclipse.jetty.http3.frames.DataFrame;
|
||||
import org.eclipse.jetty.http3.frames.GoAwayFrame;
|
||||
import org.eclipse.jetty.http3.frames.HeadersFrame;
|
||||
import org.eclipse.jetty.http3.frames.SettingsFrame;
|
||||
import org.eclipse.jetty.http3.internal.ErrorCode;
|
||||
import org.eclipse.jetty.http3.internal.HTTP3Session;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class GoAwayTest extends AbstractClientServerTest
|
||||
{
|
||||
@Test
|
||||
public void testClientGoAwayServerReplies() throws Exception
|
||||
{
|
||||
CountDownLatch serverLatch = new CountDownLatch(1);
|
||||
AtomicReference<Session> serverSessionRef = new AtomicReference<>();
|
||||
start(new Session.Server.Listener()
|
||||
{
|
||||
@Override
|
||||
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
serverSessionRef.set(stream.getSession());
|
||||
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY);
|
||||
stream.respond(new HeadersFrame(response, true));
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onGoAway(Session session, GoAwayFrame frame)
|
||||
{
|
||||
serverLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
CountDownLatch clientLatch = new CountDownLatch(1);
|
||||
Session.Client clientSession = newSession(new Session.Client.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onGoAway(Session session, GoAwayFrame frame)
|
||||
{
|
||||
clientLatch.countDown();
|
||||
}
|
||||
});
|
||||
clientSession.newRequest(new HeadersFrame(newRequest("/"), true), new Stream.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onResponse(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
MetaData.Response response = (MetaData.Response)frame.getMetaData();
|
||||
if (frame.isLast() && response.getStatus() == HttpStatus.OK_200)
|
||||
clientSession.goAway(false);
|
||||
}
|
||||
});
|
||||
|
||||
assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
await().atMost(1, TimeUnit.SECONDS).until(((HTTP3Session)serverSessionRef.get())::isClosed);
|
||||
await().atMost(1, TimeUnit.SECONDS).until(((HTTP3Session)clientSession)::isClosed);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testServerGoAwayWithInFlightStreamClientFailsStream() throws Exception
|
||||
{
|
||||
AtomicReference<Session> serverSessionRef = new AtomicReference<>();
|
||||
CountDownLatch serverGoAwayLatch = new CountDownLatch(1);
|
||||
CountDownLatch serverTerminateLatch = new CountDownLatch(1);
|
||||
start(new Session.Server.Listener()
|
||||
{
|
||||
@Override
|
||||
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
serverSessionRef.set(stream.getSession());
|
||||
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY);
|
||||
stream.respond(new HeadersFrame(response, true));
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onGoAway(Session session, GoAwayFrame frame)
|
||||
{
|
||||
serverGoAwayLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTerminate(Session session)
|
||||
{
|
||||
serverTerminateLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
|
||||
CountDownLatch clientTerminateLatch = new CountDownLatch(1);
|
||||
Session.Client clientSession = newSession(new Session.Client.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onGoAway(Session session, GoAwayFrame frame)
|
||||
{
|
||||
clientGoAwayLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTerminate(Session session)
|
||||
{
|
||||
clientTerminateLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
CountDownLatch streamFailureLatch = new CountDownLatch(1);
|
||||
clientSession.newRequest(new HeadersFrame(newRequest("/"), true), new Stream.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onResponse(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
// Simulate the server sending a GOAWAY while the client sends a second request.
|
||||
// The server sends a lastStreamId for the first request, and discards the second.
|
||||
serverSessionRef.get().goAway(false);
|
||||
// The client sends the second request and should eventually fail it
|
||||
// locally since it has a larger streamId, and the server discarded it.
|
||||
clientSession.newRequest(new HeadersFrame(newRequest("/"), true), new Stream.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onFailure(Stream stream, long error, Throwable failure)
|
||||
{
|
||||
streamFailureLatch.countDown();
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(streamFailureLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(clientTerminateLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(serverTerminateLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
await().atMost(1, TimeUnit.SECONDS).until(((HTTP3Session)clientSession)::isClosed);
|
||||
await().atMost(1, TimeUnit.SECONDS).until(((HTTP3Session)serverSessionRef.get())::isClosed);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testServerGracefulGoAway() throws Exception
|
||||
{
|
||||
CountDownLatch serverGoAwayLatch = new CountDownLatch(1);
|
||||
CountDownLatch serverTerminateLatch = new CountDownLatch(1);
|
||||
AtomicReference<Session> serverSessionRef = new AtomicReference<>();
|
||||
start(new Session.Server.Listener()
|
||||
{
|
||||
@Override
|
||||
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
serverSessionRef.set(stream.getSession());
|
||||
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY);
|
||||
stream.respond(new HeadersFrame(response, true));
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onGoAway(Session session, GoAwayFrame frame)
|
||||
{
|
||||
serverGoAwayLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTerminate(Session session)
|
||||
{
|
||||
serverTerminateLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
CountDownLatch clientGracefulGoAwayLatch = new CountDownLatch(1);
|
||||
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
|
||||
CountDownLatch clientTerminateLatch = new CountDownLatch(1);
|
||||
Session.Client clientSession = newSession(new Session.Client.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onGoAway(Session session, GoAwayFrame frame)
|
||||
{
|
||||
if (frame.isGraceful())
|
||||
clientGracefulGoAwayLatch.countDown();
|
||||
else
|
||||
clientGoAwayLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTerminate(Session session)
|
||||
{
|
||||
clientTerminateLatch.countDown();
|
||||
}
|
||||
});
|
||||
CountDownLatch clientLatch = new CountDownLatch(1);
|
||||
clientSession.newRequest(new HeadersFrame(newRequest("/"), true), new Stream.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onResponse(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
MetaData.Response response = (MetaData.Response)frame.getMetaData();
|
||||
if (frame.isLast() && response.getStatus() == HttpStatus.OK_200)
|
||||
clientLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
// Send a graceful GOAWAY from the server.
|
||||
// Because the server had no pending streams, it will send also a non-graceful GOAWAY.
|
||||
serverSessionRef.get().goAway(true);
|
||||
|
||||
assertTrue(clientGracefulGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(clientTerminateLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(serverTerminateLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
await().atMost(1, TimeUnit.SECONDS).until(((HTTP3Session)serverSessionRef.get())::isClosed);
|
||||
await().atMost(1, TimeUnit.SECONDS).until(((HTTP3Session)clientSession)::isClosed);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testServerGracefulGoAwayWithStreamsServerClosesWhenLastStreamCloses() throws Exception
|
||||
{
|
||||
CountDownLatch serverGoAwayLatch = new CountDownLatch(1);
|
||||
CountDownLatch serverTerminateLatch = new CountDownLatch(1);
|
||||
AtomicReference<Session> serverSessionRef = new AtomicReference<>();
|
||||
AtomicReference<Stream> serverStreamRef = new AtomicReference<>();
|
||||
start(new Session.Server.Listener()
|
||||
{
|
||||
@Override
|
||||
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
serverStreamRef.set(stream);
|
||||
Session session = stream.getSession();
|
||||
serverSessionRef.set(session);
|
||||
|
||||
// Send a graceful GOAWAY while processing a stream.
|
||||
session.goAway(true);
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onGoAway(Session session, GoAwayFrame frame)
|
||||
{
|
||||
serverGoAwayLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTerminate(Session session)
|
||||
{
|
||||
serverTerminateLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
CountDownLatch clientGracefulGoAwayLatch = new CountDownLatch(1);
|
||||
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
|
||||
CountDownLatch clientTerminateLatch = new CountDownLatch(1);
|
||||
Session.Client clientSession = newSession(new Session.Client.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onGoAway(Session session, GoAwayFrame frame)
|
||||
{
|
||||
if (frame.isGraceful())
|
||||
clientGracefulGoAwayLatch.countDown();
|
||||
else
|
||||
clientGoAwayLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTerminate(Session session)
|
||||
{
|
||||
clientTerminateLatch.countDown();
|
||||
}
|
||||
});
|
||||
CountDownLatch clientLatch = new CountDownLatch(1);
|
||||
clientSession.newRequest(new HeadersFrame(newRequest("/"), true), new Stream.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onResponse(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
MetaData.Response response = (MetaData.Response)frame.getMetaData();
|
||||
if (frame.isLast() && response.getStatus() == HttpStatus.OK_200)
|
||||
clientLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
// Wait for the graceful GOAWAY.
|
||||
assertTrue(clientGracefulGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
// Now the client cannot create new streams.
|
||||
CompletableFuture<Stream> streamCompletable = clientSession.newRequest(new HeadersFrame(newRequest("/"), true), null);
|
||||
assertThrows(ExecutionException.class, () -> streamCompletable.get(5, TimeUnit.SECONDS));
|
||||
|
||||
// The client must not reply to a graceful GOAWAY.
|
||||
assertFalse(serverGoAwayLatch.await(1, TimeUnit.SECONDS));
|
||||
|
||||
// Previous streams must complete successfully.
|
||||
Stream serverStream = serverStreamRef.get();
|
||||
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY);
|
||||
serverStream.respond(new HeadersFrame(response, true));
|
||||
|
||||
assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
// The server should have sent the GOAWAY after the last stream completed.
|
||||
|
||||
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(clientTerminateLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(serverTerminateLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
await().atMost(1, TimeUnit.SECONDS).until(((HTTP3Session)serverSessionRef.get())::isClosed);
|
||||
await().atMost(1, TimeUnit.SECONDS).until(((HTTP3Session)clientSession)::isClosed);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClientGoAwayWithStreamsServerClosesWhenLastStreamCloses() throws Exception
|
||||
{
|
||||
AtomicReference<Stream> serverStreamRef = new AtomicReference<>();
|
||||
CountDownLatch serverStreamLatch = new CountDownLatch(1);
|
||||
CountDownLatch serverGoAwayLatch = new CountDownLatch(1);
|
||||
CountDownLatch serverTerminateLatch = new CountDownLatch(1);
|
||||
start(new Session.Server.Listener()
|
||||
{
|
||||
@Override
|
||||
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
serverStreamRef.set(stream);
|
||||
serverStreamLatch.countDown();
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onGoAway(Session session, GoAwayFrame frame)
|
||||
{
|
||||
serverGoAwayLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTerminate(Session session)
|
||||
{
|
||||
serverTerminateLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
|
||||
CountDownLatch clientTerminateLatch = new CountDownLatch(1);
|
||||
Session.Client clientSession = newSession(new Session.Client.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onGoAway(Session session, GoAwayFrame frame)
|
||||
{
|
||||
clientGoAwayLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTerminate(Session session)
|
||||
{
|
||||
clientTerminateLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
CountDownLatch clientLatch = new CountDownLatch(1);
|
||||
clientSession.newRequest(new HeadersFrame(newRequest("/"), true), new Stream.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onResponse(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
MetaData.Response response = (MetaData.Response)frame.getMetaData();
|
||||
if (frame.isLast() && response.getStatus() == HttpStatus.OK_200)
|
||||
clientLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
assertTrue(serverStreamLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
// The client sends a GOAWAY.
|
||||
clientSession.goAway(false);
|
||||
|
||||
assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
// The client must not receive a GOAWAY until the all streams are completed.
|
||||
assertFalse(clientGoAwayLatch.await(1, TimeUnit.SECONDS));
|
||||
|
||||
// Complete the stream.
|
||||
Stream serverStream = serverStreamRef.get();
|
||||
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY);
|
||||
serverStream.respond(new HeadersFrame(response, true));
|
||||
|
||||
assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(serverTerminateLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(clientTerminateLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
await().atMost(1, TimeUnit.SECONDS).until(((HTTP3Session)serverStreamRef.get().getSession())::isClosed);
|
||||
await().atMost(1, TimeUnit.SECONDS).until(((HTTP3Session)clientSession)::isClosed);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testServerGracefulGoAwayWithStreamsClientGoAwayServerClosesWhenLastStreamCloses() throws Exception
|
||||
{
|
||||
AtomicReference<Stream> serverStreamRef = new AtomicReference<>();
|
||||
CountDownLatch serverStreamLatch = new CountDownLatch(1);
|
||||
CountDownLatch serverGoAwayLatch = new CountDownLatch(1);
|
||||
CountDownLatch serverTerminateLatch = new CountDownLatch(1);
|
||||
start(new Session.Server.Listener()
|
||||
{
|
||||
@Override
|
||||
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
serverStreamRef.set(stream);
|
||||
serverStreamLatch.countDown();
|
||||
|
||||
// Send a graceful GOAWAY while processing a stream.
|
||||
stream.getSession().goAway(true);
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onGoAway(Session session, GoAwayFrame frame)
|
||||
{
|
||||
serverGoAwayLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTerminate(Session session)
|
||||
{
|
||||
serverTerminateLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
|
||||
CountDownLatch clientTerminateLatch = new CountDownLatch(1);
|
||||
Session.Client clientSession = newSession(new Session.Client.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onGoAway(Session session, GoAwayFrame frame)
|
||||
{
|
||||
if (frame.isGraceful())
|
||||
{
|
||||
// Send a GOAWAY when receiving a graceful GOAWAY.
|
||||
session.goAway(false);
|
||||
}
|
||||
else
|
||||
{
|
||||
clientGoAwayLatch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTerminate(Session session)
|
||||
{
|
||||
clientTerminateLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
CountDownLatch clientLatch = new CountDownLatch(1);
|
||||
clientSession.newRequest(new HeadersFrame(newRequest("/"), true), new Stream.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onResponse(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
MetaData.Response response = (MetaData.Response)frame.getMetaData();
|
||||
if (frame.isLast() && response.getStatus() == HttpStatus.OK_200)
|
||||
clientLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
// The server has a pending stream, so it does not send the non-graceful GOAWAY yet.
|
||||
assertFalse(clientGoAwayLatch.await(1, TimeUnit.SECONDS));
|
||||
|
||||
// Complete the stream, the server should send the non-graceful GOAWAY.
|
||||
Stream serverStream = serverStreamRef.get();
|
||||
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY);
|
||||
serverStream.respond(new HeadersFrame(response, true));
|
||||
|
||||
// The server already received the client GOAWAY,
|
||||
// so completing the last stream produces a close event.
|
||||
assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(serverTerminateLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
|
||||
// The client should receive the server non-graceful GOAWAY.
|
||||
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(clientTerminateLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
await().atMost(1, TimeUnit.SECONDS).until(((HTTP3Session)serverStreamRef.get().getSession())::isClosed);
|
||||
await().atMost(1, TimeUnit.SECONDS).until(((HTTP3Session)clientSession)::isClosed);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClientGracefulGoAwayWithStreamsServerGracefulGoAwayServerClosesWhenLastStreamCloses() throws Exception
|
||||
{
|
||||
AtomicReference<Stream> serverStreamRef = new AtomicReference<>();
|
||||
CountDownLatch serverGoAwayLatch = new CountDownLatch(1);
|
||||
CountDownLatch serverTerminateLatch = new CountDownLatch(1);
|
||||
start(new Session.Server.Listener()
|
||||
{
|
||||
@Override
|
||||
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
serverStreamRef.set(stream);
|
||||
stream.demand();
|
||||
return new Stream.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onDataAvailable(Stream stream)
|
||||
{
|
||||
Stream.Data data = stream.readData();
|
||||
data.complete();
|
||||
if (data.isLast())
|
||||
{
|
||||
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY);
|
||||
stream.respond(new HeadersFrame(response, true));
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onGoAway(Session session, GoAwayFrame frame)
|
||||
{
|
||||
if (frame.isGraceful())
|
||||
{
|
||||
// Send a graceful GOAWAY.
|
||||
session.goAway(true);
|
||||
}
|
||||
else
|
||||
{
|
||||
serverGoAwayLatch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTerminate(Session session)
|
||||
{
|
||||
serverTerminateLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
CountDownLatch clientGracefulGoAwayLatch = new CountDownLatch(1);
|
||||
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
|
||||
CountDownLatch clientTerminateLatch = new CountDownLatch(1);
|
||||
Session.Client clientSession = newSession(new Session.Client.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onGoAway(Session session, GoAwayFrame frame)
|
||||
{
|
||||
if (frame.isGraceful())
|
||||
clientGracefulGoAwayLatch.countDown();
|
||||
else
|
||||
clientGoAwayLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTerminate(Session session)
|
||||
{
|
||||
clientTerminateLatch.countDown();
|
||||
}
|
||||
});
|
||||
Stream clientStream = clientSession.newRequest(new HeadersFrame(newRequest("/"), false), new Stream.Listener() {})
|
||||
.get(5, TimeUnit.SECONDS);
|
||||
|
||||
// Send a graceful GOAWAY from the client.
|
||||
clientSession.goAway(true);
|
||||
|
||||
// The server should send a graceful GOAWAY.
|
||||
assertTrue(clientGracefulGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
// Complete the stream.
|
||||
clientStream.data(new DataFrame(BufferUtil.EMPTY_BUFFER, true));
|
||||
|
||||
// Both client and server should send a non-graceful GOAWAY.
|
||||
assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(serverTerminateLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(clientTerminateLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
await().atMost(1, TimeUnit.SECONDS).until(((HTTP3Session)serverStreamRef.get().getSession())::isClosed);
|
||||
await().atMost(1, TimeUnit.SECONDS).until(((HTTP3Session)clientSession)::isClosed);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClientShutdownServerCloses() throws Exception
|
||||
{
|
||||
AtomicReference<Session> serverSessionRef = new AtomicReference<>();
|
||||
CountDownLatch settingsLatch = new CountDownLatch(2);
|
||||
CountDownLatch serverTerminateLatch = new CountDownLatch(1);
|
||||
start(new Session.Server.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onSettings(Session session, SettingsFrame frame)
|
||||
{
|
||||
serverSessionRef.set(session);
|
||||
settingsLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTerminate(Session session)
|
||||
{
|
||||
serverTerminateLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
Session.Client clientSession = newSession(new Session.Client.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onSettings(Session session, SettingsFrame frame)
|
||||
{
|
||||
settingsLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
assertTrue(settingsLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
// Issue a network close.
|
||||
((HTTP3Session)clientSession).close(ErrorCode.NO_ERROR.code(), "close");
|
||||
|
||||
assertTrue(serverTerminateLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
await().atMost(1, TimeUnit.SECONDS).until(((HTTP3Session)serverSessionRef.get())::isClosed);
|
||||
await().atMost(1, TimeUnit.SECONDS).until(((HTTP3Session)clientSession)::isClosed);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testServerGracefulGoAwayClientShutdownServerCloses() throws Exception
|
||||
{
|
||||
AtomicReference<Session> serverSessionRef = new AtomicReference<>();
|
||||
CountDownLatch settingsLatch = new CountDownLatch(2);
|
||||
CountDownLatch serverTerminateLatch = new CountDownLatch(1);
|
||||
start(new Session.Server.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onSettings(Session session, SettingsFrame frame)
|
||||
{
|
||||
serverSessionRef.set(session);
|
||||
settingsLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTerminate(Session session)
|
||||
{
|
||||
serverTerminateLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
Session.Client clientSession = newSession(new Session.Client.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onSettings(Session session, SettingsFrame frame)
|
||||
{
|
||||
settingsLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onGoAway(Session session, GoAwayFrame frame)
|
||||
{
|
||||
// Reply to the graceful GOAWAY from the server with a network close.
|
||||
((HTTP3Session)session).close(ErrorCode.NO_ERROR.code(), "close");
|
||||
}
|
||||
});
|
||||
|
||||
assertTrue(settingsLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
// Send a graceful GOAWAY to the client.
|
||||
serverSessionRef.get().goAway(true);
|
||||
|
||||
assertTrue(serverTerminateLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
await().atMost(1, TimeUnit.SECONDS).until(((HTTP3Session)serverSessionRef.get())::isClosed);
|
||||
await().atMost(1, TimeUnit.SECONDS).until(((HTTP3Session)clientSession)::isClosed);
|
||||
}
|
||||
|
||||
/*
|
||||
@Test
|
||||
public void testServerIdleTimeout() throws Exception
|
||||
{
|
||||
long idleTimeout = 1000;
|
||||
|
||||
AtomicReference<Session> serverSessionRef = new AtomicReference<>();
|
||||
CountDownLatch serverIdleTimeoutLatch = new CountDownLatch(1);
|
||||
CountDownLatch serverTerminateLatch = new CountDownLatch(1);
|
||||
start(new Session.Server.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onAccept(Session session)
|
||||
{
|
||||
serverSessionRef.set(session);
|
||||
((HTTP2Session)session).getEndPoint().setIdleTimeout(idleTimeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean onIdleTimeout(Session session)
|
||||
{
|
||||
serverIdleTimeoutLatch.countDown();
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onGoAway(Session session, GoAwayFrame frame)
|
||||
{
|
||||
serverTerminateLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
|
||||
CountDownLatch clientTerminateLatch = new CountDownLatch(1);
|
||||
Session.Client clientSession = newSession(new Session.Client.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onGoAway(Session session, GoAwayFrame frame)
|
||||
{
|
||||
if (!frame.isGraceful())
|
||||
clientGoAwayLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onGoAway(Session session, GoAwayFrame frame)
|
||||
{
|
||||
clientTerminateLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
assertTrue(serverIdleTimeoutLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
|
||||
// Server should send a GOAWAY to the client.
|
||||
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||
// The client replied to server's GOAWAY, but the server already closed.
|
||||
assertTrue(clientTerminateLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(serverTerminateLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
|
||||
assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testServerGracefulGoAwayWithStreamsServerIdleTimeout() throws Exception
|
||||
{
|
||||
long idleTimeout = 1000;
|
||||
|
||||
AtomicReference<Session> serverSessionRef = new AtomicReference<>();
|
||||
CountDownLatch serverTerminateLatch = new CountDownLatch(1);
|
||||
start(new Session.Server.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onAccept(Session session)
|
||||
{
|
||||
serverSessionRef.set(session);
|
||||
((HTTP2Session)session).getEndPoint().setIdleTimeout(idleTimeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
stream.setIdleTimeout(10 * idleTimeout);
|
||||
// Send a graceful GOAWAY.
|
||||
((HTTP2Session)stream.getSession()).goAway(GoAwayFrame.GRACEFUL, Callback.NOOP);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onGoAway(Session session, GoAwayFrame frame)
|
||||
{
|
||||
serverTerminateLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
CountDownLatch clientGracefulGoAwayLatch = new CountDownLatch(1);
|
||||
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
|
||||
CountDownLatch clientTerminateLatch = new CountDownLatch(1);
|
||||
Session.Client clientSession = newSession(new Session.Client.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onGoAway(Session session, GoAwayFrame frame)
|
||||
{
|
||||
if (frame.isGraceful())
|
||||
clientGracefulGoAwayLatch.countDown();
|
||||
else
|
||||
clientGoAwayLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onGoAway(Session session, GoAwayFrame frame)
|
||||
{
|
||||
clientTerminateLatch.countDown();
|
||||
}
|
||||
});
|
||||
CountDownLatch clientResetLatch = new CountDownLatch(1);
|
||||
MetaData.Request request = newRequest(HttpMethod.GET.asString(), HttpFields.EMPTY);
|
||||
// Send request headers but not data.
|
||||
clientSession.newRequest(new HeadersFrame(request, null, false), new Promise.Adapter<>(), new Stream.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onReset(Stream stream, ResetFrame frame)
|
||||
{
|
||||
clientResetLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
assertTrue(clientGracefulGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||
// Server idle timeout sends a non-graceful GOAWAY.
|
||||
assertTrue(clientResetLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
|
||||
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(serverTerminateLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(clientTerminateLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
|
||||
assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClientGracefulGoAwayWithStreamsServerIdleTimeout() throws Exception
|
||||
{
|
||||
long idleTimeout = 1000;
|
||||
|
||||
AtomicReference<Session> serverSessionRef = new AtomicReference<>();
|
||||
CountDownLatch serverGracefulGoAwayLatch = new CountDownLatch(1);
|
||||
CountDownLatch serverTerminateLatch = new CountDownLatch(1);
|
||||
start(new Session.Server.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onAccept(Session session)
|
||||
{
|
||||
serverSessionRef.set(session);
|
||||
((HTTP2Session)session).getEndPoint().setIdleTimeout(idleTimeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
stream.setIdleTimeout(10 * idleTimeout);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onGoAway(Session session, GoAwayFrame frame)
|
||||
{
|
||||
if (frame.isGraceful())
|
||||
serverGracefulGoAwayLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onGoAway(Session session, GoAwayFrame frame)
|
||||
{
|
||||
serverTerminateLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
|
||||
CountDownLatch clientTerminateLatch = new CountDownLatch(1);
|
||||
Session.Client clientSession = newSession(new Session.Client.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onGoAway(Session session, GoAwayFrame frame)
|
||||
{
|
||||
clientGoAwayLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onGoAway(Session session, GoAwayFrame frame)
|
||||
{
|
||||
clientTerminateLatch.countDown();
|
||||
}
|
||||
});
|
||||
MetaData.Request request = newRequest(HttpMethod.GET.asString(), HttpFields.EMPTY);
|
||||
CountDownLatch streamResetLatch = new CountDownLatch(1);
|
||||
clientSession.newRequest(new HeadersFrame(request, null, false), new Promise.Adapter<>(), new Stream.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onReset(Stream stream, ResetFrame frame)
|
||||
{
|
||||
streamResetLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
// Client sends a graceful GOAWAY.
|
||||
((HTTP2Session)clientSession).goAway(GoAwayFrame.GRACEFUL, Callback.NOOP);
|
||||
|
||||
assertTrue(serverGracefulGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(streamResetLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(clientGoAwayLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
|
||||
assertTrue(serverTerminateLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(clientTerminateLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
|
||||
assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testServerGoAwayWithStreamsThenStop() throws Exception
|
||||
{
|
||||
AtomicReference<Session> serverSessionRef = new AtomicReference<>();
|
||||
CountDownLatch serverTerminateLatch = new CountDownLatch(1);
|
||||
start(new Session.Server.Listener()
|
||||
{
|
||||
@Override
|
||||
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
serverSessionRef.set(stream.getSession());
|
||||
// Don't reply, don't reset the stream, just send the GOAWAY.
|
||||
stream.getSession().close(ErrorCode.NO_ERROR.code, "close", Callback.NOOP);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onGoAway(Session session, GoAwayFrame frame)
|
||||
{
|
||||
serverTerminateLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
|
||||
CountDownLatch clientTerminateLatch = new CountDownLatch(1);
|
||||
Session.Client clientSession = newSession(new Session.Client.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onGoAway(Session session, GoAwayFrame frame)
|
||||
{
|
||||
clientGoAwayLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onGoAway(Session session, GoAwayFrame frame)
|
||||
{
|
||||
clientTerminateLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
MetaData.Request request = newRequest(HttpMethod.GET.asString(), HttpFields.EMPTY);
|
||||
CountDownLatch clientResetLatch = new CountDownLatch(1);
|
||||
clientSession.newRequest(new HeadersFrame(request, null, false), new Promise.Adapter<>(), new Stream.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onReset(Stream stream, ResetFrame frame)
|
||||
{
|
||||
clientResetLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
// Neither the client nor the server are finishing
|
||||
// the pending stream, so force the stop on the server.
|
||||
LifeCycle.stop(serverSessionRef.get());
|
||||
|
||||
// The server should reset all the pending streams.
|
||||
assertTrue(clientResetLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(serverTerminateLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(clientTerminateLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
|
||||
assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
|
||||
}
|
||||
*/
|
||||
}
|
@ -19,9 +19,7 @@ import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.eclipse.jetty.http.HttpFields;
|
||||
import org.eclipse.jetty.http.HttpMethod;
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.http.HttpURI;
|
||||
import org.eclipse.jetty.http.HttpVersion;
|
||||
import org.eclipse.jetty.http.MetaData;
|
||||
import org.eclipse.jetty.http3.api.Session;
|
||||
@ -36,14 +34,14 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class StreamIdleTimeoutTest extends AbstractHTTP3ClientServerTest
|
||||
public class StreamIdleTimeoutTest extends AbstractClientServerTest
|
||||
{
|
||||
@Test
|
||||
public void testClientStreamIdleTimeout() throws Exception
|
||||
{
|
||||
AtomicReference<Session> serverSessionRef = new AtomicReference<>();
|
||||
CountDownLatch serverLatch = new CountDownLatch(1);
|
||||
startServer(new Session.Server.Listener()
|
||||
start(new Session.Server.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onAccept(Session session)
|
||||
@ -97,18 +95,14 @@ public class StreamIdleTimeoutTest extends AbstractHTTP3ClientServerTest
|
||||
}
|
||||
}
|
||||
});
|
||||
startClient();
|
||||
|
||||
long streamIdleTimeout = 1000;
|
||||
client.setStreamIdleTimeout(streamIdleTimeout);
|
||||
|
||||
Session.Client clientSession = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {})
|
||||
.get(5, TimeUnit.SECONDS);
|
||||
Session.Client clientSession = newSession(new Session.Client.Listener() {});
|
||||
|
||||
CountDownLatch clientIdleLatch = new CountDownLatch(1);
|
||||
HttpURI uri1 = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/idle");
|
||||
MetaData.Request request1 = new MetaData.Request(HttpMethod.GET.asString(), uri1, HttpVersion.HTTP_3, HttpFields.EMPTY);
|
||||
clientSession.newRequest(new HeadersFrame(request1, false), new Stream.Listener()
|
||||
clientSession.newRequest(new HeadersFrame(newRequest("/idle"), false), new Stream.Listener()
|
||||
{
|
||||
@Override
|
||||
public boolean onIdleTimeout(Stream stream, Throwable failure)
|
||||
@ -128,9 +122,7 @@ public class StreamIdleTimeoutTest extends AbstractHTTP3ClientServerTest
|
||||
|
||||
// The session should still be open, verify by sending another request.
|
||||
CountDownLatch clientLatch = new CountDownLatch(1);
|
||||
HttpURI uri2 = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/");
|
||||
MetaData.Request request2 = new MetaData.Request(HttpMethod.GET.asString(), uri2, HttpVersion.HTTP_3, HttpFields.EMPTY);
|
||||
clientSession.newRequest(new HeadersFrame(request2, true), new Stream.Listener()
|
||||
clientSession.newRequest(new HeadersFrame(newRequest("/"), true), new Stream.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onResponse(Stream stream, HeadersFrame frame)
|
||||
@ -151,7 +143,7 @@ public class StreamIdleTimeoutTest extends AbstractHTTP3ClientServerTest
|
||||
AtomicReference<Session> serverSessionRef = new AtomicReference<>();
|
||||
long idleTimeout = 1000;
|
||||
CountDownLatch serverIdleLatch = new CountDownLatch(1);
|
||||
startServer(new Session.Server.Listener()
|
||||
start(new Session.Server.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onAccept(Session session)
|
||||
@ -186,18 +178,15 @@ public class StreamIdleTimeoutTest extends AbstractHTTP3ClientServerTest
|
||||
AbstractHTTP3ServerConnectionFactory h3 = server.getConnectors()[0].getConnectionFactory(AbstractHTTP3ServerConnectionFactory.class);
|
||||
assertNotNull(h3);
|
||||
h3.setStreamIdleTimeout(idleTimeout);
|
||||
startClient();
|
||||
|
||||
Session.Client clientSession = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {})
|
||||
.get(5, TimeUnit.SECONDS);
|
||||
|
||||
CountDownLatch clientFailureLatch = new CountDownLatch(1);
|
||||
HttpURI uri1 = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/idle");
|
||||
MetaData.Request request1 = new MetaData.Request(HttpMethod.GET.asString(), uri1, HttpVersion.HTTP_3, HttpFields.EMPTY);
|
||||
clientSession.newRequest(new HeadersFrame(request1, false), new Stream.Listener()
|
||||
clientSession.newRequest(new HeadersFrame(newRequest("/idle"), false), new Stream.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onFailure(long error, Throwable failure)
|
||||
public void onFailure(Stream stream, long error, Throwable failure)
|
||||
{
|
||||
// The server idle times out, but did not send any data back.
|
||||
// However, the stream is readable, but an attempt to read it
|
||||
@ -214,9 +203,7 @@ public class StreamIdleTimeoutTest extends AbstractHTTP3ClientServerTest
|
||||
|
||||
// The session should still be open, verify by sending another request.
|
||||
CountDownLatch clientLatch = new CountDownLatch(1);
|
||||
HttpURI uri2 = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/");
|
||||
MetaData.Request request2 = new MetaData.Request(HttpMethod.GET.asString(), uri2, HttpVersion.HTTP_3, HttpFields.EMPTY);
|
||||
clientSession.newRequest(new HeadersFrame(request2, true), new Stream.Listener()
|
||||
clientSession.newRequest(new HeadersFrame(newRequest("/"), true), new Stream.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onResponse(Stream stream, HeadersFrame frame)
|
||||
|
@ -13,7 +13,6 @@
|
||||
|
||||
package org.eclipse.jetty.http3.tests;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@ -29,13 +28,13 @@ import static org.awaitility.Awaitility.await;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class HTTP3UnexpectedFrameTest extends AbstractHTTP3ClientServerTest
|
||||
public class UnexpectedFrameTest extends AbstractClientServerTest
|
||||
{
|
||||
@Test
|
||||
public void testDataBeforeHeaders() throws Exception
|
||||
{
|
||||
CountDownLatch serverLatch = new CountDownLatch(1);
|
||||
startServer(new Session.Server.Listener()
|
||||
start(new Session.Server.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onSessionFailure(Session session, long error, String reason)
|
||||
@ -44,10 +43,9 @@ public class HTTP3UnexpectedFrameTest extends AbstractHTTP3ClientServerTest
|
||||
serverLatch.countDown();
|
||||
}
|
||||
});
|
||||
startClient();
|
||||
|
||||
CountDownLatch clientLatch = new CountDownLatch(1);
|
||||
Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener()
|
||||
HTTP3Session session = (HTTP3Session)newSession(new Session.Client.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onSessionFailure(Session session, long error, String reason)
|
||||
@ -55,10 +53,9 @@ public class HTTP3UnexpectedFrameTest extends AbstractHTTP3ClientServerTest
|
||||
assertEquals(ErrorCode.FRAME_UNEXPECTED_ERROR.code(), error);
|
||||
clientLatch.countDown();
|
||||
}
|
||||
})
|
||||
.get(5, TimeUnit.SECONDS);
|
||||
});
|
||||
|
||||
((HTTP3Session)session).writeFrame(0, new DataFrame(ByteBuffer.allocate(128), false), Callback.NOOP);
|
||||
session.writeMessageFrame(0, new DataFrame(ByteBuffer.allocate(128), false), Callback.NOOP);
|
||||
|
||||
assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
|
@ -57,7 +57,7 @@ public class ClientProtocolSession extends ProtocolSession
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onClosed(CloseInfo closeInfo)
|
||||
protected void onClose(CloseInfo closeInfo)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("session closed remotely {} {}", closeInfo, this);
|
||||
|
@ -37,6 +37,6 @@ public class CloseInfo
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return String.format("%s@%x[error=%d,reason=%s]", getClass().getSimpleName(), hashCode(), error(), reason());
|
||||
return String.format("%s@%x[error=0x%s,reason=%s]", getClass().getSimpleName(), hashCode(), Long.toHexString(error()), reason());
|
||||
}
|
||||
}
|
||||
|
@ -15,7 +15,7 @@ package org.eclipse.jetty.quic.common;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import org.eclipse.jetty.io.Connection;
|
||||
@ -26,7 +26,7 @@ public abstract class ProtocolSession
|
||||
{
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ProtocolSession.class);
|
||||
|
||||
private final AtomicLong active = new AtomicLong();
|
||||
private final AtomicInteger active = new AtomicInteger();
|
||||
private final QuicSession session;
|
||||
|
||||
public ProtocolSession(QuicSession session)
|
||||
@ -43,25 +43,35 @@ public abstract class ProtocolSession
|
||||
|
||||
public void process()
|
||||
{
|
||||
if (active.getAndIncrement() == 0)
|
||||
// This method is called by the network thread and
|
||||
// dispatches to one, per-session, processing thread.
|
||||
|
||||
// The active counter counts up to 2, with the meanings:
|
||||
// 0=idle, 1=process, 2=re-process, where re-process is
|
||||
// necessary to close race between the processing thread
|
||||
// seeing active=1 and about to exit, and the network
|
||||
// thread also seeing active=1 and not dispatching,
|
||||
// leaving unprocessed data in the session.
|
||||
if (active.getAndUpdate(count -> count <= 1 ? count + 1 : count) == 0)
|
||||
session.getExecutor().execute(this::processSession);
|
||||
}
|
||||
|
||||
private void processSession()
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
session.getExecutor().execute(() ->
|
||||
processWritableStreams();
|
||||
if (processReadableStreams())
|
||||
continue;
|
||||
|
||||
// Exit if did not process any stream and we are idle.
|
||||
if (active.decrementAndGet() == 0)
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
processWritableStreams();
|
||||
if (processReadableStreams())
|
||||
continue;
|
||||
|
||||
CloseInfo closeInfo = session.getRemoteCloseInfo();
|
||||
if (closeInfo != null)
|
||||
onClosed(closeInfo);
|
||||
|
||||
// Exit if did not process any stream and we are idle.
|
||||
if (active.decrementAndGet() == 0)
|
||||
break;
|
||||
}
|
||||
});
|
||||
CloseInfo closeInfo = session.getRemoteCloseInfo();
|
||||
if (closeInfo != null)
|
||||
onClose(closeInfo);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -125,7 +135,7 @@ public abstract class ProtocolSession
|
||||
return getQuicSession().close(error, reason);
|
||||
}
|
||||
|
||||
protected abstract void onClosed(CloseInfo closeInfo);
|
||||
protected abstract void onClose(CloseInfo closeInfo);
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
|
@ -349,7 +349,9 @@ public abstract class QuicSession
|
||||
|
||||
public boolean close(long error, String reason)
|
||||
{
|
||||
return quicheConnection.close(error, reason);
|
||||
boolean closed = quicheConnection.close(error, reason);
|
||||
flush();
|
||||
return closed;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -125,6 +125,8 @@ public class QuicStreamEndPoint extends AbstractEndPoint
|
||||
else
|
||||
writeFlusher.onFail(failure);
|
||||
|
||||
session.onClose(streamId);
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("closed stream #{} with error 0x{}", streamId, Long.toHexString(error), failure);
|
||||
}
|
||||
|
@ -50,7 +50,7 @@ public class ServerProtocolSession extends ProtocolSession
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onClosed(CloseInfo closeInfo)
|
||||
protected void onClose(CloseInfo closeInfo)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("session closed remotely {} {}", closeInfo, this);
|
||||
|
Loading…
x
Reference in New Issue
Block a user