From b6c65404d5860fde1a14cc8f406019304e8527eb Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Mon, 13 Sep 2021 18:47:46 +0200 Subject: [PATCH] Issue #6728 - QUIC and HTTP/3 - Simple request/response (no content) working. Signed-off-by: Simone Bordet --- .../client/internal/ClientHTTP3Session.java | 97 +-------------- .../client/internal/HTTP3SessionClient.java | 16 ++- .../org/eclipse/jetty/http3/api/Session.java | 4 +- .../org/eclipse/jetty/http3/api/Stream.java | 17 ++- .../jetty/http3/internal/HTTP3Connection.java | 11 +- .../jetty/http3/internal/HTTP3Flusher.java | 113 ++++++++++++++++++ .../jetty/http3/internal/HTTP3Session.java | 76 +++++++++++- .../jetty/http3/internal/HTTP3Stream.java | 34 +++++- .../internal/generator/HeadersGenerator.java | 13 +- .../server/internal/HTTP3SessionServer.java | 14 +++ .../server/internal/ServerHTTP3Session.java | 13 ++ .../http3/tests/HTTP3ClientServerTest.java | 63 ++++++++-- 12 files changed, 343 insertions(+), 128 deletions(-) create mode 100644 jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Flusher.java diff --git a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3Session.java b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3Session.java index d09d2d7d842..b402d90ca15 100644 --- a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3Session.java +++ b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3Session.java @@ -13,31 +13,24 @@ package org.eclipse.jetty.http3.client.internal; -import java.nio.ByteBuffer; -import java.util.ArrayDeque; -import java.util.List; import java.util.Map; -import java.util.Queue; import org.eclipse.jetty.http3.api.Session; import org.eclipse.jetty.http3.frames.Frame; import org.eclipse.jetty.http3.frames.SettingsFrame; import org.eclipse.jetty.http3.internal.ControlFlusher; +import org.eclipse.jetty.http3.internal.HTTP3Flusher; import org.eclipse.jetty.http3.internal.InstructionFlusher; import org.eclipse.jetty.http3.internal.InstructionHandler; import org.eclipse.jetty.http3.internal.StreamConnection; -import org.eclipse.jetty.http3.internal.generator.MessageGenerator; import org.eclipse.jetty.http3.qpack.QpackDecoder; import org.eclipse.jetty.http3.qpack.QpackEncoder; -import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.quic.client.ClientProtocolSession; import org.eclipse.jetty.quic.client.ClientQuicSession; import org.eclipse.jetty.quic.common.QuicStreamEndPoint; import org.eclipse.jetty.quic.common.StreamType; import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.Promise; -import org.eclipse.jetty.util.thread.AutoLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,7 +44,7 @@ public class ClientHTTP3Session extends ClientProtocolSession private final InstructionFlusher encoderInstructionFlusher; private final InstructionFlusher decoderInstructionFlusher; private final ControlFlusher controlFlusher; - private final MessageFlusher messageFlusher; + private final HTTP3Flusher messageFlusher; public ClientHTTP3Session(ClientQuicSession session, Session.Client.Listener listener, Promise promise, int maxBlockedStreams, int maxResponseHeadersSize) { @@ -73,7 +66,7 @@ public class ClientHTTP3Session extends ClientProtocolSession this.controlFlusher = new ControlFlusher(session, controlEndPoint); // TODO: make parameters configurable. - this.messageFlusher = new MessageFlusher(session.getByteBufferPool(), encoder, 4096, true); + this.messageFlusher = new HTTP3Flusher(session.getByteBufferPool(), encoder, 4096, true); } public QpackDecoder getQpackDecoder() @@ -138,8 +131,9 @@ public class ClientHTTP3Session extends ClientProtocolSession connection.onOpen(); } - void writeMessageFrame(QuicStreamEndPoint endPoint, Frame frame, Callback callback) + void writeFrame(long streamId, Frame frame, Callback callback) { + QuicStreamEndPoint endPoint = getOrCreateStreamEndPoint(streamId, this::configureProtocolEndPoint); messageFlusher.offer(endPoint, frame, callback); messageFlusher.iterate(); } @@ -149,85 +143,4 @@ public class ClientHTTP3Session extends ClientProtocolSession { return String.format("%s@%x", getClass().getSimpleName(), hashCode()); } - - private static class MessageFlusher extends IteratingCallback - { - private final AutoLock lock = new AutoLock(); - private final Queue queue = new ArrayDeque<>(); - private final ByteBufferPool.Lease lease; - private final MessageGenerator generator; - private Entry entry; - - public MessageFlusher(ByteBufferPool byteBufferPool, QpackEncoder encoder, int maxHeadersLength, boolean useDirectByteBuffers) - { - this.lease = new ByteBufferPool.Lease(byteBufferPool); - this.generator = new MessageGenerator(encoder, maxHeadersLength, useDirectByteBuffers); - } - - public void offer(QuicStreamEndPoint endPoint, Frame frame, Callback callback) - { - try (AutoLock l = lock.lock()) - { - queue.offer(new Entry(endPoint, frame, callback)); - } - } - - @Override - protected Action process() - { - try (AutoLock l = lock.lock()) - { - entry = queue.poll(); - if (entry == null) - return Action.IDLE; - } - - if (LOG.isDebugEnabled()) - LOG.debug("flushing {} on {}", entry, this); - - generator.generate(lease, entry.endPoint.getStreamId(), entry.frame); - - QuicStreamEndPoint endPoint = entry.endPoint; - List buffers = lease.getByteBuffers(); - if (LOG.isDebugEnabled()) - LOG.debug("writing {} buffers ({} bytes) on {}", buffers.size(), lease.getTotalLength(), this); - endPoint.write(this, buffers.toArray(ByteBuffer[]::new)); - return Action.SCHEDULED; - } - - @Override - public void succeeded() - { - lease.recycle(); - entry.callback.succeeded(); - entry = null; - super.succeeded(); - } - - @Override - public InvocationType getInvocationType() - { - return entry.callback.getInvocationType(); - } - - private static class Entry - { - private final QuicStreamEndPoint endPoint; - private final Frame frame; - private final Callback callback; - - private Entry(QuicStreamEndPoint endPoint, Frame frame, Callback callback) - { - this.endPoint = endPoint; - this.frame = frame; - this.callback = callback; - } - - @Override - public String toString() - { - return String.format("%s#%d", frame, endPoint.getStreamId()); - } - } - } } diff --git a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/HTTP3SessionClient.java b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/HTTP3SessionClient.java index 2ee108be50e..2bf8464040b 100644 --- a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/HTTP3SessionClient.java +++ b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/HTTP3SessionClient.java @@ -17,10 +17,10 @@ import java.util.concurrent.CompletableFuture; import org.eclipse.jetty.http3.api.Session; import org.eclipse.jetty.http3.api.Stream; +import org.eclipse.jetty.http3.frames.Frame; import org.eclipse.jetty.http3.frames.HeadersFrame; import org.eclipse.jetty.http3.internal.HTTP3Session; import org.eclipse.jetty.http3.internal.HTTP3Stream; -import org.eclipse.jetty.quic.common.QuicStreamEndPoint; import org.eclipse.jetty.quic.common.StreamType; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Promise; @@ -46,23 +46,29 @@ public class HTTP3SessionClient extends HTTP3Session implements Session.Client return (ClientHTTP3Session)super.getProtocolSession(); } + @Override + protected void writeFrame(long streamId, Frame frame, Callback callback) + { + getProtocolSession().writeFrame(streamId, frame, callback); + } + public void onOpen() { promise.succeeded(this); } @Override - public CompletableFuture newStream(HeadersFrame frame, Stream.Listener listener) + public CompletableFuture newRequest(HeadersFrame frame, Stream.Listener listener) { ClientHTTP3Session session = getProtocolSession(); long streamId = session.getQuicSession().newStreamId(StreamType.CLIENT_BIDIRECTIONAL); - QuicStreamEndPoint endPoint = session.getOrCreateStreamEndPoint(streamId, session::configureProtocolEndPoint); Promise.Completable promise = new Promise.Completable<>(); - HTTP3Stream stream = newStream(endPoint, listener); + HTTP3Stream stream = createStream(streamId); + stream.setListener(listener); Callback callback = Callback.from(Invocable.InvocationType.NON_BLOCKING, () -> promise.succeeded(stream), promise::failed); - session.writeMessageFrame(endPoint, frame, callback); + session.writeFrame(streamId, frame, callback); return promise; } } diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Session.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Session.java index b7bb73b6732..0ea724d70b0 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Session.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Session.java @@ -23,7 +23,7 @@ public interface Session { public interface Client { - public CompletableFuture newStream(HeadersFrame frame, Stream.Listener listener); + public CompletableFuture newRequest(HeadersFrame frame, Stream.Listener listener); public interface Listener extends Session.Listener { @@ -49,7 +49,7 @@ public interface Session { } - public default Stream.Listener onHeaders(Stream stream, HeadersFrame frame) + public default Stream.Listener onRequest(Stream stream, HeadersFrame frame) { return null; } diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Stream.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Stream.java index b70d44adff6..b897ca232e0 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Stream.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Stream.java @@ -13,7 +13,22 @@ package org.eclipse.jetty.http3.api; +import java.util.concurrent.CompletableFuture; + +import org.eclipse.jetty.http3.frames.HeadersFrame; + public interface Stream { - public interface Listener {} + public CompletableFuture respond(HeadersFrame frame); + + public interface Listener + { + public default void onResponse(Stream stream, HeadersFrame frame) + { + } + + public default void onTrailer(Stream stream, HeadersFrame frame) + { + } + } } diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Connection.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Connection.java index 1ac3b15a9f1..3ec02865e93 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Connection.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Connection.java @@ -66,6 +66,9 @@ public class HTTP3Connection extends AbstractConnection while (true) { int filled = getEndPoint().fill(buffer); + if (LOG.isDebugEnabled()) + LOG.debug("filled {} on {}", filled, this); + if (filled > 0) { parser.parse(buffer); @@ -94,12 +97,4 @@ public class HTTP3Connection extends AbstractConnection getEndPoint().close(x); } } - - // TODO - // Output side. - // When responses want to send a HEADERS frame, - // they cannot generate the bytes and write them to the EP because otherwise they will be accessing the QpackEncoder concurrently. - // Therefore we need to have a reference from here back to ProtocolSession and do - // protocolSession.append(frames); - // Then ProtocolSession will have a Flusher that will generate the bytes in a single threaded way. } diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Flusher.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Flusher.java new file mode 100644 index 00000000000..ab0a4825a24 --- /dev/null +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Flusher.java @@ -0,0 +1,113 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.http3.internal; + +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.List; +import java.util.Queue; + +import org.eclipse.jetty.http3.frames.Frame; +import org.eclipse.jetty.http3.internal.generator.MessageGenerator; +import org.eclipse.jetty.http3.qpack.QpackEncoder; +import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.quic.common.QuicStreamEndPoint; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.IteratingCallback; +import org.eclipse.jetty.util.thread.AutoLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HTTP3Flusher extends IteratingCallback +{ + private static final Logger LOG = LoggerFactory.getLogger(HTTP3Flusher.class); + + private final AutoLock lock = new AutoLock(); + private final Queue queue = new ArrayDeque<>(); + private final ByteBufferPool.Lease lease; + private final MessageGenerator generator; + private Entry entry; + + public HTTP3Flusher(ByteBufferPool byteBufferPool, QpackEncoder encoder, int maxHeadersLength, boolean useDirectByteBuffers) + { + this.lease = new ByteBufferPool.Lease(byteBufferPool); + this.generator = new MessageGenerator(encoder, maxHeadersLength, useDirectByteBuffers); + } + + public void offer(QuicStreamEndPoint endPoint, Frame frame, Callback callback) + { + try (AutoLock l = lock.lock()) + { + queue.offer(new Entry(endPoint, frame, callback)); + } + } + + @Override + protected Action process() + { + try (AutoLock l = lock.lock()) + { + entry = queue.poll(); + if (entry == null) + return Action.IDLE; + } + + if (LOG.isDebugEnabled()) + LOG.debug("flushing {} on {}", entry, this); + + generator.generate(lease, entry.endPoint.getStreamId(), entry.frame); + + QuicStreamEndPoint endPoint = entry.endPoint; + List buffers = lease.getByteBuffers(); + if (LOG.isDebugEnabled()) + LOG.debug("writing {} buffers ({} bytes) on {}", buffers.size(), lease.getTotalLength(), this); + endPoint.write(this, buffers.toArray(ByteBuffer[]::new)); + return Action.SCHEDULED; + } + + @Override + public void succeeded() + { + lease.recycle(); + entry.callback.succeeded(); + entry = null; + super.succeeded(); + } + + @Override + public InvocationType getInvocationType() + { + return entry.callback.getInvocationType(); + } + + private static class Entry + { + private final QuicStreamEndPoint endPoint; + private final Frame frame; + private final Callback callback; + + private Entry(QuicStreamEndPoint endPoint, Frame frame, Callback callback) + { + this.endPoint = endPoint; + this.frame = frame; + this.callback = callback; + } + + @Override + public String toString() + { + return String.format("%s#%d", frame, endPoint.getStreamId()); + } + } +} diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Session.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Session.java index f1aee30eac9..a0337606fe1 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Session.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Session.java @@ -16,17 +16,19 @@ package org.eclipse.jetty.http3.internal; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http3.api.Session; import org.eclipse.jetty.http3.api.Stream; +import org.eclipse.jetty.http3.frames.Frame; import org.eclipse.jetty.http3.frames.HeadersFrame; import org.eclipse.jetty.http3.frames.SettingsFrame; import org.eclipse.jetty.http3.internal.parser.ParserListener; import org.eclipse.jetty.quic.common.ProtocolSession; -import org.eclipse.jetty.quic.common.QuicStreamEndPoint; +import org.eclipse.jetty.util.Callback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class HTTP3Session implements Session, ParserListener +public abstract class HTTP3Session implements Session, ParserListener { private static final Logger LOG = LoggerFactory.getLogger(HTTP3Session.class); @@ -45,11 +47,21 @@ public class HTTP3Session implements Session, ParserListener return session; } - protected HTTP3Stream newStream(QuicStreamEndPoint endPoint, Stream.Listener listener) + protected HTTP3Stream createStream(long streamId) { - return streams.computeIfAbsent(endPoint.getStreamId(), id -> new HTTP3Stream(endPoint, listener)); + HTTP3Stream stream = new HTTP3Stream(this, streamId); + if (streams.put(streamId, stream) != null) + throw new IllegalStateException("duplicate stream id " + streamId); + return stream; } + protected HTTP3Stream getOrCreateStream(long streamId) + { + return streams.computeIfAbsent(streamId, id -> new HTTP3Stream(this, streamId)); + } + + protected abstract void writeFrame(long streamId, Frame frame, Callback callback); + public Map onPreface() { Map settings = notifyPreface(); @@ -95,6 +107,60 @@ public class HTTP3Session implements Session, ParserListener public void onHeaders(long streamId, HeadersFrame frame) { if (LOG.isDebugEnabled()) - LOG.debug("received {} on {}", frame, this); + LOG.debug("received {}#{} on {}", frame, streamId, this); + + HTTP3Stream stream = getOrCreateStream(streamId); + + MetaData metaData = frame.getMetaData(); + if (metaData.isRequest()) + { + Stream.Listener streamListener = notifyRequest(stream, frame); + stream.setListener(streamListener); + } + else if (metaData.isResponse()) + { + notifyResponse(stream, frame); + } + else + { + notifyTrailer(stream, frame); + } + } + + private Stream.Listener notifyRequest(HTTP3Stream stream, HeadersFrame frame) + { + try + { + return listener.onRequest(stream, frame); + } + catch (Throwable x) + { + LOG.info("failure notifying listener {}", listener, x); + return null; + } + } + + private void notifyResponse(HTTP3Stream stream, HeadersFrame frame) + { + try + { + stream.getListener().onResponse(stream, frame); + } + catch (Throwable x) + { + LOG.info("failure notifying listener {}", listener, x); + } + } + + private void notifyTrailer(HTTP3Stream stream, HeadersFrame frame) + { + try + { + stream.getListener().onTrailer(stream, frame); + } + catch (Throwable x) + { + LOG.info("failure notifying listener {}", listener, x); + } } } diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Stream.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Stream.java index be271d29b52..1ded84e87d7 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Stream.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Stream.java @@ -13,13 +13,41 @@ package org.eclipse.jetty.http3.internal; +import java.util.concurrent.CompletableFuture; + import org.eclipse.jetty.http3.api.Stream; -import org.eclipse.jetty.quic.common.QuicStreamEndPoint; +import org.eclipse.jetty.http3.frames.HeadersFrame; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.Promise; +import org.eclipse.jetty.util.thread.Invocable; public class HTTP3Stream implements Stream { - public HTTP3Stream(QuicStreamEndPoint endPoint, Listener listener) - { + private final HTTP3Session session; + private final long streamId; + private Listener listener; + public HTTP3Stream(HTTP3Session session, long streamId) + { + this.session = session; + this.streamId = streamId; + } + + public Listener getListener() + { + return listener; + } + + public void setListener(Listener listener) + { + this.listener = listener; + } + + @Override + public CompletableFuture respond(HeadersFrame frame) + { + Promise.Completable completable = new Promise.Completable<>(); + session.writeFrame(streamId, frame, Callback.from(Invocable.InvocationType.NON_BLOCKING, () -> completable.succeeded(this), completable::failed)); + return completable; } } diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/generator/HeadersGenerator.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/generator/HeadersGenerator.java index 5aeff59cc43..9ffd4d7ea6f 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/generator/HeadersGenerator.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/generator/HeadersGenerator.java @@ -16,7 +16,9 @@ package org.eclipse.jetty.http3.internal.generator; import java.nio.ByteBuffer; import org.eclipse.jetty.http3.frames.Frame; +import org.eclipse.jetty.http3.frames.FrameType; import org.eclipse.jetty.http3.frames.HeadersFrame; +import org.eclipse.jetty.http3.internal.VarLenInt; import org.eclipse.jetty.http3.qpack.QpackEncoder; import org.eclipse.jetty.http3.qpack.QpackException; import org.eclipse.jetty.io.ByteBufferPool; @@ -48,12 +50,21 @@ public class HeadersGenerator extends FrameGenerator ByteBuffer buffer = lease.acquire(maxLength, useDirectByteBuffers); encoder.encode(buffer, streamId, frame.getMetaData()); buffer.flip(); + int length = buffer.remaining(); + int capacity = VarLenInt.length(FrameType.HEADERS.type()) + VarLenInt.length(length); + ByteBuffer header = ByteBuffer.allocate(capacity); + VarLenInt.generate(header, FrameType.HEADERS.type()); + VarLenInt.generate(header, length); + header.flip(); + lease.append(header, false); + lease.append(buffer, true); return buffer.remaining(); } catch (QpackException e) { + // TODO e.printStackTrace(); + return 0; } - return 0; } } diff --git a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HTTP3SessionServer.java b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HTTP3SessionServer.java index c63cd38b94d..0058af3035b 100644 --- a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HTTP3SessionServer.java +++ b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HTTP3SessionServer.java @@ -14,7 +14,9 @@ package org.eclipse.jetty.http3.server.internal; import org.eclipse.jetty.http3.api.Session; +import org.eclipse.jetty.http3.frames.Frame; import org.eclipse.jetty.http3.internal.HTTP3Session; +import org.eclipse.jetty.util.Callback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,4 +28,16 @@ public class HTTP3SessionServer extends HTTP3Session implements Session.Server { super(session, listener); } + + @Override + public ServerHTTP3Session getProtocolSession() + { + return (ServerHTTP3Session)super.getProtocolSession(); + } + + @Override + protected void writeFrame(long streamId, Frame frame, Callback callback) + { + getProtocolSession().writeFrame(streamId, frame, callback); + } } diff --git a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java index 358fa0f4976..5b52b079933 100644 --- a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java +++ b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java @@ -16,8 +16,10 @@ package org.eclipse.jetty.http3.server.internal; import java.util.Map; import org.eclipse.jetty.http3.api.Session; +import org.eclipse.jetty.http3.frames.Frame; import org.eclipse.jetty.http3.frames.SettingsFrame; import org.eclipse.jetty.http3.internal.ControlFlusher; +import org.eclipse.jetty.http3.internal.HTTP3Flusher; import org.eclipse.jetty.http3.internal.InstructionFlusher; import org.eclipse.jetty.http3.internal.InstructionHandler; import org.eclipse.jetty.http3.internal.StreamConnection; @@ -43,6 +45,7 @@ public class ServerHTTP3Session extends ServerProtocolSession private final InstructionFlusher decoderFlusher; private final ControlFlusher controlFlusher; private final MessageGenerator generator; + private final HTTP3Flusher messageFlusher; public ServerHTTP3Session(ServerQuicSession session, Session.Server.Listener listener, int maxBlockedStreams, int maxRequestHeadersSize) { @@ -64,6 +67,9 @@ public class ServerHTTP3Session extends ServerProtocolSession long controlStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL); QuicStreamEndPoint controlEndPoint = configureControlEndPoint(controlStreamId); this.controlFlusher = new ControlFlusher(session, controlEndPoint); + + // TODO: make parameters configurable. + this.messageFlusher = new HTTP3Flusher(session.getByteBufferPool(), encoder, 4096, true); } public QpackDecoder getQpackDecoder() @@ -132,4 +138,11 @@ public class ServerHTTP3Session extends ServerProtocolSession endPoint.onOpen(); connection.onOpen(); } + + void writeFrame(long streamId, Frame frame, Callback callback) + { + QuicStreamEndPoint endPoint = getOrCreateStreamEndPoint(streamId, this::configureProtocolEndPoint); + messageFlusher.offer(endPoint, frame, callback); + messageFlusher.iterate(); + } } diff --git a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HTTP3ClientServerTest.java b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HTTP3ClientServerTest.java index ab18272173d..0f94208683c 100644 --- a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HTTP3ClientServerTest.java +++ b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HTTP3ClientServerTest.java @@ -14,11 +14,13 @@ package org.eclipse.jetty.http3.tests; import java.net.InetSocketAddress; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpMethod; +import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.HttpURI; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.MetaData; @@ -50,20 +52,31 @@ public class HTTP3ClientServerTest serverThreads.setName("server"); Server server = new Server(serverThreads); - CountDownLatch settingsLatch = new CountDownLatch(1); - CountDownLatch serverLatch = new CountDownLatch(1); + CountDownLatch serverPrefaceLatch = new CountDownLatch(1); + CountDownLatch serverSettingsLatch = new CountDownLatch(1); + CountDownLatch serverRequestLatch = new CountDownLatch(1); ServerQuicConnector connector = new ServerQuicConnector(server, sslContextFactory, new RawHTTP3ServerConnectionFactory(new Session.Server.Listener() { @Override - public void onSettings(Session session, SettingsFrame frame) + public Map onPreface(Session session) { - settingsLatch.countDown(); + serverPrefaceLatch.countDown(); + return Map.of(); } @Override - public Stream.Listener onHeaders(Stream stream, HeadersFrame frame) + public void onSettings(Session session, SettingsFrame frame) { - serverLatch.countDown(); + serverSettingsLatch.countDown(); + } + + @Override + public Stream.Listener onRequest(Stream stream, HeadersFrame frame) + { + serverRequestLatch.countDown(); + // Send the response. + stream.respond(new HeadersFrame(new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY))); + // Not interested in request data. return null; } })); @@ -73,17 +86,45 @@ public class HTTP3ClientServerTest HTTP3Client client = new HTTP3Client(); client.start(); - Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {}) + CountDownLatch clientPrefaceLatch = new CountDownLatch(1); + CountDownLatch clientSettingsLatch = new CountDownLatch(1); + Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() + { + @Override + public Map onPreface(Session session) + { + clientPrefaceLatch.countDown(); + return Map.of(); + } + + @Override + public void onSettings(Session session, SettingsFrame frame) + { + clientSettingsLatch.countDown(); + } + }) .get(555, TimeUnit.SECONDS); - HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort()); + CountDownLatch clientResponseLatch = new CountDownLatch(1); + HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/"); MetaData.Request metaData = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY); HeadersFrame frame = new HeadersFrame(metaData); - Stream stream = session.newStream(frame, new Stream.Listener() {}) + Stream stream = session.newRequest(frame, new Stream.Listener() + { + @Override + public void onResponse(Stream stream, HeadersFrame frame) + { + clientResponseLatch.countDown(); + } + }) .get(555, TimeUnit.SECONDS); assertNotNull(stream); - assertTrue(settingsLatch.await(555, TimeUnit.SECONDS)); - assertTrue(serverLatch.await(555, TimeUnit.SECONDS)); + assertTrue(clientPrefaceLatch.await(555, TimeUnit.SECONDS)); + assertTrue(serverPrefaceLatch.await(555, TimeUnit.SECONDS)); + assertTrue(serverSettingsLatch.await(555, TimeUnit.SECONDS)); + assertTrue(clientSettingsLatch.await(555, TimeUnit.SECONDS)); + assertTrue(serverRequestLatch.await(555, TimeUnit.SECONDS)); + assertTrue(clientResponseLatch.await(555, TimeUnit.SECONDS)); } }