Issue #6728 - QUIC and HTTP/3

- WIP on the client upper layer.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-10-19 00:25:16 +02:00
parent ec6ef66b5e
commit 0b5241df6b
32 changed files with 660 additions and 146 deletions

View File

@ -13,6 +13,7 @@
package org.eclipse.jetty.client;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
@ -400,6 +401,24 @@ public abstract class HttpSender
return updated;
}
protected String relativize(String path)
{
try
{
String result = path;
URI uri = URI.create(result);
if (uri.isAbsolute())
result = uri.getPath();
return result.isEmpty() ? "/" : result;
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Could not relativize {}", path);
return path;
}
}
@Override
public String toString()
{

View File

@ -20,7 +20,6 @@ import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -125,10 +124,9 @@ public class HttpClientTransportDynamic extends AbstractConnectorHttpClientTrans
private static ClientConnector findClientConnector(ClientConnectionFactory.Info[] infos)
{
return Arrays.stream(infos)
.map(info -> info.getBean(ClientConnector.class))
.filter(Objects::nonNull)
.flatMap(info -> info.getContainedBeans(ClientConnector.class).stream())
.findFirst()
.orElse(new ClientConnector());
.orElseGet(ClientConnector::new);
}
@Override

View File

@ -313,7 +313,6 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel.
}
}
ByteBuffer buffer = dataInfo.frame.getData();
Callback callback = dataInfo.callback;
if (buffer.hasRemaining())

View File

@ -13,7 +13,6 @@
package org.eclipse.jetty.http2.client.http;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.function.Supplier;
@ -34,13 +33,9 @@ import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HttpSenderOverHTTP2 extends HttpSender
{
private static final Logger LOG = LoggerFactory.getLogger(HttpSenderOverHTTP2.class);
public HttpSenderOverHTTP2(HttpChannelOverHTTP2 channel)
{
super(channel);
@ -132,24 +127,6 @@ public class HttpSenderOverHTTP2 extends HttpSender
((ISession)channel.getSession()).newStream(frameList, new HeadersPromise(request, callback), channel.getStreamListener());
}
private String relativize(String path)
{
try
{
String result = path;
URI uri = URI.create(result);
if (uri.isAbsolute())
result = uri.getPath();
return result.isEmpty() ? "/" : result;
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Could not relativize {}", path);
return path;
}
}
private HttpFields retrieveTrailers(HttpRequest request)
{
Supplier<HttpFields> trailerSupplier = request.getTrailers();

View File

@ -23,4 +23,7 @@ module org.eclipse.jetty.http3.client
requires transitive org.eclipse.jetty.util;
exports org.eclipse.jetty.http3.client;
exports org.eclipse.jetty.http3.client.internal to
org.eclipse.jetty.http3.http.client.transport;
}

View File

@ -23,9 +23,9 @@ import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.ControlFlusher;
import org.eclipse.jetty.http3.internal.DecoderStreamConnection;
import org.eclipse.jetty.http3.internal.EncoderStreamConnection;
import org.eclipse.jetty.http3.internal.HTTP3Flusher;
import org.eclipse.jetty.http3.internal.InstructionFlusher;
import org.eclipse.jetty.http3.internal.InstructionHandler;
import org.eclipse.jetty.http3.internal.MessageFlusher;
import org.eclipse.jetty.http3.internal.UnidirectionalStreamConnection;
import org.eclipse.jetty.http3.qpack.QpackDecoder;
import org.eclipse.jetty.http3.qpack.QpackEncoder;
@ -47,7 +47,7 @@ public class ClientHTTP3Session extends ClientProtocolSession
private final QpackDecoder decoder;
private final HTTP3SessionClient session;
private final ControlFlusher controlFlusher;
private final HTTP3Flusher messageFlusher;
private final MessageFlusher messageFlusher;
public ClientHTTP3Session(HTTP3Configuration configuration, ClientQuicSession quicSession, Session.Client.Listener listener, Promise<Session.Client> promise)
{
@ -82,7 +82,7 @@ public class ClientHTTP3Session extends ClientProtocolSession
if (LOG.isDebugEnabled())
LOG.debug("created control stream #{} on {}", controlStreamId, controlEndPoint);
this.messageFlusher = new HTTP3Flusher(quicSession.getByteBufferPool(), encoder, configuration.getMaxRequestHeadersSize(), configuration.isUseOutputDirectByteBuffers());
this.messageFlusher = new MessageFlusher(quicSession.getByteBufferPool(), encoder, configuration.getMaxRequestHeadersSize(), configuration.isUseOutputDirectByteBuffers());
addBean(messageFlusher);
}
@ -97,9 +97,8 @@ public class ClientHTTP3Session extends ClientProtocolSession
}
@Override
protected void doStart() throws Exception
protected void initialize()
{
super.doStart();
// Queue the mandatory SETTINGS frame.
Map<Long, Long> settings = session.onPreface();
if (settings == null)

View File

@ -51,7 +51,8 @@ public class HTTP3SessionClient extends HTTP3Session implements Session.Client
public void onOpen()
{
super.onOpen();
promise.succeeded(this);
if (promise != null)
promise.succeeded(this);
}
@Override

View File

@ -200,7 +200,7 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, Attachable
if (listener == null)
{
Callback callback = Callback.from(Invocable.InvocationType.NON_BLOCKING, () -> endPoint.shutdownInput(HTTP3ErrorCode.NO_ERROR.code()));
session.writeMessageFrame(getId(), new HTTP3Flusher.FlushFrame(), callback);
session.writeMessageFrame(getId(), new MessageFlusher.FlushFrame(), callback);
}
updateClose(frame.isLast(), false);
}

View File

@ -29,9 +29,9 @@ import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HTTP3Flusher extends IteratingCallback
public class MessageFlusher extends IteratingCallback
{
private static final Logger LOG = LoggerFactory.getLogger(HTTP3Flusher.class);
private static final Logger LOG = LoggerFactory.getLogger(MessageFlusher.class);
private final AutoLock lock = new AutoLock();
private final Queue<Entry> queue = new ArrayDeque<>();
@ -39,7 +39,7 @@ public class HTTP3Flusher extends IteratingCallback
private final MessageGenerator generator;
private Entry entry;
public HTTP3Flusher(ByteBufferPool byteBufferPool, QpackEncoder encoder, int maxHeadersLength, boolean useDirectByteBuffers)
public MessageFlusher(ByteBufferPool byteBufferPool, QpackEncoder encoder, int maxHeadersLength, boolean useDirectByteBuffers)
{
this.lease = new ByteBufferPool.Lease(byteBufferPool);
this.generator = new MessageGenerator(encoder, maxHeadersLength, useDirectByteBuffers);

View File

@ -18,6 +18,12 @@
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-client</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.http3</groupId>
<artifactId>http3-server</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -13,7 +13,6 @@
package org.eclipse.jetty.http3.client.http;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@ -21,13 +20,16 @@ import org.eclipse.jetty.client.dynamic.HttpClientTransportDynamic;
import org.eclipse.jetty.client.http.HttpClientConnectionFactory;
import org.eclipse.jetty.http3.client.HTTP3Client;
import org.eclipse.jetty.http3.client.HTTP3ClientConnectionFactory;
import org.eclipse.jetty.http3.client.http.internal.SessionClientListener;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.quic.common.ProtocolSession;
import org.eclipse.jetty.quic.common.QuicSession;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
public class ClientConnectionFactoryOverHTTP3 extends ContainerLifeCycle implements ClientConnectionFactory
{
private final ClientConnectionFactory factory = new HTTP3ClientConnectionFactory();
private final HTTP3ClientConnectionFactory factory = new HTTP3ClientConnectionFactory();
private final HTTP3Client client;
public ClientConnectionFactoryOverHTTP3(HTTP3Client client)
@ -37,14 +39,9 @@ public class ClientConnectionFactoryOverHTTP3 extends ContainerLifeCycle impleme
}
@Override
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context)
{
// HTTPSessionListenerPromise listenerPromise = new HTTPSessionListenerPromise(context);
// context.put(HTTP2ClientConnectionFactory.CLIENT_CONTEXT_KEY, client);
// context.put(HTTP2ClientConnectionFactory.SESSION_LISTENER_CONTEXT_KEY, listenerPromise);
// context.put(HTTP2ClientConnectionFactory.SESSION_PROMISE_CONTEXT_KEY, listenerPromise);
// return factory.newConnection(endPoint, context);
return null;
return factory.newConnection(endPoint, context);
}
/**
@ -52,7 +49,7 @@ public class ClientConnectionFactoryOverHTTP3 extends ContainerLifeCycle impleme
*
* @see HttpClientConnectionFactory#HTTP11
*/
public static class HTTP3 extends Info
public static class HTTP3 extends Info implements ProtocolSession.Factory
{
public HTTP3(HTTP3Client client)
{
@ -65,6 +62,16 @@ public class ClientConnectionFactoryOverHTTP3 extends ContainerLifeCycle impleme
return List.of("h3");
}
@Override
public ProtocolSession newProtocolSession(QuicSession quicSession, Map<String, Object> context)
{
ClientConnectionFactoryOverHTTP3 http3 = (ClientConnectionFactoryOverHTTP3)getClientConnectionFactory();
context.put(HTTP3Client.CLIENT_CONTEXT_KEY, http3.client);
SessionClientListener listener = new SessionClientListener(context);
context.put(HTTP3Client.SESSION_LISTENER_CONTEXT_KEY, listener);
return http3.factory.newProtocolSession(quicSession, context);
}
@Override
public String toString()
{

View File

@ -23,22 +23,17 @@ import java.util.Objects;
import org.eclipse.jetty.client.AbstractHttpClientTransport;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.MultiplexConnectionPool;
import org.eclipse.jetty.client.MultiplexHttpDestination;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.http3.HTTP3Configuration;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.client.HTTP3Client;
import org.eclipse.jetty.http3.client.http.internal.HttpConnectionOverHTTP3;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.HTTP3Session;
import org.eclipse.jetty.http3.client.http.internal.SessionClientListener;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Promise;
public class HttpClientTransportOverHTTP3 extends AbstractHttpClientTransport
{
@ -121,28 +116,4 @@ public class HttpClientTransportOverHTTP3 extends AbstractHttpClientTransport
{
return null;
}
private class SessionClientListener implements Session.Client.Listener
{
private final Map<String, Object> context;
private SessionClientListener(Map<String, Object> context)
{
this.context = context;
}
@SuppressWarnings("unchecked")
private Promise<org.eclipse.jetty.client.api.Connection> httpConnectionPromise()
{
return (Promise<org.eclipse.jetty.client.api.Connection>)context.get(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
}
@Override
public void onSettings(Session session, SettingsFrame frame)
{
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
HttpConnectionOverHTTP3 connection = new HttpConnectionOverHTTP3(destination, (HTTP3Session)session);
httpConnectionPromise().succeeded(connection);
}
}
}

View File

@ -18,35 +18,63 @@ import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.HttpReceiver;
import org.eclipse.jetty.client.HttpSender;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.client.internal.HTTP3SessionClient;
public class HttpChannelOverHTTP3 extends HttpChannel
{
public HttpChannelOverHTTP3(HttpDestination destination)
private final HTTP3SessionClient session;
private final HttpSenderOverHTTP3 sender;
private final HttpReceiverOverHTTP3 receiver;
public HttpChannelOverHTTP3(HttpDestination destination, HTTP3SessionClient session)
{
super(destination);
this.session = session;
sender = new HttpSenderOverHTTP3(this);
receiver = new HttpReceiverOverHTTP3(this);
}
public HTTP3SessionClient getSession()
{
return session;
}
public Stream.Listener getStreamListener()
{
return receiver;
}
@Override
protected HttpSender getHttpSender()
{
return null;
return sender;
}
@Override
protected HttpReceiver getHttpReceiver()
{
return null;
return receiver;
}
@Override
public void send(HttpExchange exchange)
{
sender.send(exchange);
}
@Override
public void release()
{
// TODO
}
@Override
public String toString()
{
return String.format("%s[send=%s,recv=%s]",
super.toString(),
sender,
receiver);
}
}

View File

@ -27,15 +27,15 @@ import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.SendFailure;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http3.internal.HTTP3Session;
import org.eclipse.jetty.http3.client.internal.HTTP3SessionClient;
public class HttpConnectionOverHTTP3 extends HttpConnection implements ConnectionPool.Multiplexable
{
private final Set<HttpChannel> activeChannels = ConcurrentHashMap.newKeySet();
private final AtomicBoolean closed = new AtomicBoolean();
private final HTTP3Session session;
private final HTTP3SessionClient session;
public HttpConnectionOverHTTP3(HttpDestination destination, HTTP3Session session)
public HttpConnectionOverHTTP3(HttpDestination destination, HTTP3SessionClient session)
{
super(destination);
this.session = session;
@ -62,7 +62,7 @@ public class HttpConnectionOverHTTP3 extends HttpConnection implements Connectio
normalizeRequest(request);
// One connection maps to N channels, so one channel for each exchange.
HttpChannelOverHTTP3 channel = new HttpChannelOverHTTP3(getHttpDestination());
HttpChannelOverHTTP3 channel = new HttpChannelOverHTTP3(getHttpDestination(), session);
activeChannels.add(channel);
return send(channel, exchange);

View File

@ -0,0 +1,148 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.client.http.internal;
import java.nio.ByteBuffer;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.HttpReceiver;
import org.eclipse.jetty.client.HttpResponse;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.Invocable;
public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Listener
{
protected HttpReceiverOverHTTP3(HttpChannelOverHTTP3 channel)
{
super(channel);
}
@Override
protected HttpChannelOverHTTP3 getHttpChannel()
{
return (HttpChannelOverHTTP3)super.getHttpChannel();
}
@Override
public void onResponse(Stream stream, HeadersFrame frame)
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return;
HttpResponse httpResponse = exchange.getResponse();
MetaData.Response response = (MetaData.Response)frame.getMetaData();
httpResponse.version(response.getHttpVersion()).status(response.getStatus()).reason(response.getReason());
if (responseBegin(exchange))
{
HttpFields headers = response.getFields();
for (HttpField header : headers)
{
if (!responseHeader(exchange, header))
return;
}
// TODO: add support for HttpMethod.CONNECT.
if (responseHeaders(exchange))
{
int status = response.getStatus();
boolean informational = HttpStatus.isInformational(status) && status != HttpStatus.SWITCHING_PROTOCOLS_101;
if (frame.isLast() || informational)
responseSuccess(exchange);
else
stream.demand();
}
else
{
if (frame.isLast())
{
// There is no demand to trigger response success, so add
// a poison pill to trigger it when there will be demand.
// TODO
// notifyContent(exchange, new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP);
}
}
}
}
@Override
protected void receive()
{
// Called when the application resumes demand of content.
// TODO: stream.demand() should be enough.
}
@Override
public void onDataAvailable(Stream stream)
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return;
Stream.Data data = stream.readData();
if (data != null)
{
ByteBuffer byteBuffer = data.getByteBuffer();
if (byteBuffer.hasRemaining())
{
// TODO: callback failure should invoke responseFailure().
Callback callback = Callback.from(Invocable.InvocationType.NON_BLOCKING, data::complete);
boolean proceed = responseContent(exchange, byteBuffer, callback);
if (proceed)
{
if (data.isLast())
responseSuccess(exchange);
else
stream.demand();
}
}
else
{
data.complete();
if (data.isLast())
responseSuccess(exchange);
else
stream.demand();
}
}
else
{
stream.demand();
}
}
@Override
public void onTrailer(Stream stream, HeadersFrame frame)
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return;
HttpFields trailers = frame.getMetaData().getFields();
trailers.forEach(exchange.getResponse()::trailer);
// Previous DataFrames had endStream=false, so
// add a poison pill to trigger response success
// after all normal DataFrames have been consumed.
// TODO
// notifyContent(exchange, new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP);
}
}

View File

@ -0,0 +1,207 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.client.http.internal;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.HttpSender;
import org.eclipse.jetty.http.HostPortHttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.client.internal.HTTP3SessionClient;
import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.internal.HTTP3Stream;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
public class HttpSenderOverHTTP3 extends HttpSender
{
private Stream stream;
public HttpSenderOverHTTP3(HttpChannelOverHTTP3 channel)
{
super(channel);
}
@Override
protected HttpChannelOverHTTP3 getHttpChannel()
{
return (HttpChannelOverHTTP3)super.getHttpChannel();
}
@Override
protected void sendHeaders(HttpExchange exchange, ByteBuffer contentBuffer, boolean lastContent, Callback callback)
{
HttpRequest request = exchange.getRequest();
boolean isTunnel = HttpMethod.CONNECT.is(request.getMethod());
MetaData.Request metaData;
if (isTunnel)
{
String upgradeProtocol = request.getUpgradeProtocol();
if (upgradeProtocol == null)
{
metaData = new MetaData.ConnectRequest((String)null, new HostPortHttpField(request.getPath()), null, request.getHeaders(), null);
}
else
{
HostPortHttpField authority = new HostPortHttpField(request.getHost(), request.getPort());
metaData = new MetaData.ConnectRequest(request.getScheme(), authority, request.getPath(), request.getHeaders(), upgradeProtocol);
}
}
else
{
String path = relativize(request.getPath());
HttpURI uri = HttpURI.build()
.scheme(request.getScheme())
.host(request.getHost())
.port(request.getPort())
.path(path)
.query(request.getQuery());
metaData = new MetaData.Request(request.getMethod(), uri, HttpVersion.HTTP_3, request.getHeaders(), -1, request.getTrailers());
}
HeadersFrame headersFrame;
DataFrame dataFrame = null;
HeadersFrame trailerFrame = null;
if (isTunnel)
{
headersFrame = new HeadersFrame(metaData, false);
}
else
{
boolean hasContent = BufferUtil.hasContent(contentBuffer);
if (hasContent)
{
headersFrame = new HeadersFrame(metaData, false);
if (lastContent)
{
HttpFields trailers = retrieveTrailers(request);
boolean hasTrailers = trailers != null;
dataFrame = new DataFrame(contentBuffer, !hasTrailers);
if (hasTrailers)
trailerFrame = new HeadersFrame(new MetaData(HttpVersion.HTTP_3, trailers), true);
}
else
{
dataFrame = new DataFrame(contentBuffer, false);
}
}
else
{
if (lastContent)
{
HttpFields trailers = retrieveTrailers(request);
boolean hasTrailers = trailers != null;
headersFrame = new HeadersFrame(metaData, !hasTrailers);
if (hasTrailers)
trailerFrame = new HeadersFrame(new MetaData(HttpVersion.HTTP_3, trailers), true);
}
else
{
headersFrame = new HeadersFrame(metaData, false);
}
}
}
HeadersFrame hf = headersFrame;
DataFrame df = dataFrame;
HeadersFrame tf = trailerFrame;
HTTP3SessionClient session = getHttpChannel().getSession();
CompletableFuture<Stream> completable = session.newRequest(hf, getHttpChannel().getStreamListener())
.thenApply(stream -> onNewStream(stream, request));
if (df != null)
completable = completable.thenCompose(stream -> stream.data(df));
if (tf != null)
completable = completable.thenCompose(stream -> stream.trailer(tf));
callback.completeWith(completable);
}
private Stream onNewStream(Stream stream, HttpRequest request)
{
this.stream = stream;
long idleTimeout = request.getIdleTimeout();
if (idleTimeout > 0)
((HTTP3Stream)stream).setIdleTimeout(idleTimeout);
return stream;
}
private HttpFields retrieveTrailers(HttpRequest request)
{
Supplier<HttpFields> trailerSupplier = request.getTrailers();
HttpFields trailers = trailerSupplier == null ? null : trailerSupplier.get();
return trailers == null || trailers.size() == 0 ? null : trailers;
}
@Override
protected void sendContent(HttpExchange exchange, ByteBuffer contentBuffer, boolean lastContent, Callback callback)
{
boolean hasContent = contentBuffer.hasRemaining();
if (lastContent)
{
// Call the trailers supplier as late as possible.
HttpFields trailers = retrieveTrailers(exchange.getRequest());
boolean hasTrailers = trailers != null && trailers.size() > 0;
if (hasContent)
{
DataFrame dataFrame = new DataFrame(contentBuffer, !hasTrailers);
CompletableFuture<Stream> completable;
if (hasTrailers)
completable = stream.data(dataFrame).thenCompose(s -> sendTrailer(s, trailers));
else
completable = stream.data(dataFrame);
callback.completeWith(completable);
}
else
{
CompletableFuture<Stream> completable;
if (hasTrailers)
completable = sendTrailer(stream, trailers);
else
completable = stream.data(new DataFrame(contentBuffer, true));
callback.completeWith(completable);
}
}
else
{
if (hasContent)
{
CompletableFuture<Stream> completable = stream.data(new DataFrame(contentBuffer, false));
callback.completeWith(completable);
}
else
{
// Don't send empty non-last content.
callback.succeeded();
}
}
}
private CompletableFuture<Stream> sendTrailer(Stream stream, HttpFields trailers)
{
MetaData metaData = new MetaData(HttpVersion.HTTP_3, trailers);
HeadersFrame trailerFrame = new HeadersFrame(metaData, true);
return stream.trailer(trailerFrame);
}
}

View File

@ -0,0 +1,48 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.client.http.internal;
import java.util.Map;
import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.client.internal.HTTP3SessionClient;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.util.Promise;
public class SessionClientListener implements Session.Client.Listener
{
private final Map<String, Object> context;
public SessionClientListener(Map<String, Object> context)
{
this.context = context;
}
@SuppressWarnings("unchecked")
private Promise<Connection> httpConnectionPromise()
{
return (Promise<Connection>)context.get(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
}
@Override
public void onSettings(Session session, SettingsFrame frame)
{
HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY);
HttpConnectionOverHTTP3 connection = new HttpConnectionOverHTTP3(destination, (HTTP3SessionClient)session);
httpConnectionPromise().succeeded(connection);
}
}

View File

@ -13,7 +13,6 @@
package org.eclipse.jetty.http3.server.internal;
import java.nio.ByteBuffer;
import java.util.function.Consumer;
import org.eclipse.jetty.http.BadMessageException;
@ -25,10 +24,8 @@ import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http.PreEncodedHttpField;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.internal.HTTP3Stream;
import org.eclipse.jetty.http3.internal.parser.MessageParser;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpChannel;
@ -230,12 +227,16 @@ public class HttpChannelOverHTTP3 extends HttpChannel
if (content != null)
{
HttpInput.Content result = content;
if (!content.isSpecial())
if (!result.isSpecial())
content = null;
if (LOG.isDebugEnabled())
LOG.debug("produced content {} on {}", result, this);
return result;
}
Stream.Data data = stream.readData();
if (LOG.isDebugEnabled())
LOG.debug("read {} on {}", data, this);
if (data == null)
return null;
@ -270,9 +271,11 @@ public class HttpChannelOverHTTP3 extends HttpChannel
handle |= handleContent | handleRequest;
}
HttpInput.Content result = this.content;
if (result != null && !result.isSpecial())
HttpInput.Content result = content;
if (!result.isSpecial())
content = result.isEof() ? new HttpInput.EofContent() : null;
if (LOG.isDebugEnabled())
LOG.debug("produced new content {} on {}", result, this);
return result;
}

View File

@ -23,9 +23,9 @@ import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.ControlFlusher;
import org.eclipse.jetty.http3.internal.DecoderStreamConnection;
import org.eclipse.jetty.http3.internal.EncoderStreamConnection;
import org.eclipse.jetty.http3.internal.HTTP3Flusher;
import org.eclipse.jetty.http3.internal.InstructionFlusher;
import org.eclipse.jetty.http3.internal.InstructionHandler;
import org.eclipse.jetty.http3.internal.MessageFlusher;
import org.eclipse.jetty.http3.internal.UnidirectionalStreamConnection;
import org.eclipse.jetty.http3.qpack.QpackDecoder;
import org.eclipse.jetty.http3.qpack.QpackEncoder;
@ -46,7 +46,7 @@ public class ServerHTTP3Session extends ServerProtocolSession
private final QpackDecoder decoder;
private final HTTP3SessionServer session;
private final ControlFlusher controlFlusher;
private final HTTP3Flusher messageFlusher;
private final MessageFlusher messageFlusher;
public ServerHTTP3Session(HTTP3Configuration configuration, ServerQuicSession quicSession, Session.Server.Listener listener)
{
@ -81,7 +81,7 @@ public class ServerHTTP3Session extends ServerProtocolSession
if (LOG.isDebugEnabled())
LOG.debug("created control stream #{} on {}", controlStreamId, controlEndPoint);
this.messageFlusher = new HTTP3Flusher(quicSession.getByteBufferPool(), encoder, configuration.getMaxResponseHeadersSize(), configuration.isUseOutputDirectByteBuffers());
this.messageFlusher = new MessageFlusher(quicSession.getByteBufferPool(), encoder, configuration.getMaxResponseHeadersSize(), configuration.isUseOutputDirectByteBuffers());
addBean(messageFlusher);
}

View File

@ -14,19 +14,31 @@
<dependency>
<groupId>org.eclipse.jetty.http3</groupId>
<artifactId>http3-client</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.http3</groupId>
<artifactId>http3-http-client-transport</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.http3</groupId>
<artifactId>http3-server</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.quic</groupId>
<artifactId>quic-server</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-alpn-java-server</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.http2</groupId>
<artifactId>http2-server</artifactId>
<scope>test</scope>
</dependency>
<dependency>
@ -34,13 +46,10 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-slf4j-impl</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

View File

@ -16,6 +16,8 @@ package org.eclipse.jetty.http3.tests;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.dynamic.HttpClientTransportDynamic;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpURI;
@ -23,6 +25,7 @@ import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.client.HTTP3Client;
import org.eclipse.jetty.http3.client.http.ClientConnectionFactoryOverHTTP3;
import org.eclipse.jetty.http3.server.HTTP3ServerConnectionFactory;
import org.eclipse.jetty.http3.server.HTTP3ServerConnector;
import org.eclipse.jetty.http3.server.RawHTTP3ServerConnectionFactory;
@ -41,9 +44,10 @@ public class AbstractClientServerTest
@RegisterExtension
final BeforeTestExecutionCallback printMethodName = context ->
System.err.printf("Running %s.%s() %s%n", context.getRequiredTestClass().getSimpleName(), context.getRequiredTestMethod().getName(), context.getDisplayName());
protected HTTP3ServerConnector connector;
protected HTTP3Client client;
protected Server server;
protected HTTP3ServerConnector connector;
protected HTTP3Client http3Client;
protected HttpClient httpClient;
protected void start(Handler handler) throws Exception
{
@ -79,14 +83,18 @@ public class AbstractClientServerTest
protected void startClient() throws Exception
{
client = new HTTP3Client();
client.start();
http3Client = new HTTP3Client();
httpClient = new HttpClient(new HttpClientTransportDynamic(new ClientConnectionFactoryOverHTTP3.HTTP3(http3Client)));
QueuedThreadPool clientThreads = new QueuedThreadPool();
clientThreads.setName("client");
httpClient.setExecutor(clientThreads);
httpClient.start();
}
protected Session.Client newSession(Session.Client.Listener listener) throws Exception
{
InetSocketAddress address = new InetSocketAddress("localhost", connector.getLocalPort());
return client.connect(address, listener).get(5, TimeUnit.SECONDS);
return http3Client.connect(address, listener).get(5, TimeUnit.SECONDS);
}
protected MetaData.Request newRequest(String path)
@ -108,7 +116,7 @@ public class AbstractClientServerTest
@AfterEach
public void dispose()
{
LifeCycle.stop(client);
LifeCycle.stop(httpClient);
LifeCycle.stop(server);
}
}

View File

@ -282,7 +282,7 @@ public class ClientServerTest extends AbstractClientServerTest
});
int maxRequestHeadersSize = 128;
client.getHTTP3Configuration().setMaxRequestHeadersSize(maxRequestHeadersSize);
http3Client.getHTTP3Configuration().setMaxRequestHeadersSize(maxRequestHeadersSize);
Session.Client clientSession = newSession(new Session.Client.Listener() {});
CountDownLatch requestFailureLatch = new CountDownLatch(1);

View File

@ -792,7 +792,7 @@ public class GoAwayTest extends AbstractClientServerTest
serverDisconnectLatch.countDown();
}
});
client.getClientConnector().setIdleTimeout(Duration.ofMillis(idleTimeout));
http3Client.getClientConnector().setIdleTimeout(Duration.ofMillis(idleTimeout));
CountDownLatch clientIdleTimeoutLatch = new CountDownLatch(1);
CountDownLatch clientDisconnectLatch = new CountDownLatch(1);
@ -1177,7 +1177,7 @@ public class GoAwayTest extends AbstractClientServerTest
assertTrue(settingsLatch.await(5, TimeUnit.SECONDS));
client.stop();
http3Client.stop();
assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverDisconnectLatch.await(5, TimeUnit.SECONDS));
@ -1286,7 +1286,7 @@ public class GoAwayTest extends AbstractClientServerTest
assertTrue(responseLatch.await(5, TimeUnit.SECONDS));
CompletableFuture<Void> shutdown = client.shutdown();
CompletableFuture<Void> shutdown = http3Client.shutdown();
// Shutdown must not complete yet.
assertThrows(TimeoutException.class, () -> shutdown.get(1, TimeUnit.SECONDS));

View File

@ -139,7 +139,7 @@ public class HandlerClientServerTest extends AbstractClientServerTest
})
.get(555, TimeUnit.SECONDS);
byte[] bytes = new byte[16 * 1024 * 1024];
byte[] bytes = new byte[1 * 1024];
new Random().nextBytes(bytes);
stream.data(new DataFrame(ByteBuffer.wrap(bytes, 0, bytes.length / 2), false))
.thenCompose(s -> s.data(new DataFrame(ByteBuffer.wrap(bytes, bytes.length / 2, bytes.length / 2), true)))

View File

@ -0,0 +1,49 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.tests;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class HttpClientTransportOverHTTP3Test extends AbstractClientServerTest
{
@Test
public void testRequestResponse() throws Exception
{
String content = "Hello, World!";
start(new AbstractHandler()
{
@Override
public void handle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
jettyRequest.setHandled(true);
response.getOutputStream().print(content);
}
});
ContentResponse response = httpClient.newRequest("https://localhost:" + connector.getLocalPort())
.timeout(555, TimeUnit.SECONDS)
.send();
assertEquals(content, response.getContentAsString());
}
}

View File

@ -97,7 +97,7 @@ public class StreamIdleTimeoutTest extends AbstractClientServerTest
});
long streamIdleTimeout = 1000;
client.getHTTP3Configuration().setStreamIdleTimeout(streamIdleTimeout);
http3Client.getHTTP3Configuration().setStreamIdleTimeout(streamIdleTimeout);
Session.Client clientSession = newSession(new Session.Client.Listener() {});
@ -179,7 +179,7 @@ public class StreamIdleTimeoutTest extends AbstractClientServerTest
assertNotNull(h3);
h3.getConfiguration().setStreamIdleTimeout(idleTimeout);
Session.Client clientSession = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {})
Session.Client clientSession = http3Client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {})
.get(5, TimeUnit.SECONDS);
CountDownLatch clientFailureLatch = new CountDownLatch(1);

View File

@ -38,6 +38,11 @@ public class ClientProtocolSession extends ProtocolSession
protected void doStart() throws Exception
{
super.doStart();
initialize();
}
protected void initialize()
{
// Create a single bidirectional, client-initiated,
// QUIC stream that plays the role of the TCP stream.
long streamId = getQuicSession().newStreamId(StreamType.CLIENT_BIDIRECTIONAL);

View File

@ -115,18 +115,23 @@ public class ClientQuicConnection extends QuicConnection
@Override
public void onFillable()
{
Runnable task = receiveAndProcess();
if (task != null)
while (true)
{
Runnable task = receiveAndProcess();
if (task == null)
break;
task.run();
}
}
@Override
protected QuicSession createSession(SocketAddress remoteAddress, ByteBuffer cipherBuffer) throws IOException
{
QuicSession session = pendingSessions.get(remoteAddress);
ClientQuicSession session = pendingSessions.get(remoteAddress);
if (session != null)
{
session.process(remoteAddress, cipherBuffer);
Runnable task = session.process(remoteAddress, cipherBuffer);
session.offerTask(task);
if (session.isConnectionEstablished())
{
pendingSessions.remove(remoteAddress);

View File

@ -17,6 +17,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ClientConnectionFactory;
@ -28,6 +29,7 @@ import org.eclipse.jetty.quic.common.QuicConnection;
import org.eclipse.jetty.quic.common.QuicSession;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.quic.quiche.QuicheConnection;
import org.eclipse.jetty.util.component.Container;
import org.eclipse.jetty.util.thread.Scheduler;
/**
@ -39,6 +41,7 @@ import org.eclipse.jetty.util.thread.Scheduler;
public class ClientQuicSession extends QuicSession
{
private final Map<String, Object> context;
private final AtomicReference<Runnable> task = new AtomicReference<>();
protected ClientQuicSession(Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, QuicheConnection quicheConnection, QuicConnection connection, InetSocketAddress remoteAddress, Map<String, Object> context)
{
@ -46,12 +49,28 @@ public class ClientQuicSession extends QuicSession
this.context = context;
}
void offerTask(Runnable task)
{
this.task.set(task);
}
@Override
protected Runnable pollTask()
{
return task.getAndSet(null);
}
@Override
protected ProtocolSession createProtocolSession()
{
ClientConnectionFactory connectionFactory = (ClientConnectionFactory)context.get(ClientConnector.CLIENT_CONNECTION_FACTORY_CONTEXT_KEY);
ProtocolSession.Factory factory = null;
if (connectionFactory instanceof ProtocolSession.Factory)
return ((ProtocolSession.Factory)connectionFactory).newProtocolSession(this, context);
factory = (ProtocolSession.Factory)connectionFactory;
if (factory == null && connectionFactory instanceof Container)
factory = ((Container)connectionFactory).getContainedBeans(ProtocolSession.Factory.class).stream().findFirst().orElse(null);
if (factory != null)
return factory.newProtocolSession(this, context);
return new ClientProtocolSession(this);
}

View File

@ -172,6 +172,9 @@ public abstract class QuicConnection extends AbstractConnection
{
try
{
if (isFillInterested())
return null;
ByteBuffer cipherBuffer = byteBufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
while (true)
{
@ -222,6 +225,13 @@ public abstract class QuicConnection extends AbstractConnection
sessions.put(quicheConnectionId, session);
listeners.forEach(session::addEventListener);
LifeCycle.start(session);
// Session creation may have generated a task.
Runnable task = session.pollTask();
if (LOG.isDebugEnabled())
LOG.debug("processing creation task {} on {}", task, session);
if (task != null)
return task;
}
else
{
@ -235,7 +245,7 @@ public abstract class QuicConnection extends AbstractConnection
LOG.debug("packet is for existing session {}, processing {} bytes", session, cipherBuffer.remaining());
Runnable task = session.process(remoteAddress, cipherBuffer);
if (LOG.isDebugEnabled())
LOG.debug("processing session {} produced task {}", session, task);
LOG.debug("produced task {} on {}", task, session);
if (task != null)
return task;
}

View File

@ -330,9 +330,10 @@ public abstract class QuicSession extends ContainerLifeCycle
addManaged(session);
}
if (processing.compareAndSet(false, true))
return session::process;
return null;
boolean process = processing.compareAndSet(false, true);
if (LOG.isDebugEnabled())
LOG.debug("processing={} on {}", process, session);
return process ? session::process : null;
}
else
{
@ -343,9 +344,16 @@ public abstract class QuicSession extends ContainerLifeCycle
void processingComplete()
{
if (LOG.isDebugEnabled())
LOG.debug("processing complete on {}", protocolSession);
processing.set(false);
}
protected Runnable pollTask()
{
return null;
}
protected abstract ProtocolSession createProtocolSession();
List<Long> getWritableStreamIds()

View File

@ -17,6 +17,7 @@ import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;
import org.eclipse.jetty.io.AbstractEndPoint;
@ -26,7 +27,6 @@ import org.eclipse.jetty.io.FillInterest;
import org.eclipse.jetty.io.WriteFlusher;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -41,17 +41,15 @@ public class QuicStreamEndPoint extends AbstractEndPoint
private static final Logger LOG = LoggerFactory.getLogger(QuicStreamEndPoint.class);
private static final ByteBuffer LAST_FLAG = ByteBuffer.allocate(0);
private final AutoLock lock = new AutoLock();
private final AtomicBoolean readable = new AtomicBoolean(true);
private final QuicSession session;
private final long streamId;
private boolean readable;
public QuicStreamEndPoint(Scheduler scheduler, QuicSession session, long streamId)
{
super(scheduler);
this.session = session;
this.streamId = streamId;
this.readable = true;
}
public QuicSession getQuicSession()
@ -215,35 +213,24 @@ public class QuicStreamEndPoint extends AbstractEndPoint
public void onReadable()
{
// TODO: use AtomicBoolean.
try (AutoLock l = lock.lock())
{
if (LOG.isDebugEnabled())
LOG.debug("stream {} is readable, processing: {}", streamId, readable);
if (!readable)
return;
readable = false;
}
getFillInterest().fillable();
boolean expected = readable.compareAndExchange(true, false);
if (LOG.isDebugEnabled())
LOG.debug("stream {} is readable, processing: {}", streamId, expected);
if (expected)
getFillInterest().fillable();
}
@Override
public void fillInterested(Callback callback)
{
try (AutoLock l = lock.lock())
{
readable = true;
}
readable.set(true);
super.fillInterested(callback);
}
@Override
public boolean tryFillInterested(Callback callback)
{
try (AutoLock l = lock.lock())
{
readable = true;
}
readable.set(true);
return super.tryFillInterested(callback);
}