Issue #6728 - QUIC and HTTP/3

- More work on making HTTP semantic layer work on top of HTTP/3.
- Various fixes and improvement to HTTP client transport tests.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-10-27 00:19:29 +02:00
parent 05158a5bc0
commit 40aba07e33
34 changed files with 391 additions and 111 deletions

View File

@ -261,7 +261,7 @@ public abstract class HttpConnection implements IConnection, Attachable
}
}
public boolean onIdleTimeout(long idleTimeout)
public boolean onIdleTimeout(long idleTimeout, Throwable failure)
{
try (AutoLock l = lock.lock())
{

View File

@ -184,7 +184,8 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements IConne
protected boolean onIdleTimeout(long idleTimeout)
{
return delegate.onIdleTimeout(idleTimeout);
TimeoutException failure = new TimeoutException("Idle timeout " + idleTimeout + " ms");
return delegate.onIdleTimeout(idleTimeout, failure);
}
@Override

View File

@ -211,9 +211,10 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
public boolean onIdleExpired()
{
long idleTimeout = getEndPoint().getIdleTimeout();
boolean close = delegate.onIdleTimeout(idleTimeout);
TimeoutException failure = new TimeoutException("Idle timeout " + idleTimeout + " ms");
boolean close = delegate.onIdleTimeout(idleTimeout, failure);
if (close)
close(new TimeoutException("Idle timeout " + idleTimeout + " ms"));
close(failure);
return false;
}

View File

@ -100,11 +100,12 @@ class HTTPSessionListenerPromise extends Session.Listener.Adapter implements Pro
public boolean onIdleTimeout(Session session)
{
long idleTimeout = ((HTTP2Session)session).getEndPoint().getIdleTimeout();
if (failConnectionPromise(new TimeoutException("Idle timeout expired: " + idleTimeout + " ms")))
TimeoutException failure = new TimeoutException("Idle timeout expired: " + idleTimeout + " ms");
if (failConnectionPromise(failure))
return true;
HttpConnectionOverHTTP2 connection = this.connection.getReference();
if (connection != null)
return connection.onIdleTimeout(idleTimeout);
return connection.onIdleTimeout(idleTimeout, failure);
return true;
}

View File

@ -21,7 +21,6 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -49,7 +48,7 @@ import org.slf4j.LoggerFactory;
public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.Sweepable, ConnectionPool.Multiplexable
{
private static final Logger LOG = LoggerFactory.getLogger(HttpConnection.class);
private static final Logger LOG = LoggerFactory.getLogger(HttpConnectionOverHTTP2.class);
private final Set<HttpChannel> activeChannels = ConcurrentHashMap.newKeySet();
private final Queue<HttpChannelOverHTTP2> idleChannels = new ConcurrentLinkedQueue<>();
@ -186,11 +185,11 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
}
@Override
public boolean onIdleTimeout(long idleTimeout)
public boolean onIdleTimeout(long idleTimeout, Throwable failure)
{
boolean close = super.onIdleTimeout(idleTimeout);
boolean close = super.onIdleTimeout(idleTimeout, failure);
if (close)
close(new TimeoutException("idle_timeout"));
close(failure);
return false;
}

View File

@ -118,7 +118,8 @@ public class ClientHTTP3Session extends ClientProtocolSession
private void fail(Throwable failure)
{
// TODO: must close the connection.
// TODO
throw new UnsupportedOperationException();
}
private QuicStreamEndPoint openInstructionEndPoint(long streamId)

View File

@ -19,17 +19,17 @@ import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
public class ClientHTTP3StreamConnection extends HTTP3StreamConnection
{
private final ClientHTTP3Session http3Session;
private final ClientHTTP3Session session;
public ClientHTTP3StreamConnection(QuicStreamEndPoint endPoint, ClientHTTP3Session http3Session, MessageParser parser)
public ClientHTTP3StreamConnection(QuicStreamEndPoint endPoint, ClientHTTP3Session session, MessageParser parser)
{
super(endPoint, http3Session.getQuicSession().getExecutor(), http3Session.getQuicSession().getByteBufferPool(), parser);
this.http3Session = http3Session;
super(endPoint, session.getQuicSession().getExecutor(), session.getQuicSession().getByteBufferPool(), parser);
this.session = session;
}
@Override
protected void onDataAvailable(long streamId)
{
http3Session.onDataAvailable(streamId);
session.onDataAvailable(streamId);
}
}

View File

@ -107,6 +107,11 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
return List.copyOf(streams.values());
}
public int getMaxLocalStreams()
{
return session.getMaxLocalStreams();
}
@Override
public CompletableFuture<Void> goAway(boolean graceful)
{
@ -242,6 +247,11 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
getProtocolSession().outwardClose(error, reason);
}
public long getIdleTimeout()
{
return getProtocolSession().getIdleTimeout();
}
public long getStreamIdleTimeout()
{
return streamIdleTimeout;
@ -784,7 +794,6 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
notifyFailure = closeState == CloseState.NOT_CLOSED;
closeState = CloseState.CLOSED;
zeroStreamsAction = null;
// TODO: what about field shutdown?
}
// No point in closing the streams, as QUIC frames cannot be sent.
@ -825,6 +834,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
public void onSessionFailure(long error, String reason)
{
// TODO
throw new UnsupportedOperationException();
}
public void notifyFailure(Throwable failure)

View File

@ -171,6 +171,8 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, Attachable
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("could not read {}", this, x);
reset(HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code(), x);
// Rethrow to the application, so don't notify onFailure().
throw x;

View File

@ -213,23 +213,23 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
public void demand()
{
boolean interested;
boolean hasData;
boolean process = false;
try (AutoLock l = lock.lock())
{
interested = noData;
hasData = !noData;
dataDemand = true;
if (dataStalled)
if (dataStalled && hasData)
{
dataStalled = false;
process = true;
}
}
if (LOG.isDebugEnabled())
LOG.debug("demand, wasStalled={} on {}", process, this);
LOG.debug("demand, wasStalled={} hasData={} on {}", process, hasData, this);
if (process)
processDataDemand();
else if (interested)
processDataFrames();
else if (!hasData)
fillInterested();
}

View File

@ -22,6 +22,7 @@ import java.util.Objects;
import org.eclipse.jetty.client.AbstractHttpClientTransport;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpConnection;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.MultiplexConnectionPool;
@ -30,7 +31,9 @@ import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.http3.HTTP3Configuration;
import org.eclipse.jetty.http3.client.HTTP3Client;
import org.eclipse.jetty.http3.client.HTTP3ClientConnectionFactory;
import org.eclipse.jetty.http3.client.http.internal.HttpConnectionOverHTTP3;
import org.eclipse.jetty.http3.client.http.internal.SessionClientListener;
import org.eclipse.jetty.http3.client.internal.HTTP3SessionClient;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
@ -111,7 +114,9 @@ public class HttpClientTransportOverHTTP3 extends AbstractHttpClientTransport im
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
context.put(ClientConnector.CLIENT_CONNECTION_FACTORY_CONTEXT_KEY, destination.getClientConnectionFactory());
getHTTP3Client().connect(address, new SessionClientListener(context), context);
SessionClientListener listener = new TransportSessionClientListener(context);
getHTTP3Client().connect(address, listener, context)
.whenComplete(listener::onConnect);
}
@Override
@ -125,4 +130,23 @@ public class HttpClientTransportOverHTTP3 extends AbstractHttpClientTransport im
{
return factory.newConnection(endPoint, context);
}
protected HttpConnection newHttpConnection(HttpDestination destination, HTTP3SessionClient session)
{
return new HttpConnectionOverHTTP3(destination, session);
}
private class TransportSessionClientListener extends SessionClientListener
{
private TransportSessionClientListener(Map<String, Object> context)
{
super(context);
}
@Override
protected HttpConnectionOverHTTP3 newHttpConnection(HttpDestination destination, HTTP3SessionClient session)
{
return (HttpConnectionOverHTTP3)HttpClientTransportOverHTTP3.this.newHttpConnection(destination, session);
}
}
}

View File

@ -18,19 +18,23 @@ import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.HttpReceiver;
import org.eclipse.jetty.client.HttpSender;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.client.internal.HTTP3SessionClient;
import org.eclipse.jetty.http3.internal.HTTP3ErrorCode;
public class HttpChannelOverHTTP3 extends HttpChannel
{
private final HttpConnectionOverHTTP3 connection;
private final HTTP3SessionClient session;
private final HttpSenderOverHTTP3 sender;
private final HttpReceiverOverHTTP3 receiver;
private Stream stream;
public HttpChannelOverHTTP3(HttpDestination destination, HTTP3SessionClient session)
public HttpChannelOverHTTP3(HttpDestination destination, HttpConnectionOverHTTP3 connection, HTTP3SessionClient session)
{
super(destination);
this.connection = connection;
this.session = session;
sender = new HttpSenderOverHTTP3(this);
receiver = new HttpReceiverOverHTTP3(this);
@ -74,10 +78,23 @@ public class HttpChannelOverHTTP3 extends HttpChannel
sender.send(exchange);
}
@Override
public void exchangeTerminated(HttpExchange exchange, Result result)
{
super.exchangeTerminated(exchange, result);
Stream stream = getStream();
if (stream != null && result.isFailed())
stream.reset(HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code(), result.getFailure());
else
release();
}
@Override
public void release()
{
// TODO
setStream(null);
connection.release(this);
}
@Override

View File

@ -28,9 +28,13 @@ import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.SendFailure;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http3.client.internal.HTTP3SessionClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HttpConnectionOverHTTP3 extends HttpConnection implements ConnectionPool.Multiplexable
{
private static final Logger LOG = LoggerFactory.getLogger(HttpConnectionOverHTTP3.class);
private final Set<HttpChannel> activeChannels = ConcurrentHashMap.newKeySet();
private final AtomicBoolean closed = new AtomicBoolean();
private final HTTP3SessionClient session;
@ -41,11 +45,15 @@ public class HttpConnectionOverHTTP3 extends HttpConnection implements Connectio
this.session = session;
}
public HTTP3SessionClient getSession()
{
return session;
}
@Override
public int getMaxMultiplex()
{
// TODO: need to retrieve this via stats, and it's a fixed value.
return 1;
return session.getMaxLocalStreams();
}
@Override
@ -62,12 +70,27 @@ public class HttpConnectionOverHTTP3 extends HttpConnection implements Connectio
normalizeRequest(request);
// One connection maps to N channels, so one channel for each exchange.
HttpChannelOverHTTP3 channel = new HttpChannelOverHTTP3(getHttpDestination(), session);
HttpChannelOverHTTP3 channel = newHttpChannel();
activeChannels.add(channel);
return send(channel, exchange);
}
protected HttpChannelOverHTTP3 newHttpChannel()
{
return new HttpChannelOverHTTP3(getHttpDestination(), this, getSession());
}
public void release(HttpChannelOverHTTP3 channel)
{
if (LOG.isDebugEnabled())
LOG.debug("released {}", channel);
if (activeChannels.remove(channel))
getHttpDestination().release(this);
else
channel.destroy();
}
@Override
public boolean isClosed()
{
@ -80,7 +103,7 @@ public class HttpConnectionOverHTTP3 extends HttpConnection implements Connectio
close(new AsynchronousCloseException());
}
private void close(Throwable failure)
public void close(Throwable failure)
{
if (closed.compareAndSet(false, true))
{
@ -101,4 +124,12 @@ public class HttpConnectionOverHTTP3 extends HttpConnection implements Connectio
}
activeChannels.clear();
}
@Override
public boolean onIdleTimeout(long idleTimeout, Throwable failure)
{
if (super.onIdleTimeout(idleTimeout, failure))
close(failure);
return false;
}
}

View File

@ -13,6 +13,7 @@
package org.eclipse.jetty.http3.client.http.internal;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import org.eclipse.jetty.client.HttpExchange;
@ -110,45 +111,55 @@ public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Listen
if (exchange == null)
return;
Stream.Data data = stream.readData();
if (data != null)
try
{
ByteBuffer byteBuffer = data.getByteBuffer();
if (byteBuffer.hasRemaining())
Stream.Data data = stream.readData();
if (data != null)
{
Callback callback = Callback.from(Invocable.InvocationType.NON_BLOCKING, data::complete, x ->
ByteBuffer byteBuffer = data.getByteBuffer();
if (byteBuffer.hasRemaining())
{
Callback callback = Callback.from(Invocable.InvocationType.NON_BLOCKING, data::complete, x ->
{
data.complete();
if (responseFailure(x))
stream.reset(HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code(), x);
});
boolean proceed = responseContent(exchange, byteBuffer, callback);
if (proceed)
{
if (data.isLast())
responseSuccess(exchange);
else
stream.demand();
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("stalling response processing, no demand after {} on {}", data, this);
notifySuccess = data.isLast();
}
}
else
{
data.complete();
if (responseFailure(x))
stream.reset(HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code(), x);
});
boolean proceed = responseContent(exchange, byteBuffer, callback);
if (proceed)
{
if (data.isLast())
responseSuccess(exchange);
else
stream.demand();
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("stalling response processing, no demand after {} on {}", data, this);
notifySuccess = data.isLast();
}
}
else
{
data.complete();
if (data.isLast())
responseSuccess(exchange);
else
stream.demand();
stream.demand();
}
}
else
catch (Throwable x)
{
stream.demand();
Throwable failure = x;
if (x instanceof UncheckedIOException)
failure = x.getCause();
exchange.getRequest().abort(failure);
}
}
@ -163,4 +174,20 @@ public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Listen
trailers.forEach(exchange.getResponse()::trailer);
responseSuccess(exchange);
}
@Override
public boolean onIdleTimeout(Stream stream, Throwable failure)
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return false;
return !exchange.abort(failure);
}
@Override
public void onFailure(Stream stream, Throwable failure)
{
responseFailure(failure);
}
}

View File

@ -13,7 +13,10 @@
package org.eclipse.jetty.http3.client.http.internal;
import java.nio.channels.ClosedChannelException;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicMarkableReference;
import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.HttpDestination;
@ -21,10 +24,12 @@ import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.client.internal.HTTP3SessionClient;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.HTTP3Session;
import org.eclipse.jetty.util.Promise;
public class SessionClientListener implements Session.Client.Listener
{
private final AtomicMarkableReference<HttpConnectionOverHTTP3> connection = new AtomicMarkableReference<>(null, false);
private final Map<String, Object> context;
public SessionClientListener(Map<String, Object> context)
@ -32,17 +37,66 @@ public class SessionClientListener implements Session.Client.Listener
this.context = context;
}
public void onConnect(Session.Client session, Throwable failure)
{
if (failure != null)
failConnectionPromise(failure);
}
@Override
public void onSettings(Session session, SettingsFrame frame)
{
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
HttpConnectionOverHTTP3 connection = newHttpConnection(destination, (HTTP3SessionClient)session);
if (this.connection.compareAndSet(null, connection, false, true))
httpConnectionPromise().succeeded(connection);
}
@Override
public boolean onIdleTimeout(Session session)
{
long idleTimeout = ((HTTP3Session)session).getIdleTimeout();
TimeoutException timeout = new TimeoutException("idle timeout expired: " + idleTimeout + " ms");
if (failConnectionPromise(timeout))
return true;
HttpConnectionOverHTTP3 connection = this.connection.getReference();
if (connection != null)
return connection.onIdleTimeout(idleTimeout, timeout);
return true;
}
@Override
public void onDisconnect(Session session)
{
onFailure(session, new ClosedChannelException());
}
@Override
public void onFailure(Session session, Throwable failure)
{
if (failConnectionPromise(failure))
return;
HttpConnectionOverHTTP3 connection = this.connection.getReference();
if (connection != null)
connection.close(failure);
}
protected HttpConnectionOverHTTP3 newHttpConnection(HttpDestination destination, HTTP3SessionClient session)
{
return new HttpConnectionOverHTTP3(destination, session);
}
@SuppressWarnings("unchecked")
private Promise<Connection> httpConnectionPromise()
{
return (Promise<Connection>)context.get(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
}
@Override
public void onSettings(Session session, SettingsFrame frame)
private boolean failConnectionPromise(Throwable failure)
{
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
HttpConnectionOverHTTP3 connection = new HttpConnectionOverHTTP3(destination, (HTTP3SessionClient)session);
httpConnectionPromise().succeeded(connection);
boolean result = connection.compareAndSet(null, null, false, true);
if (result)
httpConnectionPromise().failed(failure);
return result;
}
}

View File

@ -13,15 +13,19 @@
package org.eclipse.jetty.http3.server;
import java.util.Objects;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.internal.HTTP3Session;
import org.eclipse.jetty.http3.internal.HTTP3Stream;
import org.eclipse.jetty.http3.server.internal.HttpChannelOverHTTP3;
import org.eclipse.jetty.http3.server.internal.ServerHTTP3Session;
import org.eclipse.jetty.http3.server.internal.ServerHTTP3StreamConnection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.HttpConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HTTP3ServerConnectionFactory extends AbstractHTTP3ServerConnectionFactory
{
@ -37,6 +41,8 @@ public class HTTP3ServerConnectionFactory extends AbstractHTTP3ServerConnectionF
private static class HTTP3SessionListener implements Session.Server.Listener
{
private static final Logger LOG = LoggerFactory.getLogger(HTTP3SessionListener.class);
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{
@ -45,6 +51,27 @@ public class HTTP3ServerConnectionFactory extends AbstractHTTP3ServerConnectionF
listener.onRequest(stream, frame);
return listener;
}
@Override
public boolean onIdleTimeout(Session session)
{
boolean result = session.getStreams().stream()
.map(stream -> (HTTP3Stream)stream)
.map(stream -> (HttpChannelOverHTTP3)stream.getAttachment())
.filter(Objects::nonNull)
.map(channel -> channel.getState().isIdle())
.reduce(true, Boolean::logicalAnd);
if (LOG.isDebugEnabled())
LOG.debug("{} idle timeout on {}", result ? "confirmed" : "ignored", session);
return result;
}
@Override
public void onFailure(Session session, Throwable failure)
{
// TODO
throw new UnsupportedOperationException();
}
}
private static class HTTP3StreamListener implements Stream.Listener

View File

@ -104,7 +104,7 @@ public class HttpChannelOverHTTP3 extends HttpChannel
if (LOG.isDebugEnabled())
{
LOG.debug("HTTP3 Request #{}/{}, delayed={}:{}{} {} {}{}{}",
LOG.debug("HTTP3 request #{}/{}, delayed={}:{}{} {} {}{}{}",
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
delayedUntilContent, System.lineSeparator(),
request.getMethod(), request.getURI(), request.getHttpVersion(),
@ -116,7 +116,7 @@ public class HttpChannelOverHTTP3 extends HttpChannel
catch (BadMessageException x)
{
if (LOG.isDebugEnabled())
LOG.debug("onRequest", x);
LOG.debug("onRequest() failure", x);
onBadMessage(x);
return null;
}
@ -132,7 +132,7 @@ public class HttpChannelOverHTTP3 extends HttpChannel
boolean woken = getRequest().getHttpInput().onContentProducible();
if (LOG.isDebugEnabled())
{
LOG.debug("HTTP3 Request #{}/{} woken: {}",
LOG.debug("HTTP3 request data available #{}/{} woken: {}",
stream.getId(),
Integer.toHexString(stream.getSession().hashCode()),
woken);
@ -140,7 +140,7 @@ public class HttpChannelOverHTTP3 extends HttpChannel
boolean wasDelayed = delayedUntilContent;
delayedUntilContent = false;
return wasDelayed ? this : null;
return wasDelayed || woken ? this : null;
}
public Runnable onTrailer(HeadersFrame frame)
@ -163,7 +163,7 @@ public class HttpChannelOverHTTP3 extends HttpChannel
boolean wasDelayed = delayedUntilContent;
delayedUntilContent = false;
return handle || wasDelayed ? this : null;
return wasDelayed || handle ? this : null;
}
public boolean onIdleTimeout(Throwable failure, Consumer<Runnable> consumer)

View File

@ -122,7 +122,7 @@ public class HttpTransportOverHTTP3 implements HttpTransport
else
{
dataFrame = new DataFrame(content, false);
trailersFrame = new HeadersFrame(new MetaData(HttpVersion.HTTP_2, trailers), true);
trailersFrame = new HeadersFrame(new MetaData(HttpVersion.HTTP_3, trailers), true);
}
}
else
@ -148,7 +148,7 @@ public class HttpTransportOverHTTP3 implements HttpTransport
else
{
headersFrame = new HeadersFrame(metaData, false);
trailersFrame = new HeadersFrame(new MetaData(HttpVersion.HTTP_2, trailers), true);
trailersFrame = new HeadersFrame(new MetaData(HttpVersion.HTTP_3, trailers), true);
}
}
}
@ -252,7 +252,8 @@ public class HttpTransportOverHTTP3 implements HttpTransport
@Override
public void push(MetaData.Request request)
{
// TODO implement
// TODO
throw new UnsupportedOperationException();
}
@Override
@ -497,7 +498,7 @@ public class HttpTransportOverHTTP3 implements HttpTransport
public void succeeded()
{
transportCallback.send(getCallback(), false, c ->
sendTrailerFrame(new MetaData(HttpVersion.HTTP_2, trailers), c));
sendTrailerFrame(new MetaData(HttpVersion.HTTP_3, trailers), c));
}
}
}

View File

@ -110,7 +110,8 @@ public class ServerHTTP3Session extends ServerProtocolSession
private void fail(Throwable failure)
{
// TODO: must close the connection.
// TODO
throw new UnsupportedOperationException();
}
private QuicStreamEndPoint openInstructionEndPoint(long streamId)

View File

@ -19,7 +19,6 @@ import org.eclipse.jetty.http3.internal.HTTP3StreamConnection;
import org.eclipse.jetty.http3.internal.parser.MessageParser;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpTransport;
@ -27,20 +26,20 @@ public class ServerHTTP3StreamConnection extends HTTP3StreamConnection
{
private final Connector connector;
private final HttpConfiguration httpConfiguration;
private final ServerHTTP3Session http3Session;
private final ServerHTTP3Session session;
public ServerHTTP3StreamConnection(Connector connector, HttpConfiguration httpConfiguration, QuicStreamEndPoint endPoint, ServerHTTP3Session http3Session, MessageParser parser)
public ServerHTTP3StreamConnection(Connector connector, HttpConfiguration httpConfiguration, QuicStreamEndPoint endPoint, ServerHTTP3Session session, MessageParser parser)
{
super(endPoint, connector.getExecutor(), connector.getByteBufferPool(), parser);
this.connector = connector;
this.httpConfiguration = httpConfiguration;
this.http3Session = http3Session;
this.session = session;
}
@Override
protected void onDataAvailable(long streamId)
{
http3Session.onDataAvailable(streamId);
session.onDataAvailable(streamId);
}
public Runnable onRequest(HTTP3Stream stream, HeadersFrame frame)

View File

@ -16,14 +16,14 @@ package org.eclipse.jetty.quic.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.DatagramChannelEndPoint;
import org.eclipse.jetty.io.EndPoint;
@ -49,11 +49,14 @@ public class ClientQuicConnection extends QuicConnection
private static final Logger LOG = LoggerFactory.getLogger(ClientQuicConnection.class);
private final Map<SocketAddress, ClientQuicSession> pendingSessions = new ConcurrentHashMap<>();
private final ClientConnector connector;
private final Map<String, Object> context;
private Scheduler.Task connectTask;
public ClientQuicConnection(Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, EndPoint endPoint, Map<String, Object> context)
public ClientQuicConnection(ClientConnector connector, EndPoint endPoint, Map<String, Object> context)
{
super(executor, scheduler, byteBufferPool, endPoint);
super(connector.getExecutor(), connector.getScheduler(), connector.getByteBufferPool(), endPoint);
this.connector = connector;
this.context = context;
}
@ -96,9 +99,13 @@ public class ClientQuicConnection extends QuicConnection
QuicheConnection quicheConnection = QuicheConnection.connect(quicheConfig, remoteAddress);
ClientQuicSession session = new ClientQuicSession(getExecutor(), getScheduler(), getByteBufferPool(), quicheConnection, this, remoteAddress, context);
pendingSessions.put(remoteAddress, session);
session.flush(); // send the response packet(s) that connect generated.
if (LOG.isDebugEnabled())
LOG.debug("created QUIC session {}", session);
LOG.debug("created {}", session);
connectTask = getScheduler().schedule(() -> connectTimeout(remoteAddress), connector.getConnectTimeout().toMillis(), TimeUnit.MILLISECONDS);
// Send the packets generated by the connect.
session.flush();
fillInterested();
}
@ -108,6 +115,21 @@ public class ClientQuicConnection extends QuicConnection
}
}
@Override
public void onFillable()
{
connectTask.cancel();
super.onFillable();
}
private void connectTimeout(SocketAddress remoteAddress)
{
if (LOG.isDebugEnabled())
LOG.debug("connect timeout {} ms to {} on {}", connector.getConnectTimeout(), remoteAddress, this);
close();
outwardClose(remoteAddress, new SocketTimeoutException("connect timeout"));
}
@Override
protected QuicSession createSession(SocketAddress remoteAddress, ByteBuffer cipherBuffer) throws IOException
{
@ -145,6 +167,11 @@ public class ClientQuicConnection extends QuicConnection
{
super.outwardClose(session, failure);
SocketAddress remoteAddress = session.getRemoteAddress();
outwardClose(remoteAddress, failure);
}
private void outwardClose(SocketAddress remoteAddress, Throwable failure)
{
if (remoteAddress != null)
{
if (pendingSessions.remove(remoteAddress) != null)

View File

@ -77,6 +77,6 @@ public class QuicClientConnectorConfigurator extends ClientConnector.Configurato
@Override
public Connection newConnection(ClientConnector clientConnector, SocketAddress address, EndPoint endPoint, Map<String, Object> context)
{
return configurator.apply(new ClientQuicConnection(clientConnector.getExecutor(), clientConnector.getScheduler(), clientConnector.getByteBufferPool(), endPoint, context));
return configurator.apply(new ClientQuicConnection(clientConnector, endPoint, context));
}
}

View File

@ -49,6 +49,16 @@ public abstract class ProtocolSession extends ContainerLifeCycle
return session;
}
public long getIdleTimeout()
{
return session.getIdleTimeout();
}
public int getMaxLocalStreams()
{
return session.getMaxLocalStreams();
}
public abstract Runnable getProducerTask();
protected void produce()

View File

@ -161,7 +161,6 @@ public abstract class QuicConnection extends AbstractConnection
@Override
public void close()
{
// This method should only be called when the client or the server are stopped.
if (closed.compareAndSet(false, true))
{
if (LOG.isDebugEnabled())
@ -171,7 +170,7 @@ public abstract class QuicConnection extends AbstractConnection
{
try
{
session.inwardClose(QuicErrorCode.NO_ERROR.code(), "stop");
session.inwardClose(QuicErrorCode.NO_ERROR.code(), "close");
}
catch (Throwable x)
{

View File

@ -154,6 +154,11 @@ public abstract class QuicSession extends ContainerLifeCycle
return protocolSession;
}
public int getMaxLocalStreams()
{
return quicheConnection.maxLocalStreams();
}
public String getNegotiatedProtocol()
{
return quicheConnection.getNegotiatedProtocol();

View File

@ -44,7 +44,6 @@ import org.slf4j.LoggerFactory;
public class QuicheConnection
{
private static final Logger LOG = LoggerFactory.getLogger(QuicheConnection.class);
// TODO: cannot be static!
private static final SecureRandom SECURE_RANDOM = new SecureRandom();
static
@ -527,6 +526,18 @@ public class QuicheConnection
}
}
public int maxLocalStreams()
{
try (AutoLock ignore = lock.lock())
{
if (quicheConn == null)
throw new IllegalStateException("connection was released");
LibQuiche.quiche_stats stats = new LibQuiche.quiche_stats();
LibQuiche.INSTANCE.quiche_conn_stats(quicheConn, stats);
return stats.peer_initial_max_streams_bidi.intValue();
}
}
public long windowCapacity()
{
try (AutoLock ignore = lock.lock())
@ -547,7 +558,10 @@ public class QuicheConnection
throw new IOException("connection was released");
long value = LibQuiche.INSTANCE.quiche_conn_stream_capacity(quicheConn, new uint64_t(streamId)).longValue();
if (value < 0)
throw new IOException(" failed to read capacity of stream " + streamId + "; err=" + LibQuiche.quiche_error.errToString(value));
{
if (LOG.isDebugEnabled())
LOG.debug("could not read window capacity for stream {} err={}", streamId, LibQuiche.quiche_error.errToString(value));
}
return value;
}
}

View File

@ -83,6 +83,7 @@ import static java.nio.ByteBuffer.wrap;
import static org.awaitility.Awaitility.await;
import static org.eclipse.jetty.http.client.Transport.FCGI;
import static org.eclipse.jetty.http.client.Transport.H2C;
import static org.eclipse.jetty.http.client.Transport.H3;
import static org.eclipse.jetty.http.client.Transport.HTTP;
import static org.eclipse.jetty.util.BufferUtil.toArray;
import static org.hamcrest.MatcherAssert.assertThat;
@ -419,7 +420,7 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
out.setWriteListener(new WriteListener()
{
@Override
public void onWritePossible() throws IOException
public void onWritePossible()
{
scenario.assertScope();
@ -567,8 +568,6 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
{
failed.set(result.isFailed());
clientLatch.countDown();
clientLatch.countDown();
clientLatch.countDown();
});
assertTrue(complete.await(10, TimeUnit.SECONDS));
@ -861,7 +860,6 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
{
init(transport);
String success = "SUCCESS";
AtomicBoolean allDataRead = new AtomicBoolean(false);
scenario.start(new HttpServlet()
{
@ -898,7 +896,6 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
{
scenario.assertScope();
output.write("FAILURE".getBytes(StandardCharsets.UTF_8));
allDataRead.set(true);
throw new IllegalStateException();
}
@ -1367,13 +1364,16 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
@ArgumentsSource(TransportProvider.class)
public void testAsyncEcho(Transport transport) throws Exception
{
// TODO: investigate why H3 does not work.
Assumptions.assumeTrue(transport != H3);
init(transport);
scenario.start(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
System.err.println("Service " + request);
System.err.println("service " + request);
AsyncContext asyncContext = request.startAsync();
ServletInputStream input = request.getInputStream();
@ -1396,7 +1396,7 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
}
@Override
public void onAllDataRead() throws IOException
public void onAllDataRead()
{
asyncContext.complete();
}

View File

@ -21,7 +21,7 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.util.DeferredContentProvider;
import org.eclipse.jetty.client.util.AsyncRequestContent;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.BufferUtil;
@ -107,11 +107,11 @@ public class BlockedIOTest extends AbstractTest<TransportScenario>
}
});
DeferredContentProvider contentProvider = new DeferredContentProvider();
AsyncRequestContent requestContent = new AsyncRequestContent();
CountDownLatch ok = new CountDownLatch(2);
scenario.client.newRequest(scenario.newURI())
.method("POST")
.content(contentProvider)
.body(requestContent)
.onResponseContent((response, content) ->
{
assertThat(BufferUtil.toString(content), containsString("OK"));
@ -131,7 +131,7 @@ public class BlockedIOTest extends AbstractTest<TransportScenario>
}
})
.send(null);
contentProvider.offer(BufferUtil.toBuffer("1"));
requestContent.offer(BufferUtil.toBuffer("1"));
assertTrue(ok.await(10, TimeUnit.SECONDS));
assertThat(readException.get(), instanceOf(IOException.class));

View File

@ -21,6 +21,7 @@ import java.util.function.Predicate;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.HttpConnection;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.api.Connection;
@ -36,6 +37,11 @@ import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.http2.client.http.HttpChannelOverHTTP2;
import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2;
import org.eclipse.jetty.http2.client.http.HttpConnectionOverHTTP2;
import org.eclipse.jetty.http3.client.HTTP3Client;
import org.eclipse.jetty.http3.client.http.HttpClientTransportOverHTTP3;
import org.eclipse.jetty.http3.client.http.internal.HttpChannelOverHTTP3;
import org.eclipse.jetty.http3.client.http.internal.HttpConnectionOverHTTP3;
import org.eclipse.jetty.http3.client.internal.HTTP3SessionClient;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Promise;
@ -175,8 +181,32 @@ public class HttpChannelAssociationTest extends AbstractTest<TransportScenario>
}
case H3:
{
// TODO:
throw new UnsupportedOperationException();
HTTP3Client http3Client = new HTTP3Client();
http3Client.getClientConnector().setSelectors(1);
http3Client.getClientConnector().setSslContextFactory(scenario.newClientSslContextFactory());
http3Client.getQuicConfiguration().setVerifyPeerCertificates(false);
return new HttpClientTransportOverHTTP3(http3Client)
{
@Override
protected HttpConnection newHttpConnection(HttpDestination destination, HTTP3SessionClient session)
{
return new HttpConnectionOverHTTP3(destination, session)
{
@Override
protected HttpChannelOverHTTP3 newHttpChannel()
{
return new HttpChannelOverHTTP3(getHttpDestination(), this, getSession())
{
@Override
public boolean associate(HttpExchange exchange)
{
return code.test(exchange) && super.associate(exchange);
}
};
}
};
}
};
}
case FCGI:
{

View File

@ -17,7 +17,6 @@ import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -33,7 +32,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class HttpClientIdleTimeoutTest extends AbstractTest<TransportScenario>
{
private long idleTimeout = 1000;
private final long idleTimeout = 1000;
@Override
public void init(Transport transport) throws IOException
@ -49,7 +48,7 @@ public class HttpClientIdleTimeoutTest extends AbstractTest<TransportScenario>
scenario.startServer(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
{
baseRequest.setHandled(true);
if (target.equals("/timeout"))
@ -85,7 +84,7 @@ public class HttpClientIdleTimeoutTest extends AbstractTest<TransportScenario>
scenario.start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
{
baseRequest.setHandled(true);
if (target.equals("/timeout"))
@ -139,7 +138,7 @@ public class HttpClientIdleTimeoutTest extends AbstractTest<TransportScenario>
{
init(transport);
scenario.start(new EmptyServerHandler());
scenario.setServerIdleTimeout(idleTimeout);
scenario.setConnectionIdleTimeout(idleTimeout);
ContentResponse response1 = scenario.client.newRequest(scenario.newURI()).send();
assertEquals(HttpStatus.OK_200, response1.getStatus());

View File

@ -112,7 +112,7 @@ public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario
{
init(transport);
int multiplex = 1;
if (scenario.transport.isHttp2Based())
if (scenario.transport.isMultiplexed())
multiplex = 4;
int maxMultiplex = multiplex;
@ -207,7 +207,7 @@ public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario
init(transport);
int multiplex = 1;
if (scenario.transport.isHttp2Based())
if (scenario.transport.isMultiplexed())
multiplex = 2;
int maxMultiplex = multiplex;

View File

@ -599,7 +599,7 @@ public class ServerTimeoutsTest extends AbstractTest<TransportScenario>
// In HTTP/2, we force the flow control window to be small, so that the server
// stalls almost immediately without having written many bytes, so that the test
// completes quickly.
Assumptions.assumeTrue(transport.isHttp2Based());
Assumptions.assumeTrue(transport.isMultiplexed());
init(transport);

View File

@ -22,9 +22,9 @@ public enum Transport
return this == HTTP || this == HTTPS;
}
public boolean isHttp2Based()
public boolean isMultiplexed()
{
return this == H2C || this == H2;
return this == H2C || this == H2 || this == H3;
}
public boolean isTlsBased()

View File

@ -1,4 +1,5 @@
#org.eclipse.jetty.LEVEL=DEBUG
org.eclipse.jetty.jmx.LEVEL=INFO
#org.eclipse.jetty.client.LEVEL=DEBUG
#org.eclipse.jetty.fcgi.LEVEL=DEBUG
#org.eclipse.jetty.proxy.LEVEL=DEBUG
@ -9,4 +10,3 @@ org.eclipse.jetty.http2.hpack.LEVEL=INFO
org.eclipse.jetty.http3.qpack.LEVEL=INFO
#org.eclipse.jetty.quic.LEVEL=DEBUG
org.eclipse.jetty.quic.quiche.LEVEL=INFO
#org.eclipse.jetty.io.LEVEL=DEBUG