Issue #6728 - QUIC and HTTP/3

- Simple request/response (no content) working.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-09-13 18:47:46 +02:00
parent f6958de4b2
commit b6c65404d5
12 changed files with 343 additions and 128 deletions

View File

@ -13,31 +13,24 @@
package org.eclipse.jetty.http3.client.internal; package org.eclipse.jetty.http3.client.internal;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Queue;
import org.eclipse.jetty.http3.api.Session; import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.frames.Frame; import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.SettingsFrame; import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.ControlFlusher; import org.eclipse.jetty.http3.internal.ControlFlusher;
import org.eclipse.jetty.http3.internal.HTTP3Flusher;
import org.eclipse.jetty.http3.internal.InstructionFlusher; import org.eclipse.jetty.http3.internal.InstructionFlusher;
import org.eclipse.jetty.http3.internal.InstructionHandler; import org.eclipse.jetty.http3.internal.InstructionHandler;
import org.eclipse.jetty.http3.internal.StreamConnection; import org.eclipse.jetty.http3.internal.StreamConnection;
import org.eclipse.jetty.http3.internal.generator.MessageGenerator;
import org.eclipse.jetty.http3.qpack.QpackDecoder; import org.eclipse.jetty.http3.qpack.QpackDecoder;
import org.eclipse.jetty.http3.qpack.QpackEncoder; import org.eclipse.jetty.http3.qpack.QpackEncoder;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.quic.client.ClientProtocolSession; import org.eclipse.jetty.quic.client.ClientProtocolSession;
import org.eclipse.jetty.quic.client.ClientQuicSession; import org.eclipse.jetty.quic.client.ClientQuicSession;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint; import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.quic.common.StreamType; import org.eclipse.jetty.quic.common.StreamType;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -51,7 +44,7 @@ public class ClientHTTP3Session extends ClientProtocolSession
private final InstructionFlusher encoderInstructionFlusher; private final InstructionFlusher encoderInstructionFlusher;
private final InstructionFlusher decoderInstructionFlusher; private final InstructionFlusher decoderInstructionFlusher;
private final ControlFlusher controlFlusher; private final ControlFlusher controlFlusher;
private final MessageFlusher messageFlusher; private final HTTP3Flusher messageFlusher;
public ClientHTTP3Session(ClientQuicSession session, Session.Client.Listener listener, Promise<Session.Client> promise, int maxBlockedStreams, int maxResponseHeadersSize) public ClientHTTP3Session(ClientQuicSession session, Session.Client.Listener listener, Promise<Session.Client> promise, int maxBlockedStreams, int maxResponseHeadersSize)
{ {
@ -73,7 +66,7 @@ public class ClientHTTP3Session extends ClientProtocolSession
this.controlFlusher = new ControlFlusher(session, controlEndPoint); this.controlFlusher = new ControlFlusher(session, controlEndPoint);
// TODO: make parameters configurable. // TODO: make parameters configurable.
this.messageFlusher = new MessageFlusher(session.getByteBufferPool(), encoder, 4096, true); this.messageFlusher = new HTTP3Flusher(session.getByteBufferPool(), encoder, 4096, true);
} }
public QpackDecoder getQpackDecoder() public QpackDecoder getQpackDecoder()
@ -138,8 +131,9 @@ public class ClientHTTP3Session extends ClientProtocolSession
connection.onOpen(); connection.onOpen();
} }
void writeMessageFrame(QuicStreamEndPoint endPoint, Frame frame, Callback callback) void writeFrame(long streamId, Frame frame, Callback callback)
{ {
QuicStreamEndPoint endPoint = getOrCreateStreamEndPoint(streamId, this::configureProtocolEndPoint);
messageFlusher.offer(endPoint, frame, callback); messageFlusher.offer(endPoint, frame, callback);
messageFlusher.iterate(); messageFlusher.iterate();
} }
@ -149,85 +143,4 @@ public class ClientHTTP3Session extends ClientProtocolSession
{ {
return String.format("%s@%x", getClass().getSimpleName(), hashCode()); return String.format("%s@%x", getClass().getSimpleName(), hashCode());
} }
private static class MessageFlusher extends IteratingCallback
{
private final AutoLock lock = new AutoLock();
private final Queue<Entry> queue = new ArrayDeque<>();
private final ByteBufferPool.Lease lease;
private final MessageGenerator generator;
private Entry entry;
public MessageFlusher(ByteBufferPool byteBufferPool, QpackEncoder encoder, int maxHeadersLength, boolean useDirectByteBuffers)
{
this.lease = new ByteBufferPool.Lease(byteBufferPool);
this.generator = new MessageGenerator(encoder, maxHeadersLength, useDirectByteBuffers);
}
public void offer(QuicStreamEndPoint endPoint, Frame frame, Callback callback)
{
try (AutoLock l = lock.lock())
{
queue.offer(new Entry(endPoint, frame, callback));
}
}
@Override
protected Action process()
{
try (AutoLock l = lock.lock())
{
entry = queue.poll();
if (entry == null)
return Action.IDLE;
}
if (LOG.isDebugEnabled())
LOG.debug("flushing {} on {}", entry, this);
generator.generate(lease, entry.endPoint.getStreamId(), entry.frame);
QuicStreamEndPoint endPoint = entry.endPoint;
List<ByteBuffer> buffers = lease.getByteBuffers();
if (LOG.isDebugEnabled())
LOG.debug("writing {} buffers ({} bytes) on {}", buffers.size(), lease.getTotalLength(), this);
endPoint.write(this, buffers.toArray(ByteBuffer[]::new));
return Action.SCHEDULED;
}
@Override
public void succeeded()
{
lease.recycle();
entry.callback.succeeded();
entry = null;
super.succeeded();
}
@Override
public InvocationType getInvocationType()
{
return entry.callback.getInvocationType();
}
private static class Entry
{
private final QuicStreamEndPoint endPoint;
private final Frame frame;
private final Callback callback;
private Entry(QuicStreamEndPoint endPoint, Frame frame, Callback callback)
{
this.endPoint = endPoint;
this.frame = frame;
this.callback = callback;
}
@Override
public String toString()
{
return String.format("%s#%d", frame, endPoint.getStreamId());
}
}
}
} }

View File

@ -17,10 +17,10 @@ import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.http3.api.Session; import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.api.Stream; import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.HeadersFrame; import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.internal.HTTP3Session; import org.eclipse.jetty.http3.internal.HTTP3Session;
import org.eclipse.jetty.http3.internal.HTTP3Stream; import org.eclipse.jetty.http3.internal.HTTP3Stream;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.quic.common.StreamType; import org.eclipse.jetty.quic.common.StreamType;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.Promise;
@ -46,23 +46,29 @@ public class HTTP3SessionClient extends HTTP3Session implements Session.Client
return (ClientHTTP3Session)super.getProtocolSession(); return (ClientHTTP3Session)super.getProtocolSession();
} }
@Override
protected void writeFrame(long streamId, Frame frame, Callback callback)
{
getProtocolSession().writeFrame(streamId, frame, callback);
}
public void onOpen() public void onOpen()
{ {
promise.succeeded(this); promise.succeeded(this);
} }
@Override @Override
public CompletableFuture<Stream> newStream(HeadersFrame frame, Stream.Listener listener) public CompletableFuture<Stream> newRequest(HeadersFrame frame, Stream.Listener listener)
{ {
ClientHTTP3Session session = getProtocolSession(); ClientHTTP3Session session = getProtocolSession();
long streamId = session.getQuicSession().newStreamId(StreamType.CLIENT_BIDIRECTIONAL); long streamId = session.getQuicSession().newStreamId(StreamType.CLIENT_BIDIRECTIONAL);
QuicStreamEndPoint endPoint = session.getOrCreateStreamEndPoint(streamId, session::configureProtocolEndPoint);
Promise.Completable<Stream> promise = new Promise.Completable<>(); Promise.Completable<Stream> promise = new Promise.Completable<>();
HTTP3Stream stream = newStream(endPoint, listener); HTTP3Stream stream = createStream(streamId);
stream.setListener(listener);
Callback callback = Callback.from(Invocable.InvocationType.NON_BLOCKING, () -> promise.succeeded(stream), promise::failed); Callback callback = Callback.from(Invocable.InvocationType.NON_BLOCKING, () -> promise.succeeded(stream), promise::failed);
session.writeMessageFrame(endPoint, frame, callback); session.writeFrame(streamId, frame, callback);
return promise; return promise;
} }
} }

View File

@ -23,7 +23,7 @@ public interface Session
{ {
public interface Client public interface Client
{ {
public CompletableFuture<Stream> newStream(HeadersFrame frame, Stream.Listener listener); public CompletableFuture<Stream> newRequest(HeadersFrame frame, Stream.Listener listener);
public interface Listener extends Session.Listener public interface Listener extends Session.Listener
{ {
@ -49,7 +49,7 @@ public interface Session
{ {
} }
public default Stream.Listener onHeaders(Stream stream, HeadersFrame frame) public default Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{ {
return null; return null;
} }

View File

@ -13,7 +13,22 @@
package org.eclipse.jetty.http3.api; package org.eclipse.jetty.http3.api;
import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.http3.frames.HeadersFrame;
public interface Stream public interface Stream
{ {
public interface Listener {} public CompletableFuture<Stream> respond(HeadersFrame frame);
public interface Listener
{
public default void onResponse(Stream stream, HeadersFrame frame)
{
}
public default void onTrailer(Stream stream, HeadersFrame frame)
{
}
}
} }

View File

@ -66,6 +66,9 @@ public class HTTP3Connection extends AbstractConnection
while (true) while (true)
{ {
int filled = getEndPoint().fill(buffer); int filled = getEndPoint().fill(buffer);
if (LOG.isDebugEnabled())
LOG.debug("filled {} on {}", filled, this);
if (filled > 0) if (filled > 0)
{ {
parser.parse(buffer); parser.parse(buffer);
@ -94,12 +97,4 @@ public class HTTP3Connection extends AbstractConnection
getEndPoint().close(x); getEndPoint().close(x);
} }
} }
// TODO
// Output side.
// When responses want to send a HEADERS frame,
// they cannot generate the bytes and write them to the EP because otherwise they will be accessing the QpackEncoder concurrently.
// Therefore we need to have a reference from here back to ProtocolSession and do
// protocolSession.append(frames);
// Then ProtocolSession will have a Flusher that will generate the bytes in a single threaded way.
} }

View File

@ -0,0 +1,113 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.internal;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Queue;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.internal.generator.MessageGenerator;
import org.eclipse.jetty.http3.qpack.QpackEncoder;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HTTP3Flusher extends IteratingCallback
{
private static final Logger LOG = LoggerFactory.getLogger(HTTP3Flusher.class);
private final AutoLock lock = new AutoLock();
private final Queue<Entry> queue = new ArrayDeque<>();
private final ByteBufferPool.Lease lease;
private final MessageGenerator generator;
private Entry entry;
public HTTP3Flusher(ByteBufferPool byteBufferPool, QpackEncoder encoder, int maxHeadersLength, boolean useDirectByteBuffers)
{
this.lease = new ByteBufferPool.Lease(byteBufferPool);
this.generator = new MessageGenerator(encoder, maxHeadersLength, useDirectByteBuffers);
}
public void offer(QuicStreamEndPoint endPoint, Frame frame, Callback callback)
{
try (AutoLock l = lock.lock())
{
queue.offer(new Entry(endPoint, frame, callback));
}
}
@Override
protected Action process()
{
try (AutoLock l = lock.lock())
{
entry = queue.poll();
if (entry == null)
return Action.IDLE;
}
if (LOG.isDebugEnabled())
LOG.debug("flushing {} on {}", entry, this);
generator.generate(lease, entry.endPoint.getStreamId(), entry.frame);
QuicStreamEndPoint endPoint = entry.endPoint;
List<ByteBuffer> buffers = lease.getByteBuffers();
if (LOG.isDebugEnabled())
LOG.debug("writing {} buffers ({} bytes) on {}", buffers.size(), lease.getTotalLength(), this);
endPoint.write(this, buffers.toArray(ByteBuffer[]::new));
return Action.SCHEDULED;
}
@Override
public void succeeded()
{
lease.recycle();
entry.callback.succeeded();
entry = null;
super.succeeded();
}
@Override
public InvocationType getInvocationType()
{
return entry.callback.getInvocationType();
}
private static class Entry
{
private final QuicStreamEndPoint endPoint;
private final Frame frame;
private final Callback callback;
private Entry(QuicStreamEndPoint endPoint, Frame frame, Callback callback)
{
this.endPoint = endPoint;
this.frame = frame;
this.callback = callback;
}
@Override
public String toString()
{
return String.format("%s#%d", frame, endPoint.getStreamId());
}
}
}

View File

@ -16,17 +16,19 @@ package org.eclipse.jetty.http3.internal;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http3.api.Session; import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.api.Stream; import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.HeadersFrame; import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.frames.SettingsFrame; import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.parser.ParserListener; import org.eclipse.jetty.http3.internal.parser.ParserListener;
import org.eclipse.jetty.quic.common.ProtocolSession; import org.eclipse.jetty.quic.common.ProtocolSession;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint; import org.eclipse.jetty.util.Callback;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class HTTP3Session implements Session, ParserListener public abstract class HTTP3Session implements Session, ParserListener
{ {
private static final Logger LOG = LoggerFactory.getLogger(HTTP3Session.class); private static final Logger LOG = LoggerFactory.getLogger(HTTP3Session.class);
@ -45,11 +47,21 @@ public class HTTP3Session implements Session, ParserListener
return session; return session;
} }
protected HTTP3Stream newStream(QuicStreamEndPoint endPoint, Stream.Listener listener) protected HTTP3Stream createStream(long streamId)
{ {
return streams.computeIfAbsent(endPoint.getStreamId(), id -> new HTTP3Stream(endPoint, listener)); HTTP3Stream stream = new HTTP3Stream(this, streamId);
if (streams.put(streamId, stream) != null)
throw new IllegalStateException("duplicate stream id " + streamId);
return stream;
} }
protected HTTP3Stream getOrCreateStream(long streamId)
{
return streams.computeIfAbsent(streamId, id -> new HTTP3Stream(this, streamId));
}
protected abstract void writeFrame(long streamId, Frame frame, Callback callback);
public Map<Long, Long> onPreface() public Map<Long, Long> onPreface()
{ {
Map<Long, Long> settings = notifyPreface(); Map<Long, Long> settings = notifyPreface();
@ -95,6 +107,60 @@ public class HTTP3Session implements Session, ParserListener
public void onHeaders(long streamId, HeadersFrame frame) public void onHeaders(long streamId, HeadersFrame frame)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("received {} on {}", frame, this); LOG.debug("received {}#{} on {}", frame, streamId, this);
HTTP3Stream stream = getOrCreateStream(streamId);
MetaData metaData = frame.getMetaData();
if (metaData.isRequest())
{
Stream.Listener streamListener = notifyRequest(stream, frame);
stream.setListener(streamListener);
}
else if (metaData.isResponse())
{
notifyResponse(stream, frame);
}
else
{
notifyTrailer(stream, frame);
}
}
private Stream.Listener notifyRequest(HTTP3Stream stream, HeadersFrame frame)
{
try
{
return listener.onRequest(stream, frame);
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
return null;
}
}
private void notifyResponse(HTTP3Stream stream, HeadersFrame frame)
{
try
{
stream.getListener().onResponse(stream, frame);
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
}
}
private void notifyTrailer(HTTP3Stream stream, HeadersFrame frame)
{
try
{
stream.getListener().onTrailer(stream, frame);
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
}
} }
} }

View File

@ -13,13 +13,41 @@
package org.eclipse.jetty.http3.internal; package org.eclipse.jetty.http3.internal;
import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.http3.api.Stream; import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint; import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.Invocable;
public class HTTP3Stream implements Stream public class HTTP3Stream implements Stream
{ {
public HTTP3Stream(QuicStreamEndPoint endPoint, Listener listener) private final HTTP3Session session;
{ private final long streamId;
private Listener listener;
public HTTP3Stream(HTTP3Session session, long streamId)
{
this.session = session;
this.streamId = streamId;
}
public Listener getListener()
{
return listener;
}
public void setListener(Listener listener)
{
this.listener = listener;
}
@Override
public CompletableFuture<Stream> respond(HeadersFrame frame)
{
Promise.Completable<Stream> completable = new Promise.Completable<>();
session.writeFrame(streamId, frame, Callback.from(Invocable.InvocationType.NON_BLOCKING, () -> completable.succeeded(this), completable::failed));
return completable;
} }
} }

View File

@ -16,7 +16,9 @@ package org.eclipse.jetty.http3.internal.generator;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.eclipse.jetty.http3.frames.Frame; import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.FrameType;
import org.eclipse.jetty.http3.frames.HeadersFrame; import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.internal.VarLenInt;
import org.eclipse.jetty.http3.qpack.QpackEncoder; import org.eclipse.jetty.http3.qpack.QpackEncoder;
import org.eclipse.jetty.http3.qpack.QpackException; import org.eclipse.jetty.http3.qpack.QpackException;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
@ -48,12 +50,21 @@ public class HeadersGenerator extends FrameGenerator
ByteBuffer buffer = lease.acquire(maxLength, useDirectByteBuffers); ByteBuffer buffer = lease.acquire(maxLength, useDirectByteBuffers);
encoder.encode(buffer, streamId, frame.getMetaData()); encoder.encode(buffer, streamId, frame.getMetaData());
buffer.flip(); buffer.flip();
int length = buffer.remaining();
int capacity = VarLenInt.length(FrameType.HEADERS.type()) + VarLenInt.length(length);
ByteBuffer header = ByteBuffer.allocate(capacity);
VarLenInt.generate(header, FrameType.HEADERS.type());
VarLenInt.generate(header, length);
header.flip();
lease.append(header, false);
lease.append(buffer, true);
return buffer.remaining(); return buffer.remaining();
} }
catch (QpackException e) catch (QpackException e)
{ {
// TODO
e.printStackTrace(); e.printStackTrace();
return 0;
} }
return 0;
} }
} }

View File

@ -14,7 +14,9 @@
package org.eclipse.jetty.http3.server.internal; package org.eclipse.jetty.http3.server.internal;
import org.eclipse.jetty.http3.api.Session; import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.internal.HTTP3Session; import org.eclipse.jetty.http3.internal.HTTP3Session;
import org.eclipse.jetty.util.Callback;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -26,4 +28,16 @@ public class HTTP3SessionServer extends HTTP3Session implements Session.Server
{ {
super(session, listener); super(session, listener);
} }
@Override
public ServerHTTP3Session getProtocolSession()
{
return (ServerHTTP3Session)super.getProtocolSession();
}
@Override
protected void writeFrame(long streamId, Frame frame, Callback callback)
{
getProtocolSession().writeFrame(streamId, frame, callback);
}
} }

View File

@ -16,8 +16,10 @@ package org.eclipse.jetty.http3.server.internal;
import java.util.Map; import java.util.Map;
import org.eclipse.jetty.http3.api.Session; import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.SettingsFrame; import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.ControlFlusher; import org.eclipse.jetty.http3.internal.ControlFlusher;
import org.eclipse.jetty.http3.internal.HTTP3Flusher;
import org.eclipse.jetty.http3.internal.InstructionFlusher; import org.eclipse.jetty.http3.internal.InstructionFlusher;
import org.eclipse.jetty.http3.internal.InstructionHandler; import org.eclipse.jetty.http3.internal.InstructionHandler;
import org.eclipse.jetty.http3.internal.StreamConnection; import org.eclipse.jetty.http3.internal.StreamConnection;
@ -43,6 +45,7 @@ public class ServerHTTP3Session extends ServerProtocolSession
private final InstructionFlusher decoderFlusher; private final InstructionFlusher decoderFlusher;
private final ControlFlusher controlFlusher; private final ControlFlusher controlFlusher;
private final MessageGenerator generator; private final MessageGenerator generator;
private final HTTP3Flusher messageFlusher;
public ServerHTTP3Session(ServerQuicSession session, Session.Server.Listener listener, int maxBlockedStreams, int maxRequestHeadersSize) public ServerHTTP3Session(ServerQuicSession session, Session.Server.Listener listener, int maxBlockedStreams, int maxRequestHeadersSize)
{ {
@ -64,6 +67,9 @@ public class ServerHTTP3Session extends ServerProtocolSession
long controlStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL); long controlStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL);
QuicStreamEndPoint controlEndPoint = configureControlEndPoint(controlStreamId); QuicStreamEndPoint controlEndPoint = configureControlEndPoint(controlStreamId);
this.controlFlusher = new ControlFlusher(session, controlEndPoint); this.controlFlusher = new ControlFlusher(session, controlEndPoint);
// TODO: make parameters configurable.
this.messageFlusher = new HTTP3Flusher(session.getByteBufferPool(), encoder, 4096, true);
} }
public QpackDecoder getQpackDecoder() public QpackDecoder getQpackDecoder()
@ -132,4 +138,11 @@ public class ServerHTTP3Session extends ServerProtocolSession
endPoint.onOpen(); endPoint.onOpen();
connection.onOpen(); connection.onOpen();
} }
void writeFrame(long streamId, Frame frame, Callback callback)
{
QuicStreamEndPoint endPoint = getOrCreateStreamEndPoint(streamId, this::configureProtocolEndPoint);
messageFlusher.offer(endPoint, frame, callback);
messageFlusher.iterate();
}
} }

View File

@ -14,11 +14,13 @@
package org.eclipse.jetty.http3.tests; package org.eclipse.jetty.http3.tests;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpURI; import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http.MetaData;
@ -50,20 +52,31 @@ public class HTTP3ClientServerTest
serverThreads.setName("server"); serverThreads.setName("server");
Server server = new Server(serverThreads); Server server = new Server(serverThreads);
CountDownLatch settingsLatch = new CountDownLatch(1); CountDownLatch serverPrefaceLatch = new CountDownLatch(1);
CountDownLatch serverLatch = new CountDownLatch(1); CountDownLatch serverSettingsLatch = new CountDownLatch(1);
CountDownLatch serverRequestLatch = new CountDownLatch(1);
ServerQuicConnector connector = new ServerQuicConnector(server, sslContextFactory, new RawHTTP3ServerConnectionFactory(new Session.Server.Listener() ServerQuicConnector connector = new ServerQuicConnector(server, sslContextFactory, new RawHTTP3ServerConnectionFactory(new Session.Server.Listener()
{ {
@Override @Override
public void onSettings(Session session, SettingsFrame frame) public Map<Long, Long> onPreface(Session session)
{ {
settingsLatch.countDown(); serverPrefaceLatch.countDown();
return Map.of();
} }
@Override @Override
public Stream.Listener onHeaders(Stream stream, HeadersFrame frame) public void onSettings(Session session, SettingsFrame frame)
{ {
serverLatch.countDown(); serverSettingsLatch.countDown();
}
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{
serverRequestLatch.countDown();
// Send the response.
stream.respond(new HeadersFrame(new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY)));
// Not interested in request data.
return null; return null;
} }
})); }));
@ -73,17 +86,45 @@ public class HTTP3ClientServerTest
HTTP3Client client = new HTTP3Client(); HTTP3Client client = new HTTP3Client();
client.start(); client.start();
Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {}) CountDownLatch clientPrefaceLatch = new CountDownLatch(1);
CountDownLatch clientSettingsLatch = new CountDownLatch(1);
Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener()
{
@Override
public Map<Long, Long> onPreface(Session session)
{
clientPrefaceLatch.countDown();
return Map.of();
}
@Override
public void onSettings(Session session, SettingsFrame frame)
{
clientSettingsLatch.countDown();
}
})
.get(555, TimeUnit.SECONDS); .get(555, TimeUnit.SECONDS);
HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort()); CountDownLatch clientResponseLatch = new CountDownLatch(1);
HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/");
MetaData.Request metaData = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY); MetaData.Request metaData = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY);
HeadersFrame frame = new HeadersFrame(metaData); HeadersFrame frame = new HeadersFrame(metaData);
Stream stream = session.newStream(frame, new Stream.Listener() {}) Stream stream = session.newRequest(frame, new Stream.Listener()
{
@Override
public void onResponse(Stream stream, HeadersFrame frame)
{
clientResponseLatch.countDown();
}
})
.get(555, TimeUnit.SECONDS); .get(555, TimeUnit.SECONDS);
assertNotNull(stream); assertNotNull(stream);
assertTrue(settingsLatch.await(555, TimeUnit.SECONDS)); assertTrue(clientPrefaceLatch.await(555, TimeUnit.SECONDS));
assertTrue(serverLatch.await(555, TimeUnit.SECONDS)); assertTrue(serverPrefaceLatch.await(555, TimeUnit.SECONDS));
assertTrue(serverSettingsLatch.await(555, TimeUnit.SECONDS));
assertTrue(clientSettingsLatch.await(555, TimeUnit.SECONDS));
assertTrue(serverRequestLatch.await(555, TimeUnit.SECONDS));
assertTrue(clientResponseLatch.await(555, TimeUnit.SECONDS));
} }
} }