Issue #6728 - QUIC and HTTP/3

- Restructured code
- Split implementation of control and message parsing/generation.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-09-12 23:27:58 +02:00
parent 29e82ace1b
commit 7c75e9c22f
50 changed files with 1443 additions and 569 deletions

View File

@ -14,10 +14,13 @@
module org.eclipse.jetty.http3.client
{
exports org.eclipse.jetty.http3.client;
exports org.eclipse.jetty.http3.client.internal;
requires transitive org.eclipse.jetty.http3.common;
requires transitive org.eclipse.jetty.http3.qpack;
requires transitive org.eclipse.jetty.io;
requires transitive org.eclipse.jetty.quic.common;
requires org.eclipse.jetty.quic.client;
requires transitive org.eclipse.jetty.util;
requires org.slf4j;
}

View File

@ -27,12 +27,22 @@ import org.eclipse.jetty.quic.client.QuicClientConnectorConfigurator;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <pre>
* / dgramEP1 - ClientQuiConnection -* ClientQuicSession - ClientProtocolSession
* HTTP3Client / ControlStream
* \ dgramEP3 - ClientQuiConnection -* ClientQuicSession - ClientHTTP3Session -* HTTP3Streams
* </pre>
*/
public class HTTP3Client extends ContainerLifeCycle
{
public static final String CLIENT_CONTEXT_KEY = HTTP3Client.class.getName();
public static final String SESSION_LISTENER_CONTEXT_KEY = CLIENT_CONTEXT_KEY + ".listener";
private static final String SESSION_PROMISE_CONTEXT_KEY = CLIENT_CONTEXT_KEY + ".promise";
public static final String SESSION_PROMISE_CONTEXT_KEY = CLIENT_CONTEXT_KEY + ".promise";
private static final Logger LOG = LoggerFactory.getLogger(HTTP3Client.class);
private final ClientConnector connector;
private List<String> protocols = List.of("h3");
@ -54,17 +64,21 @@ public class HTTP3Client extends ContainerLifeCycle
this.protocols = protocols;
}
public CompletableFuture<Session> connect(SocketAddress address, Session.Listener listener)
public CompletableFuture<Session.Client> connect(SocketAddress address, Session.Client.Listener listener)
{
Promise.Completable<Session> completable = new Promise.Completable<>();
ClientConnectionFactory factory = new HTTP3ClientConnectionFactory();
Map<String, Object> context = new ConcurrentHashMap<>();
Promise.Completable<Session.Client> completable = new Promise.Completable<>();
ClientConnectionFactory factory = new HTTP3ClientConnectionFactory();
context.put(CLIENT_CONTEXT_KEY, this);
context.put(SESSION_LISTENER_CONTEXT_KEY, listener);
context.put(SESSION_PROMISE_CONTEXT_KEY, completable);
context.put(ClientQuicConnection.APPLICATION_PROTOCOLS, getProtocols());
context.put(ClientConnector.CLIENT_CONNECTION_FACTORY_CONTEXT_KEY, factory);
context.put(ClientConnector.CONNECTION_PROMISE_CONTEXT_KEY, Promise.from(ioConnection -> {}, completable::failed));
if (LOG.isDebugEnabled())
LOG.debug("connecting to {}", address);
connector.connect(address, context);
return completable;
}

View File

@ -13,32 +13,71 @@
package org.eclipse.jetty.http3.client;
import java.io.IOException;
import java.util.Map;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.internal.generator.Generator;
import org.eclipse.jetty.http3.client.internal.ClientHTTP3Session;
import org.eclipse.jetty.http3.internal.HTTP3Connection;
import org.eclipse.jetty.http3.internal.parser.MessageParser;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.quic.client.ClientQuicSession;
import org.eclipse.jetty.quic.common.ProtocolQuicSession;
import org.eclipse.jetty.quic.common.ProtocolSession;
import org.eclipse.jetty.quic.common.QuicSession;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.util.Promise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HTTP3ClientConnectionFactory implements ClientConnectionFactory, ProtocolQuicSession.Factory
public class HTTP3ClientConnectionFactory implements ClientConnectionFactory, ProtocolSession.Factory
{
@Override
public ProtocolQuicSession newProtocolQuicSession(QuicSession quicSession, Map<String, Object> context)
private static final Logger LOG = LoggerFactory.getLogger(HTTP3ClientConnectionFactory.class);
private int maxBlockedStreams;
private int maxResponseHeadersSize = 8192;
public int getMaxBlockedStreams()
{
HTTP3Client http3Client = (HTTP3Client)context.get(HTTP3Client.CLIENT_CONTEXT_KEY);
Session.Listener listener = (Session.Listener)context.get(HTTP3Client.SESSION_LISTENER_CONTEXT_KEY);
Generator generator = new Generator();
return new HTTP3ClientQuicSession((ClientQuicSession)quicSession, listener, generator);
return maxBlockedStreams;
}
public void setMaxBlockedStreams(int maxBlockedStreams)
{
this.maxBlockedStreams = maxBlockedStreams;
}
public int getMaxResponseHeadersSize()
{
return maxResponseHeadersSize;
}
public void setMaxResponseHeadersSize(int maxResponseHeadersSize)
{
this.maxResponseHeadersSize = maxResponseHeadersSize;
}
@Override
public Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
public ProtocolSession newProtocolSession(QuicSession quicSession, Map<String, Object> context)
{
return null;
Session.Client.Listener listener = (Session.Client.Listener)context.get(HTTP3Client.SESSION_LISTENER_CONTEXT_KEY);
@SuppressWarnings("unchecked")
Promise<Session.Client> promise = (Promise<Session.Client>)context.get(HTTP3Client.SESSION_PROMISE_CONTEXT_KEY);
ClientHTTP3Session protocolSession = new ClientHTTP3Session((ClientQuicSession)quicSession, listener, promise, getMaxBlockedStreams(), getMaxResponseHeadersSize());
if (LOG.isDebugEnabled())
LOG.debug("created protocol-specific {}", protocolSession);
return protocolSession;
}
@Override
public Connection newConnection(EndPoint endPoint, Map<String, Object> context)
{
// TODO: can the downcasts be removed?
QuicStreamEndPoint streamEndPoint = (QuicStreamEndPoint)endPoint;
long streamId = streamEndPoint.getStreamId();
ClientHTTP3Session http3Session = (ClientHTTP3Session)streamEndPoint.getQuicSession().getProtocolSession();
// TODO: Parser may be created internally, if I pass the QuicStreamEndPoint and ClientHTTP3Session.
MessageParser parser = new MessageParser(streamId, http3Session.getQpackDecoder(), http3Session);
return new HTTP3Connection(endPoint, http3Session.getQuicSession().getExecutor(), http3Session.getQuicSession().getByteBufferPool(), parser);
}
}

View File

@ -1,127 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.client;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.ControlConnection;
import org.eclipse.jetty.http3.internal.ControlFlusher;
import org.eclipse.jetty.http3.internal.DecoderConnection;
import org.eclipse.jetty.http3.internal.EncoderConnection;
import org.eclipse.jetty.http3.internal.VarLenInt;
import org.eclipse.jetty.http3.internal.generator.Generator;
import org.eclipse.jetty.quic.client.ClientQuicSession;
import org.eclipse.jetty.quic.client.ProtocolClientQuicSession;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.util.Callback;
public class HTTP3ClientQuicSession extends ProtocolClientQuicSession implements Session
{
private final Session.Listener listener;
private final QuicStreamEndPoint decoderEndPoint;
private final QuicStreamEndPoint encoderEndPoint;
private final QuicStreamEndPoint controlEndPoint;
private final ControlFlusher controlFlusher;
public HTTP3ClientQuicSession(ClientQuicSession session, Session.Listener listener, Generator generator)
{
super(session);
this.listener = listener;
long decoderStreamId = getQuicSession().newClientUnidirectionalStreamId();
decoderEndPoint = configureDecoderEndPoint(decoderStreamId);
long encoderStreamId = getQuicSession().newClientUnidirectionalStreamId();
encoderEndPoint = configureEncoderEndPoint(encoderStreamId);
long controlStreamId = getQuicSession().newClientBidirectionalStreamId();
this.controlEndPoint = configureControlEndPoint(controlStreamId);
this.controlFlusher = new ControlFlusher(session, generator, controlEndPoint);
}
@Override
public void onOpen()
{
// Queue a synthetic frame to send the control stream type.
ByteBuffer buffer = ByteBuffer.allocate(VarLenInt.length(ControlConnection.STREAM_TYPE));
VarLenInt.generate(buffer, ControlConnection.STREAM_TYPE);
buffer.flip();
controlFlusher.offer(new Frame.Synthetic(buffer), Callback.NOOP);
// Queue the mandatory SETTINGS frame.
Map<Long, Long> settings = listener.onPreface(this);
if (settings == null)
settings = Map.of();
// TODO: add default settings.
SettingsFrame frame = new SettingsFrame(settings);
controlFlusher.offer(frame, Callback.NOOP);
controlFlusher.iterate();
process();
}
private QuicStreamEndPoint configureDecoderEndPoint(long streamId)
{
return getOrCreateStreamEndPoint(streamId, endPoint ->
{
// This is a write-only stream, so no need to link a Connection.
endPoint.onOpen();
int streamType = DecoderConnection.QPACK_DECODER_STREAM_TYPE;
ByteBuffer buffer = ByteBuffer.allocate(VarLenInt.length(streamType));
VarLenInt.generate(buffer, streamType);
buffer.flip();
endPoint.write(Callback.NOOP, buffer);
});
}
private QuicStreamEndPoint configureEncoderEndPoint(long streamId)
{
return getOrCreateStreamEndPoint(streamId, endPoint ->
{
// This is a write-only stream, so no need to link a Connection.
endPoint.onOpen();
int streamType = EncoderConnection.QPACK_ENCODER_STREAM_TYPE;
ByteBuffer buffer = ByteBuffer.allocate(VarLenInt.length(streamType));
VarLenInt.generate(buffer, streamType);
buffer.flip();
endPoint.write(Callback.NOOP, buffer);
});
}
private QuicStreamEndPoint configureControlEndPoint(long streamId)
{
return getOrCreateStreamEndPoint(streamId, endPoint ->
{
ControlConnection connection = new ControlConnection(endPoint, getQuicSession().getExecutor());
endPoint.setConnection(connection);
endPoint.onOpen();
connection.onOpen();
});
}
@Override
public CompletableFuture<Stream> newStream(HeadersFrame frame, Stream.Listener listener)
{
return null;
}
}

View File

@ -0,0 +1,238 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.client.internal;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Map;
import java.util.Queue;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.ControlConnection;
import org.eclipse.jetty.http3.internal.ControlFlusher;
import org.eclipse.jetty.http3.internal.InstructionFlusher;
import org.eclipse.jetty.http3.internal.InstructionHandler;
import org.eclipse.jetty.http3.internal.StreamConnection;
import org.eclipse.jetty.http3.internal.VarLenInt;
import org.eclipse.jetty.http3.internal.generator.MessageGenerator;
import org.eclipse.jetty.http3.internal.parser.ParserListener;
import org.eclipse.jetty.http3.qpack.QpackDecoder;
import org.eclipse.jetty.http3.qpack.QpackEncoder;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.quic.client.ClientProtocolSession;
import org.eclipse.jetty.quic.client.ClientQuicSession;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.quic.common.StreamType;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ClientHTTP3Session extends ClientProtocolSession implements ParserListener
{
private static final Logger LOG = LoggerFactory.getLogger(ClientHTTP3Session.class);
private final QpackEncoder encoder;
private final QpackDecoder decoder;
private final HTTP3SessionClient apiSession;
private final InstructionFlusher encoderInstructionFlusher;
private final InstructionFlusher decoderInstructionFlusher;
private final ControlFlusher controlFlusher;
private final MessageFlusher messageFlusher;
public ClientHTTP3Session(ClientQuicSession session, Session.Client.Listener listener, Promise<Session.Client> promise, int maxBlockedStreams, int maxResponseHeadersSize)
{
super(session);
this.apiSession = new HTTP3SessionClient(this, listener, promise);
long encoderStreamId = getQuicSession().newStreamId(StreamType.CLIENT_UNIDIRECTIONAL);
QuicStreamEndPoint encoderEndPoint = configureInstructionEndPoint(encoderStreamId);
this.encoderInstructionFlusher = new InstructionFlusher(session, encoderEndPoint);
this.encoder = new QpackEncoder(new InstructionHandler(encoderInstructionFlusher), maxBlockedStreams);
long decoderStreamId = getQuicSession().newStreamId(StreamType.CLIENT_UNIDIRECTIONAL);
QuicStreamEndPoint decoderEndPoint = configureInstructionEndPoint(decoderStreamId);
this.decoderInstructionFlusher = new InstructionFlusher(session, decoderEndPoint);
this.decoder = new QpackDecoder(new InstructionHandler(decoderInstructionFlusher), maxResponseHeadersSize);
long controlStreamId = getQuicSession().newStreamId(StreamType.CLIENT_BIDIRECTIONAL);
QuicStreamEndPoint controlEndPoint = configureControlEndPoint(controlStreamId);
this.controlFlusher = new ControlFlusher(session, controlEndPoint);
this.messageFlusher = new MessageFlusher(session.getByteBufferPool(), encoder);
}
public QpackDecoder getQpackDecoder()
{
return decoder;
}
@Override
public void onOpen()
{
initializeEncoderStream();
initializeDecoderStream();
initializeControlStream();
apiSession.onOpen();
}
private void initializeEncoderStream()
{
encoderInstructionFlusher.iterate();
}
private void initializeDecoderStream()
{
decoderInstructionFlusher.iterate();
}
private void initializeControlStream()
{
// Queue a synthetic frame to send the control stream type.
ByteBuffer buffer = ByteBuffer.allocate(VarLenInt.length(ControlConnection.STREAM_TYPE));
VarLenInt.generate(buffer, ControlConnection.STREAM_TYPE);
buffer.flip();
controlFlusher.offer(new Frame.Synthetic(buffer), Callback.NOOP);
// Queue the mandatory SETTINGS frame.
Map<Long, Long> settings = apiSession.onPreface();
if (settings == null)
settings = Map.of();
// TODO: add default settings.
SettingsFrame frame = new SettingsFrame(settings);
controlFlusher.offer(frame, Callback.NOOP);
controlFlusher.iterate();
}
private QuicStreamEndPoint configureInstructionEndPoint(long streamId)
{
// This is a write-only stream, so no need to link a Connection.
return getOrCreateStreamEndPoint(streamId, QuicStreamEndPoint::onOpen);
}
private QuicStreamEndPoint configureControlEndPoint(long streamId)
{
return getOrCreateStreamEndPoint(streamId, this::configureStreamEndPoint);
}
@Override
protected void onReadable(long readableStreamId)
{
StreamType streamType = StreamType.from(readableStreamId);
if (streamType == StreamType.CLIENT_BIDIRECTIONAL)
{
super.onReadable(readableStreamId);
}
else
{
QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, this::configureStreamEndPoint);
if (LOG.isDebugEnabled())
LOG.debug("stream {} selected endpoint for read: {}", readableStreamId, streamEndPoint);
streamEndPoint.onReadable();
}
}
private void configureStreamEndPoint(QuicStreamEndPoint endPoint)
{
StreamConnection connection = new StreamConnection(endPoint, getQuicSession().getExecutor(), getQuicSession().getByteBufferPool(), this);
endPoint.setConnection(connection);
endPoint.onOpen();
connection.onOpen();
}
void writeMessageFrame(QuicStreamEndPoint endPoint, Frame frame, Callback callback)
{
messageFlusher.offer(endPoint, frame, callback);
messageFlusher.iterate();
}
@Override
public String toString()
{
return String.format("%s@%x", getClass().getSimpleName(), hashCode());
}
private static class MessageFlusher extends IteratingCallback
{
private final AutoLock lock = new AutoLock();
private final Queue<Entry> queue = new ArrayDeque<>();
private final ByteBufferPool.Lease lease;
private final MessageGenerator generator;
private Entry entry;
public MessageFlusher(ByteBufferPool byteBufferPool, QpackEncoder encoder)
{
this.lease = new ByteBufferPool.Lease(byteBufferPool);
this.generator = new MessageGenerator(encoder);
}
public void offer(QuicStreamEndPoint endPoint, Frame frame, Callback callback)
{
try (AutoLock l = lock.lock())
{
queue.offer(new Entry(endPoint, frame, callback));
}
}
@Override
protected Action process()
{
try (AutoLock l = lock.lock())
{
entry = queue.poll();
if (entry == null)
return Action.IDLE;
}
generator.generate(lease, entry.frame);
QuicStreamEndPoint endPoint = entry.endPoint;
endPoint.write(this, lease.getByteBuffers().toArray(ByteBuffer[]::new));
return Action.SCHEDULED;
}
@Override
public void succeeded()
{
lease.recycle();
entry.callback.succeeded();
entry = null;
super.succeeded();
}
@Override
public InvocationType getInvocationType()
{
return entry.callback.getInvocationType();
}
private static class Entry
{
private final QuicStreamEndPoint endPoint;
private final Frame frame;
private final Callback callback;
private Entry(QuicStreamEndPoint endPoint, Frame frame, Callback callback)
{
this.endPoint = endPoint;
this.frame = frame;
this.callback = callback;
}
}
}
}

View File

@ -0,0 +1,64 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.client.internal;
import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.internal.HTTP3Session;
import org.eclipse.jetty.http3.internal.HTTP3Stream;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.quic.common.StreamType;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.Invocable;
public class HTTP3SessionClient extends HTTP3Session implements Session.Client
{
private final Promise<Client> promise;
public HTTP3SessionClient(ClientHTTP3Session session, Client.Listener listener, Promise<Client> promise)
{
super(session, listener);
this.promise = promise;
}
@Override
public ClientHTTP3Session getProtocolSession()
{
return (ClientHTTP3Session)super.getProtocolSession();
}
public void onOpen()
{
promise.succeeded(this);
}
@Override
public CompletableFuture<Stream> newStream(HeadersFrame frame, Stream.Listener listener)
{
ClientHTTP3Session session = getProtocolSession();
long streamId = session.getQuicSession().newStreamId(StreamType.CLIENT_BIDIRECTIONAL);
QuicStreamEndPoint endPoint = session.getOrCreateStreamEndPoint(streamId, session::configureProtocolEndPoint);
Promise.Completable<Stream> promise = new Promise.Completable<>();
HTTP3Stream stream = new HTTP3Stream(endPoint, listener);
Callback callback = Callback.from(Invocable.InvocationType.NON_BLOCKING, () -> promise.succeeded(stream), promise::failed);
session.writeMessageFrame(endPoint, frame, callback);
return promise;
}
}

View File

@ -15,11 +15,10 @@ module org.eclipse.jetty.http3.common
{
exports org.eclipse.jetty.http3;
exports org.eclipse.jetty.http3.api;
exports org.eclipse.jetty.http3.api.server;
exports org.eclipse.jetty.http3.frames;
exports org.eclipse.jetty.http3.internal.generator to org.eclipse.jetty.http3.client, org.eclipse.jetty.http3.server;
exports org.eclipse.jetty.http3.internal.parser to org.eclipse.jetty.http3.server;
exports org.eclipse.jetty.http3.internal.parser to org.eclipse.jetty.http3.client, org.eclipse.jetty.http3.server;
exports org.eclipse.jetty.http3.internal;
requires transitive org.eclipse.jetty.http;

View File

@ -20,7 +20,22 @@ import org.eclipse.jetty.http3.frames.HeadersFrame;
public interface Session
{
public CompletableFuture<Stream> newStream(HeadersFrame frame, Stream.Listener listener);
public interface Client
{
public CompletableFuture<Stream> newStream(HeadersFrame frame, Stream.Listener listener);
public interface Listener extends Session.Listener
{
}
}
public interface Server
{
public interface Listener extends Session.Listener
{
// TODO: accept event.
}
}
public interface Listener
{
@ -28,5 +43,10 @@ public interface Session
{
return null;
}
public default Stream.Listener onHeaders(Stream stream, HeadersFrame frame)
{
return null;
}
}
}

View File

@ -13,18 +13,44 @@
package org.eclipse.jetty.http3.internal;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import org.eclipse.jetty.http3.internal.parser.ControlParser;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ControlConnection extends AbstractConnection
public class ControlConnection extends AbstractConnection implements Connection.UpgradeTo
{
// SPEC: Control Stream Type.
public static final int STREAM_TYPE = 0x00;
private static final Logger LOG = LoggerFactory.getLogger(ControlConnection.class);
public ControlConnection(EndPoint endPoint, Executor executor)
private final ByteBufferPool byteBufferPool;
private final ControlParser parser;
private boolean useInputDirectByteBuffers = true;
private ByteBuffer buffer;
public ControlConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, ControlParser parser)
{
super(endPoint, executor);
this.byteBufferPool = byteBufferPool;
this.parser = parser;
}
public boolean isUseInputDirectByteBuffers()
{
return useInputDirectByteBuffers;
}
public void setUseInputDirectByteBuffers(boolean useInputDirectByteBuffers)
{
this.useInputDirectByteBuffers = useInputDirectByteBuffers;
}
@Override
@ -34,8 +60,52 @@ public class ControlConnection extends AbstractConnection
fillInterested();
}
@Override
public void onUpgradeTo(ByteBuffer upgrade)
{
if (BufferUtil.isEmpty(upgrade))
return;
int capacity = Math.max(upgrade.remaining(), getInputBufferSize());
buffer = byteBufferPool.acquire(capacity, isUseInputDirectByteBuffers());
buffer.put(upgrade);
}
@Override
public void onFillable()
{
try
{
if (buffer == null)
buffer = byteBufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
while (true)
{
int filled = getEndPoint().fill(buffer);
if (filled > 0)
{
parser.parse(buffer);
}
else if (filled == 0)
{
byteBufferPool.release(buffer);
fillInterested();
break;
}
else
{
byteBufferPool.release(buffer);
buffer = null;
getEndPoint().close();
break;
}
}
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("could not process control stream {}", getEndPoint(), x);
byteBufferPool.release(buffer);
buffer = null;
getEndPoint().close(x);
}
}
}

View File

@ -20,7 +20,7 @@ import java.util.List;
import java.util.Queue;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.internal.generator.Generator;
import org.eclipse.jetty.http3.internal.generator.ControlGenerator;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.quic.common.QuicSession;
@ -38,16 +38,16 @@ public class ControlFlusher extends IteratingCallback
private final AutoLock lock = new AutoLock();
private final Queue<Entry> queue = new ArrayDeque<>();
private final ByteBufferPool.Lease lease;
private final Generator generator;
private final ControlGenerator generator;
private final EndPoint endPoint;
private List<Entry> entries;
private InvocationType invocationType = InvocationType.NON_BLOCKING;
public ControlFlusher(QuicSession session, Generator generator, EndPoint endPoint)
public ControlFlusher(QuicSession session, EndPoint endPoint)
{
this.lease = new ByteBufferPool.Lease(session.getByteBufferPool());
this.generator = generator;
this.endPoint = endPoint;
this.generator = new ControlGenerator();
}
public void offer(Frame frame, Callback callback)

View File

@ -20,13 +20,33 @@ import org.eclipse.jetty.io.EndPoint;
public class DecoderConnection extends AbstractConnection
{
public static final int QPACK_DECODER_STREAM_TYPE = 0x03;
// SPEC: QPACK Encoder Stream Type.
public static final int STREAM_TYPE = 0x03;
private boolean useInputDirectByteBuffers = true;
public DecoderConnection(EndPoint endPoint, Executor executor)
{
super(endPoint, executor);
}
public boolean isUseInputDirectByteBuffers()
{
return useInputDirectByteBuffers;
}
public void setUseInputDirectByteBuffers(boolean useInputDirectByteBuffers)
{
this.useInputDirectByteBuffers = useInputDirectByteBuffers;
}
@Override
public void onOpen()
{
super.onOpen();
fillInterested();
}
@Override
public void onFillable()
{

View File

@ -20,13 +20,33 @@ import org.eclipse.jetty.io.EndPoint;
public class EncoderConnection extends AbstractConnection
{
public static final int QPACK_ENCODER_STREAM_TYPE = 0x02;
// SPEC: QPACK Encoder Stream Type.
public static final int STREAM_TYPE = 0x02;
private boolean useInputDirectByteBuffers = true;
public EncoderConnection(EndPoint endPoint, Executor executor)
{
super(endPoint, executor);
}
public boolean isUseInputDirectByteBuffers()
{
return useInputDirectByteBuffers;
}
public void setUseInputDirectByteBuffers(boolean useInputDirectByteBuffers)
{
this.useInputDirectByteBuffers = useInputDirectByteBuffers;
}
@Override
public void onOpen()
{
super.onOpen();
fillInterested();
}
@Override
public void onFillable()
{

View File

@ -13,25 +13,42 @@
package org.eclipse.jetty.http3.internal;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import org.eclipse.jetty.http3.internal.parser.Parser;
import org.eclipse.jetty.http3.internal.parser.MessageParser;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.quic.common.ProtocolQuicSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HTTP3Connection extends AbstractConnection
{
private final ProtocolQuicSession protocolSession;
private final Parser parser;
private static final Logger LOG = LoggerFactory.getLogger(HTTP3Connection.class);
public HTTP3Connection(EndPoint endPoint, Executor executor, Parser parser)
private final ByteBufferPool byteBufferPool;
private final MessageParser parser;
private boolean useInputDirectByteBuffers = true;
private ByteBuffer buffer;
public HTTP3Connection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, MessageParser parser)
{
super(endPoint, executor);
this.protocolSession = null; // TODO
this.byteBufferPool = byteBufferPool;
this.parser = parser;
}
public boolean isUseInputDirectByteBuffers()
{
return useInputDirectByteBuffers;
}
public void setUseInputDirectByteBuffers(boolean useInputDirectByteBuffers)
{
this.useInputDirectByteBuffers = useInputDirectByteBuffers;
}
@Override
public void onOpen()
{
@ -42,13 +59,47 @@ public class HTTP3Connection extends AbstractConnection
@Override
public void onFillable()
{
try
{
if (buffer == null)
buffer = byteBufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
while (true)
{
int filled = getEndPoint().fill(buffer);
if (filled > 0)
{
parser.parse(buffer);
}
else if (filled == 0)
{
byteBufferPool.release(buffer);
fillInterested();
break;
}
else
{
byteBufferPool.release(buffer);
buffer = null;
getEndPoint().close();
break;
}
}
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("could not process control stream {}", getEndPoint(), x);
byteBufferPool.release(buffer);
buffer = null;
getEndPoint().close(x);
}
}
// TODO
// Output side.
// When responses want to send a HEADERS frame,
// they cannot generate the bytes and write them to the EP because otherwise they will be accessing the QpackEncoder concurrently.
// Therefore we need to have a reference from here back to ProtocolQuicSession and do
// protocolQuicSession.append(frames);
// Then ProtocolQuicSession will have a Flusher that will generate the bytes in a single threaded way.
// Therefore we need to have a reference from here back to ProtocolSession and do
// protocolSession.append(frames);
// Then ProtocolSession will have a Flusher that will generate the bytes in a single threaded way.
}

View File

@ -13,18 +13,30 @@
package org.eclipse.jetty.http3.internal;
import java.util.concurrent.CompletableFuture;
import java.util.Map;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.internal.parser.Parser;
import org.eclipse.jetty.http3.internal.parser.ParserListener;
import org.eclipse.jetty.quic.common.ProtocolSession;
public class HTTP3Session implements Session, Parser.Listener
public class HTTP3Session implements Session, ParserListener
{
@Override
public CompletableFuture<Stream> newStream(HeadersFrame frame, Stream.Listener listener)
private final ProtocolSession session;
private final Listener listener;
public HTTP3Session(ProtocolSession session, Listener listener)
{
return null;
this.session = session;
this.listener = listener;
}
public ProtocolSession getProtocolSession()
{
return session;
}
public Map<Long, Long> onPreface()
{
return listener.onPreface(this);
}
}

View File

@ -11,10 +11,15 @@
// ========================================================================
//
package org.eclipse.jetty.http3.api.server;
package org.eclipse.jetty.http3.internal;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
public interface ServerSessionListener extends Session.Listener
public class HTTP3Stream implements Stream
{
public HTTP3Stream(QuicStreamEndPoint endPoint, Listener listener)
{
}
}

View File

@ -0,0 +1,90 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.internal;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import org.eclipse.jetty.http3.qpack.Instruction;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.quic.common.QuicSession;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.thread.AutoLock;
public class InstructionFlusher extends IteratingCallback
{
private final AutoLock lock = new AutoLock();
private final Queue<Instruction> queue = new ArrayDeque<>();
private final ByteBufferPool.Lease lease;
private final EndPoint endPoint;
private boolean initialized;
public InstructionFlusher(QuicSession session, EndPoint endPoint)
{
this.lease = new ByteBufferPool.Lease(session.getByteBufferPool());
this.endPoint = endPoint;
}
public void offer(List<Instruction> instructions)
{
try (AutoLock l = lock.lock())
{
queue.addAll(instructions);
}
}
@Override
protected Action process()
{
if (initialized)
{
List<Instruction> instructions;
try (AutoLock l = lock.lock())
{
if (queue.isEmpty())
return Action.IDLE;
instructions = new ArrayList<>(queue);
}
instructions.forEach(i -> i.encode(lease));
endPoint.write(this, lease.getByteBuffers().toArray(ByteBuffer[]::new));
return Action.SCHEDULED;
}
else
{
initialized = true;
ByteBuffer buffer = ByteBuffer.allocate(VarLenInt.length(EncoderConnection.STREAM_TYPE));
VarLenInt.generate(buffer, EncoderConnection.STREAM_TYPE);
buffer.flip();
endPoint.write(NOOP, buffer);
return Action.SCHEDULED;
}
}
@Override
public void succeeded()
{
lease.recycle();
super.succeeded();
}
@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
}

View File

@ -0,0 +1,35 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.internal;
import java.util.List;
import org.eclipse.jetty.http3.qpack.Instruction;
public class InstructionHandler implements Instruction.Handler
{
private final InstructionFlusher encoderFlusher;
public InstructionHandler(InstructionFlusher encoderFlusher)
{
this.encoderFlusher = encoderFlusher;
}
@Override
public void onInstructions(List<Instruction> instructions)
{
encoderFlusher.offer(instructions);
encoderFlusher.iterate();
}
}

View File

@ -0,0 +1,149 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.internal;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import org.eclipse.jetty.http3.internal.parser.ControlParser;
import org.eclipse.jetty.http3.internal.parser.ParserListener;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StreamConnection extends AbstractConnection implements Connection.UpgradeFrom
{
private static final Logger LOG = LoggerFactory.getLogger(StreamConnection.class);
private final ByteBufferPool byteBufferPool;
private final ParserListener listener;
private final VarLenInt parser = new VarLenInt();
private boolean useInputDirectByteBuffers = true;
private ByteBuffer buffer;
public StreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, ParserListener listener)
{
super(endPoint, executor);
this.byteBufferPool = byteBufferPool;
this.listener = listener;
}
public boolean isUseInputDirectByteBuffers()
{
return useInputDirectByteBuffers;
}
public void setUseInputDirectByteBuffers(boolean useInputDirectByteBuffers)
{
this.useInputDirectByteBuffers = useInputDirectByteBuffers;
}
@Override
public void onOpen()
{
super.onOpen();
fillInterested();
}
@Override
public ByteBuffer onUpgradeFrom()
{
int remaining = buffer.remaining();
ByteBuffer copy = buffer.isDirect() ? ByteBuffer.allocateDirect(remaining) : ByteBuffer.allocate(remaining);
copy.put(buffer);
byteBufferPool.release(buffer);
buffer = null;
return copy;
}
@Override
public void onFillable()
{
try
{
if (buffer == null)
buffer = byteBufferPool.acquire(2048, isUseInputDirectByteBuffers());
while (true)
{
int filled = getEndPoint().fill(buffer);
if (filled > 0)
{
if (parser.parseInt(buffer, this::detectAndUpgrade))
break;
}
else if (filled == 0)
{
byteBufferPool.release(buffer);
fillInterested();
break;
}
else
{
byteBufferPool.release(buffer);
buffer = null;
getEndPoint().close();
break;
}
}
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("could not process special stream {}", getEndPoint(), x);
byteBufferPool.release(buffer);
buffer = null;
getEndPoint().close(x);
}
}
private void detectAndUpgrade(int streamType)
{
switch (streamType)
{
case ControlConnection.STREAM_TYPE:
{
ControlParser parser = new ControlParser(listener);
ControlConnection newConnection = new ControlConnection(getEndPoint(), getExecutor(), byteBufferPool, parser);
newConnection.setInputBufferSize(getInputBufferSize());
newConnection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers());
getEndPoint().upgrade(newConnection);
break;
}
case EncoderConnection.STREAM_TYPE:
{
EncoderConnection newConnection = new EncoderConnection(getEndPoint(), getExecutor());
newConnection.setInputBufferSize(getInputBufferSize());
newConnection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers());
getEndPoint().upgrade(newConnection);
break;
}
case DecoderConnection.STREAM_TYPE:
{
DecoderConnection newConnection = new DecoderConnection(getEndPoint(), getExecutor());
newConnection.setInputBufferSize(getInputBufferSize());
newConnection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers());
getEndPoint().upgrade(newConnection);
break;
}
default:
{
throw new IllegalStateException("unexpected stream type " + Integer.toHexString(streamType));
}
}
}
}

View File

@ -17,17 +17,14 @@ import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.FrameType;
import org.eclipse.jetty.io.ByteBufferPool;
public class Generator
public class ControlGenerator
{
private final FrameGenerator[] generators = new FrameGenerator[FrameType.maxType() + 1];
public Generator()
public ControlGenerator()
{
generators[FrameType.DATA.type()] = new DataGenerator();
generators[FrameType.HEADERS.type()] = new HeadersGenerator();
generators[FrameType.CANCEL_PUSH.type()] = new CancelPushGenerator();
generators[FrameType.SETTINGS.type()] = new SettingsGenerator();
generators[FrameType.PUSH_PROMISE.type()] = new PushPromiseGenerator();
generators[FrameType.GOAWAY.type()] = new GoAwayGenerator();
generators[FrameType.MAX_PUSH_ID.type()] = new MaxPushIdGenerator();
}

View File

@ -0,0 +1,36 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.internal.generator;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.FrameType;
import org.eclipse.jetty.http3.qpack.QpackEncoder;
import org.eclipse.jetty.io.ByteBufferPool;
public class MessageGenerator
{
private final FrameGenerator[] generators = new FrameGenerator[FrameType.maxType() + 1];
public MessageGenerator(QpackEncoder encoder)
{
generators[FrameType.DATA.type()] = new DataGenerator();
generators[FrameType.HEADERS.type()] = new HeadersGenerator();
generators[FrameType.PUSH_PROMISE.type()] = new PushPromiseGenerator();
}
public int generate(ByteBufferPool.Lease lease, Frame frame)
{
return generators[frame.getFrameType().type()].generate(lease, frame);
}
}

View File

@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory;
* <p>Subclasses implement {@link #parse(ByteBuffer)} to parse
* the frame specific body.</p>
*
* @see Parser
* @see MessageParser
*/
public abstract class BodyParser
{
@ -36,9 +36,9 @@ public abstract class BodyParser
private final long streamId;
private final HeaderParser headerParser;
private final Parser.Listener listener;
private final ParserListener listener;
protected BodyParser(long streamId, HeaderParser headerParser, Parser.Listener listener)
protected BodyParser(long streamId, HeaderParser headerParser, ParserListener listener)
{
this.streamId = streamId;
this.headerParser = headerParser;
@ -105,7 +105,7 @@ public abstract class BodyParser
{
try
{
listener.onData(frame);
listener.onData(getStreamId(), frame);
}
catch (Throwable x)
{
@ -117,7 +117,7 @@ public abstract class BodyParser
{
try
{
listener.onHeaders(frame);
listener.onHeaders(getStreamId(), frame);
}
catch (Throwable x)
{

View File

@ -17,7 +17,7 @@ import java.nio.ByteBuffer;
public class CancelPushBodyParser extends BodyParser
{
public CancelPushBodyParser(HeaderParser headerParser, Parser.Listener listener)
public CancelPushBodyParser(HeaderParser headerParser, ParserListener listener)
{
super(1, headerParser, listener);
}

View File

@ -0,0 +1,135 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.internal.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http3.ErrorCode;
import org.eclipse.jetty.http3.frames.FrameType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>The HTTP/3 protocol parser.</p>
* <p>This parser makes use of the {@link HeaderParser} and of
* {@link BodyParser}s to parse HTTP/3 frames.</p>
*/
public class ControlParser
{
private static final Logger LOG = LoggerFactory.getLogger(ControlParser.class);
private final HeaderParser headerParser;
private final BodyParser[] bodyParsers = new BodyParser[FrameType.maxType() + 1];
private final BodyParser unknownBodyParser;
private final ParserListener listener;
private State state = State.HEADER;
public ControlParser(ParserListener listener)
{
this.headerParser = new HeaderParser();
this.bodyParsers[FrameType.CANCEL_PUSH.type()] = new CancelPushBodyParser(headerParser, listener);
this.bodyParsers[FrameType.SETTINGS.type()] = new SettingsBodyParser(headerParser, listener);
this.bodyParsers[FrameType.GOAWAY.type()] = new GoAwayBodyParser(headerParser, listener);
this.bodyParsers[FrameType.MAX_PUSH_ID.type()] = new MaxPushIdBodyParser(headerParser, listener);
this.unknownBodyParser = new UnknownBodyParser(headerParser, listener);
this.listener = listener;
}
private void reset()
{
headerParser.reset();
state = State.HEADER;
}
/**
* <p>Parses the given {@code buffer} bytes and emit events to a {@link ParserListener}.</p>
*
* @param buffer the buffer to parse
*/
public void parse(ByteBuffer buffer)
{
try
{
while (true)
{
switch (state)
{
case HEADER:
{
if (headerParser.parse(buffer))
{
state = State.BODY;
break;
}
return;
}
case BODY:
{
BodyParser bodyParser = null;
int frameType = headerParser.getFrameType();
if (frameType >= 0 && frameType < bodyParsers.length)
bodyParser = bodyParsers[frameType];
if (bodyParser == null)
{
// Unknown frame types must be ignored.
if (LOG.isDebugEnabled())
LOG.debug("Ignoring unknown frame type {}", Integer.toHexString(frameType));
if (!unknownBodyParser.parse(buffer))
return;
reset();
}
else
{
if (headerParser.getFrameLength() == 0)
{
bodyParser.emptyBody(buffer);
}
else
{
if (!bodyParser.parse(buffer))
return;
}
if (LOG.isDebugEnabled())
LOG.debug("Parsed {} frame body from {}", FrameType.from(frameType), buffer);
reset();
}
break;
}
default:
{
throw new IllegalStateException();
}
}
}
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Parse failed", x);
buffer.clear();
connectionFailure(buffer, ErrorCode.INTERNAL_ERROR.code(), "parser_error");
}
}
private void connectionFailure(ByteBuffer buffer, int error, String reason)
{
unknownBodyParser.sessionFailure(buffer, error, reason);
}
private enum State
{
HEADER, BODY
}
}

View File

@ -27,7 +27,7 @@ public class DataBodyParser extends BodyParser
private State state = State.INIT;
private long length;
public DataBodyParser(long streamId, HeaderParser headerParser, Parser.Listener listener)
public DataBodyParser(long streamId, HeaderParser headerParser, ParserListener listener)
{
super(streamId, headerParser, listener);
}

View File

@ -17,7 +17,7 @@ import java.nio.ByteBuffer;
public class GoAwayBodyParser extends BodyParser
{
public GoAwayBodyParser(HeaderParser headerParser, Parser.Listener listener)
public GoAwayBodyParser(HeaderParser headerParser, ParserListener listener)
{
super(1, headerParser, listener);
}

View File

@ -20,7 +20,7 @@ import org.eclipse.jetty.http3.internal.VarLenInt;
/**
* <p>The parser for the frame header of HTTP/3 frames.</p>
*
* @see Parser
* @see MessageParser
*/
public class HeaderParser
{

View File

@ -35,7 +35,7 @@ public class HeadersBodyParser extends BodyParser
private State state = State.INIT;
private long length;
public HeadersBodyParser(long streamId, HeaderParser headerParser, Parser.Listener listener, QpackDecoder decoder)
public HeadersBodyParser(long streamId, HeaderParser headerParser, ParserListener listener, QpackDecoder decoder)
{
super(streamId, headerParser, listener);
this.decoder = decoder;

View File

@ -17,7 +17,7 @@ import java.nio.ByteBuffer;
public class MaxPushIdBodyParser extends BodyParser
{
public MaxPushIdBodyParser(HeaderParser headerParser, Parser.Listener listener)
public MaxPushIdBodyParser(HeaderParser headerParser, ParserListener listener)
{
super(1, headerParser, listener);
}

View File

@ -16,10 +16,7 @@ package org.eclipse.jetty.http3.internal.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http3.ErrorCode;
import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.frames.FrameType;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.qpack.QpackDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -29,28 +26,24 @@ import org.slf4j.LoggerFactory;
* <p>This parser makes use of the {@link HeaderParser} and of
* {@link BodyParser}s to parse HTTP/3 frames.</p>
*/
public class Parser
public class MessageParser
{
private static final Logger LOG = LoggerFactory.getLogger(Parser.class);
private static final Logger LOG = LoggerFactory.getLogger(MessageParser.class);
private final HeaderParser headerParser;
private final BodyParser[] bodyParsers = new BodyParser[FrameType.maxType() + 1];
private final BodyParser unknownBodyParser;
private final long streamId;
private final Listener listener;
private final ParserListener listener;
private State state = State.HEADER;
public Parser(long streamId, QpackDecoder decoder, Listener listener)
public MessageParser(long streamId, QpackDecoder decoder, ParserListener listener)
{
this.streamId = streamId;
this.headerParser = new HeaderParser();
this.bodyParsers[FrameType.DATA.type()] = new DataBodyParser(streamId, headerParser, listener);
this.bodyParsers[FrameType.HEADERS.type()] = new HeadersBodyParser(streamId, headerParser, listener, decoder);
this.bodyParsers[FrameType.CANCEL_PUSH.type()] = new CancelPushBodyParser(headerParser, listener);
this.bodyParsers[FrameType.SETTINGS.type()] = new SettingsBodyParser(headerParser, listener);
this.bodyParsers[FrameType.PUSH_PROMISE.type()] = new PushPromiseBodyParser(headerParser, listener);
this.bodyParsers[FrameType.GOAWAY.type()] = new GoAwayBodyParser(headerParser, listener);
this.bodyParsers[FrameType.MAX_PUSH_ID.type()] = new MaxPushIdBodyParser(headerParser, listener);
this.unknownBodyParser = new UnknownBodyParser(headerParser, listener);
this.listener = listener;
}
@ -62,7 +55,7 @@ public class Parser
}
/**
* <p>Parses the given {@code buffer} bytes and emit events to a {@link Listener}.</p>
* <p>Parses the given {@code buffer} bytes and emit events to a {@link ParserListener}.</p>
*
* @param buffer the buffer to parse
*/
@ -137,29 +130,6 @@ public class Parser
unknownBodyParser.sessionFailure(buffer, error, reason);
}
public interface Listener
{
public default void onHeaders(HeadersFrame frame)
{
}
public default void onData(DataFrame frame)
{
}
public default void onSettings(SettingsFrame frame)
{
}
public default void onStreamFailure(long streamId, int error, String reason)
{
}
public default void onSessionFailure(int error, String reason)
{
}
}
private enum State
{
HEADER, BODY

View File

@ -0,0 +1,41 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.internal.parser;
import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.frames.SettingsFrame;
public interface ParserListener
{
public default void onHeaders(long streamId, HeadersFrame frame)
{
}
public default void onData(long streamId, DataFrame frame)
{
}
public default void onSettings(SettingsFrame frame)
{
}
public default void onStreamFailure(long streamId, int error, String reason)
{
}
public default void onSessionFailure(int error, String reason)
{
}
}

View File

@ -17,7 +17,7 @@ import java.nio.ByteBuffer;
public class PushPromiseBodyParser extends BodyParser
{
public PushPromiseBodyParser(HeaderParser headerParser, Parser.Listener listener)
public PushPromiseBodyParser(HeaderParser headerParser, ParserListener listener)
{
super(1, headerParser, listener);
}

View File

@ -29,7 +29,7 @@ public class SettingsBodyParser extends BodyParser
private long key;
private Map<Long, Long> settings;
public SettingsBodyParser(HeaderParser headerParser, Parser.Listener listener)
public SettingsBodyParser(HeaderParser headerParser, ParserListener listener)
{
super(1, headerParser, listener);
}

View File

@ -17,7 +17,7 @@ import java.nio.ByteBuffer;
public class UnknownBodyParser extends BodyParser
{
public UnknownBodyParser(HeaderParser headerParser, Parser.Listener listener)
public UnknownBodyParser(HeaderParser headerParser, ParserListener listener)
{
super(1, headerParser, listener);
}

View File

@ -20,7 +20,8 @@ import java.util.Map;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.generator.SettingsGenerator;
import org.eclipse.jetty.http3.internal.parser.Parser;
import org.eclipse.jetty.http3.internal.parser.MessageParser;
import org.eclipse.jetty.http3.internal.parser.ParserListener;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.NullByteBufferPool;
import org.junit.jupiter.api.Test;
@ -50,7 +51,7 @@ public class SettingsGenerateParseTest
new SettingsGenerator().generate(lease, input);
List<SettingsFrame> frames = new ArrayList<>();
Parser parser = new Parser(0, null, new Parser.Listener()
MessageParser parser = new MessageParser(0, null, new ParserListener()
{
@Override
public void onSettings(SettingsFrame frame)

View File

@ -14,6 +14,7 @@
module org.eclipse.jetty.http3.server
{
exports org.eclipse.jetty.http3.server;
exports org.eclipse.jetty.http3.server.internal;
requires transitive org.eclipse.jetty.http3.common;
requires transitive org.eclipse.jetty.http3.qpack;

View File

@ -16,14 +16,13 @@ package org.eclipse.jetty.http3.server;
import java.util.Map;
import java.util.Objects;
import org.eclipse.jetty.http3.api.server.ServerSessionListener;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.internal.HTTP3Connection;
import org.eclipse.jetty.http3.internal.HTTP3Session;
import org.eclipse.jetty.http3.internal.generator.Generator;
import org.eclipse.jetty.http3.internal.parser.Parser;
import org.eclipse.jetty.http3.internal.parser.MessageParser;
import org.eclipse.jetty.http3.server.internal.ServerHTTP3Session;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.quic.common.ProtocolQuicSession;
import org.eclipse.jetty.quic.common.ProtocolSession;
import org.eclipse.jetty.quic.common.QuicSession;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.quic.server.ServerQuicSession;
@ -32,14 +31,15 @@ import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
public abstract class AbstractHTTP3ServerConnectionFactory extends AbstractConnectionFactory implements ProtocolQuicSession.Factory
public abstract class AbstractHTTP3ServerConnectionFactory extends AbstractConnectionFactory implements ProtocolSession.Factory
{
private final HttpConfiguration httpConfiguration;
private final ServerSessionListener listener;
private final Session.Server.Listener listener;
private boolean useInputDirectByteBuffers = true;
private boolean useOutputDirectByteBuffers = true;
private int maxBlockedStreams;
public AbstractHTTP3ServerConnectionFactory(HttpConfiguration httpConfiguration, ServerSessionListener listener)
public AbstractHTTP3ServerConnectionFactory(HttpConfiguration httpConfiguration, Session.Server.Listener listener)
{
super("h3");
this.httpConfiguration = Objects.requireNonNull(httpConfiguration);
@ -74,11 +74,20 @@ public abstract class AbstractHTTP3ServerConnectionFactory extends AbstractConne
return httpConfiguration;
}
@Override
public ProtocolQuicSession newProtocolQuicSession(QuicSession quicSession, Map<String, Object> context)
public int getMaxBlockedStreams()
{
Generator generator = new Generator();
return new HTTP3ServerQuicSession((ServerQuicSession)quicSession, listener, generator);
return maxBlockedStreams;
}
public void setMaxBlockedStreams(int maxBlockedStreams)
{
this.maxBlockedStreams = maxBlockedStreams;
}
@Override
public ProtocolSession newProtocolSession(QuicSession quicSession, Map<String, Object> context)
{
return new ServerHTTP3Session((ServerQuicSession)quicSession, listener, getMaxBlockedStreams(), getHttpConfiguration().getRequestHeaderSize());
}
@Override
@ -87,17 +96,9 @@ public abstract class AbstractHTTP3ServerConnectionFactory extends AbstractConne
// TODO: can the downcasts be removed?
QuicStreamEndPoint streamEndPoint = (QuicStreamEndPoint)endPoint;
long streamId = streamEndPoint.getStreamId();
HTTP3ServerQuicSession http3QuicSession = (HTTP3ServerQuicSession)streamEndPoint.getQuicSession().getProtocolQuicSession();
// TODO: this is wrong, as the endpoint here is already per-stream
// Could it be that HTTP3[Client|Server]QuicSession and HTTP3Session are the same thing?
// If an app wants to send a SETTINGS frame, it calls Session.settings() and this has to go back to an object that knows the control stream,
// which is indeed HTTP3[Client|Server]QuicSession!
HTTP3Session session = new HTTP3Session();
Parser parser = new Parser(streamId, http3QuicSession.getQpackDecoder(), session);
HTTP3Connection connection = new HTTP3Connection(endPoint, connector.getExecutor(), parser);
ServerHTTP3Session http3Session = (ServerHTTP3Session)streamEndPoint.getQuicSession().getProtocolSession();
MessageParser parser = new MessageParser(streamId, http3Session.getQpackDecoder(), http3Session);
HTTP3Connection connection = new HTTP3Connection(endPoint, connector.getExecutor(), connector.getByteBufferPool(), parser);
return connection;
}
}

View File

@ -1,189 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.server;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.api.server.ServerSessionListener;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.ControlConnection;
import org.eclipse.jetty.http3.internal.ControlFlusher;
import org.eclipse.jetty.http3.internal.DecoderConnection;
import org.eclipse.jetty.http3.internal.EncoderConnection;
import org.eclipse.jetty.http3.internal.VarLenInt;
import org.eclipse.jetty.http3.internal.generator.Generator;
import org.eclipse.jetty.http3.qpack.Instruction;
import org.eclipse.jetty.http3.qpack.QpackDecoder;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.quic.server.ProtocolServerQuicSession;
import org.eclipse.jetty.quic.server.ServerQuicSession;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HTTP3ServerQuicSession extends ProtocolServerQuicSession implements Session
{
private static final Logger LOG = LoggerFactory.getLogger(HTTP3ServerQuicSession.class);
private final ServerSessionListener listener;
private final Generator generator;
private final QpackDecoder decoder;
private final QuicStreamEndPoint decoderEndPoint;
private final QuicStreamEndPoint encoderEndPoint;
private final QuicStreamEndPoint controlEndPoint;
private final ControlFlusher controlFlusher;
public HTTP3ServerQuicSession(ServerQuicSession session, ServerSessionListener listener, Generator generator)
{
super(session);
this.listener = listener;
this.generator = generator;
long decoderStreamId = getQuicSession().newServerUnidirectionalStreamId();
decoderEndPoint = configureDecoderEndPoint(decoderStreamId);
long encoderStreamId = getQuicSession().newServerUnidirectionalStreamId();
encoderEndPoint = configureEncoderEndPoint(encoderStreamId);
long controlStreamId = getQuicSession().newServerBidirectionalStreamId();
this.controlEndPoint = configureControlEndPoint(controlStreamId);
this.controlFlusher = new ControlFlusher(session, generator, controlEndPoint);
// TODO: configure the maxHeaderSize
decoder = new QpackDecoder(new QpackDecoderInstructionHandler(), 4096);
}
@Override
public void onOpen()
{
// Queue a synthetic frame to send the control stream type.
ByteBuffer buffer = ByteBuffer.allocate(VarLenInt.length(ControlConnection.STREAM_TYPE));
VarLenInt.generate(buffer, ControlConnection.STREAM_TYPE);
buffer.flip();
controlFlusher.offer(new Frame.Synthetic(buffer), Callback.NOOP);
// Queue the mandatory SETTINGS frame.
Map<Long, Long> settings = listener.onPreface(this);
if (settings == null)
settings = Map.of();
// TODO: add default settings.
SettingsFrame frame = new SettingsFrame(settings);
controlFlusher.offer(frame, Callback.NOOP);
controlFlusher.iterate();
process();
}
private QuicStreamEndPoint configureDecoderEndPoint(long streamId)
{
return getOrCreateStreamEndPoint(streamId, endPoint ->
{
DecoderConnection connection = new DecoderConnection(endPoint, getQuicSession().getExecutor());
endPoint.setConnection(connection);
endPoint.onOpen();
connection.onOpen();
});
}
private QuicStreamEndPoint configureEncoderEndPoint(long streamId)
{
return getOrCreateStreamEndPoint(streamId, endPoint ->
{
EncoderConnection connection = new EncoderConnection(endPoint, getQuicSession().getExecutor());
endPoint.setConnection(connection);
endPoint.onOpen();
connection.onOpen();
});
}
private QuicStreamEndPoint configureControlEndPoint(long streamId)
{
return getOrCreateStreamEndPoint(streamId, endPoint ->
{
ControlConnection connection = new ControlConnection(endPoint, getQuicSession().getExecutor());
endPoint.setConnection(connection);
endPoint.onOpen();
connection.onOpen();
});
}
public QpackDecoder getQpackDecoder()
{
return decoder;
}
@Override
public CompletableFuture<Stream> newStream(HeadersFrame frame, Stream.Listener listener)
{
return null;
}
private class QpackDecoderInstructionHandler extends IteratingCallback implements Instruction.Handler
{
private final AutoLock lock = new AutoLock();
private final ByteBufferPool.Lease lease = new ByteBufferPool.Lease(getQuicSession().getByteBufferPool());
private final Queue<Instruction> queue = new ArrayDeque<>();
@Override
public void onInstructions(List<Instruction> instructions)
{
try (AutoLock l = lock.lock())
{
queue.addAll(instructions);
}
iterate();
}
@Override
protected Action process()
{
List<Instruction> instructions;
try (AutoLock l = lock.lock())
{
if (queue.isEmpty())
return Action.IDLE;
instructions = new ArrayList<>(queue);
}
instructions.forEach(i -> i.encode(lease));
decoderEndPoint.write(this, getQuicSession().getRemoteAddress(), lease.getByteBuffers().toArray(ByteBuffer[]::new));
return Action.SCHEDULED;
}
@Override
public void succeeded()
{
lease.recycle();
super.succeeded();
}
@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
}
}

View File

@ -13,17 +13,17 @@
package org.eclipse.jetty.http3.server;
import org.eclipse.jetty.http3.api.server.ServerSessionListener;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.server.HttpConfiguration;
public class RawHTTP3ServerConnectionFactory extends AbstractHTTP3ServerConnectionFactory
{
public RawHTTP3ServerConnectionFactory(ServerSessionListener listener)
public RawHTTP3ServerConnectionFactory(Session.Server.Listener listener)
{
this(new HttpConfiguration(), listener);
}
public RawHTTP3ServerConnectionFactory(HttpConfiguration httpConfiguration, ServerSessionListener listener)
public RawHTTP3ServerConnectionFactory(HttpConfiguration httpConfiguration, Session.Server.Listener listener)
{
super(httpConfiguration, listener);
}

View File

@ -0,0 +1,157 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.server.internal;
import java.nio.ByteBuffer;
import java.util.Map;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.ControlConnection;
import org.eclipse.jetty.http3.internal.ControlFlusher;
import org.eclipse.jetty.http3.internal.HTTP3Session;
import org.eclipse.jetty.http3.internal.InstructionFlusher;
import org.eclipse.jetty.http3.internal.InstructionHandler;
import org.eclipse.jetty.http3.internal.StreamConnection;
import org.eclipse.jetty.http3.internal.VarLenInt;
import org.eclipse.jetty.http3.internal.generator.MessageGenerator;
import org.eclipse.jetty.http3.internal.parser.ParserListener;
import org.eclipse.jetty.http3.qpack.QpackDecoder;
import org.eclipse.jetty.http3.qpack.QpackEncoder;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.quic.common.StreamType;
import org.eclipse.jetty.quic.server.ServerProtocolSession;
import org.eclipse.jetty.quic.server.ServerQuicSession;
import org.eclipse.jetty.util.Callback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ServerHTTP3Session extends ServerProtocolSession implements ParserListener
{
private static final Logger LOG = LoggerFactory.getLogger(ServerHTTP3Session.class);
private final QpackEncoder encoder;
private final QpackDecoder decoder;
private final HTTP3Session apiSession;
private final InstructionFlusher encoderFlusher;
private final InstructionFlusher decoderFlusher;
private final ControlFlusher controlFlusher;
private final MessageGenerator generator;
public ServerHTTP3Session(ServerQuicSession session, Session.Server.Listener listener, int maxBlockedStreams, int maxRequestHeadersSize)
{
super(session);
this.apiSession = new HTTP3Session(this, listener);
long encoderStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL);
QuicStreamEndPoint encoderEndPoint = configureEncoderEndPoint(encoderStreamId);
this.encoderFlusher = new InstructionFlusher(session, encoderEndPoint);
this.encoder = new QpackEncoder(new InstructionHandler(encoderFlusher), maxBlockedStreams);
long decoderStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL);
QuicStreamEndPoint decoderEndPoint = configureDecoderEndPoint(decoderStreamId);
this.decoderFlusher = new InstructionFlusher(session, decoderEndPoint);
this.decoder = new QpackDecoder(new InstructionHandler(decoderFlusher), maxRequestHeadersSize);
this.generator = new MessageGenerator(encoder);
long controlStreamId = getQuicSession().newStreamId(StreamType.SERVER_BIDIRECTIONAL);
QuicStreamEndPoint controlEndPoint = configureControlEndPoint(controlStreamId);
this.controlFlusher = new ControlFlusher(session, controlEndPoint);
}
public QpackDecoder getQpackDecoder()
{
return decoder;
}
@Override
public void onOpen()
{
initializeEncoderStream();
initializeDecoderStream();
initializeControlStream();
}
private void initializeEncoderStream()
{
encoderFlusher.iterate();
}
private void initializeDecoderStream()
{
decoderFlusher.iterate();
}
private void initializeControlStream()
{
// Queue a synthetic frame to send the control stream type.
ByteBuffer buffer = ByteBuffer.allocate(VarLenInt.length(ControlConnection.STREAM_TYPE));
VarLenInt.generate(buffer, ControlConnection.STREAM_TYPE);
buffer.flip();
controlFlusher.offer(new Frame.Synthetic(buffer), Callback.NOOP);
// Queue the mandatory SETTINGS frame.
Map<Long, Long> settings = apiSession.onPreface();
if (settings == null)
settings = Map.of();
// TODO: add default settings.
SettingsFrame frame = new SettingsFrame(settings);
controlFlusher.offer(frame, Callback.NOOP);
controlFlusher.iterate();
}
private QuicStreamEndPoint configureEncoderEndPoint(long streamId)
{
// This is a write-only stream, so no need to link a Connection.
return getOrCreateStreamEndPoint(streamId, QuicStreamEndPoint::onOpen);
}
private QuicStreamEndPoint configureDecoderEndPoint(long streamId)
{
// This is a write-only stream, so no need to link a Connection.
return getOrCreateStreamEndPoint(streamId, QuicStreamEndPoint::onOpen);
}
private QuicStreamEndPoint configureControlEndPoint(long streamId)
{
return getOrCreateStreamEndPoint(streamId, this::configureStreamEndPoint);
}
@Override
protected void onReadable(long readableStreamId)
{
StreamType streamType = StreamType.from(readableStreamId);
if (streamType == StreamType.CLIENT_BIDIRECTIONAL)
{
super.onReadable(readableStreamId);
}
else
{
// On the server, we need a get-or-create semantic in case of reads.
QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, this::configureStreamEndPoint);
if (LOG.isDebugEnabled())
LOG.debug("stream {} selected endpoint for read: {}", readableStreamId, streamEndPoint);
streamEndPoint.onReadable();
}
}
private void configureStreamEndPoint(QuicStreamEndPoint endPoint)
{
StreamConnection connection = new StreamConnection(endPoint, getQuicSession().getExecutor(), getQuicSession().getByteBufferPool(), this);
endPoint.setConnection(connection);
endPoint.onOpen();
connection.onOpen();
}
}

View File

@ -14,6 +14,7 @@
package org.eclipse.jetty.http3.tests;
import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.http.HttpFields;
@ -23,7 +24,6 @@ import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.api.server.ServerSessionListener;
import org.eclipse.jetty.http3.client.HTTP3Client;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.server.RawHTTP3ServerConnectionFactory;
@ -33,6 +33,9 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class HTTP3ClientServerTest
{
@Test
@ -45,24 +48,33 @@ public class HTTP3ClientServerTest
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
Server server = new Server(serverThreads);
ServerQuicConnector connector = new ServerQuicConnector(server, sslContextFactory, new RawHTTP3ServerConnectionFactory(new ServerSessionListener() {}));
CountDownLatch serverLatch = new CountDownLatch(1);
ServerQuicConnector connector = new ServerQuicConnector(server, sslContextFactory, new RawHTTP3ServerConnectionFactory(new Session.Server.Listener()
{
@Override
public Stream.Listener onHeaders(Stream stream, HeadersFrame frame)
{
serverLatch.countDown();
return null;
}
}));
server.addConnector(connector);
server.start();
HTTP3Client client = new HTTP3Client();
client.start();
Session session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Listener() {})
Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {})
.get(555, TimeUnit.SECONDS);
System.err.println("session = " + session);
HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort());
MetaData.Request metaData = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY);
HeadersFrame frame = new HeadersFrame(metaData);
Stream stream = session.newStream(frame, new Stream.Listener() {})
.get(5, TimeUnit.SECONDS);
.get(555, TimeUnit.SECONDS);
assertNotNull(stream);
System.err.println("stream = " + stream);
assertTrue(serverLatch.await(555, TimeUnit.SECONDS));
}
}

View File

@ -13,17 +13,17 @@
package org.eclipse.jetty.quic.client;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.quic.common.ProtocolQuicSession;
import org.eclipse.jetty.quic.common.ProtocolSession;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.quic.common.StreamType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ProtocolClientQuicSession extends ProtocolQuicSession
public class ClientProtocolSession extends ProtocolSession
{
private static final Logger LOG = LoggerFactory.getLogger(ProtocolClientQuicSession.class);
private static final Logger LOG = LoggerFactory.getLogger(ClientProtocolSession.class);
public ProtocolClientQuicSession(ClientQuicSession session)
public ClientProtocolSession(ClientQuicSession session)
{
super(session);
}
@ -39,34 +39,8 @@ public class ProtocolClientQuicSession extends ProtocolQuicSession
{
// Create a single bidirectional, client-initiated,
// QUIC stream that plays the role of the TCP stream.
configureEndPoint(getQuicSession().newClientBidirectionalStreamId());
process();
}
private void configureEndPoint(long streamId)
{
getOrCreateStreamEndPoint(streamId, endPoint ->
{
try
{
Connection connection = getQuicSession().newConnection(endPoint);
endPoint.setConnection(connection);
endPoint.onOpen();
connection.onOpen();
}
catch (RuntimeException | Error x)
{
if (LOG.isDebugEnabled())
LOG.debug("could not open protocol QUIC session", x);
throw x;
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("could not open protocol QUIC session", x);
throw new RuntimeException(x);
}
});
long streamId = getQuicSession().newStreamId(StreamType.CLIENT_BIDIRECTIONAL);
getOrCreateStreamEndPoint(streamId, this::configureProtocolEndPoint);
}
@Override

View File

@ -85,17 +85,23 @@ public class ClientQuicConnection extends QuicConnection
quicheConfig.setMaxIdleTimeout(getEndPoint().getIdleTimeout());
quicheConfig.setInitialMaxData(10_000_000L);
quicheConfig.setInitialMaxStreamDataBidiLocal(10_000_000L);
quicheConfig.setInitialMaxStreamDataBidiRemote(10000000L);
quicheConfig.setInitialMaxStreamDataUni(10_000_000L);
quicheConfig.setInitialMaxStreamsBidi(100L);
quicheConfig.setInitialMaxStreamsUni(100L);
quicheConfig.setInitialMaxStreamsBidi(100L);
quicheConfig.setCongestionControl(QuicheConfig.CongestionControl.RENO);
InetSocketAddress remoteAddress = (InetSocketAddress)context.get(ClientConnector.REMOTE_SOCKET_ADDRESS_CONTEXT_KEY);
if (LOG.isDebugEnabled())
LOG.debug("connecting to {} with protocols {}", remoteAddress, protocols);
QuicheConnection quicheConnection = QuicheConnection.connect(quicheConfig, remoteAddress);
QuicSession session = new ClientQuicSession(getExecutor(), getScheduler(), getByteBufferPool(), quicheConnection, this, remoteAddress, context);
pendingSessions.put(remoteAddress, session);
session.flush(); // send the response packet(s) that connect generated.
if (LOG.isDebugEnabled())
LOG.debug("created connecting QUIC session {}", session);
LOG.debug("created QUIC session {}", session);
fillInterested();
}

View File

@ -22,7 +22,8 @@ import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.quic.common.ProtocolQuicSession;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.quic.common.ProtocolSession;
import org.eclipse.jetty.quic.common.QuicConnection;
import org.eclipse.jetty.quic.common.QuicSession;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
@ -46,18 +47,25 @@ public class ClientQuicSession extends QuicSession
}
@Override
protected ProtocolQuicSession createProtocolQuicSession()
protected ProtocolSession createProtocolSession()
{
ClientConnectionFactory connectionFactory = (ClientConnectionFactory)context.get(ClientConnector.CLIENT_CONNECTION_FACTORY_CONTEXT_KEY);
if (connectionFactory instanceof ProtocolQuicSession.Factory)
return ((ProtocolQuicSession.Factory)connectionFactory).newProtocolQuicSession(this, context);
return new ProtocolClientQuicSession(this);
if (connectionFactory instanceof ProtocolSession.Factory)
return ((ProtocolSession.Factory)connectionFactory).newProtocolSession(this, context);
return new ClientProtocolSession(this);
}
@Override
public Connection newConnection(QuicStreamEndPoint endPoint) throws IOException
public Connection newConnection(QuicStreamEndPoint endPoint)
{
ClientConnectionFactory connectionFactory = (ClientConnectionFactory)context.get(ClientConnector.CLIENT_CONNECTION_FACTORY_CONTEXT_KEY);
return connectionFactory.newConnection(endPoint, context);
try
{
ClientConnectionFactory connectionFactory = (ClientConnectionFactory)context.get(ClientConnector.CLIENT_CONNECTION_FACTORY_CONTEXT_KEY);
return connectionFactory.newConnection(endPoint, context);
}
catch (IOException x)
{
throw new RuntimeIOException(x);
}
}
}

View File

@ -18,17 +18,18 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.eclipse.jetty.io.Connection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class ProtocolQuicSession
public abstract class ProtocolSession
{
private static final Logger LOG = LoggerFactory.getLogger(ProtocolQuicSession.class);
private static final Logger LOG = LoggerFactory.getLogger(ProtocolSession.class);
private final AtomicLong active = new AtomicLong();
private final QuicSession session;
public ProtocolQuicSession(QuicSession session)
public ProtocolSession(QuicSession session)
{
this.session = session;
}
@ -64,7 +65,7 @@ public abstract class ProtocolQuicSession
return session.getStreamEndPoint(streamId);
}
protected QuicStreamEndPoint getOrCreateStreamEndPoint(long streamId, Consumer<QuicStreamEndPoint> consumer)
public QuicStreamEndPoint getOrCreateStreamEndPoint(long streamId, Consumer<QuicStreamEndPoint> consumer)
{
return session.getOrCreateStreamEndPoint(streamId, consumer);
}
@ -98,8 +99,16 @@ public abstract class ProtocolQuicSession
protected abstract void onReadable(long readableStreamId);
public void configureProtocolEndPoint(QuicStreamEndPoint endPoint)
{
Connection connection = getQuicSession().newConnection(endPoint);
endPoint.setConnection(connection);
endPoint.onOpen();
connection.onOpen();
}
public interface Factory
{
public ProtocolQuicSession newProtocolQuicSession(QuicSession quicSession, Map<String, Object> context);
public ProtocolSession newProtocolSession(QuicSession quicSession, Map<String, Object> context);
}
}

View File

@ -17,6 +17,7 @@ import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
@ -57,7 +58,7 @@ public abstract class QuicSession
{
private static final Logger LOG = LoggerFactory.getLogger(QuicSession.class);
private final AtomicLong ids = new AtomicLong();
private final AtomicLong[] ids = new AtomicLong[StreamType.values().length];
private final AutoLock strategyQueueLock = new AutoLock();
private final Queue<Runnable> strategyQueue = new ArrayDeque<>();
private final ConcurrentMap<Long, QuicStreamEndPoint> endpoints = new ConcurrentHashMap<>();
@ -69,7 +70,7 @@ public abstract class QuicSession
private final Flusher flusher;
private final ExecutionStrategy strategy;
private SocketAddress remoteAddress;
private ProtocolQuicSession protocolSession;
private ProtocolSession protocolSession;
private QuicheConnectionId quicheConnectionId;
protected QuicSession(Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, QuicheConnection quicheConnection, QuicConnection connection, SocketAddress remoteAddress)
@ -83,6 +84,7 @@ public abstract class QuicSession
this.strategy = new AdaptiveExecutionStrategy(new Producer(), executor);
this.remoteAddress = remoteAddress;
LifeCycle.start(strategy);
Arrays.setAll(ids, i -> new AtomicLong());
}
public Executor getExecutor()
@ -100,7 +102,7 @@ public abstract class QuicSession
return byteBufferPool;
}
public ProtocolQuicSession getProtocolQuicSession()
public ProtocolSession getProtocolSession()
{
return protocolSession;
}
@ -111,40 +113,14 @@ public abstract class QuicSession
}
/**
* @return a new unidirectional, client-initiated, stream ID
* @param streamType the stream type
* @return a new stream ID for the given type
*/
public long newClientUnidirectionalStreamId()
public long newStreamId(StreamType streamType)
{
return newStreamId() + 0x02;
}
/**
* @return a new bidirectional, client-initiated, stream ID
*/
public long newClientBidirectionalStreamId()
{
return newStreamId();
}
/**
* @return a new unidirectional, server-initiated, stream ID
*/
public long newServerUnidirectionalStreamId()
{
return newStreamId() + 0x03;
}
/**
* @return a new bidirectional, server-initiated, stream ID
*/
public long newServerBidirectionalStreamId()
{
return newStreamId() + 0x01;
}
private long newStreamId()
{
return ids.getAndIncrement() << 2;
int type = streamType.type();
long id = ids[type].getAndIncrement();
return (id << 2) + type;
}
public void onOpen()
@ -230,7 +206,7 @@ public abstract class QuicSession
// HTTP/1 on QUIC
// client1
// \
// dataEP - QuicConnection -* QuicSession -# ProtocolQuicSession -* RequestStreamN - HttpConnection - HTTP Handler
// dataEP - QuicConnection -* QuicSession -# ProtocolSession -* RequestStreamN - HttpConnection - HTTP Handler
// /
// client2
@ -246,13 +222,10 @@ public abstract class QuicSession
if (protocolSession == null)
{
protocolSession = createProtocolQuicSession();
protocolSession = createProtocolSession();
onOpen();
}
else
{
protocolSession.process();
}
protocolSession.process();
}
else
{
@ -260,7 +233,7 @@ public abstract class QuicSession
}
}
protected abstract ProtocolQuicSession createProtocolQuicSession();
protected abstract ProtocolSession createProtocolSession();
List<Long> getWritableStreamIds()
{
@ -277,7 +250,7 @@ public abstract class QuicSession
return endpoints.get(streamId);
}
public abstract Connection newConnection(QuicStreamEndPoint endPoint) throws IOException;
public abstract Connection newConnection(QuicStreamEndPoint endPoint);
private void dispatch(Runnable runnable)
{

View File

@ -0,0 +1,46 @@
package org.eclipse.jetty.quic.common;
import java.util.HashMap;
import java.util.Map;
public enum StreamType
{
CLIENT_BIDIRECTIONAL(0x00),
SERVER_BIDIRECTIONAL(0x01),
CLIENT_UNIDIRECTIONAL(0x02),
SERVER_UNIDIRECTIONAL(0x03);
public static StreamType from(long streamId)
{
int type = ((int)(streamId)) & 0b11;
return Types.types.get(type);
}
public static boolean isUnidirectional(long streamId)
{
return (streamId & 0b01) == 0b01;
}
public static boolean isBidirectional(long streamId)
{
return (streamId & 0b01) == 0b00;
}
private final int type;
private StreamType(int type)
{
this.type = type;
Types.types.put(type, this);
}
public int type()
{
return type;
}
private static class Types
{
private static final Map<Integer, StreamType> types = new HashMap<>();
}
}

View File

@ -13,17 +13,16 @@
package org.eclipse.jetty.quic.server;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.quic.common.ProtocolQuicSession;
import org.eclipse.jetty.quic.common.ProtocolSession;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ProtocolServerQuicSession extends ProtocolQuicSession
public class ServerProtocolSession extends ProtocolSession
{
private static final Logger LOG = LoggerFactory.getLogger(ProtocolServerQuicSession.class);
private static final Logger LOG = LoggerFactory.getLogger(ServerProtocolSession.class);
public ProtocolServerQuicSession(ServerQuicSession session)
public ServerProtocolSession(ServerQuicSession session)
{
super(session);
}
@ -37,24 +36,15 @@ public class ProtocolServerQuicSession extends ProtocolQuicSession
@Override
public void onOpen()
{
process();
}
@Override
protected void onReadable(long readableStreamId)
{
// On the server, we need a get-or-create semantic in case of reads.
QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, this::configureEndPoint);
QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, this::configureProtocolEndPoint);
if (LOG.isDebugEnabled())
LOG.debug("stream {} selected endpoint for read: {}", readableStreamId, streamEndPoint);
streamEndPoint.onReadable();
}
private void configureEndPoint(QuicStreamEndPoint endPoint)
{
Connection connection = getQuicSession().newConnection(endPoint);
endPoint.setConnection(connection);
endPoint.onOpen();
connection.onOpen();
}
}

View File

@ -17,7 +17,6 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
@ -31,7 +30,6 @@ import org.eclipse.jetty.quic.server.internal.SimpleTokenValidator;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -45,9 +43,9 @@ public class ServerQuicConnection extends QuicConnection
private final QuicheConfig quicheConfig;
private final Connector connector;
protected ServerQuicConnection(Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, EndPoint endp, QuicheConfig quicheConfig, Connector connector)
protected ServerQuicConnection(Connector connector, EndPoint endPoint, QuicheConfig quicheConfig)
{
super(executor, scheduler, byteBufferPool, endp);
super(connector.getExecutor(), connector.getScheduler(), connector.getByteBufferPool(), endPoint);
this.quicheConfig = quicheConfig;
this.connector = connector;
}

View File

@ -102,6 +102,7 @@ public class ServerQuicConnector extends AbstractNetworkConnector
_quicheConfig.setInitialMaxStreamDataBidiLocal(10000000L);
_quicheConfig.setInitialMaxStreamDataBidiRemote(10000000L);
_quicheConfig.setInitialMaxStreamDataUni(10000000L);
_quicheConfig.setInitialMaxStreamsUni(100L);
_quicheConfig.setInitialMaxStreamsBidi(100L);
_quicheConfig.setCongestionControl(QuicheConfig.CongestionControl.RENO);
List<String> protocols = getProtocols();
@ -192,7 +193,7 @@ public class ServerQuicConnector extends AbstractNetworkConnector
@Override
public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment)
{
return new ServerQuicConnection(getExecutor(), getScheduler(), getByteBufferPool(), endpoint, _quicheConfig, ServerQuicConnector.this);
return new ServerQuicConnection(ServerQuicConnector.this, endpoint, _quicheConfig);
}
}
}

View File

@ -20,7 +20,7 @@ import java.util.concurrent.Executor;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.quic.common.ProtocolQuicSession;
import org.eclipse.jetty.quic.common.ProtocolSession;
import org.eclipse.jetty.quic.common.QuicConnection;
import org.eclipse.jetty.quic.common.QuicSession;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
@ -47,19 +47,24 @@ public class ServerQuicSession extends QuicSession
}
@Override
protected ProtocolQuicSession createProtocolQuicSession()
protected ProtocolSession createProtocolSession()
{
ConnectionFactory connectionFactory = findConnectionFactory(getNegotiatedProtocol());
if (connectionFactory instanceof ProtocolQuicSession.Factory)
return ((ProtocolQuicSession.Factory)connectionFactory).newProtocolQuicSession(this, Map.of());
return new ProtocolServerQuicSession(this);
if (connectionFactory instanceof ProtocolSession.Factory)
return ((ProtocolSession.Factory)connectionFactory).newProtocolSession(this, Map.of());
return new ServerProtocolSession(this);
}
@Override
public Connection newConnection(QuicStreamEndPoint endPoint)
{
ConnectionFactory connectionFactory = findConnectionFactory(getNegotiatedProtocol());
return connectionFactory.newConnection(connector, endPoint);
return newConnection(connectionFactory, endPoint);
}
private Connection newConnection(ConnectionFactory factory, QuicStreamEndPoint endPoint)
{
return factory.newConnection(connector, endPoint);
}
private ConnectionFactory findConnectionFactory(String negotiatedProtocol)