Issue #6728 - QUIC and HTTP/3

- Implemented handling of the control stream and SETTINGS frame/parser/generator.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-09-11 23:14:55 +02:00
parent 7c09800fe1
commit 29e82ace1b
29 changed files with 893 additions and 51 deletions

View File

@ -31,7 +31,7 @@ import org.eclipse.jetty.util.component.ContainerLifeCycle;
public class HTTP3Client extends ContainerLifeCycle
{
public static final String CLIENT_CONTEXT_KEY = HTTP3Client.class.getName();
private static final String SESSION_LISTENER_CONTEXT_KEY = CLIENT_CONTEXT_KEY + ".listener";
public static final String SESSION_LISTENER_CONTEXT_KEY = CLIENT_CONTEXT_KEY + ".listener";
private static final String SESSION_PROMISE_CONTEXT_KEY = CLIENT_CONTEXT_KEY + ".promise";
private final ClientConnector connector;

View File

@ -16,6 +16,8 @@ package org.eclipse.jetty.http3.client;
import java.io.IOException;
import java.util.Map;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.internal.generator.Generator;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
@ -29,8 +31,9 @@ public class HTTP3ClientConnectionFactory implements ClientConnectionFactory, Pr
public ProtocolQuicSession newProtocolQuicSession(QuicSession quicSession, Map<String, Object> context)
{
HTTP3Client http3Client = (HTTP3Client)context.get(HTTP3Client.CLIENT_CONTEXT_KEY);
// TODO: configure the QpackDecoder.maxHeaderSize from HTTP3Client
return new HTTP3ClientQuicSession((ClientQuicSession)quicSession);
Session.Listener listener = (Session.Listener)context.get(HTTP3Client.SESSION_LISTENER_CONTEXT_KEY);
Generator generator = new Generator();
return new HTTP3ClientQuicSession((ClientQuicSession)quicSession, listener, generator);
}
@Override

View File

@ -13,27 +13,39 @@
package org.eclipse.jetty.http3.client;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.ControlConnection;
import org.eclipse.jetty.http3.internal.ControlFlusher;
import org.eclipse.jetty.http3.internal.DecoderConnection;
import org.eclipse.jetty.http3.internal.EncoderConnection;
import org.eclipse.jetty.http3.internal.VarLenInt;
import org.eclipse.jetty.http3.internal.generator.Generator;
import org.eclipse.jetty.quic.client.ClientQuicSession;
import org.eclipse.jetty.quic.client.ProtocolClientQuicSession;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.util.Callback;
public class HTTP3ClientQuicSession extends ProtocolClientQuicSession
public class HTTP3ClientQuicSession extends ProtocolClientQuicSession implements Session
{
private QuicStreamEndPoint decoderEndPoint;
private QuicStreamEndPoint encoderEndPoint;
private QuicStreamEndPoint controlEndPoint;
private final Session.Listener listener;
private final QuicStreamEndPoint decoderEndPoint;
private final QuicStreamEndPoint encoderEndPoint;
private final QuicStreamEndPoint controlEndPoint;
private final ControlFlusher controlFlusher;
public HTTP3ClientQuicSession(ClientQuicSession session)
public HTTP3ClientQuicSession(ClientQuicSession session, Session.Listener listener, Generator generator)
{
super(session);
}
this.listener = listener;
@Override
public void onOpen()
{
long decoderStreamId = getQuicSession().newClientUnidirectionalStreamId();
decoderEndPoint = configureDecoderEndPoint(decoderStreamId);
@ -41,17 +53,43 @@ public class HTTP3ClientQuicSession extends ProtocolClientQuicSession
encoderEndPoint = configureEncoderEndPoint(encoderStreamId);
long controlStreamId = getQuicSession().newClientBidirectionalStreamId();
controlEndPoint = configureControlEndPoint(controlStreamId);
this.controlEndPoint = configureControlEndPoint(controlStreamId);
this.controlFlusher = new ControlFlusher(session, generator, controlEndPoint);
}
@Override
public void onOpen()
{
// Queue a synthetic frame to send the control stream type.
ByteBuffer buffer = ByteBuffer.allocate(VarLenInt.length(ControlConnection.STREAM_TYPE));
VarLenInt.generate(buffer, ControlConnection.STREAM_TYPE);
buffer.flip();
controlFlusher.offer(new Frame.Synthetic(buffer), Callback.NOOP);
// Queue the mandatory SETTINGS frame.
Map<Long, Long> settings = listener.onPreface(this);
if (settings == null)
settings = Map.of();
// TODO: add default settings.
SettingsFrame frame = new SettingsFrame(settings);
controlFlusher.offer(frame, Callback.NOOP);
controlFlusher.iterate();
process();
}
private QuicStreamEndPoint configureDecoderEndPoint(long streamId)
{
return getOrCreateStreamEndPoint(streamId, endPoint ->
{
DecoderConnection connection = new DecoderConnection(endPoint, getQuicSession().getExecutor());
endPoint.setConnection(connection);
// This is a write-only stream, so no need to link a Connection.
endPoint.onOpen();
connection.onOpen();
int streamType = DecoderConnection.QPACK_DECODER_STREAM_TYPE;
ByteBuffer buffer = ByteBuffer.allocate(VarLenInt.length(streamType));
VarLenInt.generate(buffer, streamType);
buffer.flip();
endPoint.write(Callback.NOOP, buffer);
});
}
@ -59,10 +97,14 @@ public class HTTP3ClientQuicSession extends ProtocolClientQuicSession
{
return getOrCreateStreamEndPoint(streamId, endPoint ->
{
EncoderConnection connection = new EncoderConnection(endPoint, getQuicSession().getExecutor());
endPoint.setConnection(connection);
// This is a write-only stream, so no need to link a Connection.
endPoint.onOpen();
connection.onOpen();
int streamType = EncoderConnection.QPACK_ENCODER_STREAM_TYPE;
ByteBuffer buffer = ByteBuffer.allocate(VarLenInt.length(streamType));
VarLenInt.generate(buffer, streamType);
buffer.flip();
endPoint.write(Callback.NOOP, buffer);
});
}
@ -76,4 +118,10 @@ public class HTTP3ClientQuicSession extends ProtocolClientQuicSession
connection.onOpen();
});
}
@Override
public CompletableFuture<Stream> newStream(HeadersFrame frame, Stream.Listener listener)
{
return null;
}
}

View File

@ -18,6 +18,7 @@ module org.eclipse.jetty.http3.common
exports org.eclipse.jetty.http3.api.server;
exports org.eclipse.jetty.http3.frames;
exports org.eclipse.jetty.http3.internal.generator to org.eclipse.jetty.http3.client, org.eclipse.jetty.http3.server;
exports org.eclipse.jetty.http3.internal.parser to org.eclipse.jetty.http3.server;
exports org.eclipse.jetty.http3.internal;

View File

@ -13,6 +13,7 @@
package org.eclipse.jetty.http3.api;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.http3.frames.HeadersFrame;
@ -21,5 +22,11 @@ public interface Session
{
public CompletableFuture<Stream> newStream(HeadersFrame frame, Stream.Listener listener);
public interface Listener {}
public interface Listener
{
public default Map<Long, Long> onPreface(Session session)
{
return null;
}
}
}

View File

@ -13,6 +13,8 @@
package org.eclipse.jetty.http3.frames;
import java.nio.ByteBuffer;
public abstract class Frame
{
private final FrameType type;
@ -32,4 +34,20 @@ public abstract class Frame
{
return String.format("%s@%x", getClass().getSimpleName(), hashCode());
}
public static class Synthetic extends Frame
{
private final ByteBuffer buffer;
public Synthetic(ByteBuffer buffer)
{
super(null);
this.buffer = buffer;
}
public ByteBuffer getByteBuffer()
{
return buffer;
}
}
}

View File

@ -0,0 +1,45 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.frames;
import java.util.Map;
public class SettingsFrame extends Frame
{
public static final long MAX_FIELD_SECTION_SIZE = 0x06;
public static boolean isReserved(long key)
{
return key >= 0 && key <= 5;
}
private final Map<Long, Long> settings;
public SettingsFrame(Map<Long, Long> settings)
{
super(FrameType.SETTINGS);
this.settings = settings;
}
public Map<Long, Long> getSettings()
{
return settings;
}
@Override
public String toString()
{
return String.format("%s,settings=%s", super.toString(), settings);
}
}

View File

@ -20,14 +20,22 @@ import org.eclipse.jetty.io.EndPoint;
public class ControlConnection extends AbstractConnection
{
public static final int STREAM_TYPE = 0x00;
public ControlConnection(EndPoint endPoint, Executor executor)
{
super(endPoint, executor);
}
@Override
public void onOpen()
{
super.onOpen();
fillInterested();
}
@Override
public void onFillable()
{
}
}

View File

@ -0,0 +1,153 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.internal;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.internal.generator.Generator;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.quic.common.QuicSession;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.Invocable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ControlFlusher extends IteratingCallback
{
private static final Logger LOG = LoggerFactory.getLogger(ControlFlusher.class);
private final AutoLock lock = new AutoLock();
private final Queue<Entry> queue = new ArrayDeque<>();
private final ByteBufferPool.Lease lease;
private final Generator generator;
private final EndPoint endPoint;
private List<Entry> entries;
private InvocationType invocationType = InvocationType.NON_BLOCKING;
public ControlFlusher(QuicSession session, Generator generator, EndPoint endPoint)
{
this.lease = new ByteBufferPool.Lease(session.getByteBufferPool());
this.generator = generator;
this.endPoint = endPoint;
}
public void offer(Frame frame, Callback callback)
{
try (AutoLock l = lock.lock())
{
queue.offer(new Entry(frame, callback));
}
}
@Override
protected Action process()
{
try (AutoLock l = lock.lock())
{
if (queue.isEmpty())
{
entries = List.of();
}
else
{
entries = new ArrayList<>(queue);
queue.clear();
}
}
if (LOG.isDebugEnabled())
LOG.debug("flushing {} entries on {}", entries.size(), this);
if (entries.isEmpty())
return Action.IDLE;
for (Entry entry : entries)
{
Frame frame = entry.frame;
if (frame instanceof Frame.Synthetic)
lease.append(((Frame.Synthetic)frame).getByteBuffer(), false);
else
generator.generate(lease, frame);
invocationType = Invocable.combine(invocationType, entry.callback.getInvocationType());
}
List<ByteBuffer> buffers = lease.getByteBuffers();
if (LOG.isDebugEnabled())
LOG.debug("writing {} buffers ({} bytes) on {}", buffers.size(), lease.getTotalLength(), this);
endPoint.write(this, buffers.toArray(ByteBuffer[]::new));
return Action.SCHEDULED;
}
@Override
public void succeeded()
{
if (LOG.isDebugEnabled())
LOG.debug("succeeded to flush {} entries on {}", entries, this);
lease.recycle();
entries.forEach(e -> e.callback.succeeded());
entries.clear();
invocationType = InvocationType.NON_BLOCKING;
super.succeeded();
}
@Override
protected void onCompleteFailure(Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("failed to flush {} entries on {}", entries, this, failure);
lease.recycle();
entries.forEach(e -> e.callback.failed(failure));
entries.clear();
// TODO: I guess we should fail the whole connection, as we cannot proceed without the control stream.
}
@Override
public InvocationType getInvocationType()
{
return invocationType;
}
private static class Entry
{
private final Frame frame;
private final Callback callback;
private Entry(Frame frame, Callback callback)
{
this.frame = frame;
this.callback = callback;
}
@Override
public String toString()
{
return frame.toString();
}
}
}

View File

@ -20,6 +20,8 @@ import org.eclipse.jetty.io.EndPoint;
public class DecoderConnection extends AbstractConnection
{
public static final int QPACK_DECODER_STREAM_TYPE = 0x03;
public DecoderConnection(EndPoint endPoint, Executor executor)
{
super(endPoint, executor);
@ -28,6 +30,5 @@ public class DecoderConnection extends AbstractConnection
@Override
public void onFillable()
{
}
}

View File

@ -20,6 +20,8 @@ import org.eclipse.jetty.io.EndPoint;
public class EncoderConnection extends AbstractConnection
{
public static final int QPACK_ENCODER_STREAM_TYPE = 0x02;
public EncoderConnection(EndPoint endPoint, Executor executor)
{
super(endPoint, executor);
@ -28,6 +30,5 @@ public class EncoderConnection extends AbstractConnection
@Override
public void onFillable()
{
}
}

View File

@ -0,0 +1,26 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.internal.generator;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.io.ByteBufferPool;
public class CancelPushGenerator extends FrameGenerator
{
@Override
public int generate(ByteBufferPool.Lease lease, Frame frame)
{
return 0;
}
}

View File

@ -0,0 +1,26 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.internal.generator;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.io.ByteBufferPool;
public class DataGenerator extends FrameGenerator
{
@Override
public int generate(ByteBufferPool.Lease lease, Frame frame)
{
return 0;
}
}

View File

@ -0,0 +1,22 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.internal.generator;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.io.ByteBufferPool;
public abstract class FrameGenerator
{
public abstract int generate(ByteBufferPool.Lease lease, Frame frame);
}

View File

@ -0,0 +1,39 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.internal.generator;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.FrameType;
import org.eclipse.jetty.io.ByteBufferPool;
public class Generator
{
private final FrameGenerator[] generators = new FrameGenerator[FrameType.maxType() + 1];
public Generator()
{
generators[FrameType.DATA.type()] = new DataGenerator();
generators[FrameType.HEADERS.type()] = new HeadersGenerator();
generators[FrameType.CANCEL_PUSH.type()] = new CancelPushGenerator();
generators[FrameType.SETTINGS.type()] = new SettingsGenerator();
generators[FrameType.PUSH_PROMISE.type()] = new PushPromiseGenerator();
generators[FrameType.GOAWAY.type()] = new GoAwayGenerator();
generators[FrameType.MAX_PUSH_ID.type()] = new MaxPushIdGenerator();
}
public int generate(ByteBufferPool.Lease lease, Frame frame)
{
return generators[frame.getFrameType().type()].generate(lease, frame);
}
}

View File

@ -0,0 +1,26 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.internal.generator;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.io.ByteBufferPool;
public class GoAwayGenerator extends FrameGenerator
{
@Override
public int generate(ByteBufferPool.Lease lease, Frame frame)
{
return 0;
}
}

View File

@ -0,0 +1,26 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.internal.generator;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.io.ByteBufferPool;
public class HeadersGenerator extends FrameGenerator
{
@Override
public int generate(ByteBufferPool.Lease lease, Frame frame)
{
return 0;
}
}

View File

@ -0,0 +1,26 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.internal.generator;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.io.ByteBufferPool;
public class MaxPushIdGenerator extends FrameGenerator
{
@Override
public int generate(ByteBufferPool.Lease lease, Frame frame)
{
return 0;
}
}

View File

@ -0,0 +1,26 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.internal.generator;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.io.ByteBufferPool;
public class PushPromiseGenerator extends FrameGenerator
{
@Override
public int generate(ByteBufferPool.Lease lease, Frame frame)
{
return 0;
}
}

View File

@ -0,0 +1,56 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.internal.generator;
import java.nio.ByteBuffer;
import java.util.Map;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.VarLenInt;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
public class SettingsGenerator extends FrameGenerator
{
@Override
public int generate(ByteBufferPool.Lease lease, Frame frame)
{
SettingsFrame settingsFrame = (SettingsFrame)frame;
return generateSettings(lease, settingsFrame);
}
private int generateSettings(ByteBufferPool.Lease lease, SettingsFrame frame)
{
Map<Long, Long> settings = frame.getSettings();
int length = 0;
for (Map.Entry<Long, Long> e : settings.entrySet())
{
length += VarLenInt.length(e.getKey()) + VarLenInt.length(e.getValue());
}
int capacity = VarLenInt.length(frame.getFrameType().type()) + VarLenInt.length(length) + length;
// TODO: configure buffer directness.
ByteBuffer buffer = lease.acquire(capacity, true);
VarLenInt.generate(buffer, frame.getFrameType().type());
VarLenInt.generate(buffer, length);
for (Map.Entry<Long, Long> e : settings.entrySet())
{
VarLenInt.generate(buffer, e.getKey());
VarLenInt.generate(buffer, e.getValue());
}
BufferUtil.flipToFlush(buffer, 0);
lease.append(buffer, true);
return capacity;
}
}

View File

@ -18,6 +18,7 @@ import java.nio.ByteBuffer;
import org.eclipse.jetty.http3.ErrorCode;
import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.util.BufferUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -123,4 +124,16 @@ public abstract class BodyParser
LOG.info("failure while notifying listener {}", listener, x);
}
}
protected void notifySettings(SettingsFrame frame)
{
try
{
listener.onSettings(frame);
}
catch (Throwable x)
{
LOG.info("failure while notifying listener {}", listener, x);
}
}
}

View File

@ -19,6 +19,7 @@ import org.eclipse.jetty.http3.ErrorCode;
import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.frames.FrameType;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.qpack.QpackDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -33,13 +34,13 @@ public class Parser
private static final Logger LOG = LoggerFactory.getLogger(Parser.class);
private final HeaderParser headerParser;
private final BodyParser[] bodyParsers = new BodyParser[FrameType.maxType()];
private final BodyParser[] bodyParsers = new BodyParser[FrameType.maxType() + 1];
private final BodyParser unknownBodyParser;
private final long streamId;
private final Listener listener;
private State state = State.HEADER;
public Parser(long streamId, Listener listener, QpackDecoder decoder)
public Parser(long streamId, QpackDecoder decoder, Listener listener)
{
this.streamId = streamId;
this.headerParser = new HeaderParser();
@ -146,6 +147,10 @@ public class Parser
{
}
public default void onSettings(SettingsFrame frame)
{
}
public default void onStreamFailure(long streamId, int error, String reason)
{
}

View File

@ -14,17 +14,132 @@
package org.eclipse.jetty.http3.internal.parser;
import java.nio.ByteBuffer;
import java.util.LinkedHashMap;
import java.util.Map;
import org.eclipse.jetty.http3.ErrorCode;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.VarLenInt;
public class SettingsBodyParser extends BodyParser
{
private final VarLenInt varLenInt = new VarLenInt();
private State state = State.INIT;
private long length;
private long key;
private Map<Long, Long> settings;
public SettingsBodyParser(HeaderParser headerParser, Parser.Listener listener)
{
super(1, headerParser, listener);
}
private void reset()
{
varLenInt.reset();
state = State.INIT;
length = 0;
key = 0;
settings = null;
}
@Override
protected void emptyBody(ByteBuffer buffer)
{
onSettings(Map.of());
}
@Override
public boolean parse(ByteBuffer buffer)
{
throw new UnsupportedOperationException();
while (buffer.hasRemaining())
{
switch (state)
{
case INIT:
{
length = getBodyLength();
settings = new LinkedHashMap<>();
state = State.KEY;
break;
}
case KEY:
{
if (varLenInt.parseLong(buffer, v ->
{
key = v;
length -= VarLenInt.length(v);
}))
{
if (settings.containsKey(key))
{
sessionFailure(buffer, ErrorCode.SETTINGS_ERROR.code(), "settings_duplicate");
return true;
}
if (SettingsFrame.isReserved(key))
{
sessionFailure(buffer, ErrorCode.SETTINGS_ERROR.code(), "settings_reserved");
return true;
}
if (length > 0)
{
state = State.VALUE;
}
else
{
sessionFailure(buffer, ErrorCode.FRAME_ERROR.code(), "settings_invalid_format");
return true;
}
break;
}
return false;
}
case VALUE:
{
if (varLenInt.parseLong(buffer, v ->
{
settings.put(key, v);
length -= VarLenInt.length(v);
}))
{
if (length > 0)
{
// TODO: count keys, if too many -> error.
state = State.KEY;
}
else if (length == 0)
{
Map<Long, Long> settings = this.settings;
reset();
onSettings(settings);
return true;
}
else
{
sessionFailure(buffer, ErrorCode.FRAME_ERROR.code(), "settings_invalid_format");
return true;
}
break;
}
return false;
}
default:
{
throw new IllegalStateException();
}
}
}
return false;
}
private void onSettings(Map<Long, Long> settings)
{
SettingsFrame frame = new SettingsFrame(settings);
notifySettings(frame);
}
private enum State
{
INIT, KEY, VALUE
}
}

View File

@ -0,0 +1,72 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.internal;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.generator.SettingsGenerator;
import org.eclipse.jetty.http3.internal.parser.Parser;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.NullByteBufferPool;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
public class SettingsGenerateParseTest
{
@Test
public void testGenerateParseEmpty()
{
testGenerateParse(Map.of());
}
@Test
public void testGenerateParse()
{
testGenerateParse(Map.of(13L, 7L, 31L, 29L));
}
private void testGenerateParse(Map<Long, Long> settings)
{
SettingsFrame input = new SettingsFrame(settings);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(new NullByteBufferPool());
new SettingsGenerator().generate(lease, input);
List<SettingsFrame> frames = new ArrayList<>();
Parser parser = new Parser(0, null, new Parser.Listener()
{
@Override
public void onSettings(SettingsFrame frame)
{
frames.add(frame);
}
});
for (ByteBuffer buffer : lease.getByteBuffers())
{
parser.parse(buffer);
assertFalse(buffer.hasRemaining());
}
assertEquals(1, frames.size());
SettingsFrame output = frames.get(0);
assertEquals(input.getSettings(), output.getSettings());
}
}

View File

@ -16,8 +16,10 @@ package org.eclipse.jetty.http3.server;
import java.util.Map;
import java.util.Objects;
import org.eclipse.jetty.http3.api.server.ServerSessionListener;
import org.eclipse.jetty.http3.internal.HTTP3Connection;
import org.eclipse.jetty.http3.internal.HTTP3Session;
import org.eclipse.jetty.http3.internal.generator.Generator;
import org.eclipse.jetty.http3.internal.parser.Parser;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
@ -28,16 +30,43 @@ import org.eclipse.jetty.quic.server.ServerQuicSession;
import org.eclipse.jetty.server.AbstractConnectionFactory;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
public abstract class AbstractHTTP3ServerConnectionFactory extends AbstractConnectionFactory implements ProtocolQuicSession.Factory
{
private final HttpConfiguration httpConfiguration;
private final ServerSessionListener listener;
private boolean useInputDirectByteBuffers = true;
private boolean useOutputDirectByteBuffers = true;
public AbstractHTTP3ServerConnectionFactory(HttpConfiguration httpConfiguration)
public AbstractHTTP3ServerConnectionFactory(HttpConfiguration httpConfiguration, ServerSessionListener listener)
{
super("h3");
this.httpConfiguration = Objects.requireNonNull(httpConfiguration);
addBean(httpConfiguration);
this.listener = listener;
}
@ManagedAttribute("Whether to use direct ByteBuffers for reading")
public boolean isUseInputDirectByteBuffers()
{
return useInputDirectByteBuffers;
}
public void setUseInputDirectByteBuffers(boolean useInputDirectByteBuffers)
{
this.useInputDirectByteBuffers = useInputDirectByteBuffers;
}
@ManagedAttribute("Whether to use direct ByteBuffers for writing")
public boolean isUseOutputDirectByteBuffers()
{
return useOutputDirectByteBuffers;
}
public void setUseOutputDirectByteBuffers(boolean useOutputDirectByteBuffers)
{
this.useOutputDirectByteBuffers = useOutputDirectByteBuffers;
}
public HttpConfiguration getHttpConfiguration()
@ -48,19 +77,25 @@ public abstract class AbstractHTTP3ServerConnectionFactory extends AbstractConne
@Override
public ProtocolQuicSession newProtocolQuicSession(QuicSession quicSession, Map<String, Object> context)
{
return new HTTP3ServerQuicSession((ServerQuicSession)quicSession, getHttpConfiguration().getResponseHeaderSize());
Generator generator = new Generator();
return new HTTP3ServerQuicSession((ServerQuicSession)quicSession, listener, generator);
}
@Override
public Connection newConnection(Connector connector, EndPoint endPoint)
{
// TODO: can the downcasts be removed?
long streamId = ((QuicStreamEndPoint)endPoint).getStreamId();
HTTP3ServerQuicSession http3QuicSession = (HTTP3ServerQuicSession)((QuicStreamEndPoint)endPoint).getQuicSession().getProtocolQuicSession();
QuicStreamEndPoint streamEndPoint = (QuicStreamEndPoint)endPoint;
long streamId = streamEndPoint.getStreamId();
HTTP3ServerQuicSession http3QuicSession = (HTTP3ServerQuicSession)streamEndPoint.getQuicSession().getProtocolQuicSession();
// TODO: this is wrong, as the endpoint here is already per-stream
// Could it be that HTTP3[Client|Server]QuicSession and HTTP3Session are the same thing?
// If an app wants to send a SETTINGS frame, it calls Session.settings() and this has to go back to an object that knows the control stream,
// which is indeed HTTP3[Client|Server]QuicSession!
HTTP3Session session = new HTTP3Session();
Parser parser = new Parser(streamId, session, http3QuicSession.getQpackDecoder());
Parser parser = new Parser(streamId, http3QuicSession.getQpackDecoder(), session);
HTTP3Connection connection = new HTTP3Connection(endPoint, connector.getExecutor(), parser);
return connection;

View File

@ -24,6 +24,6 @@ public class HTTP3ServerConnectionFactory extends AbstractHTTP3ServerConnectionF
public HTTP3ServerConnectionFactory(HttpConfiguration configuration)
{
super(configuration);
super(configuration, null);
}
}

View File

@ -17,44 +17,85 @@ import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.api.server.ServerSessionListener;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.ControlConnection;
import org.eclipse.jetty.http3.internal.ControlFlusher;
import org.eclipse.jetty.http3.internal.DecoderConnection;
import org.eclipse.jetty.http3.internal.EncoderConnection;
import org.eclipse.jetty.http3.internal.VarLenInt;
import org.eclipse.jetty.http3.internal.generator.Generator;
import org.eclipse.jetty.http3.qpack.Instruction;
import org.eclipse.jetty.http3.qpack.QpackDecoder;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.quic.server.ProtocolServerQuicSession;
import org.eclipse.jetty.quic.server.ServerQuicSession;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HTTP3ServerQuicSession extends ProtocolServerQuicSession
public class HTTP3ServerQuicSession extends ProtocolServerQuicSession implements Session
{
private final QpackDecoder decoder;
private QuicStreamEndPoint decoderEndPoint;
private QuicStreamEndPoint encoderEndPoint;
private QuicStreamEndPoint controlEndPoint;
private static final Logger LOG = LoggerFactory.getLogger(HTTP3ServerQuicSession.class);
public HTTP3ServerQuicSession(ServerQuicSession session, int maxHeaderSize)
private final ServerSessionListener listener;
private final Generator generator;
private final QpackDecoder decoder;
private final QuicStreamEndPoint decoderEndPoint;
private final QuicStreamEndPoint encoderEndPoint;
private final QuicStreamEndPoint controlEndPoint;
private final ControlFlusher controlFlusher;
public HTTP3ServerQuicSession(ServerQuicSession session, ServerSessionListener listener, Generator generator)
{
super(session);
decoder = new QpackDecoder(new QpackDecoderInstructionHandler(), maxHeaderSize);
this.listener = listener;
this.generator = generator;
long decoderStreamId = getQuicSession().newServerUnidirectionalStreamId();
decoderEndPoint = configureDecoderEndPoint(decoderStreamId);
long encoderStreamId = getQuicSession().newServerUnidirectionalStreamId();
encoderEndPoint = configureEncoderEndPoint(encoderStreamId);
long controlStreamId = getQuicSession().newServerBidirectionalStreamId();
this.controlEndPoint = configureControlEndPoint(controlStreamId);
this.controlFlusher = new ControlFlusher(session, generator, controlEndPoint);
// TODO: configure the maxHeaderSize
decoder = new QpackDecoder(new QpackDecoderInstructionHandler(), 4096);
}
@Override
public void onOpen()
{
long decoderStreamId = getQuicSession().newServerUnidirectionalStreamId();
decoderEndPoint = configureDecoderEndPoint(decoderStreamId);
long encoderStreamId = getQuicSession().newServerUnidirectionalStreamId();
encoderEndPoint = configureEncoderEndPoint(encoderStreamId);
// Queue a synthetic frame to send the control stream type.
ByteBuffer buffer = ByteBuffer.allocate(VarLenInt.length(ControlConnection.STREAM_TYPE));
VarLenInt.generate(buffer, ControlConnection.STREAM_TYPE);
buffer.flip();
controlFlusher.offer(new Frame.Synthetic(buffer), Callback.NOOP);
long controlStreamId = getQuicSession().newServerBidirectionalStreamId();
controlEndPoint = configureControlEndPoint(controlStreamId);
// Queue the mandatory SETTINGS frame.
Map<Long, Long> settings = listener.onPreface(this);
if (settings == null)
settings = Map.of();
// TODO: add default settings.
SettingsFrame frame = new SettingsFrame(settings);
controlFlusher.offer(frame, Callback.NOOP);
controlFlusher.iterate();
process();
}
private QuicStreamEndPoint configureDecoderEndPoint(long streamId)
@ -95,6 +136,12 @@ public class HTTP3ServerQuicSession extends ProtocolServerQuicSession
return decoder;
}
@Override
public CompletableFuture<Stream> newStream(HeadersFrame frame, Stream.Listener listener)
{
return null;
}
private class QpackDecoderInstructionHandler extends IteratingCallback implements Instruction.Handler
{
private final AutoLock lock = new AutoLock();

View File

@ -18,8 +18,6 @@ import org.eclipse.jetty.server.HttpConfiguration;
public class RawHTTP3ServerConnectionFactory extends AbstractHTTP3ServerConnectionFactory
{
private final ServerSessionListener listener;
public RawHTTP3ServerConnectionFactory(ServerSessionListener listener)
{
this(new HttpConfiguration(), listener);
@ -27,7 +25,6 @@ public class RawHTTP3ServerConnectionFactory extends AbstractHTTP3ServerConnecti
public RawHTTP3ServerConnectionFactory(HttpConfiguration httpConfiguration, ServerSessionListener listener)
{
super(httpConfiguration);
this.listener = listener;
super(httpConfiguration, listener);
}
}

View File

@ -1,4 +1,4 @@
#org.eclipse.jetty.LEVEL=DEBUG
#org.eclipse.jetty.http3.LEVEL=DEBUG
#org.eclipse.jetty.quic.LEVEL=DEBUG
org.eclipse.jetty.quic.quiche.LEVEL=DEBUG
org.eclipse.jetty.quic.quiche.LEVEL=INFO