Issue - QUIC and HTTP/3

- Improved configuration of client and server.
- Implemented handling of QPACK exceptions.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-10-13 22:43:57 +02:00
parent 29385fa095
commit a0399a2e30
26 changed files with 331 additions and 173 deletions

View File

@ -19,6 +19,7 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.eclipse.jetty.http3.HTTP3Configuration;
import org.eclipse.jetty.http3.api.Session; import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.io.ClientConnectionFactory; import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.ClientConnector; import org.eclipse.jetty.io.ClientConnector;
@ -48,18 +49,15 @@ public class HTTP3Client extends ContainerLifeCycle
private static final Logger LOG = LoggerFactory.getLogger(HTTP3Client.class); private static final Logger LOG = LoggerFactory.getLogger(HTTP3Client.class);
private final QuicSessionContainer container = new QuicSessionContainer(); private final QuicSessionContainer container = new QuicSessionContainer();
private final HTTP3Configuration configuration = new HTTP3Configuration();
private final ClientConnector connector; private final ClientConnector connector;
private List<String> protocols = List.of("h3"); private List<String> protocols = List.of("h3");
private long streamIdleTimeout = 30000;
private int inputBufferSize = 2048;
private int outputBufferSize = 2048;
private boolean useInputDirectByteBuffers = true;
private boolean useOutputDirectByteBuffers = true;
public HTTP3Client() public HTTP3Client()
{ {
this.connector = new ClientConnector(new QuicClientConnectorConfigurator(this::configureConnection)); this.connector = new ClientConnector(new QuicClientConnectorConfigurator(this::configureConnection));
addBean(connector); addBean(connector);
addBean(configuration);
addBean(container); addBean(container);
} }
@ -68,44 +66,9 @@ public class HTTP3Client extends ContainerLifeCycle
return connector; return connector;
} }
public int getInputBufferSize() public HTTP3Configuration getConfiguration()
{ {
return inputBufferSize; return configuration;
}
public void setInputBufferSize(int inputBufferSize)
{
this.inputBufferSize = inputBufferSize;
}
public int getOutputBufferSize()
{
return outputBufferSize;
}
public void setOutputBufferSize(int outputBufferSize)
{
this.outputBufferSize = outputBufferSize;
}
public boolean isUseInputDirectByteBuffers()
{
return useInputDirectByteBuffers;
}
public void setUseInputDirectByteBuffers(boolean useInputDirectByteBuffers)
{
this.useInputDirectByteBuffers = useInputDirectByteBuffers;
}
public boolean isUseOutputDirectByteBuffers()
{
return useOutputDirectByteBuffers;
}
public void setUseOutputDirectByteBuffers(boolean useOutputDirectByteBuffers)
{
this.useOutputDirectByteBuffers = useOutputDirectByteBuffers;
} }
@ManagedAttribute("The ALPN protocol list") @ManagedAttribute("The ALPN protocol list")
@ -119,17 +82,6 @@ public class HTTP3Client extends ContainerLifeCycle
this.protocols = protocols; this.protocols = protocols;
} }
@ManagedAttribute("The stream idle timeout in milliseconds")
public long getStreamIdleTimeout()
{
return streamIdleTimeout;
}
public void setStreamIdleTimeout(long streamIdleTimeout)
{
this.streamIdleTimeout = streamIdleTimeout;
}
public CompletableFuture<Session.Client> connect(SocketAddress address, Session.Client.Listener listener) public CompletableFuture<Session.Client> connect(SocketAddress address, Session.Client.Listener listener)
{ {
Map<String, Object> context = new ConcurrentHashMap<>(); Map<String, Object> context = new ConcurrentHashMap<>();
@ -155,10 +107,10 @@ public class HTTP3Client extends ContainerLifeCycle
{ {
QuicConnection quicConnection = (QuicConnection)connection; QuicConnection quicConnection = (QuicConnection)connection;
quicConnection.addEventListener(container); quicConnection.addEventListener(container);
quicConnection.setInputBufferSize(getInputBufferSize()); quicConnection.setInputBufferSize(getConfiguration().getInputBufferSize());
quicConnection.setOutputBufferSize(getOutputBufferSize()); quicConnection.setOutputBufferSize(getConfiguration().getOutputBufferSize());
quicConnection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers()); quicConnection.setUseInputDirectByteBuffers(getConfiguration().isUseInputDirectByteBuffers());
quicConnection.setUseOutputDirectByteBuffers(isUseOutputDirectByteBuffers()); quicConnection.setUseOutputDirectByteBuffers(getConfiguration().isUseOutputDirectByteBuffers());
} }
return connection; return connection;
} }

View File

@ -34,29 +34,6 @@ public class HTTP3ClientConnectionFactory implements ClientConnectionFactory, Pr
{ {
private static final Logger LOG = LoggerFactory.getLogger(HTTP3ClientConnectionFactory.class); private static final Logger LOG = LoggerFactory.getLogger(HTTP3ClientConnectionFactory.class);
private int maxBlockedStreams;
private int maxResponseHeadersSize = 8192;
public int getMaxBlockedStreams()
{
return maxBlockedStreams;
}
public void setMaxBlockedStreams(int maxBlockedStreams)
{
this.maxBlockedStreams = maxBlockedStreams;
}
public int getMaxResponseHeadersSize()
{
return maxResponseHeadersSize;
}
public void setMaxResponseHeadersSize(int maxResponseHeadersSize)
{
this.maxResponseHeadersSize = maxResponseHeadersSize;
}
@Override @Override
public ProtocolSession newProtocolSession(QuicSession quicSession, Map<String, Object> context) public ProtocolSession newProtocolSession(QuicSession quicSession, Map<String, Object> context)
{ {
@ -64,8 +41,8 @@ public class HTTP3ClientConnectionFactory implements ClientConnectionFactory, Pr
Session.Client.Listener listener = (Session.Client.Listener)context.get(HTTP3Client.SESSION_LISTENER_CONTEXT_KEY); Session.Client.Listener listener = (Session.Client.Listener)context.get(HTTP3Client.SESSION_LISTENER_CONTEXT_KEY);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Promise<Session.Client> promise = (Promise<Session.Client>)context.get(HTTP3Client.SESSION_PROMISE_CONTEXT_KEY); Promise<Session.Client> promise = (Promise<Session.Client>)context.get(HTTP3Client.SESSION_PROMISE_CONTEXT_KEY);
ClientHTTP3Session session = new ClientHTTP3Session((ClientQuicSession)quicSession, listener, promise, getMaxBlockedStreams(), getMaxResponseHeadersSize()); ClientHTTP3Session session = new ClientHTTP3Session(client.getConfiguration(), (ClientQuicSession)quicSession, listener, promise);
session.setStreamIdleTimeout(client.getStreamIdleTimeout()); session.setStreamIdleTimeout(client.getConfiguration().getStreamIdleTimeout());
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("created protocol-specific {}", session); LOG.debug("created protocol-specific {}", session);
return session; return session;

View File

@ -16,6 +16,7 @@ package org.eclipse.jetty.http3.client.internal;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.http3.HTTP3Configuration;
import org.eclipse.jetty.http3.api.Session; import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.frames.Frame; import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.SettingsFrame; import org.eclipse.jetty.http3.frames.SettingsFrame;
@ -48,7 +49,7 @@ public class ClientHTTP3Session extends ClientProtocolSession
private final ControlFlusher controlFlusher; private final ControlFlusher controlFlusher;
private final HTTP3Flusher messageFlusher; private final HTTP3Flusher messageFlusher;
public ClientHTTP3Session(ClientQuicSession quicSession, Session.Client.Listener listener, Promise<Session.Client> promise, int maxBlockedStreams, int maxResponseHeadersSize) public ClientHTTP3Session(HTTP3Configuration configuration, ClientQuicSession quicSession, Session.Client.Listener listener, Promise<Session.Client> promise)
{ {
super(quicSession); super(quicSession);
this.session = new HTTP3SessionClient(this, listener, promise); this.session = new HTTP3SessionClient(this, listener, promise);
@ -60,7 +61,7 @@ public class ClientHTTP3Session extends ClientProtocolSession
long encoderStreamId = getQuicSession().newStreamId(StreamType.CLIENT_UNIDIRECTIONAL); long encoderStreamId = getQuicSession().newStreamId(StreamType.CLIENT_UNIDIRECTIONAL);
QuicStreamEndPoint encoderEndPoint = configureInstructionEndPoint(encoderStreamId); QuicStreamEndPoint encoderEndPoint = configureInstructionEndPoint(encoderStreamId);
InstructionFlusher encoderInstructionFlusher = new InstructionFlusher(quicSession, encoderEndPoint, EncoderStreamConnection.STREAM_TYPE); InstructionFlusher encoderInstructionFlusher = new InstructionFlusher(quicSession, encoderEndPoint, EncoderStreamConnection.STREAM_TYPE);
this.encoder = new QpackEncoder(new InstructionHandler(encoderInstructionFlusher), maxBlockedStreams); this.encoder = new QpackEncoder(new InstructionHandler(encoderInstructionFlusher), configuration.getMaxBlockedStreams());
addBean(encoder); addBean(encoder);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("created encoder stream #{} on {}", encoderStreamId, encoderEndPoint); LOG.debug("created encoder stream #{} on {}", encoderStreamId, encoderEndPoint);
@ -68,7 +69,7 @@ public class ClientHTTP3Session extends ClientProtocolSession
long decoderStreamId = getQuicSession().newStreamId(StreamType.CLIENT_UNIDIRECTIONAL); long decoderStreamId = getQuicSession().newStreamId(StreamType.CLIENT_UNIDIRECTIONAL);
QuicStreamEndPoint decoderEndPoint = configureInstructionEndPoint(decoderStreamId); QuicStreamEndPoint decoderEndPoint = configureInstructionEndPoint(decoderStreamId);
InstructionFlusher decoderInstructionFlusher = new InstructionFlusher(quicSession, decoderEndPoint, DecoderStreamConnection.STREAM_TYPE); InstructionFlusher decoderInstructionFlusher = new InstructionFlusher(quicSession, decoderEndPoint, DecoderStreamConnection.STREAM_TYPE);
this.decoder = new QpackDecoder(new InstructionHandler(decoderInstructionFlusher), maxResponseHeadersSize); this.decoder = new QpackDecoder(new InstructionHandler(decoderInstructionFlusher), configuration.getMaxResponseHeadersSize());
addBean(decoder); addBean(decoder);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("created decoder stream #{} on {}", decoderStreamId, decoderEndPoint); LOG.debug("created decoder stream #{} on {}", decoderStreamId, decoderEndPoint);
@ -80,8 +81,7 @@ public class ClientHTTP3Session extends ClientProtocolSession
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("created control stream #{} on {}", controlStreamId, controlEndPoint); LOG.debug("created control stream #{} on {}", controlStreamId, controlEndPoint);
// TODO: make parameters configurable. this.messageFlusher = new HTTP3Flusher(quicSession.getByteBufferPool(), encoder, configuration.getMaxRequestHeadersSize(), configuration.isUseOutputDirectByteBuffers());
this.messageFlusher = new HTTP3Flusher(quicSession.getByteBufferPool(), encoder, 4096, true);
addBean(messageFlusher); addBean(messageFlusher);
} }

View File

@ -21,6 +21,7 @@ module org.eclipse.jetty.http3.common
requires transitive org.eclipse.jetty.http3.qpack; requires transitive org.eclipse.jetty.http3.qpack;
requires transitive org.eclipse.jetty.quic.common; requires transitive org.eclipse.jetty.quic.common;
exports org.eclipse.jetty.http3;
exports org.eclipse.jetty.http3.api; exports org.eclipse.jetty.http3.api;
exports org.eclipse.jetty.http3.frames; exports org.eclipse.jetty.http3.frames;

View File

@ -0,0 +1,118 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
@ManagedObject
public class HTTP3Configuration
{
private long streamIdleTimeout = 30000;
private int inputBufferSize = 2048;
private int outputBufferSize = 2048;
private boolean useInputDirectByteBuffers = true;
private boolean useOutputDirectByteBuffers = true;
private int maxBlockedStreams = 0;
private int maxRequestHeadersSize = 8192;
private int maxResponseHeadersSize = 8192;
@ManagedAttribute("The stream idle timeout in milliseconds")
public long getStreamIdleTimeout()
{
return streamIdleTimeout;
}
public void setStreamIdleTimeout(long streamIdleTimeout)
{
this.streamIdleTimeout = streamIdleTimeout;
}
@ManagedAttribute("The size of the network input buffer")
public int getInputBufferSize()
{
return inputBufferSize;
}
public void setInputBufferSize(int inputBufferSize)
{
this.inputBufferSize = inputBufferSize;
}
@ManagedAttribute("The size of the network output buffer")
public int getOutputBufferSize()
{
return outputBufferSize;
}
public void setOutputBufferSize(int outputBufferSize)
{
this.outputBufferSize = outputBufferSize;
}
@ManagedAttribute("Whether to use direct buffers for input")
public boolean isUseInputDirectByteBuffers()
{
return useInputDirectByteBuffers;
}
public void setUseInputDirectByteBuffers(boolean useInputDirectByteBuffers)
{
this.useInputDirectByteBuffers = useInputDirectByteBuffers;
}
@ManagedAttribute("Whether to use direct buffers for output")
public boolean isUseOutputDirectByteBuffers()
{
return useOutputDirectByteBuffers;
}
public void setUseOutputDirectByteBuffers(boolean useOutputDirectByteBuffers)
{
this.useOutputDirectByteBuffers = useOutputDirectByteBuffers;
}
@ManagedAttribute("The max number of QPACK blocked streams")
public int getMaxBlockedStreams()
{
return maxBlockedStreams;
}
public void setMaxBlockedStreams(int maxBlockedStreams)
{
this.maxBlockedStreams = maxBlockedStreams;
}
@ManagedAttribute("The max size of the request headers")
public int getMaxRequestHeadersSize()
{
return maxRequestHeadersSize;
}
public void setMaxRequestHeadersSize(int maxRequestHeadersSize)
{
this.maxRequestHeadersSize = maxRequestHeadersSize;
}
@ManagedAttribute("The max size of the response headers")
public int getMaxResponseHeadersSize()
{
return maxResponseHeadersSize;
}
public void setMaxResponseHeadersSize(int maxResponseHeadersSize)
{
this.maxResponseHeadersSize = maxResponseHeadersSize;
}
}

View File

@ -75,7 +75,7 @@ public class ControlFlusher extends IteratingCallback
for (Entry entry : entries) for (Entry entry : entries)
{ {
generator.generate(lease, endPoint.getStreamId(), entry.frame); generator.generate(lease, endPoint.getStreamId(), entry.frame, null);
invocationType = Invocable.combine(invocationType, entry.callback.getInvocationType()); invocationType = Invocable.combine(invocationType, entry.callback.getInvocationType());
} }

View File

@ -74,7 +74,9 @@ public class HTTP3Flusher extends IteratingCallback
return Action.SCHEDULED; return Action.SCHEDULED;
} }
generator.generate(lease, entry.endPoint.getStreamId(), frame); int generated = generator.generate(lease, entry.endPoint.getStreamId(), frame, this::fail);
if (generated < 0)
return Action.SCHEDULED;
QuicStreamEndPoint endPoint = entry.endPoint; QuicStreamEndPoint endPoint = entry.endPoint;
List<ByteBuffer> buffers = lease.getByteBuffers(); List<ByteBuffer> buffers = lease.getByteBuffers();
@ -96,6 +98,17 @@ public class HTTP3Flusher extends IteratingCallback
super.succeeded(); super.succeeded();
} }
private void fail(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("failed to flush {} on {}", entry, this);
lease.recycle();
entry.callback.failed(x);
entry = null;
// Continue the iteration.
super.succeeded();
}
@Override @Override
protected void onCompleteFailure(Throwable failure) protected void onCompleteFailure(Throwable failure)
{ {

View File

@ -287,6 +287,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
} }
else else
{ {
removeStream(stream);
promise.failed(x); promise.failed(x);
} }
}); });

View File

@ -13,13 +13,15 @@
package org.eclipse.jetty.http3.internal.generator; package org.eclipse.jetty.http3.internal.generator;
import java.util.function.Consumer;
import org.eclipse.jetty.http3.frames.Frame; import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
public class CancelPushGenerator extends FrameGenerator public class CancelPushGenerator extends FrameGenerator
{ {
@Override @Override
public int generate(ByteBufferPool.Lease lease, long streamId, Frame frame) public int generate(ByteBufferPool.Lease lease, long streamId, Frame frame, Consumer<Throwable> fail)
{ {
return 0; return 0;
} }

View File

@ -13,6 +13,8 @@
package org.eclipse.jetty.http3.internal.generator; package org.eclipse.jetty.http3.internal.generator;
import java.util.function.Consumer;
import org.eclipse.jetty.http3.frames.Frame; import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.FrameType; import org.eclipse.jetty.http3.frames.FrameType;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
@ -29,8 +31,8 @@ public class ControlGenerator
generators[FrameType.MAX_PUSH_ID.type()] = new MaxPushIdGenerator(); generators[FrameType.MAX_PUSH_ID.type()] = new MaxPushIdGenerator();
} }
public int generate(ByteBufferPool.Lease lease, long streamId, Frame frame) public int generate(ByteBufferPool.Lease lease, long streamId, Frame frame, Consumer<Throwable> fail)
{ {
return generators[frame.getFrameType().type()].generate(lease, streamId, frame); return generators[frame.getFrameType().type()].generate(lease, streamId, frame, fail);
} }
} }

View File

@ -14,6 +14,7 @@
package org.eclipse.jetty.http3.internal.generator; package org.eclipse.jetty.http3.internal.generator;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.function.Consumer;
import org.eclipse.jetty.http3.frames.DataFrame; import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.frames.Frame; import org.eclipse.jetty.http3.frames.Frame;
@ -31,7 +32,7 @@ public class DataGenerator extends FrameGenerator
} }
@Override @Override
public int generate(ByteBufferPool.Lease lease, long streamId, Frame frame) public int generate(ByteBufferPool.Lease lease, long streamId, Frame frame, Consumer<Throwable> fail)
{ {
DataFrame dataFrame = (DataFrame)frame; DataFrame dataFrame = (DataFrame)frame;
return generateDataFrame(lease, dataFrame); return generateDataFrame(lease, dataFrame);

View File

@ -13,10 +13,12 @@
package org.eclipse.jetty.http3.internal.generator; package org.eclipse.jetty.http3.internal.generator;
import java.util.function.Consumer;
import org.eclipse.jetty.http3.frames.Frame; import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
public abstract class FrameGenerator public abstract class FrameGenerator
{ {
public abstract int generate(ByteBufferPool.Lease lease, long streamId, Frame frame); public abstract int generate(ByteBufferPool.Lease lease, long streamId, Frame frame, Consumer<Throwable> fail);
} }

View File

@ -14,6 +14,7 @@
package org.eclipse.jetty.http3.internal.generator; package org.eclipse.jetty.http3.internal.generator;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.function.Consumer;
import org.eclipse.jetty.http3.frames.Frame; import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.FrameType; import org.eclipse.jetty.http3.frames.FrameType;
@ -31,7 +32,7 @@ public class GoAwayGenerator extends FrameGenerator
} }
@Override @Override
public int generate(ByteBufferPool.Lease lease, long streamId, Frame frame) public int generate(ByteBufferPool.Lease lease, long streamId, Frame frame, Consumer<Throwable> fail)
{ {
GoAwayFrame goAwayFrame = (GoAwayFrame)frame; GoAwayFrame goAwayFrame = (GoAwayFrame)frame;
return generateGoAwayFrame(lease, goAwayFrame); return generateGoAwayFrame(lease, goAwayFrame);

View File

@ -14,6 +14,7 @@
package org.eclipse.jetty.http3.internal.generator; package org.eclipse.jetty.http3.internal.generator;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.function.Consumer;
import org.eclipse.jetty.http3.frames.Frame; import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.FrameType; import org.eclipse.jetty.http3.frames.FrameType;
@ -37,21 +38,23 @@ public class HeadersGenerator extends FrameGenerator
} }
@Override @Override
public int generate(ByteBufferPool.Lease lease, long streamId, Frame frame) public int generate(ByteBufferPool.Lease lease, long streamId, Frame frame, Consumer<Throwable> fail)
{ {
HeadersFrame headersFrame = (HeadersFrame)frame; HeadersFrame headersFrame = (HeadersFrame)frame;
return generateHeadersFrame(lease, streamId, headersFrame); return generateHeadersFrame(lease, streamId, headersFrame, fail);
} }
private int generateHeadersFrame(ByteBufferPool.Lease lease, long streamId, HeadersFrame frame) private int generateHeadersFrame(ByteBufferPool.Lease lease, long streamId, HeadersFrame frame, Consumer<Throwable> fail)
{ {
try try
{ {
// Reserve initial bytes for the frame header bytes. // Reserve initial bytes for the frame header bytes.
int frameTypeLength = VarLenInt.length(FrameType.HEADERS.type()); int frameTypeLength = VarLenInt.length(FrameType.HEADERS.type());
int maxHeaderLength = frameTypeLength + VarLenInt.MAX_LENGTH; int maxHeaderLength = frameTypeLength + VarLenInt.MAX_LENGTH;
// The capacity of the buffer is larger than maxLength, but we need to enforce at most maxLength.
ByteBuffer buffer = lease.acquire(maxHeaderLength + maxLength, useDirectByteBuffers); ByteBuffer buffer = lease.acquire(maxHeaderLength + maxLength, useDirectByteBuffers);
buffer.position(maxHeaderLength); buffer.position(maxHeaderLength);
buffer.limit(buffer.position() + maxLength);
// Encode after the maxHeaderLength. // Encode after the maxHeaderLength.
encoder.encode(buffer, streamId, frame.getMetaData()); encoder.encode(buffer, streamId, frame.getMetaData());
buffer.flip(); buffer.flip();
@ -66,11 +69,11 @@ public class HeadersGenerator extends FrameGenerator
lease.append(buffer, true); lease.append(buffer, true);
return headerLength + dataLength; return headerLength + dataLength;
} }
catch (QpackException e) catch (QpackException x)
{ {
// TODO if (fail != null)
e.printStackTrace(); fail.accept(x);
return 0; return -1;
} }
} }
} }

View File

@ -13,13 +13,15 @@
package org.eclipse.jetty.http3.internal.generator; package org.eclipse.jetty.http3.internal.generator;
import java.util.function.Consumer;
import org.eclipse.jetty.http3.frames.Frame; import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
public class MaxPushIdGenerator extends FrameGenerator public class MaxPushIdGenerator extends FrameGenerator
{ {
@Override @Override
public int generate(ByteBufferPool.Lease lease, long streamId, Frame frame) public int generate(ByteBufferPool.Lease lease, long streamId, Frame frame, Consumer<Throwable> fail)
{ {
return 0; return 0;
} }

View File

@ -13,6 +13,8 @@
package org.eclipse.jetty.http3.internal.generator; package org.eclipse.jetty.http3.internal.generator;
import java.util.function.Consumer;
import org.eclipse.jetty.http3.frames.Frame; import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.FrameType; import org.eclipse.jetty.http3.frames.FrameType;
import org.eclipse.jetty.http3.qpack.QpackEncoder; import org.eclipse.jetty.http3.qpack.QpackEncoder;
@ -29,8 +31,8 @@ public class MessageGenerator
generators[FrameType.PUSH_PROMISE.type()] = new PushPromiseGenerator(); generators[FrameType.PUSH_PROMISE.type()] = new PushPromiseGenerator();
} }
public int generate(ByteBufferPool.Lease lease, long streamId, Frame frame) public int generate(ByteBufferPool.Lease lease, long streamId, Frame frame, Consumer<Throwable> fail)
{ {
return generators[frame.getFrameType().type()].generate(lease, streamId, frame); return generators[frame.getFrameType().type()].generate(lease, streamId, frame, fail);
} }
} }

View File

@ -13,13 +13,15 @@
package org.eclipse.jetty.http3.internal.generator; package org.eclipse.jetty.http3.internal.generator;
import java.util.function.Consumer;
import org.eclipse.jetty.http3.frames.Frame; import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
public class PushPromiseGenerator extends FrameGenerator public class PushPromiseGenerator extends FrameGenerator
{ {
@Override @Override
public int generate(ByteBufferPool.Lease lease, long streamId, Frame frame) public int generate(ByteBufferPool.Lease lease, long streamId, Frame frame, Consumer<Throwable> fail)
{ {
return 0; return 0;
} }

View File

@ -15,6 +15,7 @@ package org.eclipse.jetty.http3.internal.generator;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Map; import java.util.Map;
import java.util.function.Consumer;
import org.eclipse.jetty.http3.frames.Frame; import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.SettingsFrame; import org.eclipse.jetty.http3.frames.SettingsFrame;
@ -32,7 +33,7 @@ public class SettingsGenerator extends FrameGenerator
} }
@Override @Override
public int generate(ByteBufferPool.Lease lease, long streamId, Frame frame) public int generate(ByteBufferPool.Lease lease, long streamId, Frame frame, Consumer<Throwable> fail)
{ {
SettingsFrame settingsFrame = (SettingsFrame)frame; SettingsFrame settingsFrame = (SettingsFrame)frame;
return generateSettings(lease, settingsFrame); return generateSettings(lease, settingsFrame);

View File

@ -55,7 +55,7 @@ public class DataGenerateParseTest
DataFrame input = new DataFrame(ByteBuffer.wrap(inputBytes), true); DataFrame input = new DataFrame(ByteBuffer.wrap(inputBytes), true);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(new NullByteBufferPool()); ByteBufferPool.Lease lease = new ByteBufferPool.Lease(new NullByteBufferPool());
new MessageGenerator(null, 8192, true).generate(lease, 0, input); new MessageGenerator(null, 8192, true).generate(lease, 0, input, null);
List<DataFrame> frames = new ArrayList<>(); List<DataFrame> frames = new ArrayList<>();
MessageParser parser = new MessageParser(new ParserListener() MessageParser parser = new MessageParser(new ParserListener()

View File

@ -36,7 +36,7 @@ public class GoAwayGenerateParseTest
GoAwayFrame input = GoAwayFrame.CLIENT_GRACEFUL; GoAwayFrame input = GoAwayFrame.CLIENT_GRACEFUL;
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(new NullByteBufferPool()); ByteBufferPool.Lease lease = new ByteBufferPool.Lease(new NullByteBufferPool());
new ControlGenerator(true).generate(lease, 0, input); new ControlGenerator(true).generate(lease, 0, input, null);
List<GoAwayFrame> frames = new ArrayList<>(); List<GoAwayFrame> frames = new ArrayList<>();
ControlParser parser = new ControlParser(new ParserListener() ControlParser parser = new ControlParser(new ParserListener()

View File

@ -49,7 +49,7 @@ public class HeadersGenerateParseTest
QpackEncoder encoder = new QpackEncoder(instructions -> {}, 100); QpackEncoder encoder = new QpackEncoder(instructions -> {}, 100);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(new NullByteBufferPool()); ByteBufferPool.Lease lease = new ByteBufferPool.Lease(new NullByteBufferPool());
new MessageGenerator(encoder, 8192, true).generate(lease, 0, input); new MessageGenerator(encoder, 8192, true).generate(lease, 0, input, null);
QpackDecoder decoder = new QpackDecoder(instructions -> {}, 8192); QpackDecoder decoder = new QpackDecoder(instructions -> {}, 8192);
List<HeadersFrame> frames = new ArrayList<>(); List<HeadersFrame> frames = new ArrayList<>();

View File

@ -48,7 +48,7 @@ public class SettingsGenerateParseTest
SettingsFrame input = new SettingsFrame(settings); SettingsFrame input = new SettingsFrame(settings);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(new NullByteBufferPool()); ByteBufferPool.Lease lease = new ByteBufferPool.Lease(new NullByteBufferPool());
new ControlGenerator(true).generate(lease, 0, input); new ControlGenerator(true).generate(lease, 0, input, null);
List<SettingsFrame> frames = new ArrayList<>(); List<SettingsFrame> frames = new ArrayList<>();
ControlParser parser = new ControlParser(new ParserListener() ControlParser parser = new ControlParser(new ParserListener()

View File

@ -16,6 +16,7 @@ package org.eclipse.jetty.http3.server;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import org.eclipse.jetty.http3.HTTP3Configuration;
import org.eclipse.jetty.http3.api.Session; import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.internal.parser.MessageParser; import org.eclipse.jetty.http3.internal.parser.MessageParser;
import org.eclipse.jetty.http3.server.internal.ServerHTTP3Session; import org.eclipse.jetty.http3.server.internal.ServerHTTP3Session;
@ -29,50 +30,24 @@ import org.eclipse.jetty.quic.server.ServerQuicSession;
import org.eclipse.jetty.server.AbstractConnectionFactory; import org.eclipse.jetty.server.AbstractConnectionFactory;
import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
public abstract class AbstractHTTP3ServerConnectionFactory extends AbstractConnectionFactory implements ProtocolSession.Factory public abstract class AbstractHTTP3ServerConnectionFactory extends AbstractConnectionFactory implements ProtocolSession.Factory
{ {
private final HTTP3Configuration configuration = new HTTP3Configuration();
private final HttpConfiguration httpConfiguration; private final HttpConfiguration httpConfiguration;
private final Session.Server.Listener listener; private final Session.Server.Listener listener;
private boolean useInputDirectByteBuffers = true;
private boolean useOutputDirectByteBuffers = true;
private int maxBlockedStreams = 0;
private long streamIdleTimeout = 30000;
public AbstractHTTP3ServerConnectionFactory(HttpConfiguration httpConfiguration, Session.Server.Listener listener) public AbstractHTTP3ServerConnectionFactory(HttpConfiguration httpConfiguration, Session.Server.Listener listener)
{ {
super("h3"); super("h3");
addBean(configuration);
this.httpConfiguration = Objects.requireNonNull(httpConfiguration); this.httpConfiguration = Objects.requireNonNull(httpConfiguration);
addBean(httpConfiguration); addBean(httpConfiguration);
this.listener = listener; this.listener = listener;
} configuration.setUseInputDirectByteBuffers(httpConfiguration.isUseInputDirectByteBuffers());
configuration.setUseOutputDirectByteBuffers(httpConfiguration.isUseOutputDirectByteBuffers());
protected Session.Server.Listener getListener() configuration.setMaxRequestHeadersSize(httpConfiguration.getRequestHeaderSize());
{ configuration.setMaxResponseHeadersSize(httpConfiguration.getResponseHeaderSize());
return listener;
}
@ManagedAttribute("Whether to use direct ByteBuffers for reading")
public boolean isUseInputDirectByteBuffers()
{
return useInputDirectByteBuffers;
}
public void setUseInputDirectByteBuffers(boolean useInputDirectByteBuffers)
{
this.useInputDirectByteBuffers = useInputDirectByteBuffers;
}
@ManagedAttribute("Whether to use direct ByteBuffers for writing")
public boolean isUseOutputDirectByteBuffers()
{
return useOutputDirectByteBuffers;
}
public void setUseOutputDirectByteBuffers(boolean useOutputDirectByteBuffers)
{
this.useOutputDirectByteBuffers = useOutputDirectByteBuffers;
} }
public HttpConfiguration getHttpConfiguration() public HttpConfiguration getHttpConfiguration()
@ -80,33 +55,16 @@ public abstract class AbstractHTTP3ServerConnectionFactory extends AbstractConne
return httpConfiguration; return httpConfiguration;
} }
@ManagedAttribute("The max number of streams blocked in QPACK encoding") public HTTP3Configuration getConfiguration()
public int getMaxBlockedStreams()
{ {
return maxBlockedStreams; return configuration;
}
public void setMaxBlockedStreams(int maxBlockedStreams)
{
this.maxBlockedStreams = maxBlockedStreams;
}
@ManagedAttribute("The stream idle timeout in milliseconds")
public long getStreamIdleTimeout()
{
return streamIdleTimeout;
}
public void setStreamIdleTimeout(long streamIdleTimeout)
{
this.streamIdleTimeout = streamIdleTimeout;
} }
@Override @Override
public ProtocolSession newProtocolSession(QuicSession quicSession, Map<String, Object> context) public ProtocolSession newProtocolSession(QuicSession quicSession, Map<String, Object> context)
{ {
ServerHTTP3Session session = new ServerHTTP3Session((ServerQuicSession)quicSession, listener, getMaxBlockedStreams(), getHttpConfiguration().getRequestHeaderSize()); ServerHTTP3Session session = new ServerHTTP3Session(getConfiguration(), (ServerQuicSession)quicSession, listener);
session.setStreamIdleTimeout(getStreamIdleTimeout()); session.setStreamIdleTimeout(getConfiguration().getStreamIdleTimeout());
return session; return session;
} }

View File

@ -18,6 +18,7 @@ import java.util.Map;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.http3.HTTP3Configuration;
import org.eclipse.jetty.http3.api.Session; import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.frames.Frame; import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.SettingsFrame; import org.eclipse.jetty.http3.frames.SettingsFrame;
@ -53,7 +54,7 @@ public class ServerHTTP3Session extends ServerProtocolSession
private final AdaptiveExecutionStrategy strategy; private final AdaptiveExecutionStrategy strategy;
private final HTTP3Producer producer = new HTTP3Producer(); private final HTTP3Producer producer = new HTTP3Producer();
public ServerHTTP3Session(ServerQuicSession quicSession, Session.Server.Listener listener, int maxBlockedStreams, int maxRequestHeadersSize) public ServerHTTP3Session(HTTP3Configuration configuration, ServerQuicSession quicSession, Session.Server.Listener listener)
{ {
super(quicSession); super(quicSession);
this.session = new HTTP3SessionServer(this, listener); this.session = new HTTP3SessionServer(this, listener);
@ -65,7 +66,7 @@ public class ServerHTTP3Session extends ServerProtocolSession
long encoderStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL); long encoderStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL);
QuicStreamEndPoint encoderEndPoint = configureInstructionEndPoint(encoderStreamId); QuicStreamEndPoint encoderEndPoint = configureInstructionEndPoint(encoderStreamId);
InstructionFlusher encoderInstructionFlusher = new InstructionFlusher(quicSession, encoderEndPoint, EncoderStreamConnection.STREAM_TYPE); InstructionFlusher encoderInstructionFlusher = new InstructionFlusher(quicSession, encoderEndPoint, EncoderStreamConnection.STREAM_TYPE);
this.encoder = new QpackEncoder(new InstructionHandler(encoderInstructionFlusher), maxBlockedStreams); this.encoder = new QpackEncoder(new InstructionHandler(encoderInstructionFlusher), configuration.getMaxBlockedStreams());
addBean(encoder); addBean(encoder);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("created encoder stream #{} on {}", encoderStreamId, encoderEndPoint); LOG.debug("created encoder stream #{} on {}", encoderStreamId, encoderEndPoint);
@ -73,20 +74,19 @@ public class ServerHTTP3Session extends ServerProtocolSession
long decoderStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL); long decoderStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL);
QuicStreamEndPoint decoderEndPoint = configureInstructionEndPoint(decoderStreamId); QuicStreamEndPoint decoderEndPoint = configureInstructionEndPoint(decoderStreamId);
InstructionFlusher decoderInstructionFlusher = new InstructionFlusher(quicSession, decoderEndPoint, DecoderStreamConnection.STREAM_TYPE); InstructionFlusher decoderInstructionFlusher = new InstructionFlusher(quicSession, decoderEndPoint, DecoderStreamConnection.STREAM_TYPE);
this.decoder = new QpackDecoder(new InstructionHandler(decoderInstructionFlusher), maxRequestHeadersSize); this.decoder = new QpackDecoder(new InstructionHandler(decoderInstructionFlusher), configuration.getMaxRequestHeadersSize());
addBean(decoder); addBean(decoder);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("created decoder stream #{} on {}", decoderStreamId, decoderEndPoint); LOG.debug("created decoder stream #{} on {}", decoderStreamId, decoderEndPoint);
long controlStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL); long controlStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL);
QuicStreamEndPoint controlEndPoint = configureControlEndPoint(controlStreamId); QuicStreamEndPoint controlEndPoint = configureControlEndPoint(controlStreamId);
this.controlFlusher = new ControlFlusher(quicSession, controlEndPoint, true); this.controlFlusher = new ControlFlusher(quicSession, controlEndPoint, configuration.isUseOutputDirectByteBuffers());
addBean(controlFlusher); addBean(controlFlusher);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("created control stream #{} on {}", controlStreamId, controlEndPoint); LOG.debug("created control stream #{} on {}", controlStreamId, controlEndPoint);
// TODO: make parameters configurable. this.messageFlusher = new HTTP3Flusher(quicSession.getByteBufferPool(), encoder, configuration.getMaxResponseHeadersSize(), configuration.isUseOutputDirectByteBuffers());
this.messageFlusher = new HTTP3Flusher(quicSession.getByteBufferPool(), encoder, 4096, true);
addBean(messageFlusher); addBean(messageFlusher);
this.strategy = new AdaptiveExecutionStrategy(producer, getQuicSession().getExecutor()); this.strategy = new AdaptiveExecutionStrategy(producer, getQuicSession().getExecutor());

View File

@ -30,6 +30,8 @@ import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.DataFrame; import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.frames.HeadersFrame; import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.frames.SettingsFrame; import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.HTTP3ErrorCode;
import org.eclipse.jetty.http3.server.AbstractHTTP3ServerConnectionFactory;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource; import org.junit.jupiter.params.provider.ValueSource;
@ -265,4 +267,122 @@ public class ClientServerTest extends AbstractClientServerTest
assertTrue(clientDataLatch.await(5, TimeUnit.SECONDS)); assertTrue(clientDataLatch.await(5, TimeUnit.SECONDS));
assertArrayEquals(bytesSent, bytesReceived); assertArrayEquals(bytesSent, bytesReceived);
} }
@Test
public void testRequestHeadersTooLarge() throws Exception
{
start(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{
stream.respond(new HeadersFrame(new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY), true));
return null;
}
});
int maxRequestHeadersSize = 128;
client.getConfiguration().setMaxRequestHeadersSize(maxRequestHeadersSize);
Session.Client clientSession = newSession(new Session.Client.Listener() {});
CountDownLatch requestFailureLatch = new CountDownLatch(1);
HttpFields largeHeaders = HttpFields.build().put("too-large", "x".repeat(2 * maxRequestHeadersSize));
clientSession.newRequest(new HeadersFrame(newRequest(HttpMethod.GET, "/", largeHeaders), true), new Stream.Listener() {})
.whenComplete((s, x) ->
{
// The HTTP3Stream was created, but the application cannot access
// it, so the implementation must remove it from the HTTP3Session.
// See below the difference with the server.
if (x != null)
requestFailureLatch.countDown();
});
assertTrue(requestFailureLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientSession.getStreams().isEmpty());
// Verify that the connection is still good.
CountDownLatch responseLatch = new CountDownLatch(1);
clientSession.newRequest(new HeadersFrame(newRequest("/"), true), new Stream.Listener()
{
@Override
public void onResponse(Stream stream, HeadersFrame frame)
{
responseLatch.countDown();
}
});
assertTrue(responseLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testResponseHeadersTooLarge() throws Exception
{
int maxResponseHeadersSize = 128;
AtomicReference<Session> serverSessionRef = new AtomicReference<>();
CountDownLatch responseFailureLatch = new CountDownLatch(1);
start(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{
serverSessionRef.set(stream.getSession());
MetaData.Request request = (MetaData.Request)frame.getMetaData();
if ("/large".equals(request.getURI().getPath()))
{
HttpFields largeHeaders = HttpFields.build().put("too-large", "x".repeat(2 * maxResponseHeadersSize));
stream.respond(new HeadersFrame(new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, largeHeaders), true))
.whenComplete((s, x) ->
{
// The response could not be generated, but the stream is still valid.
// Applications may try to send a smaller response here,
// so the implementation must not remove the stream.
if (x != null)
{
// In this test, we give up if there is an error.
stream.reset(HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code(), x);
responseFailureLatch.countDown();
}
});
}
else
{
stream.respond(new HeadersFrame(new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY), true));
}
return null;
}
});
AbstractHTTP3ServerConnectionFactory h3 = connector.getConnectionFactory(AbstractHTTP3ServerConnectionFactory.class);
assertNotNull(h3);
h3.getConfiguration().setMaxResponseHeadersSize(maxResponseHeadersSize);
Session.Client clientSession = newSession(new Session.Client.Listener() {});
CountDownLatch streamFailureLatch = new CountDownLatch(1);
clientSession.newRequest(new HeadersFrame(newRequest("/large"), true), new Stream.Listener()
{
@Override
public void onFailure(Stream stream, Throwable failure)
{
streamFailureLatch.countDown();
}
});
assertTrue(responseFailureLatch.await(5, TimeUnit.SECONDS));
assertTrue(streamFailureLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverSessionRef.get().getStreams().isEmpty());
assertTrue(clientSession.getStreams().isEmpty());
// Verify that the connection is still good.
CountDownLatch responseLatch = new CountDownLatch(1);
clientSession.newRequest(new HeadersFrame(newRequest("/"), true), new Stream.Listener()
{
@Override
public void onResponse(Stream stream, HeadersFrame frame)
{
responseLatch.countDown();
}
});
assertTrue(responseLatch.await(5, TimeUnit.SECONDS));
}
} }

View File

@ -97,7 +97,7 @@ public class StreamIdleTimeoutTest extends AbstractClientServerTest
}); });
long streamIdleTimeout = 1000; long streamIdleTimeout = 1000;
client.setStreamIdleTimeout(streamIdleTimeout); client.getConfiguration().setStreamIdleTimeout(streamIdleTimeout);
Session.Client clientSession = newSession(new Session.Client.Listener() {}); Session.Client clientSession = newSession(new Session.Client.Listener() {});
@ -175,9 +175,9 @@ public class StreamIdleTimeoutTest extends AbstractClientServerTest
} }
} }
}); });
AbstractHTTP3ServerConnectionFactory h3 = server.getConnectors()[0].getConnectionFactory(AbstractHTTP3ServerConnectionFactory.class); AbstractHTTP3ServerConnectionFactory h3 = connector.getConnectionFactory(AbstractHTTP3ServerConnectionFactory.class);
assertNotNull(h3); assertNotNull(h3);
h3.setStreamIdleTimeout(idleTimeout); h3.getConfiguration().setStreamIdleTimeout(idleTimeout);
Session.Client clientSession = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {}) Session.Client clientSession = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {})
.get(5, TimeUnit.SECONDS); .get(5, TimeUnit.SECONDS);