Issue #6728 - QUIC and HTTP/3

- Fixed control streams, must be unidirectional.
- Initial implementation of events to HTTP/3 APIs.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-09-13 16:27:09 +02:00
parent 7c75e9c22f
commit f6958de4b2
26 changed files with 295 additions and 168 deletions

View File

@ -77,7 +77,7 @@ public class HTTP3ClientConnectionFactory implements ClientConnectionFactory, Pr
long streamId = streamEndPoint.getStreamId();
ClientHTTP3Session http3Session = (ClientHTTP3Session)streamEndPoint.getQuicSession().getProtocolSession();
// TODO: Parser may be created internally, if I pass the QuicStreamEndPoint and ClientHTTP3Session.
MessageParser parser = new MessageParser(streamId, http3Session.getQpackDecoder(), http3Session);
MessageParser parser = new MessageParser(streamId, http3Session.getQpackDecoder(), http3Session.getSessionClient());
return new HTTP3Connection(endPoint, http3Session.getQuicSession().getExecutor(), http3Session.getQuicSession().getByteBufferPool(), parser);
}
}

View File

@ -15,20 +15,18 @@ package org.eclipse.jetty.http3.client.internal;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.ControlConnection;
import org.eclipse.jetty.http3.internal.ControlFlusher;
import org.eclipse.jetty.http3.internal.InstructionFlusher;
import org.eclipse.jetty.http3.internal.InstructionHandler;
import org.eclipse.jetty.http3.internal.StreamConnection;
import org.eclipse.jetty.http3.internal.VarLenInt;
import org.eclipse.jetty.http3.internal.generator.MessageGenerator;
import org.eclipse.jetty.http3.internal.parser.ParserListener;
import org.eclipse.jetty.http3.qpack.QpackDecoder;
import org.eclipse.jetty.http3.qpack.QpackEncoder;
import org.eclipse.jetty.io.ByteBufferPool;
@ -43,7 +41,7 @@ import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ClientHTTP3Session extends ClientProtocolSession implements ParserListener
public class ClientHTTP3Session extends ClientProtocolSession
{
private static final Logger LOG = LoggerFactory.getLogger(ClientHTTP3Session.class);
@ -70,11 +68,12 @@ public class ClientHTTP3Session extends ClientProtocolSession implements ParserL
this.decoderInstructionFlusher = new InstructionFlusher(session, decoderEndPoint);
this.decoder = new QpackDecoder(new InstructionHandler(decoderInstructionFlusher), maxResponseHeadersSize);
long controlStreamId = getQuicSession().newStreamId(StreamType.CLIENT_BIDIRECTIONAL);
long controlStreamId = getQuicSession().newStreamId(StreamType.CLIENT_UNIDIRECTIONAL);
QuicStreamEndPoint controlEndPoint = configureControlEndPoint(controlStreamId);
this.controlFlusher = new ControlFlusher(session, controlEndPoint);
this.messageFlusher = new MessageFlusher(session.getByteBufferPool(), encoder);
// TODO: make parameters configurable.
this.messageFlusher = new MessageFlusher(session.getByteBufferPool(), encoder, 4096, true);
}
public QpackDecoder getQpackDecoder()
@ -82,33 +81,14 @@ public class ClientHTTP3Session extends ClientProtocolSession implements ParserL
return decoder;
}
public HTTP3SessionClient getSessionClient()
{
return apiSession;
}
@Override
public void onOpen()
{
initializeEncoderStream();
initializeDecoderStream();
initializeControlStream();
apiSession.onOpen();
}
private void initializeEncoderStream()
{
encoderInstructionFlusher.iterate();
}
private void initializeDecoderStream()
{
decoderInstructionFlusher.iterate();
}
private void initializeControlStream()
{
// Queue a synthetic frame to send the control stream type.
ByteBuffer buffer = ByteBuffer.allocate(VarLenInt.length(ControlConnection.STREAM_TYPE));
VarLenInt.generate(buffer, ControlConnection.STREAM_TYPE);
buffer.flip();
controlFlusher.offer(new Frame.Synthetic(buffer), Callback.NOOP);
// Queue the mandatory SETTINGS frame.
Map<Long, Long> settings = apiSession.onPreface();
if (settings == null)
@ -117,6 +97,8 @@ public class ClientHTTP3Session extends ClientProtocolSession implements ParserL
SettingsFrame frame = new SettingsFrame(settings);
controlFlusher.offer(frame, Callback.NOOP);
controlFlusher.iterate();
apiSession.onOpen();
}
private QuicStreamEndPoint configureInstructionEndPoint(long streamId)
@ -127,7 +109,8 @@ public class ClientHTTP3Session extends ClientProtocolSession implements ParserL
private QuicStreamEndPoint configureControlEndPoint(long streamId)
{
return getOrCreateStreamEndPoint(streamId, this::configureStreamEndPoint);
// This is a write-only stream, so no need to link a Connection.
return getOrCreateStreamEndPoint(streamId, QuicStreamEndPoint::onOpen);
}
@Override
@ -149,7 +132,7 @@ public class ClientHTTP3Session extends ClientProtocolSession implements ParserL
private void configureStreamEndPoint(QuicStreamEndPoint endPoint)
{
StreamConnection connection = new StreamConnection(endPoint, getQuicSession().getExecutor(), getQuicSession().getByteBufferPool(), this);
StreamConnection connection = new StreamConnection(endPoint, getQuicSession().getExecutor(), getQuicSession().getByteBufferPool(), apiSession);
endPoint.setConnection(connection);
endPoint.onOpen();
connection.onOpen();
@ -175,10 +158,10 @@ public class ClientHTTP3Session extends ClientProtocolSession implements ParserL
private final MessageGenerator generator;
private Entry entry;
public MessageFlusher(ByteBufferPool byteBufferPool, QpackEncoder encoder)
public MessageFlusher(ByteBufferPool byteBufferPool, QpackEncoder encoder, int maxHeadersLength, boolean useDirectByteBuffers)
{
this.lease = new ByteBufferPool.Lease(byteBufferPool);
this.generator = new MessageGenerator(encoder);
this.generator = new MessageGenerator(encoder, maxHeadersLength, useDirectByteBuffers);
}
public void offer(QuicStreamEndPoint endPoint, Frame frame, Callback callback)
@ -199,10 +182,16 @@ public class ClientHTTP3Session extends ClientProtocolSession implements ParserL
return Action.IDLE;
}
generator.generate(lease, entry.frame);
if (LOG.isDebugEnabled())
LOG.debug("flushing {} on {}", entry, this);
generator.generate(lease, entry.endPoint.getStreamId(), entry.frame);
QuicStreamEndPoint endPoint = entry.endPoint;
endPoint.write(this, lease.getByteBuffers().toArray(ByteBuffer[]::new));
List<ByteBuffer> buffers = lease.getByteBuffers();
if (LOG.isDebugEnabled())
LOG.debug("writing {} buffers ({} bytes) on {}", buffers.size(), lease.getTotalLength(), this);
endPoint.write(this, buffers.toArray(ByteBuffer[]::new));
return Action.SCHEDULED;
}
@ -233,6 +222,12 @@ public class ClientHTTP3Session extends ClientProtocolSession implements ParserL
this.frame = frame;
this.callback = callback;
}
@Override
public String toString()
{
return String.format("%s#%d", frame, endPoint.getStreamId());
}
}
}
}

View File

@ -25,9 +25,13 @@ import org.eclipse.jetty.quic.common.StreamType;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.Invocable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HTTP3SessionClient extends HTTP3Session implements Session.Client
{
private static final Logger LOG = LoggerFactory.getLogger(HTTP3SessionClient.class);
private final Promise<Client> promise;
public HTTP3SessionClient(ClientHTTP3Session session, Client.Listener listener, Promise<Client> promise)
@ -55,7 +59,7 @@ public class HTTP3SessionClient extends HTTP3Session implements Session.Client
QuicStreamEndPoint endPoint = session.getOrCreateStreamEndPoint(streamId, session::configureProtocolEndPoint);
Promise.Completable<Stream> promise = new Promise.Completable<>();
HTTP3Stream stream = new HTTP3Stream(endPoint, listener);
HTTP3Stream stream = newStream(endPoint, listener);
Callback callback = Callback.from(Invocable.InvocationType.NON_BLOCKING, () -> promise.succeeded(stream), promise::failed);
session.writeMessageFrame(endPoint, frame, callback);

View File

@ -17,6 +17,7 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.frames.SettingsFrame;
public interface Session
{
@ -44,6 +45,10 @@ public interface Session
return null;
}
public default void onSettings(Session session, SettingsFrame frame)
{
}
public default Stream.Listener onHeaders(Stream stream, HeadersFrame frame)
{
return null;

View File

@ -13,8 +13,6 @@
package org.eclipse.jetty.http3.frames;
import java.nio.ByteBuffer;
public abstract class Frame
{
private final FrameType type;
@ -34,20 +32,4 @@ public abstract class Frame
{
return String.format("%s@%x", getClass().getSimpleName(), hashCode());
}
public static class Synthetic extends Frame
{
private final ByteBuffer buffer;
public Synthetic(ByteBuffer buffer)
{
super(null);
this.buffer = buffer;
}
public ByteBuffer getByteBuffer()
{
return buffer;
}
}
}

View File

@ -54,20 +54,23 @@ public class ControlConnection extends AbstractConnection implements Connection.
}
@Override
public void onOpen()
public void onUpgradeTo(ByteBuffer upgrade)
{
super.onOpen();
fillInterested();
int capacity = Math.max(upgrade.remaining(), getInputBufferSize());
buffer = byteBufferPool.acquire(capacity, isUseInputDirectByteBuffers());
int position = BufferUtil.flipToFill(buffer);
buffer.put(upgrade);
BufferUtil.flipToFlush(buffer, position);
}
@Override
public void onUpgradeTo(ByteBuffer upgrade)
public void onOpen()
{
if (BufferUtil.isEmpty(upgrade))
return;
int capacity = Math.max(upgrade.remaining(), getInputBufferSize());
buffer = byteBufferPool.acquire(capacity, isUseInputDirectByteBuffers());
buffer.put(upgrade);
super.onOpen();
if (BufferUtil.hasContent(buffer))
onFillable();
else
fillInterested();
}
@Override
@ -79,18 +82,21 @@ public class ControlConnection extends AbstractConnection implements Connection.
buffer = byteBufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
while (true)
{
// Parse first in case of bytes from the upgrade.
parser.parse(buffer);
// Then read from the EndPoint.
int filled = getEndPoint().fill(buffer);
if (filled > 0)
{
parser.parse(buffer);
}
else if (filled == 0)
if (LOG.isDebugEnabled())
LOG.debug("filled {} on {}", filled, this);
if (filled == 0)
{
byteBufferPool.release(buffer);
fillInterested();
break;
}
else
else if (filled < 0)
{
byteBufferPool.release(buffer);
buffer = null;

View File

@ -22,8 +22,8 @@ import java.util.Queue;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.internal.generator.ControlGenerator;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.quic.common.QuicSession;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.thread.AutoLock;
@ -39,11 +39,12 @@ public class ControlFlusher extends IteratingCallback
private final Queue<Entry> queue = new ArrayDeque<>();
private final ByteBufferPool.Lease lease;
private final ControlGenerator generator;
private final EndPoint endPoint;
private final QuicStreamEndPoint endPoint;
private boolean initialized;
private List<Entry> entries;
private InvocationType invocationType = InvocationType.NON_BLOCKING;
public ControlFlusher(QuicSession session, EndPoint endPoint)
public ControlFlusher(QuicSession session, QuicStreamEndPoint endPoint)
{
this.lease = new ByteBufferPool.Lease(session.getByteBufferPool());
this.endPoint = endPoint;
@ -64,32 +65,29 @@ public class ControlFlusher extends IteratingCallback
try (AutoLock l = lock.lock())
{
if (queue.isEmpty())
{
entries = List.of();
}
else
{
entries = new ArrayList<>(queue);
queue.clear();
}
return Action.IDLE;
entries = new ArrayList<>(queue);
queue.clear();
}
if (LOG.isDebugEnabled())
LOG.debug("flushing {} entries on {}", entries.size(), this);
if (entries.isEmpty())
return Action.IDLE;
LOG.debug("flushing {} on {}", entries, this);
for (Entry entry : entries)
{
Frame frame = entry.frame;
if (frame instanceof Frame.Synthetic)
lease.append(((Frame.Synthetic)frame).getByteBuffer(), false);
else
generator.generate(lease, frame);
generator.generate(lease, endPoint.getStreamId(), entry.frame);
invocationType = Invocable.combine(invocationType, entry.callback.getInvocationType());
}
if (!initialized)
{
initialized = true;
ByteBuffer buffer = ByteBuffer.allocate(VarLenInt.length(ControlConnection.STREAM_TYPE));
VarLenInt.generate(buffer, ControlConnection.STREAM_TYPE);
buffer.flip();
lease.insert(0, buffer, false);
}
List<ByteBuffer> buffers = lease.getByteBuffers();
if (LOG.isDebugEnabled())
LOG.debug("writing {} buffers ({} bytes) on {}", buffers.size(), lease.getTotalLength(), this);
@ -101,7 +99,7 @@ public class ControlFlusher extends IteratingCallback
public void succeeded()
{
if (LOG.isDebugEnabled())
LOG.debug("succeeded to flush {} entries on {}", entries, this);
LOG.debug("succeeded to flush {} on {}", entries, this);
lease.recycle();
@ -117,7 +115,7 @@ public class ControlFlusher extends IteratingCallback
protected void onCompleteFailure(Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("failed to flush {} entries on {}", entries, this, failure);
LOG.debug("failed to flush {} on {}", entries, this, failure);
lease.recycle();
@ -133,6 +131,12 @@ public class ControlFlusher extends IteratingCallback
return invocationType;
}
@Override
public String toString()
{
return String.format("%s#%s", super.toString(), endPoint.getStreamId());
}
private static class Entry
{
private final Frame frame;

View File

@ -14,13 +14,23 @@
package org.eclipse.jetty.http3.internal;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.parser.ParserListener;
import org.eclipse.jetty.quic.common.ProtocolSession;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HTTP3Session implements Session, ParserListener
{
private static final Logger LOG = LoggerFactory.getLogger(HTTP3Session.class);
private final Map<Long, HTTP3Stream> streams = new ConcurrentHashMap<>();
private final ProtocolSession session;
private final Listener listener;
@ -35,8 +45,56 @@ public class HTTP3Session implements Session, ParserListener
return session;
}
protected HTTP3Stream newStream(QuicStreamEndPoint endPoint, Stream.Listener listener)
{
return streams.computeIfAbsent(endPoint.getStreamId(), id -> new HTTP3Stream(endPoint, listener));
}
public Map<Long, Long> onPreface()
{
return listener.onPreface(this);
Map<Long, Long> settings = notifyPreface();
if (LOG.isDebugEnabled())
LOG.debug("produced settings {} on {}", settings, this);
return settings;
}
private Map<Long, Long> notifyPreface()
{
try
{
return listener.onPreface(this);
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
return null;
}
}
@Override
public void onSettings(SettingsFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("received {} on {}", frame, this);
notifySettings(frame);
}
private void notifySettings(SettingsFrame frame)
{
try
{
listener.onSettings(this, frame);
}
catch (Throwable x)
{
LOG.info("failure notifying listener {}", listener, x);
}
}
@Override
public void onHeaders(long streamId, HeadersFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("received {} on {}", frame, this);
}
}

View File

@ -21,20 +21,24 @@ import java.util.Queue;
import org.eclipse.jetty.http3.qpack.Instruction;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.quic.common.QuicSession;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class InstructionFlusher extends IteratingCallback
{
private static final Logger LOG = LoggerFactory.getLogger(InstructionFlusher.class);
private final AutoLock lock = new AutoLock();
private final Queue<Instruction> queue = new ArrayDeque<>();
private final ByteBufferPool.Lease lease;
private final EndPoint endPoint;
private final QuicStreamEndPoint endPoint;
private boolean initialized;
public InstructionFlusher(QuicSession session, EndPoint endPoint)
public InstructionFlusher(QuicSession session, QuicStreamEndPoint endPoint)
{
this.lease = new ByteBufferPool.Lease(session.getByteBufferPool());
this.endPoint = endPoint;
@ -51,28 +55,34 @@ public class InstructionFlusher extends IteratingCallback
@Override
protected Action process()
{
if (initialized)
List<Instruction> instructions;
try (AutoLock l = lock.lock())
{
List<Instruction> instructions;
try (AutoLock l = lock.lock())
{
if (queue.isEmpty())
return Action.IDLE;
instructions = new ArrayList<>(queue);
}
instructions.forEach(i -> i.encode(lease));
endPoint.write(this, lease.getByteBuffers().toArray(ByteBuffer[]::new));
return Action.SCHEDULED;
if (queue.isEmpty())
return Action.IDLE;
instructions = new ArrayList<>(queue);
queue.clear();
}
else
if (LOG.isDebugEnabled())
LOG.debug("flushing {} on {}", instructions, this);
instructions.forEach(i -> i.encode(lease));
if (!initialized)
{
initialized = true;
ByteBuffer buffer = ByteBuffer.allocate(VarLenInt.length(EncoderConnection.STREAM_TYPE));
VarLenInt.generate(buffer, EncoderConnection.STREAM_TYPE);
buffer.flip();
endPoint.write(NOOP, buffer);
return Action.SCHEDULED;
lease.insert(0, buffer, false);
}
List<ByteBuffer> buffers = lease.getByteBuffers();
if (LOG.isDebugEnabled())
LOG.debug("writing {} buffers ({} bytes) on {}", buffers.size(), lease.getTotalLength(), this);
endPoint.write(this, buffers.toArray(ByteBuffer[]::new));
return Action.SCHEDULED;
}
@Override
@ -87,4 +97,10 @@ public class InstructionFlusher extends IteratingCallback
{
return InvocationType.NON_BLOCKING;
}
@Override
public String toString()
{
return String.format("%s#%s", super.toString(), endPoint.getStreamId());
}
}

View File

@ -67,6 +67,7 @@ public class StreamConnection extends AbstractConnection implements Connection.U
copy.put(buffer);
byteBufferPool.release(buffer);
buffer = null;
copy.flip();
return copy;
}
@ -81,6 +82,9 @@ public class StreamConnection extends AbstractConnection implements Connection.U
while (true)
{
int filled = getEndPoint().fill(buffer);
if (LOG.isDebugEnabled())
LOG.debug("filled {} on {}", filled, this);
if (filled > 0)
{
if (parser.parseInt(buffer, this::detectAndUpgrade))
@ -104,7 +108,7 @@ public class StreamConnection extends AbstractConnection implements Connection.U
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("could not process special stream {}", getEndPoint(), x);
LOG.debug("could not process stream {}", getEndPoint(), x);
byteBufferPool.release(buffer);
buffer = null;
getEndPoint().close(x);
@ -121,6 +125,8 @@ public class StreamConnection extends AbstractConnection implements Connection.U
ControlConnection newConnection = new ControlConnection(getEndPoint(), getExecutor(), byteBufferPool, parser);
newConnection.setInputBufferSize(getInputBufferSize());
newConnection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers());
if (LOG.isDebugEnabled())
LOG.debug("upgrading to {}", newConnection);
getEndPoint().upgrade(newConnection);
break;
}
@ -129,6 +135,8 @@ public class StreamConnection extends AbstractConnection implements Connection.U
EncoderConnection newConnection = new EncoderConnection(getEndPoint(), getExecutor());
newConnection.setInputBufferSize(getInputBufferSize());
newConnection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers());
if (LOG.isDebugEnabled())
LOG.debug("upgrading to {}", newConnection);
getEndPoint().upgrade(newConnection);
break;
}
@ -137,6 +145,8 @@ public class StreamConnection extends AbstractConnection implements Connection.U
DecoderConnection newConnection = new DecoderConnection(getEndPoint(), getExecutor());
newConnection.setInputBufferSize(getInputBufferSize());
newConnection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers());
if (LOG.isDebugEnabled())
LOG.debug("upgrading to {}", newConnection);
getEndPoint().upgrade(newConnection);
break;
}

View File

@ -19,7 +19,7 @@ import org.eclipse.jetty.io.ByteBufferPool;
public class CancelPushGenerator extends FrameGenerator
{
@Override
public int generate(ByteBufferPool.Lease lease, Frame frame)
public int generate(ByteBufferPool.Lease lease, long streamId, Frame frame)
{
return 0;
}

View File

@ -29,8 +29,8 @@ public class ControlGenerator
generators[FrameType.MAX_PUSH_ID.type()] = new MaxPushIdGenerator();
}
public int generate(ByteBufferPool.Lease lease, Frame frame)
public int generate(ByteBufferPool.Lease lease, long streamId, Frame frame)
{
return generators[frame.getFrameType().type()].generate(lease, frame);
return generators[frame.getFrameType().type()].generate(lease, streamId, frame);
}
}

View File

@ -19,7 +19,7 @@ import org.eclipse.jetty.io.ByteBufferPool;
public class DataGenerator extends FrameGenerator
{
@Override
public int generate(ByteBufferPool.Lease lease, Frame frame)
public int generate(ByteBufferPool.Lease lease, long streamId, Frame frame)
{
return 0;
}

View File

@ -18,5 +18,5 @@ import org.eclipse.jetty.io.ByteBufferPool;
public abstract class FrameGenerator
{
public abstract int generate(ByteBufferPool.Lease lease, Frame frame);
public abstract int generate(ByteBufferPool.Lease lease, long streamId, Frame frame);
}

View File

@ -19,7 +19,7 @@ import org.eclipse.jetty.io.ByteBufferPool;
public class GoAwayGenerator extends FrameGenerator
{
@Override
public int generate(ByteBufferPool.Lease lease, Frame frame)
public int generate(ByteBufferPool.Lease lease, long streamId, Frame frame)
{
return 0;
}

View File

@ -13,14 +13,47 @@
package org.eclipse.jetty.http3.internal.generator;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.qpack.QpackEncoder;
import org.eclipse.jetty.http3.qpack.QpackException;
import org.eclipse.jetty.io.ByteBufferPool;
public class HeadersGenerator extends FrameGenerator
{
@Override
public int generate(ByteBufferPool.Lease lease, Frame frame)
private final QpackEncoder encoder;
private final int maxLength;
private final boolean useDirectByteBuffers;
public HeadersGenerator(QpackEncoder encoder, int maxLength, boolean useDirectByteBuffers)
{
this.encoder = encoder;
this.maxLength = maxLength;
this.useDirectByteBuffers = useDirectByteBuffers;
}
@Override
public int generate(ByteBufferPool.Lease lease, long streamId, Frame frame)
{
HeadersFrame headersFrame = (HeadersFrame)frame;
return generateHeadersFrame(lease, streamId, headersFrame);
}
private int generateHeadersFrame(ByteBufferPool.Lease lease, long streamId, HeadersFrame frame)
{
try
{
ByteBuffer buffer = lease.acquire(maxLength, useDirectByteBuffers);
encoder.encode(buffer, streamId, frame.getMetaData());
buffer.flip();
return buffer.remaining();
}
catch (QpackException e)
{
e.printStackTrace();
}
return 0;
}
}

View File

@ -19,7 +19,7 @@ import org.eclipse.jetty.io.ByteBufferPool;
public class MaxPushIdGenerator extends FrameGenerator
{
@Override
public int generate(ByteBufferPool.Lease lease, Frame frame)
public int generate(ByteBufferPool.Lease lease, long streamId, Frame frame)
{
return 0;
}

View File

@ -22,15 +22,15 @@ public class MessageGenerator
{
private final FrameGenerator[] generators = new FrameGenerator[FrameType.maxType() + 1];
public MessageGenerator(QpackEncoder encoder)
public MessageGenerator(QpackEncoder encoder, int maxHeadersLength, boolean useDirectByteBuffers)
{
generators[FrameType.DATA.type()] = new DataGenerator();
generators[FrameType.HEADERS.type()] = new HeadersGenerator();
generators[FrameType.HEADERS.type()] = new HeadersGenerator(encoder, maxHeadersLength, useDirectByteBuffers);
generators[FrameType.PUSH_PROMISE.type()] = new PushPromiseGenerator();
}
public int generate(ByteBufferPool.Lease lease, Frame frame)
public int generate(ByteBufferPool.Lease lease, long streamId, Frame frame)
{
return generators[frame.getFrameType().type()].generate(lease, frame);
return generators[frame.getFrameType().type()].generate(lease, streamId, frame);
}
}

View File

@ -19,7 +19,7 @@ import org.eclipse.jetty.io.ByteBufferPool;
public class PushPromiseGenerator extends FrameGenerator
{
@Override
public int generate(ByteBufferPool.Lease lease, Frame frame)
public int generate(ByteBufferPool.Lease lease, long streamId, Frame frame)
{
return 0;
}

View File

@ -25,7 +25,7 @@ import org.eclipse.jetty.util.BufferUtil;
public class SettingsGenerator extends FrameGenerator
{
@Override
public int generate(ByteBufferPool.Lease lease, Frame frame)
public int generate(ByteBufferPool.Lease lease, long streamId, Frame frame)
{
SettingsFrame settingsFrame = (SettingsFrame)frame;
return generateSettings(lease, settingsFrame);
@ -33,8 +33,8 @@ public class SettingsGenerator extends FrameGenerator
private int generateSettings(ByteBufferPool.Lease lease, SettingsFrame frame)
{
Map<Long, Long> settings = frame.getSettings();
int length = 0;
Map<Long, Long> settings = frame.getSettings();
for (Map.Entry<Long, Long> e : settings.entrySet())
{
length += VarLenInt.length(e.getKey()) + VarLenInt.length(e.getValue());

View File

@ -48,7 +48,7 @@ public class SettingsGenerateParseTest
SettingsFrame input = new SettingsFrame(settings);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(new NullByteBufferPool());
new SettingsGenerator().generate(lease, input);
new SettingsGenerator().generate(lease, 0, input);
List<SettingsFrame> frames = new ArrayList<>();
MessageParser parser = new MessageParser(0, null, new ParserListener()

View File

@ -221,7 +221,6 @@ public class QpackEncoder implements Dumpable
int deltaBase = signBit ? requiredInsertCount - base - 1 : base - requiredInsertCount;
// Encode all the entries into the buffer.
int pos = BufferUtil.flipToFill(buffer);
// Encode the Field Section Prefix into the ByteBuffer.
NBitIntegerEncoder.encode(buffer, 8, encodedInsertCount);
@ -234,7 +233,6 @@ public class QpackEncoder implements Dumpable
entry.encode(buffer, base);
}
BufferUtil.flipToFlush(buffer, pos);
notifyInstructionHandler();
}

View File

@ -97,7 +97,7 @@ public abstract class AbstractHTTP3ServerConnectionFactory extends AbstractConne
QuicStreamEndPoint streamEndPoint = (QuicStreamEndPoint)endPoint;
long streamId = streamEndPoint.getStreamId();
ServerHTTP3Session http3Session = (ServerHTTP3Session)streamEndPoint.getQuicSession().getProtocolSession();
MessageParser parser = new MessageParser(streamId, http3Session.getQpackDecoder(), http3Session);
MessageParser parser = new MessageParser(streamId, http3Session.getQpackDecoder(), http3Session.getSessionServer());
HTTP3Connection connection = new HTTP3Connection(endPoint, connector.getExecutor(), connector.getByteBufferPool(), parser);
return connection;
}

View File

@ -0,0 +1,29 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.server.internal;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.internal.HTTP3Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HTTP3SessionServer extends HTTP3Session implements Session.Server
{
private static final Logger LOG = LoggerFactory.getLogger(HTTP3SessionServer.class);
public HTTP3SessionServer(ServerHTTP3Session session, Session.Server.Listener listener)
{
super(session, listener);
}
}

View File

@ -13,21 +13,15 @@
package org.eclipse.jetty.http3.server.internal;
import java.nio.ByteBuffer;
import java.util.Map;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.ControlConnection;
import org.eclipse.jetty.http3.internal.ControlFlusher;
import org.eclipse.jetty.http3.internal.HTTP3Session;
import org.eclipse.jetty.http3.internal.InstructionFlusher;
import org.eclipse.jetty.http3.internal.InstructionHandler;
import org.eclipse.jetty.http3.internal.StreamConnection;
import org.eclipse.jetty.http3.internal.VarLenInt;
import org.eclipse.jetty.http3.internal.generator.MessageGenerator;
import org.eclipse.jetty.http3.internal.parser.ParserListener;
import org.eclipse.jetty.http3.qpack.QpackDecoder;
import org.eclipse.jetty.http3.qpack.QpackEncoder;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
@ -38,13 +32,13 @@ import org.eclipse.jetty.util.Callback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ServerHTTP3Session extends ServerProtocolSession implements ParserListener
public class ServerHTTP3Session extends ServerProtocolSession
{
private static final Logger LOG = LoggerFactory.getLogger(ServerHTTP3Session.class);
private final QpackEncoder encoder;
private final QpackDecoder decoder;
private final HTTP3Session apiSession;
private final HTTP3SessionServer apiSession;
private final InstructionFlusher encoderFlusher;
private final InstructionFlusher decoderFlusher;
private final ControlFlusher controlFlusher;
@ -53,7 +47,7 @@ public class ServerHTTP3Session extends ServerProtocolSession implements ParserL
public ServerHTTP3Session(ServerQuicSession session, Session.Server.Listener listener, int maxBlockedStreams, int maxRequestHeadersSize)
{
super(session);
this.apiSession = new HTTP3Session(this, listener);
this.apiSession = new HTTP3SessionServer(this, listener);
long encoderStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL);
QuicStreamEndPoint encoderEndPoint = configureEncoderEndPoint(encoderStreamId);
@ -65,8 +59,9 @@ public class ServerHTTP3Session extends ServerProtocolSession implements ParserL
this.decoderFlusher = new InstructionFlusher(session, decoderEndPoint);
this.decoder = new QpackDecoder(new InstructionHandler(decoderFlusher), maxRequestHeadersSize);
this.generator = new MessageGenerator(encoder);
long controlStreamId = getQuicSession().newStreamId(StreamType.SERVER_BIDIRECTIONAL);
// TODO: make parameters configurable.
this.generator = new MessageGenerator(encoder, 4096, true);
long controlStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL);
QuicStreamEndPoint controlEndPoint = configureControlEndPoint(controlStreamId);
this.controlFlusher = new ControlFlusher(session, controlEndPoint);
}
@ -76,32 +71,14 @@ public class ServerHTTP3Session extends ServerProtocolSession implements ParserL
return decoder;
}
public HTTP3SessionServer getSessionServer()
{
return apiSession;
}
@Override
public void onOpen()
{
initializeEncoderStream();
initializeDecoderStream();
initializeControlStream();
}
private void initializeEncoderStream()
{
encoderFlusher.iterate();
}
private void initializeDecoderStream()
{
decoderFlusher.iterate();
}
private void initializeControlStream()
{
// Queue a synthetic frame to send the control stream type.
ByteBuffer buffer = ByteBuffer.allocate(VarLenInt.length(ControlConnection.STREAM_TYPE));
VarLenInt.generate(buffer, ControlConnection.STREAM_TYPE);
buffer.flip();
controlFlusher.offer(new Frame.Synthetic(buffer), Callback.NOOP);
// Queue the mandatory SETTINGS frame.
Map<Long, Long> settings = apiSession.onPreface();
if (settings == null)
@ -126,7 +103,8 @@ public class ServerHTTP3Session extends ServerProtocolSession implements ParserL
private QuicStreamEndPoint configureControlEndPoint(long streamId)
{
return getOrCreateStreamEndPoint(streamId, this::configureStreamEndPoint);
// This is a write-only stream, so no need to link a Connection.
return getOrCreateStreamEndPoint(streamId, QuicStreamEndPoint::onOpen);
}
@Override
@ -149,7 +127,7 @@ public class ServerHTTP3Session extends ServerProtocolSession implements ParserL
private void configureStreamEndPoint(QuicStreamEndPoint endPoint)
{
StreamConnection connection = new StreamConnection(endPoint, getQuicSession().getExecutor(), getQuicSession().getByteBufferPool(), this);
StreamConnection connection = new StreamConnection(endPoint, getQuicSession().getExecutor(), getQuicSession().getByteBufferPool(), apiSession);
endPoint.setConnection(connection);
endPoint.onOpen();
connection.onOpen();

View File

@ -26,6 +26,7 @@ import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.client.HTTP3Client;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.server.RawHTTP3ServerConnectionFactory;
import org.eclipse.jetty.quic.server.ServerQuicConnector;
import org.eclipse.jetty.server.Server;
@ -49,9 +50,16 @@ public class HTTP3ClientServerTest
serverThreads.setName("server");
Server server = new Server(serverThreads);
CountDownLatch settingsLatch = new CountDownLatch(1);
CountDownLatch serverLatch = new CountDownLatch(1);
ServerQuicConnector connector = new ServerQuicConnector(server, sslContextFactory, new RawHTTP3ServerConnectionFactory(new Session.Server.Listener()
{
@Override
public void onSettings(Session session, SettingsFrame frame)
{
settingsLatch.countDown();
}
@Override
public Stream.Listener onHeaders(Stream stream, HeadersFrame frame)
{
@ -75,6 +83,7 @@ public class HTTP3ClientServerTest
.get(555, TimeUnit.SECONDS);
assertNotNull(stream);
assertTrue(settingsLatch.await(555, TimeUnit.SECONDS));
assertTrue(serverLatch.await(555, TimeUnit.SECONDS));
}
}