Issue #6728 - QUIC and HTTP/3

- Fixed session failure notification.
Now the failure is propagated to listeners.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-11-08 11:05:27 +01:00
parent 95d835685f
commit 4d778b1aff
20 changed files with 120 additions and 55 deletions

View File

@ -23,6 +23,7 @@ import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.ControlFlusher;
import org.eclipse.jetty.http3.internal.DecoderStreamConnection;
import org.eclipse.jetty.http3.internal.EncoderStreamConnection;
import org.eclipse.jetty.http3.internal.HTTP3ErrorCode;
import org.eclipse.jetty.http3.internal.InstructionFlusher;
import org.eclipse.jetty.http3.internal.InstructionHandler;
import org.eclipse.jetty.http3.internal.MessageFlusher;
@ -105,10 +106,16 @@ public class ClientHTTP3Session extends ClientProtocolSession
settings = Map.of();
// TODO: add default settings.
SettingsFrame frame = new SettingsFrame(settings);
if (controlFlusher.offer(frame, Callback.from(Invocable.InvocationType.NON_BLOCKING, session::onOpen, this::fail)))
if (controlFlusher.offer(frame, Callback.from(Invocable.InvocationType.NON_BLOCKING, session::onOpen, this::failControlStream)))
controlFlusher.iterate();
}
private void failControlStream(Throwable failure)
{
long error = HTTP3ErrorCode.CLOSED_CRITICAL_STREAM_ERROR.code();
onFailure(error, "control_stream_failure", failure);
}
@Override
protected void onStop()
{
@ -116,12 +123,6 @@ public class ClientHTTP3Session extends ClientProtocolSession
// as onStart() does not call super either.
}
private void fail(Throwable failure)
{
// TODO
throw new UnsupportedOperationException();
}
private QuicStreamEndPoint openInstructionEndPoint(long streamId)
{
// This is a write-only stream, so no need to link a Connection.
@ -161,6 +162,12 @@ public class ClientHTTP3Session extends ClientProtocolSession
return session.onIdleTimeout();
}
@Override
protected void onFailure(long error, String reason, Throwable failure)
{
session.onSessionFailure(error, reason, failure);
}
@Override
public void inwardClose(long error, String reason)
{

View File

@ -240,8 +240,9 @@ public interface Session
* @param session the session
* @param error the failure error
* @param reason the failure reason
* @param failure the failure
*/
public default void onFailure(Session session, long error, String reason)
public default void onFailure(Session session, long error, String reason, Throwable failure)
{
}
}

View File

@ -15,6 +15,7 @@ package org.eclipse.jetty.http3.internal;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
@ -476,7 +477,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
if (stream != null)
stream.onData(frame);
else
onSessionFailure(HTTP3ErrorCode.FRAME_UNEXPECTED_ERROR.code(), "invalid_frame_sequence");
onSessionFailure(HTTP3ErrorCode.FRAME_UNEXPECTED_ERROR.code(), "invalid_frame_sequence", new IllegalStateException("invalid frame sequence"));
}
public void onDataAvailable(long streamId)
@ -860,7 +861,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
failStreams(stream -> true, error, reason, false);
if (notifyFailure)
onSessionFailure(error, reason);
onSessionFailure(error, reason, new ClosedChannelException());
notifyDisconnect(error, reason);
}
@ -888,17 +889,17 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
}
@Override
public void onSessionFailure(long error, String reason)
public void onSessionFailure(long error, String reason, Throwable failure)
{
notifyFailure(error, reason);
notifyFailure(error, reason, failure);
inwardClose(error, reason);
}
private void notifyFailure(long error, String reason)
private void notifyFailure(long error, String reason, Throwable failure)
{
try
{
listener.onFailure(this, error, reason);
listener.onFailure(this, error, reason, failure);
}
catch (Throwable x)
{

View File

@ -361,7 +361,7 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, Attachable
if (frameState == FrameState.FAILED)
return false;
frameState = FrameState.FAILED;
session.onSessionFailure(HTTP3ErrorCode.FRAME_UNEXPECTED_ERROR.code(), "invalid_frame_sequence");
session.onSessionFailure(HTTP3ErrorCode.FRAME_UNEXPECTED_ERROR.code(), "invalid_frame_sequence", new IllegalStateException("invalid frame sequence"));
return false;
}
}

View File

@ -13,6 +13,7 @@
package org.eclipse.jetty.http3.internal.parser;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http3.frames.GoAwayFrame;
@ -65,20 +66,20 @@ public abstract class BodyParser
protected void emptyBody(ByteBuffer buffer)
{
sessionFailure(buffer, HTTP3ErrorCode.PROTOCOL_ERROR.code(), "invalid_frame");
sessionFailure(buffer, HTTP3ErrorCode.PROTOCOL_ERROR.code(), "invalid_frame", new IOException("invalid empty body frame"));
}
protected void sessionFailure(ByteBuffer buffer, long error, String reason)
protected void sessionFailure(ByteBuffer buffer, long error, String reason, Throwable failure)
{
BufferUtil.clear(buffer);
notifySessionFailure(error, reason);
notifySessionFailure(error, reason, failure);
}
protected void notifySessionFailure(long error, String reason)
protected void notifySessionFailure(long error, String reason, Throwable failure)
{
try
{
listener.onSessionFailure(error, reason);
listener.onSessionFailure(error, reason, failure);
}
catch (Throwable x)
{

View File

@ -13,6 +13,7 @@
package org.eclipse.jetty.http3.internal.parser;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http3.frames.FrameType;
@ -86,7 +87,7 @@ public class ControlParser
// SPEC: message frames on the control stream are invalid.
if (LOG.isDebugEnabled())
LOG.debug("invalid message frame type {} on control stream", Long.toHexString(frameType));
sessionFailure(buffer, HTTP3ErrorCode.FRAME_UNEXPECTED_ERROR.code(), "invalid_frame_type");
sessionFailure(buffer, HTTP3ErrorCode.FRAME_UNEXPECTED_ERROR.code(), "invalid_frame_type", new IOException("invalid message frame on control stream"));
return;
}
@ -132,13 +133,13 @@ public class ControlParser
{
if (LOG.isDebugEnabled())
LOG.debug("parse failed", x);
sessionFailure(buffer, HTTP3ErrorCode.INTERNAL_ERROR.code(), "parser_error");
sessionFailure(buffer, HTTP3ErrorCode.INTERNAL_ERROR.code(), "parser_error", x);
}
}
private void sessionFailure(ByteBuffer buffer, long error, String reason)
private void sessionFailure(ByteBuffer buffer, long error, String reason, Throwable failure)
{
unknownBodyParser.sessionFailure(buffer, error, reason);
unknownBodyParser.sessionFailure(buffer, error, reason, failure);
}
private enum State

View File

@ -133,13 +133,13 @@ public class HeadersBodyParser extends BodyParser
{
if (LOG.isDebugEnabled())
LOG.debug("decode failure", x);
notifySessionFailure(x.getErrorCode(), x.getMessage());
notifySessionFailure(x.getErrorCode(), x.getMessage(), x);
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("decode failure", x);
notifySessionFailure(HTTP3ErrorCode.INTERNAL_ERROR.code(), "internal_error");
notifySessionFailure(HTTP3ErrorCode.INTERNAL_ERROR.code(), "internal_error", x);
}
return false;
}

View File

@ -13,6 +13,7 @@
package org.eclipse.jetty.http3.internal.parser;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.function.BooleanSupplier;
import java.util.function.UnaryOperator;
@ -118,7 +119,7 @@ public class MessageParser
// SPEC: control frames on a message stream are invalid.
if (LOG.isDebugEnabled())
LOG.debug("invalid control frame type {} on message stream", Long.toHexString(frameType));
sessionFailure(buffer, HTTP3ErrorCode.FRAME_UNEXPECTED_ERROR.code(), "invalid_frame_type");
sessionFailure(buffer, HTTP3ErrorCode.FRAME_UNEXPECTED_ERROR.code(), "invalid_frame_type", new IOException("invalid control frame in message stream"));
return Result.NO_FRAME;
}
@ -167,14 +168,14 @@ public class MessageParser
{
if (LOG.isDebugEnabled())
LOG.debug("parse failed", x);
sessionFailure(buffer, HTTP3ErrorCode.INTERNAL_ERROR.code(), "parser_error");
sessionFailure(buffer, HTTP3ErrorCode.INTERNAL_ERROR.code(), "parser_error", x);
return Result.NO_FRAME;
}
}
private void sessionFailure(ByteBuffer buffer, long error, String reason)
private void sessionFailure(ByteBuffer buffer, long error, String reason, Throwable failure)
{
unknownBodyParser.sessionFailure(buffer, error, reason);
unknownBodyParser.sessionFailure(buffer, error, reason, failure);
}
public enum Result

View File

@ -40,7 +40,7 @@ public interface ParserListener
{
}
public default void onSessionFailure(long error, String reason)
public default void onSessionFailure(long error, String reason, Throwable failure)
{
}
@ -78,9 +78,9 @@ public interface ParserListener
}
@Override
public void onSessionFailure(long error, String reason)
public void onSessionFailure(long error, String reason, Throwable failure)
{
listener.onSessionFailure(error, reason);
listener.onSessionFailure(error, reason, failure);
}
}
}

View File

@ -73,12 +73,12 @@ public class SettingsBodyParser extends BodyParser
{
if (settings.containsKey(key))
{
sessionFailure(buffer, HTTP3ErrorCode.SETTINGS_ERROR.code(), "settings_duplicate");
sessionFailure(buffer, HTTP3ErrorCode.SETTINGS_ERROR.code(), "settings_duplicate", new IllegalArgumentException("invalid duplicate setting"));
return Result.NO_FRAME;
}
if (SettingsFrame.isReserved(key))
{
sessionFailure(buffer, HTTP3ErrorCode.SETTINGS_ERROR.code(), "settings_reserved");
sessionFailure(buffer, HTTP3ErrorCode.SETTINGS_ERROR.code(), "settings_reserved", new IllegalArgumentException("invalid reserved setting"));
return Result.NO_FRAME;
}
if (length > 0)
@ -87,7 +87,7 @@ public class SettingsBodyParser extends BodyParser
}
else
{
sessionFailure(buffer, HTTP3ErrorCode.FRAME_ERROR.code(), "settings_invalid_format");
sessionFailure(buffer, HTTP3ErrorCode.FRAME_ERROR.code(), "settings_invalid_format", new IllegalArgumentException("invalid setting"));
return Result.NO_FRAME;
}
break;
@ -116,7 +116,7 @@ public class SettingsBodyParser extends BodyParser
}
else
{
sessionFailure(buffer, HTTP3ErrorCode.FRAME_ERROR.code(), "settings_invalid_format");
sessionFailure(buffer, HTTP3ErrorCode.FRAME_ERROR.code(), "settings_invalid_format", new IllegalArgumentException("invalid setting"));
return Result.NO_FRAME;
}
break;

View File

@ -13,7 +13,7 @@
package org.eclipse.jetty.http3.client.http.internal;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicMarkableReference;
@ -68,13 +68,12 @@ public class SessionClientListener implements Session.Client.Listener
@Override
public void onDisconnect(Session session, long error, String reason)
{
onFailure(session, error, reason);
onFailure(session, error, reason, new ClosedChannelException());
}
@Override
public void onFailure(Session session, long error, String reason)
public void onFailure(Session session, long error, String reason, Throwable failure)
{
IOException failure = new IOException(String.format("%#x/%s", error, reason));
if (failConnectionPromise(failure))
return;
HttpConnectionOverHTTP3 connection = this.connection.getReference();

View File

@ -13,7 +13,6 @@
package org.eclipse.jetty.http3.server;
import java.io.IOException;
import java.util.Objects;
import org.eclipse.jetty.http3.api.Session;
@ -68,9 +67,8 @@ public class HTTP3ServerConnectionFactory extends AbstractHTTP3ServerConnectionF
}
@Override
public void onFailure(Session session, long error, String reason)
public void onFailure(Session session, long error, String reason, Throwable failure)
{
IOException failure = new IOException(reason);
session.getStreams().stream()
.map(stream -> (HTTP3Stream)stream)
.forEach(stream -> stream.onFailure(error, failure));

View File

@ -23,6 +23,7 @@ import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.ControlFlusher;
import org.eclipse.jetty.http3.internal.DecoderStreamConnection;
import org.eclipse.jetty.http3.internal.EncoderStreamConnection;
import org.eclipse.jetty.http3.internal.HTTP3ErrorCode;
import org.eclipse.jetty.http3.internal.InstructionFlusher;
import org.eclipse.jetty.http3.internal.InstructionHandler;
import org.eclipse.jetty.http3.internal.MessageFlusher;
@ -104,14 +105,14 @@ public class ServerHTTP3Session extends ServerProtocolSession
settings = Map.of();
// TODO: add default settings.
SettingsFrame frame = new SettingsFrame(settings);
if (controlFlusher.offer(frame, Callback.from(Invocable.InvocationType.NON_BLOCKING, session::onOpen, this::fail)))
if (controlFlusher.offer(frame, Callback.from(Invocable.InvocationType.NON_BLOCKING, session::onOpen, this::failControlStream)))
controlFlusher.iterate();
}
private void fail(Throwable failure)
private void failControlStream(Throwable failure)
{
// TODO
throw new UnsupportedOperationException();
long error = HTTP3ErrorCode.CLOSED_CRITICAL_STREAM_ERROR.code();
onFailure(error, "control_stream_failure", failure);
}
private QuicStreamEndPoint openInstructionEndPoint(long streamId)
@ -153,6 +154,12 @@ public class ServerHTTP3Session extends ServerProtocolSession
return session.onIdleTimeout();
}
@Override
protected void onFailure(long error, String reason, Throwable failure)
{
session.onSessionFailure(HTTP3ErrorCode.NO_ERROR.code(), "failure", failure);
}
@Override
public void inwardClose(long error, String reason)
{

View File

@ -38,7 +38,7 @@ public class UnexpectedFrameTest extends AbstractClientServerTest
start(new Session.Server.Listener()
{
@Override
public void onFailure(Session session, long error, String reason)
public void onFailure(Session session, long error, String reason, Throwable failure)
{
assertEquals(HTTP3ErrorCode.FRAME_UNEXPECTED_ERROR.code(), error);
serverFailureLatch.countDown();

View File

@ -16,6 +16,7 @@ package org.eclipse.jetty.quic.client;
import org.eclipse.jetty.quic.common.ProtocolSession;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.quic.common.StreamType;
import org.eclipse.jetty.quic.common.internal.QuicErrorCode;
import org.eclipse.jetty.util.thread.Invocable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -86,6 +87,12 @@ public class ClientProtocolSession extends ProtocolSession
return false;
}
@Override
protected void onFailure(long error, String reason, Throwable failure)
{
outwardClose(QuicErrorCode.NO_ERROR.code(), "failure");
}
@Override
protected void onClose(long error, String reason)
{

View File

@ -19,4 +19,8 @@ module org.eclipse.jetty.quic.common
requires transitive org.eclipse.jetty.quic.quiche;
exports org.eclipse.jetty.quic.common;
exports org.eclipse.jetty.quic.common.internal to
org.eclipse.jetty.quic.client,
org.eclipse.jetty.quic.server;
}

View File

@ -136,6 +136,10 @@ public abstract class ProtocolSession extends ContainerLifeCycle
return true;
}
protected void onFailure(long error, String reason, Throwable failure)
{
}
public void inwardClose(long error, String reason)
{
outwardClose(error, reason);

View File

@ -279,11 +279,7 @@ public abstract class QuicConnection extends AbstractConnection
continue;
}
if (LOG.isDebugEnabled())
LOG.debug("packet is for existing session {}, processing {} bytes", session, cipherBuffer.remaining());
Runnable task = session.process(remoteAddress, cipherBuffer);
if (LOG.isDebugEnabled())
LOG.debug("produced session task {} on {}", task, this);
Runnable task = process(session, remoteAddress, cipherBuffer);
if (task != null)
{
byteBufferPool.release(cipherBuffer);
@ -296,11 +292,37 @@ public abstract class QuicConnection extends AbstractConnection
if (LOG.isDebugEnabled())
LOG.debug("receiveAndProcess() failure", x);
byteBufferPool.release(cipherBuffer);
// TODO: close?
onFailure(x);
return null;
}
}
private Runnable process(QuicSession session, SocketAddress remoteAddress, ByteBuffer cipherBuffer)
{
try
{
if (LOG.isDebugEnabled())
LOG.debug("packet is for existing session {}, processing {} bytes", session, cipherBuffer.remaining());
Runnable task = session.process(remoteAddress, cipherBuffer);
if (LOG.isDebugEnabled())
LOG.debug("produced session task {} on {}", task, this);
return task;
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("process failure for {}", session, x);
byteBufferPool.release(cipherBuffer);
session.onFailure(x);
return null;
}
}
protected void onFailure(Throwable failure)
{
sessions.values().forEach(session -> outwardClose(session, failure));
}
private class QuicProducer implements ExecutionStrategy.Producer
{
@Override

View File

@ -33,6 +33,7 @@ import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.CyclicTimeout;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.quic.common.internal.QuicErrorCode;
import org.eclipse.jetty.quic.quiche.QuicheConnection;
import org.eclipse.jetty.quic.quiche.QuicheConnectionId;
import org.eclipse.jetty.util.BufferUtil;
@ -200,6 +201,11 @@ public abstract class QuicSession extends ContainerLifeCycle
return protocolSession.onIdleTimeout();
}
public void onFailure(Throwable failure)
{
protocolSession.onFailure(QuicErrorCode.NO_ERROR.code(), "failure", failure);
}
/**
* @param streamType the stream type
* @return a new stream ID for the given type

View File

@ -77,6 +77,12 @@ public class ServerProtocolSession extends ProtocolSession
return streamEndPoint.onReadable();
}
@Override
protected void onFailure(long error, String reason, Throwable failure)
{
// TODO: should probably reset the stream if it exists.
}
@Override
protected void onClose(long error, String reason)
{