Issue #6728 - QUIC and HTTP/3
- Improved close mechanism. Now error and reason are propagated at the HTTP/3 level, in case e.g. applications want to take statistics about the error codes. - Improved buffer handling to be sure they are properly released back to the pool. Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
parent
85a13cfc20
commit
521a0adf0e
|
@ -166,7 +166,7 @@ public class ClientHTTP3Session extends ClientProtocolSession
|
|||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("inward closing 0x{}/{} on {}", Long.toHexString(error), reason, this);
|
||||
session.disconnect(reason);
|
||||
session.inwardClose(error, reason);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -188,8 +188,10 @@ public interface Session
|
|||
* <p>Callback method invoked when the underlying transport has been disconnected.</p>
|
||||
*
|
||||
* @param session the session
|
||||
* @param error the disconnect error
|
||||
* @param reason the disconnect reason
|
||||
*/
|
||||
public default void onDisconnect(Session session)
|
||||
public default void onDisconnect(Session session, long error, String reason)
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -236,9 +238,10 @@ public interface Session
|
|||
* <p>Callback method invoked when a failure has been detected for this session.</p>
|
||||
*
|
||||
* @param session the session
|
||||
* @param failure the cause of the failure
|
||||
* @param error the failure error
|
||||
* @param reason the failure reason
|
||||
*/
|
||||
public default void onFailure(Session session, Throwable failure)
|
||||
public default void onFailure(Session session, long error, String reason)
|
||||
{
|
||||
}
|
||||
}
|
||||
|
|
|
@ -237,9 +237,10 @@ public interface Stream
|
|||
* the stream has been reset.</p>
|
||||
*
|
||||
* @param stream the stream
|
||||
* @param error the failure error
|
||||
* @param failure the cause of the failure
|
||||
*/
|
||||
public default void onFailure(Stream stream, Throwable failure)
|
||||
public default void onFailure(Stream stream, long error, Throwable failure)
|
||||
{
|
||||
}
|
||||
}
|
||||
|
|
|
@ -183,7 +183,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
|
|||
else
|
||||
{
|
||||
closeState = CloseState.CLOSING;
|
||||
zeroStreamsAction = () -> terminate("go_away");
|
||||
zeroStreamsAction = this::terminate;
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
@ -212,7 +212,13 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
|
|||
else
|
||||
{
|
||||
if (failStreams)
|
||||
failStreams(stream -> true, "go_away", true);
|
||||
{
|
||||
long error = HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code();
|
||||
String reason = "go_away";
|
||||
failStreams(stream -> true, error, reason, true);
|
||||
terminate();
|
||||
outwardDisconnect(error, reason);
|
||||
}
|
||||
return CompletableFuture.completedFuture(null);
|
||||
}
|
||||
}
|
||||
|
@ -240,13 +246,6 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
|
|||
Atomics.updateMax(lastId, id);
|
||||
}
|
||||
|
||||
public void outwardClose(long error, String reason)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("outward closing 0x{}/{} on {}", Long.toHexString(error), reason, this);
|
||||
getProtocolSession().outwardClose(error, reason);
|
||||
}
|
||||
|
||||
public long getIdleTimeout()
|
||||
{
|
||||
return getProtocolSession().getIdleTimeout();
|
||||
|
@ -475,7 +474,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
|
|||
if (stream != null)
|
||||
stream.onData(frame);
|
||||
else
|
||||
fail(HTTP3ErrorCode.FRAME_UNEXPECTED_ERROR.code(), "invalid_frame_sequence");
|
||||
onSessionFailure(HTTP3ErrorCode.FRAME_UNEXPECTED_ERROR.code(), "invalid_frame_sequence");
|
||||
}
|
||||
|
||||
public void onDataAvailable(long streamId)
|
||||
|
@ -486,17 +485,6 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
|
|||
stream.onDataAvailable();
|
||||
}
|
||||
|
||||
void fail(long error, String reason)
|
||||
{
|
||||
// Hard failure, no need to send a GOAWAY.
|
||||
try (AutoLock l = lock.lock())
|
||||
{
|
||||
closeState = CloseState.CLOSED;
|
||||
}
|
||||
outwardClose(error, reason);
|
||||
notifyFailure(new IOException(String.format("%d/%s", error, reason)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onGoAway(GoAwayFrame frame)
|
||||
{
|
||||
|
@ -522,7 +510,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
|
|||
goAwaySent = newGoAwayFrame(false);
|
||||
closeState = CloseState.CLOSING;
|
||||
GoAwayFrame goAwayFrame = goAwaySent;
|
||||
zeroStreamsAction = () -> writeControlFrame(goAwayFrame, Callback.from(() -> terminate("go_away")));
|
||||
zeroStreamsAction = () -> writeControlFrame(goAwayFrame, Callback.from(this::terminate));
|
||||
failStreams = true;
|
||||
}
|
||||
break;
|
||||
|
@ -542,11 +530,19 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
|
|||
{
|
||||
goAwaySent = newGoAwayFrame(false);
|
||||
GoAwayFrame goAwayFrame = goAwaySent;
|
||||
zeroStreamsAction = () -> writeControlFrame(goAwayFrame, Callback.from(() -> terminate("go_away")));
|
||||
zeroStreamsAction = () -> writeControlFrame(goAwayFrame, Callback.from(() ->
|
||||
{
|
||||
terminate();
|
||||
outwardDisconnect(HTTP3ErrorCode.NO_ERROR.code(), "go_away");
|
||||
}));
|
||||
}
|
||||
else
|
||||
{
|
||||
zeroStreamsAction = () -> terminate("go_away");
|
||||
zeroStreamsAction = () ->
|
||||
{
|
||||
terminate();
|
||||
outwardDisconnect(HTTP3ErrorCode.NO_ERROR.code(), "go_away");
|
||||
};
|
||||
failStreams = true;
|
||||
}
|
||||
}
|
||||
|
@ -567,11 +563,11 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
|
|||
{
|
||||
goAwaySent = newGoAwayFrame(false);
|
||||
GoAwayFrame goAwayFrame = goAwaySent;
|
||||
zeroStreamsAction = () -> writeControlFrame(goAwayFrame, Callback.from(() -> terminate("go_away")));
|
||||
zeroStreamsAction = () -> writeControlFrame(goAwayFrame, Callback.from(this::terminate));
|
||||
}
|
||||
else
|
||||
{
|
||||
zeroStreamsAction = () -> terminate("go_away");
|
||||
zeroStreamsAction = this::terminate;
|
||||
}
|
||||
failStreams = true;
|
||||
}
|
||||
|
@ -598,7 +594,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
|
|||
// The other peer sent us a GOAWAY with the last processed streamId,
|
||||
// so we must fail the streams that have a bigger streamId.
|
||||
Predicate<HTTP3Stream> predicate = stream -> stream.isLocal() && stream.getId() > frame.getLastId();
|
||||
failStreams(predicate, "go_away", true);
|
||||
failStreams(predicate, HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code(), "go_away", true);
|
||||
}
|
||||
|
||||
tryRunZeroStreamsAction();
|
||||
|
@ -645,12 +641,22 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
|
|||
if (!confirmed)
|
||||
return false;
|
||||
|
||||
disconnect("idle_timeout");
|
||||
inwardClose(HTTP3ErrorCode.NO_ERROR.code(), "idle_timeout");
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
public void disconnect(String reason)
|
||||
/**
|
||||
* <p>Called when a an external event wants to initiate the close of this session locally,
|
||||
* for example a close at the network level (due to e.g. stopping a component) or a timeout.</p>
|
||||
* <p>The correspondent passive event, where it's the remote peer that initiates the close,
|
||||
* is delivered via {@link #onClose(long, String)}.</p>
|
||||
*
|
||||
* @param error the close error
|
||||
* @param reason the close reason
|
||||
* @see #onClose(long, String)
|
||||
*/
|
||||
public void inwardClose(long error, String reason)
|
||||
{
|
||||
GoAwayFrame goAwayFrame = null;
|
||||
try (AutoLock l = lock.lock())
|
||||
|
@ -678,17 +684,58 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
|
|||
}
|
||||
}
|
||||
|
||||
failStreams(stream -> true, reason, true);
|
||||
failStreams(stream -> true, error, reason, true);
|
||||
|
||||
if (goAwayFrame != null)
|
||||
writeControlFrame(goAwayFrame, Callback.from(() -> terminate(reason)));
|
||||
{
|
||||
writeControlFrame(goAwayFrame, Callback.from(() ->
|
||||
{
|
||||
terminate();
|
||||
outwardDisconnect(error, reason);
|
||||
}));
|
||||
}
|
||||
else
|
||||
terminate(reason);
|
||||
{
|
||||
terminate();
|
||||
outwardDisconnect(error, reason);
|
||||
}
|
||||
}
|
||||
|
||||
private void failStreams(Predicate<HTTP3Stream> predicate, String reason, boolean close)
|
||||
/**
|
||||
* <p>Calls {@link #outwardClose(long, String)}, then notifies
|
||||
* {@link Session.Listener#onDisconnect(Session, long, String)}.</p>
|
||||
*
|
||||
* @param error the close error
|
||||
* @param reason the close reason.
|
||||
* @see #outwardClose(long, String)
|
||||
*/
|
||||
private void outwardDisconnect(long error, String reason)
|
||||
{
|
||||
outwardClose(error, reason);
|
||||
// Since the outwardClose() above is called by
|
||||
// the implementation, notify the application.
|
||||
notifyDisconnect(error, reason);
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Propagates a close outwards, i.e. towards the network.</p>
|
||||
* <p>This method does not notify {@link Session.Listener#onDisconnect(Session, long, String)}
|
||||
* so calling {@link #outwardDisconnect(long, String)} is preferred.</p>
|
||||
*
|
||||
* @param error the close error
|
||||
* @param reason the close reason
|
||||
* @see #outwardDisconnect(long, String)
|
||||
* @see #inwardClose(long, String)
|
||||
*/
|
||||
private void outwardClose(long error, String reason)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("outward closing 0x{}/{} on {}", Long.toHexString(error), reason, this);
|
||||
getProtocolSession().outwardClose(error, reason);
|
||||
}
|
||||
|
||||
private void failStreams(Predicate<HTTP3Stream> predicate, long error, String reason, boolean close)
|
||||
{
|
||||
long error = HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code();
|
||||
Throwable failure = new IOException(reason);
|
||||
streams.values().stream()
|
||||
.filter(predicate)
|
||||
|
@ -698,19 +745,22 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
|
|||
stream.reset(error, failure);
|
||||
// Since the stream failure was generated
|
||||
// by a GOAWAY, notify the application.
|
||||
stream.onFailure(failure);
|
||||
stream.onFailure(error, failure);
|
||||
});
|
||||
}
|
||||
|
||||
private void terminate(String reason)
|
||||
/**
|
||||
* Terminates this session at the HTTP/3 level, and possibly notifies the shutdown callback.
|
||||
* Termination at the QUIC level may still be in progress.
|
||||
*
|
||||
* @see #onClose(long, String)
|
||||
* @see #inwardClose(long, String)
|
||||
*/
|
||||
private void terminate()
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("terminating reason={} for {}", reason, this);
|
||||
LOG.debug("terminating {}", this);
|
||||
streamTimeouts.destroy();
|
||||
outwardClose(HTTP3ErrorCode.NO_ERROR.code(), reason);
|
||||
// Since the close() above is called by the
|
||||
// implementation, notify the application.
|
||||
notifyDisconnect();
|
||||
// Notify the shutdown completable.
|
||||
CompletableFuture<Void> shutdown;
|
||||
try (AutoLock l = lock.lock())
|
||||
|
@ -781,6 +831,14 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Called when the local peer receives a close initiated by the remote peer.</p>
|
||||
* <p>The correspondent active event, where it's the local peer that initiates the close,
|
||||
* it's delivered via {@link #inwardClose(long, String)}.</p>
|
||||
*
|
||||
* @param error the close error
|
||||
* @param reason the close reason
|
||||
*/
|
||||
public void onClose(long error, String reason)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
|
@ -797,19 +855,19 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
|
|||
}
|
||||
|
||||
// No point in closing the streams, as QUIC frames cannot be sent.
|
||||
failStreams(stream -> true, "remote_close", false);
|
||||
failStreams(stream -> true, error, reason, false);
|
||||
|
||||
if (notifyFailure)
|
||||
fail(error, reason);
|
||||
onSessionFailure(error, reason);
|
||||
|
||||
notifyDisconnect();
|
||||
notifyDisconnect(error, reason);
|
||||
}
|
||||
|
||||
private void notifyDisconnect()
|
||||
private void notifyDisconnect(long error, String reason)
|
||||
{
|
||||
try
|
||||
{
|
||||
listener.onDisconnect(this);
|
||||
listener.onDisconnect(this, error, reason);
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
|
@ -824,24 +882,21 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
|
|||
LOG.debug("stream failure 0x{}/{} for stream #{} on {}", Long.toHexString(error), failure.getMessage(), streamId, this);
|
||||
HTTP3Stream stream = getStream(streamId);
|
||||
if (stream != null)
|
||||
{
|
||||
stream.onFailure(failure);
|
||||
removeStream(stream, failure);
|
||||
}
|
||||
stream.onFailure(error, failure);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSessionFailure(long error, String reason)
|
||||
{
|
||||
// TODO
|
||||
throw new UnsupportedOperationException();
|
||||
notifyFailure(error, reason);
|
||||
inwardClose(error, reason);
|
||||
}
|
||||
|
||||
public void notifyFailure(Throwable failure)
|
||||
private void notifyFailure(long error, String reason)
|
||||
{
|
||||
try
|
||||
{
|
||||
listener.onFailure(this, failure);
|
||||
listener.onFailure(this, error, reason);
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
|
|
|
@ -327,19 +327,19 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, Attachable
|
|||
}
|
||||
}
|
||||
|
||||
public void onFailure(Throwable failure)
|
||||
public void onFailure(long error, Throwable failure)
|
||||
{
|
||||
notifyFailure(failure);
|
||||
notifyFailure(error, failure);
|
||||
session.removeStream(this, failure);
|
||||
}
|
||||
|
||||
private void notifyFailure(Throwable failure)
|
||||
private void notifyFailure(long error, Throwable failure)
|
||||
{
|
||||
Listener listener = getListener();
|
||||
try
|
||||
{
|
||||
if (listener != null)
|
||||
listener.onFailure(this, failure);
|
||||
listener.onFailure(this, error, failure);
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
|
@ -361,7 +361,7 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, Attachable
|
|||
if (frameState == FrameState.FAILED)
|
||||
return false;
|
||||
frameState = FrameState.FAILED;
|
||||
session.fail(HTTP3ErrorCode.FRAME_UNEXPECTED_ERROR.code(), "invalid_frame_sequence");
|
||||
session.onSessionFailure(HTTP3ErrorCode.FRAME_UNEXPECTED_ERROR.code(), "invalid_frame_sequence");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -202,7 +202,12 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
|
|||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("read data {} on {}", frame, this);
|
||||
buffer.retain();
|
||||
return new Stream.Data(frame, this::completeReadData);
|
||||
// Store in a local variable so that the lambda captures the right buffer.
|
||||
RetainableByteBuffer current = buffer;
|
||||
// Release the network buffer here (if empty), since the application may
|
||||
// not be reading more bytes, to avoid to keep around a consumed buffer.
|
||||
tryReleaseBuffer(false);
|
||||
return new Stream.Data(frame, () -> completeReadData(current));
|
||||
}
|
||||
case MODE_SWITCH:
|
||||
{
|
||||
|
@ -237,11 +242,11 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
|
|||
}
|
||||
}
|
||||
|
||||
private void completeReadData()
|
||||
private void completeReadData(RetainableByteBuffer buffer)
|
||||
{
|
||||
buffer.release();
|
||||
if (!buffer.isRetained())
|
||||
tryReleaseBuffer(false);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("retained released {}", buffer);
|
||||
}
|
||||
|
||||
public void demand()
|
||||
|
@ -377,7 +382,10 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
|
|||
if (buffer.isRetained())
|
||||
{
|
||||
buffer.release();
|
||||
buffer = buffers.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
|
||||
RetainableByteBuffer newBuffer = buffers.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("reacquired {} for retained {}", newBuffer, buffer);
|
||||
buffer = newBuffer;
|
||||
byteBuffer = buffer.getBuffer();
|
||||
}
|
||||
|
||||
|
|
|
@ -186,7 +186,7 @@ public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Listen
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Stream stream, Throwable failure)
|
||||
public void onFailure(Stream stream, long error, Throwable failure)
|
||||
{
|
||||
responseFailure(failure);
|
||||
}
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
|
||||
package org.eclipse.jetty.http3.client.http.internal;
|
||||
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicMarkableReference;
|
||||
|
@ -66,14 +66,15 @@ public class SessionClientListener implements Session.Client.Listener
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onDisconnect(Session session)
|
||||
public void onDisconnect(Session session, long error, String reason)
|
||||
{
|
||||
onFailure(session, new ClosedChannelException());
|
||||
onFailure(session, error, reason);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Session session, Throwable failure)
|
||||
public void onFailure(Session session, long error, String reason)
|
||||
{
|
||||
IOException failure = new IOException(String.format("%#x/%s", error, reason));
|
||||
if (failConnectionPromise(failure))
|
||||
return;
|
||||
HttpConnectionOverHTTP3 connection = this.connection.getReference();
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
|
||||
package org.eclipse.jetty.http3.server;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
import org.eclipse.jetty.http3.api.Session;
|
||||
|
@ -67,10 +68,12 @@ public class HTTP3ServerConnectionFactory extends AbstractHTTP3ServerConnectionF
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Session session, Throwable failure)
|
||||
public void onFailure(Session session, long error, String reason)
|
||||
{
|
||||
// TODO
|
||||
throw new UnsupportedOperationException();
|
||||
IOException failure = new IOException(reason);
|
||||
session.getStreams().stream()
|
||||
.map(stream -> (HTTP3Stream)stream)
|
||||
.forEach(stream -> stream.onFailure(error, failure));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -138,9 +141,15 @@ public class HTTP3ServerConnectionFactory extends AbstractHTTP3ServerConnectionF
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Stream stream, Throwable failure)
|
||||
public void onFailure(Stream stream, long error, Throwable failure)
|
||||
{
|
||||
getConnection().onFailure((HTTP3Stream)stream, failure);
|
||||
HTTP3Stream http3Stream = (HTTP3Stream)stream;
|
||||
Runnable task = getConnection().onFailure((HTTP3Stream)stream, failure);
|
||||
if (task != null)
|
||||
{
|
||||
ServerHTTP3Session protocolSession = (ServerHTTP3Session)http3Stream.getSession().getProtocolSession();
|
||||
protocolSession.offer(task, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -244,7 +244,7 @@ public class HttpChannelOverHTTP3 extends HttpChannel
|
|||
if (reset)
|
||||
consumeInput();
|
||||
|
||||
getHttpTransport().onStreamIdleTimeout(failure);
|
||||
getHttpTransport().onIdleTimeout(failure);
|
||||
|
||||
failure.addSuppressed(new Throwable("idle timeout"));
|
||||
|
||||
|
@ -267,14 +267,21 @@ public class HttpChannelOverHTTP3 extends HttpChannel
|
|||
handle();
|
||||
}
|
||||
|
||||
public void onFailure(Throwable failure)
|
||||
public Runnable onFailure(Throwable failure)
|
||||
{
|
||||
//TODO
|
||||
throw new UnsupportedOperationException(failure);
|
||||
// getHttpTransport().onStreamFailure(failure);
|
||||
// boolean handle = failed(failure);
|
||||
// consumeInput();
|
||||
// return new FailureTask(failure, callback, handle);
|
||||
consumeInput();
|
||||
|
||||
getHttpTransport().onFailure(failure);
|
||||
|
||||
boolean handle = failed(failure);
|
||||
|
||||
return () ->
|
||||
{
|
||||
if (handle)
|
||||
handleWithContext();
|
||||
else if (getHttpConfiguration().isNotifyRemoteAsyncErrors())
|
||||
getState().asyncError(failure);
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -390,8 +397,31 @@ public class HttpChannelOverHTTP3 extends HttpChannel
|
|||
@Override
|
||||
public boolean failed(Throwable failure)
|
||||
{
|
||||
// TODO
|
||||
throw new UnsupportedOperationException(failure);
|
||||
HttpInput.Content contentToFail = null;
|
||||
try (AutoLock l = lock.lock())
|
||||
{
|
||||
if (content == null)
|
||||
{
|
||||
content = new HttpInput.ErrorContent(failure);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (content.isSpecial())
|
||||
{
|
||||
// Either EOF or error already, no nothing.
|
||||
}
|
||||
else
|
||||
{
|
||||
contentToFail = content;
|
||||
content = new HttpInput.ErrorContent(failure);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (contentToFail != null)
|
||||
contentToFail.failed(failure);
|
||||
|
||||
return getRequest().getHttpInput().onContentProducible();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -278,11 +278,16 @@ public class HttpTransportOverHTTP3 implements HttpTransport
|
|||
}
|
||||
}
|
||||
|
||||
boolean onStreamIdleTimeout(Throwable failure)
|
||||
boolean onIdleTimeout(Throwable failure)
|
||||
{
|
||||
return transportCallback.idleTimeout(failure);
|
||||
}
|
||||
|
||||
void onFailure(Throwable failure)
|
||||
{
|
||||
transportCallback.abort(failure);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void abort(Throwable failure)
|
||||
{
|
||||
|
|
|
@ -158,7 +158,7 @@ public class ServerHTTP3Session extends ServerProtocolSession
|
|||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("inward closing 0x{}/{} on {}", Long.toHexString(error), reason, this);
|
||||
session.disconnect(reason);
|
||||
session.inwardClose(error, reason);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -69,9 +69,9 @@ public class ServerHTTP3StreamConnection extends HTTP3StreamConnection
|
|||
return channel.onIdleTimeout(failure, consumer);
|
||||
}
|
||||
|
||||
public void onFailure(HTTP3Stream stream, Throwable failure)
|
||||
public Runnable onFailure(HTTP3Stream stream, Throwable failure)
|
||||
{
|
||||
HttpChannelOverHTTP3 channel = (HttpChannelOverHTTP3)stream.getAttachment();
|
||||
channel.onFailure(failure);
|
||||
return channel.onFailure(failure);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -393,7 +393,7 @@ public class ClientServerTest extends AbstractClientServerTest
|
|||
clientSession.newRequest(new HeadersFrame(newRequest("/large"), true), new Stream.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onFailure(Stream stream, Throwable failure)
|
||||
public void onFailure(Stream stream, long error, Throwable failure)
|
||||
{
|
||||
streamFailureLatch.countDown();
|
||||
}
|
||||
|
|
|
@ -73,7 +73,7 @@ public class GoAwayTest extends AbstractClientServerTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onDisconnect(Session session)
|
||||
public void onDisconnect(Session session, long error, String reason)
|
||||
{
|
||||
serverDisconnectLatch.countDown();
|
||||
}
|
||||
|
@ -90,7 +90,7 @@ public class GoAwayTest extends AbstractClientServerTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onDisconnect(Session session)
|
||||
public void onDisconnect(Session session, long error, String reason)
|
||||
{
|
||||
clientDisconnectLatch.countDown();
|
||||
}
|
||||
|
@ -153,7 +153,7 @@ public class GoAwayTest extends AbstractClientServerTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onDisconnect(Session session)
|
||||
public void onDisconnect(Session session, long error, String reason)
|
||||
{
|
||||
serverDisconnectLatch.countDown();
|
||||
}
|
||||
|
@ -170,7 +170,7 @@ public class GoAwayTest extends AbstractClientServerTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onDisconnect(Session session)
|
||||
public void onDisconnect(Session session, long error, String reason)
|
||||
{
|
||||
clientDisconnectLatch.countDown();
|
||||
}
|
||||
|
@ -190,7 +190,7 @@ public class GoAwayTest extends AbstractClientServerTest
|
|||
clientSession.newRequest(new HeadersFrame(newRequest("/2"), true), new Stream.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onFailure(Stream stream, Throwable failure)
|
||||
public void onFailure(Stream stream, long error, Throwable failure)
|
||||
{
|
||||
streamFailureLatch.countDown();
|
||||
}
|
||||
|
@ -232,7 +232,7 @@ public class GoAwayTest extends AbstractClientServerTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onDisconnect(Session session)
|
||||
public void onDisconnect(Session session, long error, String reason)
|
||||
{
|
||||
serverDisconnectLatch.countDown();
|
||||
}
|
||||
|
@ -253,7 +253,7 @@ public class GoAwayTest extends AbstractClientServerTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onDisconnect(Session session)
|
||||
public void onDisconnect(Session session, long error, String reason)
|
||||
{
|
||||
clientDisconnectLatch.countDown();
|
||||
}
|
||||
|
@ -315,7 +315,7 @@ public class GoAwayTest extends AbstractClientServerTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onDisconnect(Session session)
|
||||
public void onDisconnect(Session session, long error, String reason)
|
||||
{
|
||||
serverDisconnectLatch.countDown();
|
||||
}
|
||||
|
@ -336,7 +336,7 @@ public class GoAwayTest extends AbstractClientServerTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onDisconnect(Session session)
|
||||
public void onDisconnect(Session session, long error, String reason)
|
||||
{
|
||||
clientDisconnectLatch.countDown();
|
||||
}
|
||||
|
@ -405,7 +405,7 @@ public class GoAwayTest extends AbstractClientServerTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onDisconnect(Session session)
|
||||
public void onDisconnect(Session session, long error, String reason)
|
||||
{
|
||||
serverDisconnectLatch.countDown();
|
||||
}
|
||||
|
@ -422,7 +422,7 @@ public class GoAwayTest extends AbstractClientServerTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onDisconnect(Session session)
|
||||
public void onDisconnect(Session session, long error, String reason)
|
||||
{
|
||||
clientDisconnectLatch.countDown();
|
||||
}
|
||||
|
@ -492,7 +492,7 @@ public class GoAwayTest extends AbstractClientServerTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onDisconnect(Session session)
|
||||
public void onDisconnect(Session session, long error, String reason)
|
||||
{
|
||||
serverDisconnectLatch.countDown();
|
||||
}
|
||||
|
@ -517,7 +517,7 @@ public class GoAwayTest extends AbstractClientServerTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onDisconnect(Session session)
|
||||
public void onDisconnect(Session session, long error, String reason)
|
||||
{
|
||||
clientDisconnectLatch.countDown();
|
||||
}
|
||||
|
@ -607,7 +607,7 @@ public class GoAwayTest extends AbstractClientServerTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onDisconnect(Session session)
|
||||
public void onDisconnect(Session session, long error, String reason)
|
||||
{
|
||||
serverDisconnectLatch.countDown();
|
||||
}
|
||||
|
@ -628,7 +628,7 @@ public class GoAwayTest extends AbstractClientServerTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onDisconnect(Session session)
|
||||
public void onDisconnect(Session session, long error, String reason)
|
||||
{
|
||||
clientDisconnectLatch.countDown();
|
||||
}
|
||||
|
@ -673,7 +673,7 @@ public class GoAwayTest extends AbstractClientServerTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onDisconnect(Session session)
|
||||
public void onDisconnect(Session session, long error, String reason)
|
||||
{
|
||||
serverDisconnectLatch.countDown();
|
||||
}
|
||||
|
@ -689,7 +689,7 @@ public class GoAwayTest extends AbstractClientServerTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onDisconnect(Session session)
|
||||
public void onDisconnect(Session session, long error, String reason)
|
||||
{
|
||||
clientDisconnectLatch.countDown();
|
||||
}
|
||||
|
@ -723,7 +723,7 @@ public class GoAwayTest extends AbstractClientServerTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onDisconnect(Session session)
|
||||
public void onDisconnect(Session session, long error, String reason)
|
||||
{
|
||||
serverDisconnectLatch.countDown();
|
||||
}
|
||||
|
@ -746,7 +746,7 @@ public class GoAwayTest extends AbstractClientServerTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onDisconnect(Session session)
|
||||
public void onDisconnect(Session session, long error, String reason)
|
||||
{
|
||||
clientDisconnectLatch.countDown();
|
||||
}
|
||||
|
@ -787,7 +787,7 @@ public class GoAwayTest extends AbstractClientServerTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onDisconnect(Session session)
|
||||
public void onDisconnect(Session session, long error, String reason)
|
||||
{
|
||||
serverDisconnectLatch.countDown();
|
||||
}
|
||||
|
@ -806,7 +806,7 @@ public class GoAwayTest extends AbstractClientServerTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onDisconnect(Session session)
|
||||
public void onDisconnect(Session session, long error, String reason)
|
||||
{
|
||||
clientDisconnectLatch.countDown();
|
||||
}
|
||||
|
@ -849,7 +849,7 @@ public class GoAwayTest extends AbstractClientServerTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onDisconnect(Session session)
|
||||
public void onDisconnect(Session session, long error, String reason)
|
||||
{
|
||||
serverDisconnectLatch.countDown();
|
||||
}
|
||||
|
@ -868,7 +868,7 @@ public class GoAwayTest extends AbstractClientServerTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onDisconnect(Session session)
|
||||
public void onDisconnect(Session session, long error, String reason)
|
||||
{
|
||||
clientDisconnectLatch.countDown();
|
||||
}
|
||||
|
@ -916,7 +916,7 @@ public class GoAwayTest extends AbstractClientServerTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onDisconnect(Session session)
|
||||
public void onDisconnect(Session session, long error, String reason)
|
||||
{
|
||||
serverDisconnectLatch.countDown();
|
||||
}
|
||||
|
@ -938,7 +938,7 @@ public class GoAwayTest extends AbstractClientServerTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onDisconnect(Session session)
|
||||
public void onDisconnect(Session session, long error, String reason)
|
||||
{
|
||||
clientDisconnectLatch.countDown();
|
||||
}
|
||||
|
@ -948,7 +948,7 @@ public class GoAwayTest extends AbstractClientServerTest
|
|||
clientSession.newRequest(new HeadersFrame(newRequest("/"), false), new Stream.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onFailure(Stream stream, Throwable failure)
|
||||
public void onFailure(Stream stream, long error, Throwable failure)
|
||||
{
|
||||
clientFailureLatch.countDown();
|
||||
}
|
||||
|
@ -1011,7 +1011,7 @@ public class GoAwayTest extends AbstractClientServerTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onDisconnect(Session session)
|
||||
public void onDisconnect(Session session, long error, String reason)
|
||||
{
|
||||
serverDisconnectLatch.countDown();
|
||||
}
|
||||
|
@ -1029,7 +1029,7 @@ public class GoAwayTest extends AbstractClientServerTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onDisconnect(Session session)
|
||||
public void onDisconnect(Session session, long error, String reason)
|
||||
{
|
||||
clientDisconnectLatch.countDown();
|
||||
}
|
||||
|
@ -1038,7 +1038,7 @@ public class GoAwayTest extends AbstractClientServerTest
|
|||
clientSession.newRequest(new HeadersFrame(newRequest("/"), false), new Stream.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onFailure(Stream stream, Throwable failure)
|
||||
public void onFailure(Stream stream, long error, Throwable failure)
|
||||
{
|
||||
streamFailureLatch.countDown();
|
||||
}
|
||||
|
@ -1083,7 +1083,7 @@ public class GoAwayTest extends AbstractClientServerTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onDisconnect(Session session)
|
||||
public void onDisconnect(Session session, long error, String reason)
|
||||
{
|
||||
serverDisconnectLatch.countDown();
|
||||
}
|
||||
|
@ -1100,7 +1100,7 @@ public class GoAwayTest extends AbstractClientServerTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onDisconnect(Session session)
|
||||
public void onDisconnect(Session session, long error, String reason)
|
||||
{
|
||||
clientDisconnectLatch.countDown();
|
||||
}
|
||||
|
@ -1110,7 +1110,7 @@ public class GoAwayTest extends AbstractClientServerTest
|
|||
clientSession.newRequest(new HeadersFrame(newRequest("/"), false), new Stream.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onFailure(Stream stream, Throwable failure)
|
||||
public void onFailure(Stream stream, long error, Throwable failure)
|
||||
{
|
||||
clientFailureLatch.countDown();
|
||||
}
|
||||
|
@ -1153,7 +1153,7 @@ public class GoAwayTest extends AbstractClientServerTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onDisconnect(Session session)
|
||||
public void onDisconnect(Session session, long error, String reason)
|
||||
{
|
||||
serverDisconnectLatch.countDown();
|
||||
}
|
||||
|
@ -1169,7 +1169,7 @@ public class GoAwayTest extends AbstractClientServerTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onDisconnect(Session session)
|
||||
public void onDisconnect(Session session, long error, String reason)
|
||||
{
|
||||
clientDisconnectLatch.countDown();
|
||||
}
|
||||
|
@ -1203,7 +1203,7 @@ public class GoAwayTest extends AbstractClientServerTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onDisconnect(Session session)
|
||||
public void onDisconnect(Session session, long error, String reason)
|
||||
{
|
||||
serverDisconnectLatch.countDown();
|
||||
}
|
||||
|
@ -1226,7 +1226,7 @@ public class GoAwayTest extends AbstractClientServerTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onDisconnect(Session session)
|
||||
public void onDisconnect(Session session, long error, String reason)
|
||||
{
|
||||
clientDisconnectLatch.countDown();
|
||||
}
|
||||
|
|
|
@ -186,7 +186,7 @@ public class StreamIdleTimeoutTest extends AbstractClientServerTest
|
|||
clientSession.newRequest(new HeadersFrame(newRequest("/idle"), false), new Stream.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onFailure(Stream stream, Throwable failure)
|
||||
public void onFailure(Stream stream, long error, Throwable failure)
|
||||
{
|
||||
// The server idle times out, but did not send any data back.
|
||||
// However, the stream is readable and the implementation
|
||||
|
|
|
@ -19,11 +19,14 @@ import java.util.concurrent.TimeUnit;
|
|||
|
||||
import org.eclipse.jetty.http3.api.Session;
|
||||
import org.eclipse.jetty.http3.frames.DataFrame;
|
||||
import org.eclipse.jetty.http3.frames.GoAwayFrame;
|
||||
import org.eclipse.jetty.http3.internal.HTTP3ErrorCode;
|
||||
import org.eclipse.jetty.http3.internal.HTTP3Session;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class UnexpectedFrameTest extends AbstractClientServerTest
|
||||
|
@ -31,30 +34,40 @@ public class UnexpectedFrameTest extends AbstractClientServerTest
|
|||
@Test
|
||||
public void testDataBeforeHeaders() throws Exception
|
||||
{
|
||||
CountDownLatch serverLatch = new CountDownLatch(1);
|
||||
CountDownLatch serverFailureLatch = new CountDownLatch(1);
|
||||
start(new Session.Server.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onFailure(Session session, Throwable failure)
|
||||
public void onFailure(Session session, long error, String reason)
|
||||
{
|
||||
serverLatch.countDown();
|
||||
assertEquals(HTTP3ErrorCode.FRAME_UNEXPECTED_ERROR.code(), error);
|
||||
serverFailureLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
CountDownLatch clientLatch = new CountDownLatch(1);
|
||||
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
|
||||
CountDownLatch clientDisconnectLatch = new CountDownLatch(1);
|
||||
HTTP3Session clientSession = (HTTP3Session)newSession(new Session.Client.Listener()
|
||||
{
|
||||
@Override
|
||||
public void onFailure(Session session, Throwable failure)
|
||||
public void onGoAway(Session session, GoAwayFrame frame)
|
||||
{
|
||||
clientLatch.countDown();
|
||||
clientGoAwayLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDisconnect(Session session, long error, String reason)
|
||||
{
|
||||
assertEquals(HTTP3ErrorCode.FRAME_UNEXPECTED_ERROR.code(), error);
|
||||
clientDisconnectLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
clientSession.writeMessageFrame(0, new DataFrame(ByteBuffer.allocate(128), false), Callback.NOOP);
|
||||
|
||||
assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(serverFailureLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
|
||||
assertTrue(clientDisconnectLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
await().atMost(1, TimeUnit.SECONDS).until(clientSession::isClosed);
|
||||
}
|
||||
|
|
|
@ -37,6 +37,6 @@ public class CloseInfo
|
|||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return String.format("%s@%x[error=0x%s,reason=%s]", getClass().getSimpleName(), hashCode(), Long.toHexString(error()), reason());
|
||||
return String.format("%s@%x[error=%#x,reason=%s]", getClass().getSimpleName(), hashCode(), error(), reason());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue