diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java index 40b7d29d027..6dd77182d89 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java @@ -13,6 +13,7 @@ package org.eclipse.jetty.client; +import java.net.URI; import java.nio.ByteBuffer; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; @@ -400,6 +401,24 @@ public abstract class HttpSender return updated; } + protected String relativize(String path) + { + try + { + String result = path; + URI uri = URI.create(result); + if (uri.isAbsolute()) + result = uri.getPath(); + return result.isEmpty() ? "/" : result; + } + catch (Throwable x) + { + if (LOG.isDebugEnabled()) + LOG.debug("Could not relativize {}", path); + return path; + } + } + @Override public String toString() { diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/dynamic/HttpClientTransportDynamic.java b/jetty-client/src/main/java/org/eclipse/jetty/client/dynamic/HttpClientTransportDynamic.java index 4673ee0797b..cafb3959b41 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/dynamic/HttpClientTransportDynamic.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/dynamic/HttpClientTransportDynamic.java @@ -20,7 +20,6 @@ import java.util.Arrays; import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -125,10 +124,9 @@ public class HttpClientTransportDynamic extends AbstractConnectorHttpClientTrans private static ClientConnector findClientConnector(ClientConnectionFactory.Info[] infos) { return Arrays.stream(infos) - .map(info -> info.getBean(ClientConnector.class)) - .filter(Objects::nonNull) + .flatMap(info -> info.getContainedBeans(ClientConnector.class).stream()) .findFirst() - .orElse(new ClientConnector()); + .orElseGet(ClientConnector::new); } @Override diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java index f7135f00625..4a434a05a14 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java @@ -313,7 +313,6 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel. } } - ByteBuffer buffer = dataInfo.frame.getData(); Callback callback = dataInfo.callback; if (buffer.hasRemaining()) diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java index 8967c4874ac..ef683dfaf1c 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java @@ -13,7 +13,6 @@ package org.eclipse.jetty.http2.client.http; -import java.net.URI; import java.nio.ByteBuffer; import java.util.function.Supplier; @@ -34,13 +33,9 @@ import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Promise; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class HttpSenderOverHTTP2 extends HttpSender { - private static final Logger LOG = LoggerFactory.getLogger(HttpSenderOverHTTP2.class); - public HttpSenderOverHTTP2(HttpChannelOverHTTP2 channel) { super(channel); @@ -132,24 +127,6 @@ public class HttpSenderOverHTTP2 extends HttpSender ((ISession)channel.getSession()).newStream(frameList, new HeadersPromise(request, callback), channel.getStreamListener()); } - private String relativize(String path) - { - try - { - String result = path; - URI uri = URI.create(result); - if (uri.isAbsolute()) - result = uri.getPath(); - return result.isEmpty() ? "/" : result; - } - catch (Throwable x) - { - if (LOG.isDebugEnabled()) - LOG.debug("Could not relativize {}", path); - return path; - } - } - private HttpFields retrieveTrailers(HttpRequest request) { Supplier trailerSupplier = request.getTrailers(); diff --git a/jetty-http3/http3-client/src/main/java/module-info.java b/jetty-http3/http3-client/src/main/java/module-info.java index bfd394af2b1..0188c5f52bf 100644 --- a/jetty-http3/http3-client/src/main/java/module-info.java +++ b/jetty-http3/http3-client/src/main/java/module-info.java @@ -23,4 +23,7 @@ module org.eclipse.jetty.http3.client requires transitive org.eclipse.jetty.util; exports org.eclipse.jetty.http3.client; + + exports org.eclipse.jetty.http3.client.internal to + org.eclipse.jetty.http3.http.client.transport; } diff --git a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3Session.java b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3Session.java index 6f7a7546f4e..ae6848c5b14 100644 --- a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3Session.java +++ b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3Session.java @@ -23,9 +23,9 @@ import org.eclipse.jetty.http3.frames.SettingsFrame; import org.eclipse.jetty.http3.internal.ControlFlusher; import org.eclipse.jetty.http3.internal.DecoderStreamConnection; import org.eclipse.jetty.http3.internal.EncoderStreamConnection; -import org.eclipse.jetty.http3.internal.HTTP3Flusher; import org.eclipse.jetty.http3.internal.InstructionFlusher; import org.eclipse.jetty.http3.internal.InstructionHandler; +import org.eclipse.jetty.http3.internal.MessageFlusher; import org.eclipse.jetty.http3.internal.UnidirectionalStreamConnection; import org.eclipse.jetty.http3.qpack.QpackDecoder; import org.eclipse.jetty.http3.qpack.QpackEncoder; @@ -47,7 +47,7 @@ public class ClientHTTP3Session extends ClientProtocolSession private final QpackDecoder decoder; private final HTTP3SessionClient session; private final ControlFlusher controlFlusher; - private final HTTP3Flusher messageFlusher; + private final MessageFlusher messageFlusher; public ClientHTTP3Session(HTTP3Configuration configuration, ClientQuicSession quicSession, Session.Client.Listener listener, Promise promise) { @@ -82,7 +82,7 @@ public class ClientHTTP3Session extends ClientProtocolSession if (LOG.isDebugEnabled()) LOG.debug("created control stream #{} on {}", controlStreamId, controlEndPoint); - this.messageFlusher = new HTTP3Flusher(quicSession.getByteBufferPool(), encoder, configuration.getMaxRequestHeadersSize(), configuration.isUseOutputDirectByteBuffers()); + this.messageFlusher = new MessageFlusher(quicSession.getByteBufferPool(), encoder, configuration.getMaxRequestHeadersSize(), configuration.isUseOutputDirectByteBuffers()); addBean(messageFlusher); } @@ -97,9 +97,8 @@ public class ClientHTTP3Session extends ClientProtocolSession } @Override - protected void doStart() throws Exception + protected void initialize() { - super.doStart(); // Queue the mandatory SETTINGS frame. Map settings = session.onPreface(); if (settings == null) diff --git a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/HTTP3SessionClient.java b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/HTTP3SessionClient.java index 29e2b9673a4..5cdbbd4efbf 100644 --- a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/HTTP3SessionClient.java +++ b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/HTTP3SessionClient.java @@ -51,7 +51,8 @@ public class HTTP3SessionClient extends HTTP3Session implements Session.Client public void onOpen() { super.onOpen(); - promise.succeeded(this); + if (promise != null) + promise.succeeded(this); } @Override diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Stream.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Stream.java index 10a2a0a2380..c5e3839a22d 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Stream.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Stream.java @@ -200,7 +200,7 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable, Attachable if (listener == null) { Callback callback = Callback.from(Invocable.InvocationType.NON_BLOCKING, () -> endPoint.shutdownInput(HTTP3ErrorCode.NO_ERROR.code())); - session.writeMessageFrame(getId(), new HTTP3Flusher.FlushFrame(), callback); + session.writeMessageFrame(getId(), new MessageFlusher.FlushFrame(), callback); } updateClose(frame.isLast(), false); } diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Flusher.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/MessageFlusher.java similarity index 94% rename from jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Flusher.java rename to jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/MessageFlusher.java index 6e3134ca1ef..f1e50273a5f 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Flusher.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/MessageFlusher.java @@ -29,9 +29,9 @@ import org.eclipse.jetty.util.thread.AutoLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class HTTP3Flusher extends IteratingCallback +public class MessageFlusher extends IteratingCallback { - private static final Logger LOG = LoggerFactory.getLogger(HTTP3Flusher.class); + private static final Logger LOG = LoggerFactory.getLogger(MessageFlusher.class); private final AutoLock lock = new AutoLock(); private final Queue queue = new ArrayDeque<>(); @@ -39,7 +39,7 @@ public class HTTP3Flusher extends IteratingCallback private final MessageGenerator generator; private Entry entry; - public HTTP3Flusher(ByteBufferPool byteBufferPool, QpackEncoder encoder, int maxHeadersLength, boolean useDirectByteBuffers) + public MessageFlusher(ByteBufferPool byteBufferPool, QpackEncoder encoder, int maxHeadersLength, boolean useDirectByteBuffers) { this.lease = new ByteBufferPool.Lease(byteBufferPool); this.generator = new MessageGenerator(encoder, maxHeadersLength, useDirectByteBuffers); diff --git a/jetty-http3/http3-http-client-transport/pom.xml b/jetty-http3/http3-http-client-transport/pom.xml index ee571b18af6..7404622dbd3 100644 --- a/jetty-http3/http3-http-client-transport/pom.xml +++ b/jetty-http3/http3-http-client-transport/pom.xml @@ -18,6 +18,12 @@ org.eclipse.jetty jetty-client + + + org.eclipse.jetty.http3 + http3-server + test + diff --git a/jetty-http3/http3-http-client-transport/src/main/java/org/eclipse/jetty/http3/client/http/ClientConnectionFactoryOverHTTP3.java b/jetty-http3/http3-http-client-transport/src/main/java/org/eclipse/jetty/http3/client/http/ClientConnectionFactoryOverHTTP3.java index 628e8308fd2..aac45c012c2 100644 --- a/jetty-http3/http3-http-client-transport/src/main/java/org/eclipse/jetty/http3/client/http/ClientConnectionFactoryOverHTTP3.java +++ b/jetty-http3/http3-http-client-transport/src/main/java/org/eclipse/jetty/http3/client/http/ClientConnectionFactoryOverHTTP3.java @@ -13,7 +13,6 @@ package org.eclipse.jetty.http3.client.http; -import java.io.IOException; import java.util.List; import java.util.Map; @@ -21,13 +20,16 @@ import org.eclipse.jetty.client.dynamic.HttpClientTransportDynamic; import org.eclipse.jetty.client.http.HttpClientConnectionFactory; import org.eclipse.jetty.http3.client.HTTP3Client; import org.eclipse.jetty.http3.client.HTTP3ClientConnectionFactory; +import org.eclipse.jetty.http3.client.http.internal.SessionClientListener; import org.eclipse.jetty.io.ClientConnectionFactory; import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.quic.common.ProtocolSession; +import org.eclipse.jetty.quic.common.QuicSession; import org.eclipse.jetty.util.component.ContainerLifeCycle; public class ClientConnectionFactoryOverHTTP3 extends ContainerLifeCycle implements ClientConnectionFactory { - private final ClientConnectionFactory factory = new HTTP3ClientConnectionFactory(); + private final HTTP3ClientConnectionFactory factory = new HTTP3ClientConnectionFactory(); private final HTTP3Client client; public ClientConnectionFactoryOverHTTP3(HTTP3Client client) @@ -37,14 +39,9 @@ public class ClientConnectionFactoryOverHTTP3 extends ContainerLifeCycle impleme } @Override - public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map context) throws IOException + public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map context) { -// HTTPSessionListenerPromise listenerPromise = new HTTPSessionListenerPromise(context); -// context.put(HTTP2ClientConnectionFactory.CLIENT_CONTEXT_KEY, client); -// context.put(HTTP2ClientConnectionFactory.SESSION_LISTENER_CONTEXT_KEY, listenerPromise); -// context.put(HTTP2ClientConnectionFactory.SESSION_PROMISE_CONTEXT_KEY, listenerPromise); -// return factory.newConnection(endPoint, context); - return null; + return factory.newConnection(endPoint, context); } /** @@ -52,7 +49,7 @@ public class ClientConnectionFactoryOverHTTP3 extends ContainerLifeCycle impleme * * @see HttpClientConnectionFactory#HTTP11 */ - public static class HTTP3 extends Info + public static class HTTP3 extends Info implements ProtocolSession.Factory { public HTTP3(HTTP3Client client) { @@ -65,6 +62,16 @@ public class ClientConnectionFactoryOverHTTP3 extends ContainerLifeCycle impleme return List.of("h3"); } + @Override + public ProtocolSession newProtocolSession(QuicSession quicSession, Map context) + { + ClientConnectionFactoryOverHTTP3 http3 = (ClientConnectionFactoryOverHTTP3)getClientConnectionFactory(); + context.put(HTTP3Client.CLIENT_CONTEXT_KEY, http3.client); + SessionClientListener listener = new SessionClientListener(context); + context.put(HTTP3Client.SESSION_LISTENER_CONTEXT_KEY, listener); + return http3.factory.newProtocolSession(quicSession, context); + } + @Override public String toString() { diff --git a/jetty-http3/http3-http-client-transport/src/main/java/org/eclipse/jetty/http3/client/http/HttpClientTransportOverHTTP3.java b/jetty-http3/http3-http-client-transport/src/main/java/org/eclipse/jetty/http3/client/http/HttpClientTransportOverHTTP3.java index d9a2477ebbe..47928d04c82 100644 --- a/jetty-http3/http3-http-client-transport/src/main/java/org/eclipse/jetty/http3/client/http/HttpClientTransportOverHTTP3.java +++ b/jetty-http3/http3-http-client-transport/src/main/java/org/eclipse/jetty/http3/client/http/HttpClientTransportOverHTTP3.java @@ -23,22 +23,17 @@ import java.util.Objects; import org.eclipse.jetty.client.AbstractHttpClientTransport; import org.eclipse.jetty.client.HttpClient; -import org.eclipse.jetty.client.HttpClientTransport; import org.eclipse.jetty.client.HttpDestination; import org.eclipse.jetty.client.HttpRequest; import org.eclipse.jetty.client.MultiplexConnectionPool; import org.eclipse.jetty.client.MultiplexHttpDestination; import org.eclipse.jetty.client.Origin; import org.eclipse.jetty.http3.HTTP3Configuration; -import org.eclipse.jetty.http3.api.Session; import org.eclipse.jetty.http3.client.HTTP3Client; -import org.eclipse.jetty.http3.client.http.internal.HttpConnectionOverHTTP3; -import org.eclipse.jetty.http3.frames.SettingsFrame; -import org.eclipse.jetty.http3.internal.HTTP3Session; +import org.eclipse.jetty.http3.client.http.internal.SessionClientListener; import org.eclipse.jetty.io.ClientConnector; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; -import org.eclipse.jetty.util.Promise; public class HttpClientTransportOverHTTP3 extends AbstractHttpClientTransport { @@ -121,28 +116,4 @@ public class HttpClientTransportOverHTTP3 extends AbstractHttpClientTransport { return null; } - - private class SessionClientListener implements Session.Client.Listener - { - private final Map context; - - private SessionClientListener(Map context) - { - this.context = context; - } - - @SuppressWarnings("unchecked") - private Promise httpConnectionPromise() - { - return (Promise)context.get(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY); - } - - @Override - public void onSettings(Session session, SettingsFrame frame) - { - HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY); - HttpConnectionOverHTTP3 connection = new HttpConnectionOverHTTP3(destination, (HTTP3Session)session); - httpConnectionPromise().succeeded(connection); - } - } } diff --git a/jetty-http3/http3-http-client-transport/src/main/java/org/eclipse/jetty/http3/client/http/internal/HttpChannelOverHTTP3.java b/jetty-http3/http3-http-client-transport/src/main/java/org/eclipse/jetty/http3/client/http/internal/HttpChannelOverHTTP3.java index b43ebea6291..da3b11180b6 100644 --- a/jetty-http3/http3-http-client-transport/src/main/java/org/eclipse/jetty/http3/client/http/internal/HttpChannelOverHTTP3.java +++ b/jetty-http3/http3-http-client-transport/src/main/java/org/eclipse/jetty/http3/client/http/internal/HttpChannelOverHTTP3.java @@ -18,35 +18,63 @@ import org.eclipse.jetty.client.HttpDestination; import org.eclipse.jetty.client.HttpExchange; import org.eclipse.jetty.client.HttpReceiver; import org.eclipse.jetty.client.HttpSender; +import org.eclipse.jetty.http3.api.Stream; +import org.eclipse.jetty.http3.client.internal.HTTP3SessionClient; public class HttpChannelOverHTTP3 extends HttpChannel { - public HttpChannelOverHTTP3(HttpDestination destination) + private final HTTP3SessionClient session; + private final HttpSenderOverHTTP3 sender; + private final HttpReceiverOverHTTP3 receiver; + + public HttpChannelOverHTTP3(HttpDestination destination, HTTP3SessionClient session) { super(destination); + this.session = session; + sender = new HttpSenderOverHTTP3(this); + receiver = new HttpReceiverOverHTTP3(this); + } + + public HTTP3SessionClient getSession() + { + return session; + } + + public Stream.Listener getStreamListener() + { + return receiver; } @Override protected HttpSender getHttpSender() { - return null; + return sender; } @Override protected HttpReceiver getHttpReceiver() { - return null; + return receiver; } @Override public void send(HttpExchange exchange) { - + sender.send(exchange); } @Override public void release() { + // TODO + } + @Override + public String toString() + { + return String.format("%s[send=%s,recv=%s]", + super.toString(), + sender, + receiver); } } diff --git a/jetty-http3/http3-http-client-transport/src/main/java/org/eclipse/jetty/http3/client/http/internal/HttpConnectionOverHTTP3.java b/jetty-http3/http3-http-client-transport/src/main/java/org/eclipse/jetty/http3/client/http/internal/HttpConnectionOverHTTP3.java index f2340d1ea70..e9143322842 100644 --- a/jetty-http3/http3-http-client-transport/src/main/java/org/eclipse/jetty/http3/client/http/internal/HttpConnectionOverHTTP3.java +++ b/jetty-http3/http3-http-client-transport/src/main/java/org/eclipse/jetty/http3/client/http/internal/HttpConnectionOverHTTP3.java @@ -27,15 +27,15 @@ import org.eclipse.jetty.client.HttpExchange; import org.eclipse.jetty.client.HttpRequest; import org.eclipse.jetty.client.SendFailure; import org.eclipse.jetty.http.HttpVersion; -import org.eclipse.jetty.http3.internal.HTTP3Session; +import org.eclipse.jetty.http3.client.internal.HTTP3SessionClient; public class HttpConnectionOverHTTP3 extends HttpConnection implements ConnectionPool.Multiplexable { private final Set activeChannels = ConcurrentHashMap.newKeySet(); private final AtomicBoolean closed = new AtomicBoolean(); - private final HTTP3Session session; + private final HTTP3SessionClient session; - public HttpConnectionOverHTTP3(HttpDestination destination, HTTP3Session session) + public HttpConnectionOverHTTP3(HttpDestination destination, HTTP3SessionClient session) { super(destination); this.session = session; @@ -62,7 +62,7 @@ public class HttpConnectionOverHTTP3 extends HttpConnection implements Connectio normalizeRequest(request); // One connection maps to N channels, so one channel for each exchange. - HttpChannelOverHTTP3 channel = new HttpChannelOverHTTP3(getHttpDestination()); + HttpChannelOverHTTP3 channel = new HttpChannelOverHTTP3(getHttpDestination(), session); activeChannels.add(channel); return send(channel, exchange); diff --git a/jetty-http3/http3-http-client-transport/src/main/java/org/eclipse/jetty/http3/client/http/internal/HttpReceiverOverHTTP3.java b/jetty-http3/http3-http-client-transport/src/main/java/org/eclipse/jetty/http3/client/http/internal/HttpReceiverOverHTTP3.java new file mode 100644 index 00000000000..470e5b76642 --- /dev/null +++ b/jetty-http3/http3-http-client-transport/src/main/java/org/eclipse/jetty/http3/client/http/internal/HttpReceiverOverHTTP3.java @@ -0,0 +1,148 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.http3.client.http.internal; + +import java.nio.ByteBuffer; + +import org.eclipse.jetty.client.HttpExchange; +import org.eclipse.jetty.client.HttpReceiver; +import org.eclipse.jetty.client.HttpResponse; +import org.eclipse.jetty.http.HttpField; +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http3.api.Stream; +import org.eclipse.jetty.http3.frames.HeadersFrame; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.thread.Invocable; + +public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Listener +{ + protected HttpReceiverOverHTTP3(HttpChannelOverHTTP3 channel) + { + super(channel); + } + + @Override + protected HttpChannelOverHTTP3 getHttpChannel() + { + return (HttpChannelOverHTTP3)super.getHttpChannel(); + } + + @Override + public void onResponse(Stream stream, HeadersFrame frame) + { + HttpExchange exchange = getHttpExchange(); + if (exchange == null) + return; + + HttpResponse httpResponse = exchange.getResponse(); + MetaData.Response response = (MetaData.Response)frame.getMetaData(); + httpResponse.version(response.getHttpVersion()).status(response.getStatus()).reason(response.getReason()); + + if (responseBegin(exchange)) + { + HttpFields headers = response.getFields(); + for (HttpField header : headers) + { + if (!responseHeader(exchange, header)) + return; + } + + // TODO: add support for HttpMethod.CONNECT. + + if (responseHeaders(exchange)) + { + int status = response.getStatus(); + boolean informational = HttpStatus.isInformational(status) && status != HttpStatus.SWITCHING_PROTOCOLS_101; + if (frame.isLast() || informational) + responseSuccess(exchange); + else + stream.demand(); + } + else + { + if (frame.isLast()) + { + // There is no demand to trigger response success, so add + // a poison pill to trigger it when there will be demand. + // TODO +// notifyContent(exchange, new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP); + } + } + } + } + + @Override + protected void receive() + { + // Called when the application resumes demand of content. + // TODO: stream.demand() should be enough. + } + + @Override + public void onDataAvailable(Stream stream) + { + HttpExchange exchange = getHttpExchange(); + if (exchange == null) + return; + + Stream.Data data = stream.readData(); + if (data != null) + { + ByteBuffer byteBuffer = data.getByteBuffer(); + if (byteBuffer.hasRemaining()) + { + // TODO: callback failure should invoke responseFailure(). + Callback callback = Callback.from(Invocable.InvocationType.NON_BLOCKING, data::complete); + boolean proceed = responseContent(exchange, byteBuffer, callback); + if (proceed) + { + if (data.isLast()) + responseSuccess(exchange); + else + stream.demand(); + } + } + else + { + data.complete(); + if (data.isLast()) + responseSuccess(exchange); + else + stream.demand(); + } + } + else + { + stream.demand(); + } + } + + @Override + public void onTrailer(Stream stream, HeadersFrame frame) + { + HttpExchange exchange = getHttpExchange(); + if (exchange == null) + return; + + HttpFields trailers = frame.getMetaData().getFields(); + trailers.forEach(exchange.getResponse()::trailer); + // Previous DataFrames had endStream=false, so + // add a poison pill to trigger response success + // after all normal DataFrames have been consumed. + // TODO +// notifyContent(exchange, new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP); + } +} diff --git a/jetty-http3/http3-http-client-transport/src/main/java/org/eclipse/jetty/http3/client/http/internal/HttpSenderOverHTTP3.java b/jetty-http3/http3-http-client-transport/src/main/java/org/eclipse/jetty/http3/client/http/internal/HttpSenderOverHTTP3.java new file mode 100644 index 00000000000..82f9dfcef22 --- /dev/null +++ b/jetty-http3/http3-http-client-transport/src/main/java/org/eclipse/jetty/http3/client/http/internal/HttpSenderOverHTTP3.java @@ -0,0 +1,207 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.http3.client.http.internal; + +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; + +import org.eclipse.jetty.client.HttpExchange; +import org.eclipse.jetty.client.HttpRequest; +import org.eclipse.jetty.client.HttpSender; +import org.eclipse.jetty.http.HostPortHttpField; +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpMethod; +import org.eclipse.jetty.http.HttpURI; +import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http3.api.Stream; +import org.eclipse.jetty.http3.client.internal.HTTP3SessionClient; +import org.eclipse.jetty.http3.frames.DataFrame; +import org.eclipse.jetty.http3.frames.HeadersFrame; +import org.eclipse.jetty.http3.internal.HTTP3Stream; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.Callback; + +public class HttpSenderOverHTTP3 extends HttpSender +{ + private Stream stream; + + public HttpSenderOverHTTP3(HttpChannelOverHTTP3 channel) + { + super(channel); + } + + @Override + protected HttpChannelOverHTTP3 getHttpChannel() + { + return (HttpChannelOverHTTP3)super.getHttpChannel(); + } + + @Override + protected void sendHeaders(HttpExchange exchange, ByteBuffer contentBuffer, boolean lastContent, Callback callback) + { + HttpRequest request = exchange.getRequest(); + boolean isTunnel = HttpMethod.CONNECT.is(request.getMethod()); + MetaData.Request metaData; + if (isTunnel) + { + String upgradeProtocol = request.getUpgradeProtocol(); + if (upgradeProtocol == null) + { + metaData = new MetaData.ConnectRequest((String)null, new HostPortHttpField(request.getPath()), null, request.getHeaders(), null); + } + else + { + HostPortHttpField authority = new HostPortHttpField(request.getHost(), request.getPort()); + metaData = new MetaData.ConnectRequest(request.getScheme(), authority, request.getPath(), request.getHeaders(), upgradeProtocol); + } + } + else + { + String path = relativize(request.getPath()); + HttpURI uri = HttpURI.build() + .scheme(request.getScheme()) + .host(request.getHost()) + .port(request.getPort()) + .path(path) + .query(request.getQuery()); + metaData = new MetaData.Request(request.getMethod(), uri, HttpVersion.HTTP_3, request.getHeaders(), -1, request.getTrailers()); + } + + HeadersFrame headersFrame; + DataFrame dataFrame = null; + HeadersFrame trailerFrame = null; + + if (isTunnel) + { + headersFrame = new HeadersFrame(metaData, false); + } + else + { + boolean hasContent = BufferUtil.hasContent(contentBuffer); + if (hasContent) + { + headersFrame = new HeadersFrame(metaData, false); + if (lastContent) + { + HttpFields trailers = retrieveTrailers(request); + boolean hasTrailers = trailers != null; + dataFrame = new DataFrame(contentBuffer, !hasTrailers); + if (hasTrailers) + trailerFrame = new HeadersFrame(new MetaData(HttpVersion.HTTP_3, trailers), true); + } + else + { + dataFrame = new DataFrame(contentBuffer, false); + } + } + else + { + if (lastContent) + { + HttpFields trailers = retrieveTrailers(request); + boolean hasTrailers = trailers != null; + headersFrame = new HeadersFrame(metaData, !hasTrailers); + if (hasTrailers) + trailerFrame = new HeadersFrame(new MetaData(HttpVersion.HTTP_3, trailers), true); + } + else + { + headersFrame = new HeadersFrame(metaData, false); + } + } + } + + HeadersFrame hf = headersFrame; + DataFrame df = dataFrame; + HeadersFrame tf = trailerFrame; + + HTTP3SessionClient session = getHttpChannel().getSession(); + CompletableFuture completable = session.newRequest(hf, getHttpChannel().getStreamListener()) + .thenApply(stream -> onNewStream(stream, request)); + if (df != null) + completable = completable.thenCompose(stream -> stream.data(df)); + if (tf != null) + completable = completable.thenCompose(stream -> stream.trailer(tf)); + callback.completeWith(completable); + } + + private Stream onNewStream(Stream stream, HttpRequest request) + { + this.stream = stream; + long idleTimeout = request.getIdleTimeout(); + if (idleTimeout > 0) + ((HTTP3Stream)stream).setIdleTimeout(idleTimeout); + return stream; + } + + private HttpFields retrieveTrailers(HttpRequest request) + { + Supplier trailerSupplier = request.getTrailers(); + HttpFields trailers = trailerSupplier == null ? null : trailerSupplier.get(); + return trailers == null || trailers.size() == 0 ? null : trailers; + } + + @Override + protected void sendContent(HttpExchange exchange, ByteBuffer contentBuffer, boolean lastContent, Callback callback) + { + boolean hasContent = contentBuffer.hasRemaining(); + if (lastContent) + { + // Call the trailers supplier as late as possible. + HttpFields trailers = retrieveTrailers(exchange.getRequest()); + boolean hasTrailers = trailers != null && trailers.size() > 0; + if (hasContent) + { + DataFrame dataFrame = new DataFrame(contentBuffer, !hasTrailers); + CompletableFuture completable; + if (hasTrailers) + completable = stream.data(dataFrame).thenCompose(s -> sendTrailer(s, trailers)); + else + completable = stream.data(dataFrame); + callback.completeWith(completable); + } + else + { + CompletableFuture completable; + if (hasTrailers) + completable = sendTrailer(stream, trailers); + else + completable = stream.data(new DataFrame(contentBuffer, true)); + callback.completeWith(completable); + } + } + else + { + if (hasContent) + { + CompletableFuture completable = stream.data(new DataFrame(contentBuffer, false)); + callback.completeWith(completable); + } + else + { + // Don't send empty non-last content. + callback.succeeded(); + } + } + } + + private CompletableFuture sendTrailer(Stream stream, HttpFields trailers) + { + MetaData metaData = new MetaData(HttpVersion.HTTP_3, trailers); + HeadersFrame trailerFrame = new HeadersFrame(metaData, true); + return stream.trailer(trailerFrame); + } +} diff --git a/jetty-http3/http3-http-client-transport/src/main/java/org/eclipse/jetty/http3/client/http/internal/SessionClientListener.java b/jetty-http3/http3-http-client-transport/src/main/java/org/eclipse/jetty/http3/client/http/internal/SessionClientListener.java new file mode 100644 index 00000000000..1a4ddc6bc2e --- /dev/null +++ b/jetty-http3/http3-http-client-transport/src/main/java/org/eclipse/jetty/http3/client/http/internal/SessionClientListener.java @@ -0,0 +1,48 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.http3.client.http.internal; + +import java.util.Map; + +import org.eclipse.jetty.client.HttpClientTransport; +import org.eclipse.jetty.client.HttpDestination; +import org.eclipse.jetty.client.api.Connection; +import org.eclipse.jetty.http3.api.Session; +import org.eclipse.jetty.http3.client.internal.HTTP3SessionClient; +import org.eclipse.jetty.http3.frames.SettingsFrame; +import org.eclipse.jetty.util.Promise; + +public class SessionClientListener implements Session.Client.Listener +{ + private final Map context; + + public SessionClientListener(Map context) + { + this.context = context; + } + + @SuppressWarnings("unchecked") + private Promise httpConnectionPromise() + { + return (Promise)context.get(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY); + } + + @Override + public void onSettings(Session session, SettingsFrame frame) + { + HttpDestination destination = (HttpDestination)context.get(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY); + HttpConnectionOverHTTP3 connection = new HttpConnectionOverHTTP3(destination, (HTTP3SessionClient)session); + httpConnectionPromise().succeeded(connection); + } +} diff --git a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HttpChannelOverHTTP3.java b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HttpChannelOverHTTP3.java index 08e533fe17f..2feb8854fa6 100644 --- a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HttpChannelOverHTTP3.java +++ b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HttpChannelOverHTTP3.java @@ -13,7 +13,6 @@ package org.eclipse.jetty.http3.server.internal; -import java.nio.ByteBuffer; import java.util.function.Consumer; import org.eclipse.jetty.http.BadMessageException; @@ -25,10 +24,8 @@ import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http.PreEncodedHttpField; import org.eclipse.jetty.http3.api.Stream; -import org.eclipse.jetty.http3.frames.DataFrame; import org.eclipse.jetty.http3.frames.HeadersFrame; import org.eclipse.jetty.http3.internal.HTTP3Stream; -import org.eclipse.jetty.http3.internal.parser.MessageParser; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.HttpChannel; @@ -230,12 +227,16 @@ public class HttpChannelOverHTTP3 extends HttpChannel if (content != null) { HttpInput.Content result = content; - if (!content.isSpecial()) + if (!result.isSpecial()) content = null; + if (LOG.isDebugEnabled()) + LOG.debug("produced content {} on {}", result, this); return result; } Stream.Data data = stream.readData(); + if (LOG.isDebugEnabled()) + LOG.debug("read {} on {}", data, this); if (data == null) return null; @@ -270,9 +271,11 @@ public class HttpChannelOverHTTP3 extends HttpChannel handle |= handleContent | handleRequest; } - HttpInput.Content result = this.content; - if (result != null && !result.isSpecial()) + HttpInput.Content result = content; + if (!result.isSpecial()) content = result.isEof() ? new HttpInput.EofContent() : null; + if (LOG.isDebugEnabled()) + LOG.debug("produced new content {} on {}", result, this); return result; } diff --git a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java index db32925ef0d..b1c16c550e7 100644 --- a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java +++ b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java @@ -23,9 +23,9 @@ import org.eclipse.jetty.http3.frames.SettingsFrame; import org.eclipse.jetty.http3.internal.ControlFlusher; import org.eclipse.jetty.http3.internal.DecoderStreamConnection; import org.eclipse.jetty.http3.internal.EncoderStreamConnection; -import org.eclipse.jetty.http3.internal.HTTP3Flusher; import org.eclipse.jetty.http3.internal.InstructionFlusher; import org.eclipse.jetty.http3.internal.InstructionHandler; +import org.eclipse.jetty.http3.internal.MessageFlusher; import org.eclipse.jetty.http3.internal.UnidirectionalStreamConnection; import org.eclipse.jetty.http3.qpack.QpackDecoder; import org.eclipse.jetty.http3.qpack.QpackEncoder; @@ -46,7 +46,7 @@ public class ServerHTTP3Session extends ServerProtocolSession private final QpackDecoder decoder; private final HTTP3SessionServer session; private final ControlFlusher controlFlusher; - private final HTTP3Flusher messageFlusher; + private final MessageFlusher messageFlusher; public ServerHTTP3Session(HTTP3Configuration configuration, ServerQuicSession quicSession, Session.Server.Listener listener) { @@ -81,7 +81,7 @@ public class ServerHTTP3Session extends ServerProtocolSession if (LOG.isDebugEnabled()) LOG.debug("created control stream #{} on {}", controlStreamId, controlEndPoint); - this.messageFlusher = new HTTP3Flusher(quicSession.getByteBufferPool(), encoder, configuration.getMaxResponseHeadersSize(), configuration.isUseOutputDirectByteBuffers()); + this.messageFlusher = new MessageFlusher(quicSession.getByteBufferPool(), encoder, configuration.getMaxResponseHeadersSize(), configuration.isUseOutputDirectByteBuffers()); addBean(messageFlusher); } diff --git a/jetty-http3/http3-tests/pom.xml b/jetty-http3/http3-tests/pom.xml index e9de6054500..6162e1f6fbd 100644 --- a/jetty-http3/http3-tests/pom.xml +++ b/jetty-http3/http3-tests/pom.xml @@ -14,19 +14,31 @@ org.eclipse.jetty.http3 http3-client - ${project.version} + test + + + org.eclipse.jetty.http3 + http3-http-client-transport test org.eclipse.jetty.http3 http3-server - ${project.version} test org.eclipse.jetty.quic quic-server - ${project.version} + test + + + org.eclipse.jetty + jetty-alpn-java-server + test + + + org.eclipse.jetty.http2 + http2-server test @@ -34,13 +46,10 @@ awaitility test - - org.junit.jupiter - junit-jupiter - org.eclipse.jetty jetty-slf4j-impl + test diff --git a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/AbstractClientServerTest.java b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/AbstractClientServerTest.java index a7d4e029924..e9604234d66 100644 --- a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/AbstractClientServerTest.java +++ b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/AbstractClientServerTest.java @@ -16,6 +16,8 @@ package org.eclipse.jetty.http3.tests; import java.net.InetSocketAddress; import java.util.concurrent.TimeUnit; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.dynamic.HttpClientTransportDynamic; import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpURI; @@ -23,6 +25,7 @@ import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http3.api.Session; import org.eclipse.jetty.http3.client.HTTP3Client; +import org.eclipse.jetty.http3.client.http.ClientConnectionFactoryOverHTTP3; import org.eclipse.jetty.http3.server.HTTP3ServerConnectionFactory; import org.eclipse.jetty.http3.server.HTTP3ServerConnector; import org.eclipse.jetty.http3.server.RawHTTP3ServerConnectionFactory; @@ -41,9 +44,10 @@ public class AbstractClientServerTest @RegisterExtension final BeforeTestExecutionCallback printMethodName = context -> System.err.printf("Running %s.%s() %s%n", context.getRequiredTestClass().getSimpleName(), context.getRequiredTestMethod().getName(), context.getDisplayName()); - protected HTTP3ServerConnector connector; - protected HTTP3Client client; protected Server server; + protected HTTP3ServerConnector connector; + protected HTTP3Client http3Client; + protected HttpClient httpClient; protected void start(Handler handler) throws Exception { @@ -79,14 +83,18 @@ public class AbstractClientServerTest protected void startClient() throws Exception { - client = new HTTP3Client(); - client.start(); + http3Client = new HTTP3Client(); + httpClient = new HttpClient(new HttpClientTransportDynamic(new ClientConnectionFactoryOverHTTP3.HTTP3(http3Client))); + QueuedThreadPool clientThreads = new QueuedThreadPool(); + clientThreads.setName("client"); + httpClient.setExecutor(clientThreads); + httpClient.start(); } protected Session.Client newSession(Session.Client.Listener listener) throws Exception { InetSocketAddress address = new InetSocketAddress("localhost", connector.getLocalPort()); - return client.connect(address, listener).get(5, TimeUnit.SECONDS); + return http3Client.connect(address, listener).get(5, TimeUnit.SECONDS); } protected MetaData.Request newRequest(String path) @@ -108,7 +116,7 @@ public class AbstractClientServerTest @AfterEach public void dispose() { - LifeCycle.stop(client); + LifeCycle.stop(httpClient); LifeCycle.stop(server); } } diff --git a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/ClientServerTest.java b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/ClientServerTest.java index 48493276097..2ac54f3fa96 100644 --- a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/ClientServerTest.java +++ b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/ClientServerTest.java @@ -282,7 +282,7 @@ public class ClientServerTest extends AbstractClientServerTest }); int maxRequestHeadersSize = 128; - client.getHTTP3Configuration().setMaxRequestHeadersSize(maxRequestHeadersSize); + http3Client.getHTTP3Configuration().setMaxRequestHeadersSize(maxRequestHeadersSize); Session.Client clientSession = newSession(new Session.Client.Listener() {}); CountDownLatch requestFailureLatch = new CountDownLatch(1); diff --git a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/GoAwayTest.java b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/GoAwayTest.java index a9ac309f96b..bf0f4bbbeb3 100644 --- a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/GoAwayTest.java +++ b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/GoAwayTest.java @@ -792,7 +792,7 @@ public class GoAwayTest extends AbstractClientServerTest serverDisconnectLatch.countDown(); } }); - client.getClientConnector().setIdleTimeout(Duration.ofMillis(idleTimeout)); + http3Client.getClientConnector().setIdleTimeout(Duration.ofMillis(idleTimeout)); CountDownLatch clientIdleTimeoutLatch = new CountDownLatch(1); CountDownLatch clientDisconnectLatch = new CountDownLatch(1); @@ -1177,7 +1177,7 @@ public class GoAwayTest extends AbstractClientServerTest assertTrue(settingsLatch.await(5, TimeUnit.SECONDS)); - client.stop(); + http3Client.stop(); assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS)); assertTrue(serverDisconnectLatch.await(5, TimeUnit.SECONDS)); @@ -1286,7 +1286,7 @@ public class GoAwayTest extends AbstractClientServerTest assertTrue(responseLatch.await(5, TimeUnit.SECONDS)); - CompletableFuture shutdown = client.shutdown(); + CompletableFuture shutdown = http3Client.shutdown(); // Shutdown must not complete yet. assertThrows(TimeoutException.class, () -> shutdown.get(1, TimeUnit.SECONDS)); diff --git a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HandlerClientServerTest.java b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HandlerClientServerTest.java index 9f9f36ef45d..eb7b4b50c3d 100644 --- a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HandlerClientServerTest.java +++ b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HandlerClientServerTest.java @@ -139,7 +139,7 @@ public class HandlerClientServerTest extends AbstractClientServerTest }) .get(555, TimeUnit.SECONDS); - byte[] bytes = new byte[16 * 1024 * 1024]; + byte[] bytes = new byte[1 * 1024]; new Random().nextBytes(bytes); stream.data(new DataFrame(ByteBuffer.wrap(bytes, 0, bytes.length / 2), false)) .thenCompose(s -> s.data(new DataFrame(ByteBuffer.wrap(bytes, bytes.length / 2, bytes.length / 2), true))) diff --git a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HttpClientTransportOverHTTP3Test.java b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HttpClientTransportOverHTTP3Test.java new file mode 100644 index 00000000000..a61675ac7a6 --- /dev/null +++ b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HttpClientTransportOverHTTP3Test.java @@ -0,0 +1,49 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.http3.tests; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class HttpClientTransportOverHTTP3Test extends AbstractClientServerTest +{ + @Test + public void testRequestResponse() throws Exception + { + String content = "Hello, World!"; + start(new AbstractHandler() + { + @Override + public void handle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + jettyRequest.setHandled(true); + response.getOutputStream().print(content); + } + }); + + ContentResponse response = httpClient.newRequest("https://localhost:" + connector.getLocalPort()) + .timeout(555, TimeUnit.SECONDS) + .send(); + assertEquals(content, response.getContentAsString()); + } +} diff --git a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/StreamIdleTimeoutTest.java b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/StreamIdleTimeoutTest.java index 140adba9e13..364edaa91df 100644 --- a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/StreamIdleTimeoutTest.java +++ b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/StreamIdleTimeoutTest.java @@ -97,7 +97,7 @@ public class StreamIdleTimeoutTest extends AbstractClientServerTest }); long streamIdleTimeout = 1000; - client.getHTTP3Configuration().setStreamIdleTimeout(streamIdleTimeout); + http3Client.getHTTP3Configuration().setStreamIdleTimeout(streamIdleTimeout); Session.Client clientSession = newSession(new Session.Client.Listener() {}); @@ -179,7 +179,7 @@ public class StreamIdleTimeoutTest extends AbstractClientServerTest assertNotNull(h3); h3.getConfiguration().setStreamIdleTimeout(idleTimeout); - Session.Client clientSession = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {}) + Session.Client clientSession = http3Client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {}) .get(5, TimeUnit.SECONDS); CountDownLatch clientFailureLatch = new CountDownLatch(1); diff --git a/jetty-quic/quic-client/src/main/java/org/eclipse/jetty/quic/client/ClientProtocolSession.java b/jetty-quic/quic-client/src/main/java/org/eclipse/jetty/quic/client/ClientProtocolSession.java index 809d49dc42c..a7f484d0844 100644 --- a/jetty-quic/quic-client/src/main/java/org/eclipse/jetty/quic/client/ClientProtocolSession.java +++ b/jetty-quic/quic-client/src/main/java/org/eclipse/jetty/quic/client/ClientProtocolSession.java @@ -38,6 +38,11 @@ public class ClientProtocolSession extends ProtocolSession protected void doStart() throws Exception { super.doStart(); + initialize(); + } + + protected void initialize() + { // Create a single bidirectional, client-initiated, // QUIC stream that plays the role of the TCP stream. long streamId = getQuicSession().newStreamId(StreamType.CLIENT_BIDIRECTIONAL); diff --git a/jetty-quic/quic-client/src/main/java/org/eclipse/jetty/quic/client/ClientQuicConnection.java b/jetty-quic/quic-client/src/main/java/org/eclipse/jetty/quic/client/ClientQuicConnection.java index 2840c3ab424..0618b80f691 100644 --- a/jetty-quic/quic-client/src/main/java/org/eclipse/jetty/quic/client/ClientQuicConnection.java +++ b/jetty-quic/quic-client/src/main/java/org/eclipse/jetty/quic/client/ClientQuicConnection.java @@ -115,18 +115,23 @@ public class ClientQuicConnection extends QuicConnection @Override public void onFillable() { - Runnable task = receiveAndProcess(); - if (task != null) + while (true) + { + Runnable task = receiveAndProcess(); + if (task == null) + break; task.run(); + } } @Override protected QuicSession createSession(SocketAddress remoteAddress, ByteBuffer cipherBuffer) throws IOException { - QuicSession session = pendingSessions.get(remoteAddress); + ClientQuicSession session = pendingSessions.get(remoteAddress); if (session != null) { - session.process(remoteAddress, cipherBuffer); + Runnable task = session.process(remoteAddress, cipherBuffer); + session.offerTask(task); if (session.isConnectionEstablished()) { pendingSessions.remove(remoteAddress); diff --git a/jetty-quic/quic-client/src/main/java/org/eclipse/jetty/quic/client/ClientQuicSession.java b/jetty-quic/quic-client/src/main/java/org/eclipse/jetty/quic/client/ClientQuicSession.java index 4e13a47e8e0..0dd88c477ff 100644 --- a/jetty-quic/quic-client/src/main/java/org/eclipse/jetty/quic/client/ClientQuicSession.java +++ b/jetty-quic/quic-client/src/main/java/org/eclipse/jetty/quic/client/ClientQuicSession.java @@ -17,6 +17,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ClientConnectionFactory; @@ -28,6 +29,7 @@ import org.eclipse.jetty.quic.common.QuicConnection; import org.eclipse.jetty.quic.common.QuicSession; import org.eclipse.jetty.quic.common.QuicStreamEndPoint; import org.eclipse.jetty.quic.quiche.QuicheConnection; +import org.eclipse.jetty.util.component.Container; import org.eclipse.jetty.util.thread.Scheduler; /** @@ -39,6 +41,7 @@ import org.eclipse.jetty.util.thread.Scheduler; public class ClientQuicSession extends QuicSession { private final Map context; + private final AtomicReference task = new AtomicReference<>(); protected ClientQuicSession(Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, QuicheConnection quicheConnection, QuicConnection connection, InetSocketAddress remoteAddress, Map context) { @@ -46,12 +49,28 @@ public class ClientQuicSession extends QuicSession this.context = context; } + void offerTask(Runnable task) + { + this.task.set(task); + } + + @Override + protected Runnable pollTask() + { + return task.getAndSet(null); + } + @Override protected ProtocolSession createProtocolSession() { ClientConnectionFactory connectionFactory = (ClientConnectionFactory)context.get(ClientConnector.CLIENT_CONNECTION_FACTORY_CONTEXT_KEY); + ProtocolSession.Factory factory = null; if (connectionFactory instanceof ProtocolSession.Factory) - return ((ProtocolSession.Factory)connectionFactory).newProtocolSession(this, context); + factory = (ProtocolSession.Factory)connectionFactory; + if (factory == null && connectionFactory instanceof Container) + factory = ((Container)connectionFactory).getContainedBeans(ProtocolSession.Factory.class).stream().findFirst().orElse(null); + if (factory != null) + return factory.newProtocolSession(this, context); return new ClientProtocolSession(this); } diff --git a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicConnection.java b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicConnection.java index 8a8afa726f2..d324536740c 100644 --- a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicConnection.java +++ b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicConnection.java @@ -172,6 +172,9 @@ public abstract class QuicConnection extends AbstractConnection { try { + if (isFillInterested()) + return null; + ByteBuffer cipherBuffer = byteBufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers()); while (true) { @@ -222,6 +225,13 @@ public abstract class QuicConnection extends AbstractConnection sessions.put(quicheConnectionId, session); listeners.forEach(session::addEventListener); LifeCycle.start(session); + + // Session creation may have generated a task. + Runnable task = session.pollTask(); + if (LOG.isDebugEnabled()) + LOG.debug("processing creation task {} on {}", task, session); + if (task != null) + return task; } else { @@ -235,7 +245,7 @@ public abstract class QuicConnection extends AbstractConnection LOG.debug("packet is for existing session {}, processing {} bytes", session, cipherBuffer.remaining()); Runnable task = session.process(remoteAddress, cipherBuffer); if (LOG.isDebugEnabled()) - LOG.debug("processing session {} produced task {}", session, task); + LOG.debug("produced task {} on {}", task, session); if (task != null) return task; } diff --git a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java index 0596790de11..5b0cf0e412a 100644 --- a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java +++ b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java @@ -330,9 +330,10 @@ public abstract class QuicSession extends ContainerLifeCycle addManaged(session); } - if (processing.compareAndSet(false, true)) - return session::process; - return null; + boolean process = processing.compareAndSet(false, true); + if (LOG.isDebugEnabled()) + LOG.debug("processing={} on {}", process, session); + return process ? session::process : null; } else { @@ -343,9 +344,16 @@ public abstract class QuicSession extends ContainerLifeCycle void processingComplete() { + if (LOG.isDebugEnabled()) + LOG.debug("processing complete on {}", protocolSession); processing.set(false); } + protected Runnable pollTask() + { + return null; + } + protected abstract ProtocolSession createProtocolSession(); List getWritableStreamIds() diff --git a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicStreamEndPoint.java b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicStreamEndPoint.java index 4d51633c266..6f80955a555 100644 --- a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicStreamEndPoint.java +++ b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicStreamEndPoint.java @@ -17,6 +17,7 @@ import java.io.IOException; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.IntStream; import org.eclipse.jetty.io.AbstractEndPoint; @@ -26,7 +27,6 @@ import org.eclipse.jetty.io.FillInterest; import org.eclipse.jetty.io.WriteFlusher; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.thread.AutoLock; import org.eclipse.jetty.util.thread.Scheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,17 +41,15 @@ public class QuicStreamEndPoint extends AbstractEndPoint private static final Logger LOG = LoggerFactory.getLogger(QuicStreamEndPoint.class); private static final ByteBuffer LAST_FLAG = ByteBuffer.allocate(0); - private final AutoLock lock = new AutoLock(); + private final AtomicBoolean readable = new AtomicBoolean(true); private final QuicSession session; private final long streamId; - private boolean readable; public QuicStreamEndPoint(Scheduler scheduler, QuicSession session, long streamId) { super(scheduler); this.session = session; this.streamId = streamId; - this.readable = true; } public QuicSession getQuicSession() @@ -215,35 +213,24 @@ public class QuicStreamEndPoint extends AbstractEndPoint public void onReadable() { - // TODO: use AtomicBoolean. - try (AutoLock l = lock.lock()) - { - if (LOG.isDebugEnabled()) - LOG.debug("stream {} is readable, processing: {}", streamId, readable); - if (!readable) - return; - readable = false; - } - getFillInterest().fillable(); + boolean expected = readable.compareAndExchange(true, false); + if (LOG.isDebugEnabled()) + LOG.debug("stream {} is readable, processing: {}", streamId, expected); + if (expected) + getFillInterest().fillable(); } @Override public void fillInterested(Callback callback) { - try (AutoLock l = lock.lock()) - { - readable = true; - } + readable.set(true); super.fillInterested(callback); } @Override public boolean tryFillInterested(Callback callback) { - try (AutoLock l = lock.lock()) - { - readable = true; - } + readable.set(true); return super.tryFillInterested(callback); }