Fixes to QPACK configuration from SETTINGS frames. (#9728)

* Improved configuration upon sending/receiving SETTINGS frames.
* Improved handling of blocked frames in HTTP3StreamConnection.
* Fixed locking and reentrance in QPACK encoder/decoder.

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
Co-authored-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Lachlan 2023-05-15 23:07:33 +10:00 committed by GitHub
parent 7ac49cd43c
commit 6e8457cfde
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 886 additions and 381 deletions

View File

@ -13,6 +13,7 @@
package org.eclipse.jetty.http3.client.internal;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@ -44,6 +45,7 @@ public class ClientHTTP3Session extends ClientProtocolSession
{
private static final Logger LOG = LoggerFactory.getLogger(ClientHTTP3Session.class);
private final HTTP3Configuration configuration;
private final HTTP3SessionClient session;
private final QpackEncoder encoder;
private final QpackDecoder decoder;
@ -53,7 +55,8 @@ public class ClientHTTP3Session extends ClientProtocolSession
public ClientHTTP3Session(HTTP3Configuration configuration, ClientQuicSession quicSession, Session.Client.Listener listener, Promise<Session.Client> promise)
{
super(quicSession);
this.session = new HTTP3SessionClient(this, listener, promise);
this.configuration = configuration;
session = new HTTP3SessionClient(this, listener, promise);
addBean(session);
session.setStreamIdleTimeout(configuration.getStreamIdleTimeout());
@ -63,7 +66,8 @@ public class ClientHTTP3Session extends ClientProtocolSession
long encoderStreamId = getQuicSession().newStreamId(StreamType.CLIENT_UNIDIRECTIONAL);
QuicStreamEndPoint encoderEndPoint = openInstructionEndPoint(encoderStreamId);
InstructionFlusher encoderInstructionFlusher = new InstructionFlusher(quicSession, encoderEndPoint, EncoderStreamConnection.STREAM_TYPE);
this.encoder = new QpackEncoder(new InstructionHandler(encoderInstructionFlusher), configuration.getMaxBlockedStreams());
encoder = new QpackEncoder(new InstructionHandler(encoderInstructionFlusher));
encoder.setMaxHeadersSize(configuration.getMaxRequestHeadersSize());
addBean(encoder);
if (LOG.isDebugEnabled())
LOG.debug("created encoder stream #{} on {}", encoderStreamId, encoderEndPoint);
@ -71,19 +75,19 @@ public class ClientHTTP3Session extends ClientProtocolSession
long decoderStreamId = getQuicSession().newStreamId(StreamType.CLIENT_UNIDIRECTIONAL);
QuicStreamEndPoint decoderEndPoint = openInstructionEndPoint(decoderStreamId);
InstructionFlusher decoderInstructionFlusher = new InstructionFlusher(quicSession, decoderEndPoint, DecoderStreamConnection.STREAM_TYPE);
this.decoder = new QpackDecoder(new InstructionHandler(decoderInstructionFlusher), configuration.getMaxResponseHeadersSize());
decoder = new QpackDecoder(new InstructionHandler(decoderInstructionFlusher));
addBean(decoder);
if (LOG.isDebugEnabled())
LOG.debug("created decoder stream #{} on {}", decoderStreamId, decoderEndPoint);
long controlStreamId = getQuicSession().newStreamId(StreamType.CLIENT_UNIDIRECTIONAL);
QuicStreamEndPoint controlEndPoint = openControlEndPoint(controlStreamId);
this.controlFlusher = new ControlFlusher(quicSession, controlEndPoint, true);
controlFlusher = new ControlFlusher(quicSession, controlEndPoint, true);
addBean(controlFlusher);
if (LOG.isDebugEnabled())
LOG.debug("created control stream #{} on {}", controlStreamId, controlEndPoint);
this.messageFlusher = new MessageFlusher(quicSession.getByteBufferPool(), encoder, configuration.getMaxRequestHeadersSize(), configuration.isUseOutputDirectByteBuffers());
messageFlusher = new MessageFlusher(quicSession.getByteBufferPool(), encoder, configuration.isUseOutputDirectByteBuffers());
addBean(messageFlusher);
}
@ -105,16 +109,86 @@ public class ClientHTTP3Session extends ClientProtocolSession
@Override
protected void onStart()
{
// Queue the mandatory SETTINGS frame.
Map<Long, Long> settings = session.onPreface();
if (settings == null)
settings = Map.of();
// TODO: add default settings.
settings = settings != null ? new HashMap<>(settings) : new HashMap<>();
settings.compute(SettingsFrame.MAX_TABLE_CAPACITY, (k, v) ->
{
if (v == null)
{
v = (long)configuration.getMaxDecoderTableCapacity();
if (v == 0)
v = null;
}
return v;
});
settings.compute(SettingsFrame.MAX_FIELD_SECTION_SIZE, (k, v) ->
{
if (v == null)
{
v = (long)configuration.getMaxResponseHeadersSize();
if (v <= 0)
v = null;
}
return v;
});
settings.compute(SettingsFrame.MAX_BLOCKED_STREAMS, (k, v) ->
{
if (v == null)
{
v = (long)configuration.getMaxBlockedStreams();
if (v == 0)
v = null;
}
return v;
});
if (LOG.isDebugEnabled())
LOG.debug("configuring local {} on {}", settings, this);
settings.forEach((key, value) ->
{
if (key == SettingsFrame.MAX_TABLE_CAPACITY)
decoder.setMaxTableCapacity(value.intValue());
else if (key == SettingsFrame.MAX_FIELD_SECTION_SIZE)
decoder.setMaxHeadersSize(value.intValue());
else if (key == SettingsFrame.MAX_BLOCKED_STREAMS)
decoder.setMaxBlockedStreams(value.intValue());
});
// Queue the mandatory SETTINGS frame.
SettingsFrame frame = new SettingsFrame(settings);
if (controlFlusher.offer(frame, Callback.from(Invocable.InvocationType.NON_BLOCKING, session::onOpen, this::failControlStream)))
controlFlusher.iterate();
}
public void onSettings(SettingsFrame frame)
{
Map<Long, Long> settings = frame.getSettings();
if (LOG.isDebugEnabled())
LOG.debug("configuring encoder {} on {}", settings, this);
settings.forEach((key, value) ->
{
if (key == SettingsFrame.MAX_TABLE_CAPACITY)
{
int maxTableCapacity = value.intValue();
encoder.setMaxTableCapacity(maxTableCapacity);
encoder.setTableCapacity(Math.min(maxTableCapacity, configuration.getInitialEncoderTableCapacity()));
}
else if (key == SettingsFrame.MAX_FIELD_SECTION_SIZE)
{
// Must cap the maxHeaderSize to avoid large allocations.
int maxHeadersSize = Math.min(value.intValue(), configuration.getMaxRequestHeadersSize());
encoder.setMaxHeadersSize(maxHeadersSize);
}
else if (key == SettingsFrame.MAX_BLOCKED_STREAMS)
{
int maxBlockedStreams = value.intValue();
encoder.setMaxBlockedStreams(maxBlockedStreams);
}
});
}
private void failControlStream(Throwable failure)
{
long error = HTTP3ErrorCode.CLOSED_CRITICAL_STREAM_ERROR.code();

View File

@ -20,6 +20,7 @@ import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.GoAwayFrame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.HTTP3ErrorCode;
import org.eclipse.jetty.http3.internal.HTTP3Session;
import org.eclipse.jetty.quic.common.ProtocolSession;
@ -63,7 +64,7 @@ public class HTTP3SessionClient extends HTTP3Session implements Session.Client
}
@Override
public void onHeaders(long streamId, HeadersFrame frame)
public void onHeaders(long streamId, HeadersFrame frame, boolean wasBlocked)
{
if (frame.getMetaData().isResponse())
{
@ -76,10 +77,19 @@ public class HTTP3SessionClient extends HTTP3Session implements Session.Client
}
else
{
super.onHeaders(streamId, frame);
super.onHeaders(streamId, frame, wasBlocked);
}
}
@Override
public void onSettings(SettingsFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("received {} on {}", frame, this);
getProtocolSession().onSettings(frame);
super.onSettings(frame);
}
@Override
public CompletableFuture<Stream> newRequest(HeadersFrame frame, Stream.Client.Listener listener)
{
@ -147,24 +157,4 @@ public class HTTP3SessionClient extends HTTP3Session implements Session.Client
return GoAwayFrame.CLIENT_GRACEFUL;
return super.newGoAwayFrame(graceful);
}
@Override
protected void onSettingMaxTableCapacity(long value)
{
getProtocolSession().getQpackEncoder().setCapacity((int)value);
}
@Override
protected void onSettingMaxFieldSectionSize(long value)
{
getProtocolSession().getQpackDecoder().setMaxHeaderSize((int)value);
}
@Override
protected void onSettingMaxBlockedStreams(long value)
{
ClientHTTP3Session session = getProtocolSession();
session.getQpackDecoder().setMaxBlockedStreams((int)value);
session.getQpackEncoder().setMaxBlockedStreams((int)value);
}
}

View File

@ -17,7 +17,7 @@ import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
/**
* <p>A record that captures HTTP/3 configuration parameters.</p>
* <p>The HTTP/3 configuration parameters.</p>
*/
@ManagedObject
public class HTTP3Configuration
@ -27,9 +27,11 @@ public class HTTP3Configuration
private int outputBufferSize = 2048;
private boolean useInputDirectByteBuffers = true;
private boolean useOutputDirectByteBuffers = true;
private int maxBlockedStreams = 0;
private int maxRequestHeadersSize = 8192;
private int maxResponseHeadersSize = 8192;
private int maxBlockedStreams = 64;
private int maxTableCapacity = 64 * 1024;
private int initialTableCapacity = 64 * 1024;
private int maxRequestHeadersSize = 8 * 1024;
private int maxResponseHeadersSize = 8 * 1024;
@ManagedAttribute("The stream idle timeout in milliseconds")
public long getStreamIdleTimeout()
@ -37,6 +39,13 @@ public class HTTP3Configuration
return streamIdleTimeout;
}
/**
* <p>Sets the stream idle timeout in milliseconds.</p>
* <p>Negative values and zero mean that the stream never times out.</p>
* <p>Default value is {@code 30} seconds.</p>
*
* @param streamIdleTimeout the stream idle timeout in milliseconds
*/
public void setStreamIdleTimeout(long streamIdleTimeout)
{
this.streamIdleTimeout = streamIdleTimeout;
@ -48,6 +57,12 @@ public class HTTP3Configuration
return inputBufferSize;
}
/**
* <p>Sets the size of the buffer used for QUIC network reads.</p>
* <p>Default value is {@code 2048} bytes.</p>
*
* @param inputBufferSize the buffer size in bytes
*/
public void setInputBufferSize(int inputBufferSize)
{
this.inputBufferSize = inputBufferSize;
@ -59,39 +74,105 @@ public class HTTP3Configuration
return outputBufferSize;
}
/**
* <p>Sets the size of the buffer used for QUIC network writes.</p>
* <p>Default value is {@code 2048} bytes.</p>
*
* @param outputBufferSize the buffer size in bytes
*/
public void setOutputBufferSize(int outputBufferSize)
{
this.outputBufferSize = outputBufferSize;
}
@ManagedAttribute("Whether to use direct buffers for input")
@ManagedAttribute("Whether to use direct buffers for network reads")
public boolean isUseInputDirectByteBuffers()
{
return useInputDirectByteBuffers;
}
/**
* <p>Sets whether to use direct buffers for QUIC network reads.</p>
* <p>Default value is {@code true}.</p>
*
* @param useInputDirectByteBuffers whether to use direct buffers for network reads
*/
public void setUseInputDirectByteBuffers(boolean useInputDirectByteBuffers)
{
this.useInputDirectByteBuffers = useInputDirectByteBuffers;
}
@ManagedAttribute("Whether to use direct buffers for output")
@ManagedAttribute("Whether to use direct buffers for network writes")
public boolean isUseOutputDirectByteBuffers()
{
return useOutputDirectByteBuffers;
}
/**
* <p>Sets whether to use direct buffers for QUIC network writes.</p>
* <p>Default value is {@code true}.</p>
*
* @param useOutputDirectByteBuffers whether to use direct buffers for network writes
*/
public void setUseOutputDirectByteBuffers(boolean useOutputDirectByteBuffers)
{
this.useOutputDirectByteBuffers = useOutputDirectByteBuffers;
}
@ManagedAttribute("The local QPACK max decoder dynamic table capacity")
public int getMaxDecoderTableCapacity()
{
return maxTableCapacity;
}
/**
* <p>Sets the local QPACK decoder max dynamic table capacity.</p>
* <p>The default value is {@code 65536} bytes.</p>
* <p>This value is configured on the local QPACK decoder, and then
* communicated to the remote QPACK encoder via the SETTINGS frame.</p>
*
* @param maxTableCapacity the QPACK decoder dynamic table max capacity
* @see #setInitialEncoderTableCapacity(int)
*/
public void setMaxDecoderTableCapacity(int maxTableCapacity)
{
this.maxTableCapacity = maxTableCapacity;
}
@ManagedAttribute("The local QPACK initial encoder dynamic table capacity")
public int getInitialEncoderTableCapacity()
{
return initialTableCapacity;
}
/**
* <p>Sets the local QPACK encoder initial dynamic table capacity.</p>
* <p>The default value is {@code 65536} bytes.</p>
* <p>This value is configured in the local QPACK encoder, and may be
* overwritten by a smaller value received via the SETTINGS frame.</p>
*
* @param initialTableCapacity the QPACK encoder dynamic table initial capacity
* @see #setMaxDecoderTableCapacity(int)
*/
public void setInitialEncoderTableCapacity(int initialTableCapacity)
{
this.initialTableCapacity = initialTableCapacity;
}
@ManagedAttribute("The max number of QPACK blocked streams")
public int getMaxBlockedStreams()
{
return maxBlockedStreams;
}
/**
* <p>Sets the local QPACK decoder max number of blocked streams.</p>
* <p>The default value is {@code 64}.</p>
* <p>This value is configured in the local QPACK decoder, and then
* communicated to the remote QPACK encoder via the SETTINGS frame.</p>
*
* @param maxBlockedStreams the QPACK decoder max blocked streams
*/
public void setMaxBlockedStreams(int maxBlockedStreams)
{
this.maxBlockedStreams = maxBlockedStreams;
@ -103,6 +184,17 @@ public class HTTP3Configuration
return maxRequestHeadersSize;
}
/**
* <p>Sets max request headers size.</p>
* <p>The default value is {@code 8192} bytes.</p>
* <p>This value is configured in the server-side QPACK decoder, and
* then communicated to the client-side QPACK encoder via the SETTINGS
* frame.</p>
* <p>The client-side QPACK encoder uses this value to cap, if necessary,
* the value sent by the server-side QPACK decoder.</p>
*
* @param maxRequestHeadersSize the max request headers size in bytes
*/
public void setMaxRequestHeadersSize(int maxRequestHeadersSize)
{
this.maxRequestHeadersSize = maxRequestHeadersSize;
@ -114,6 +206,17 @@ public class HTTP3Configuration
return maxResponseHeadersSize;
}
/**
* <p>Sets max response headers size.</p>
* <p>The default value is {@code 8192} bytes.</p>
* <p>This value is configured in the client-side QPACK decoder, and
* then communicated to the server-side QPACK encoder via the SETTINGS
* frame.</p>
* <p>The server-side QPACK encoder uses this value to cap, if necessary,
* the value sent by the client-side QPACK decoder.</p>
*
* @param maxResponseHeadersSize the max response headers size
*/
public void setMaxResponseHeadersSize(int maxResponseHeadersSize)
{
this.maxResponseHeadersSize = maxResponseHeadersSize;

View File

@ -16,6 +16,7 @@ package org.eclipse.jetty.http3.internal;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import org.eclipse.jetty.http3.internal.parser.ParserListener;
import org.eclipse.jetty.http3.qpack.QpackEncoder;
import org.eclipse.jetty.http3.qpack.QpackException;
import org.eclipse.jetty.io.ByteBufferPool;
@ -28,9 +29,9 @@ public class DecoderStreamConnection extends InstructionStreamConnection
private final QpackEncoder encoder;
public DecoderStreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, QpackEncoder encoder)
public DecoderStreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, QpackEncoder encoder, ParserListener listener)
{
super(endPoint, executor, byteBufferPool);
super(endPoint, executor, byteBufferPool, listener);
this.encoder = encoder;
}

View File

@ -16,6 +16,7 @@ package org.eclipse.jetty.http3.internal;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import org.eclipse.jetty.http3.internal.parser.ParserListener;
import org.eclipse.jetty.http3.qpack.QpackDecoder;
import org.eclipse.jetty.http3.qpack.QpackException;
import org.eclipse.jetty.io.ByteBufferPool;
@ -28,9 +29,9 @@ public class EncoderStreamConnection extends InstructionStreamConnection
private final QpackDecoder decoder;
public EncoderStreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, QpackDecoder decoder)
public EncoderStreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, QpackDecoder decoder, ParserListener listener)
{
super(endPoint, executor, byteBufferPool);
super(endPoint, executor, byteBufferPool, listener);
this.decoder = decoder;
}

View File

@ -125,7 +125,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
boolean failStreams = false;
boolean sendGoAway = false;
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
switch (closeState)
{
@ -231,7 +231,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
public CompletableFuture<Void> shutdown()
{
CompletableFuture<Void> result;
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
if (shutdown != null)
return shutdown;
@ -287,7 +287,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
private HTTP3Stream newHTTP3Stream(QuicStreamEndPoint endPoint, Consumer<Throwable> fail, boolean local)
{
Throwable failure = null;
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
if (closeState == CloseState.NOT_CLOSED)
streamCount.incrementAndGet();
@ -349,7 +349,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
{
Map<Long, Long> settings = notifyPreface();
if (LOG.isDebugEnabled())
LOG.debug("produced settings {} on {}", settings, this);
LOG.debug("application produced settings {} on {}", settings, this);
return settings;
}
@ -369,34 +369,9 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
@Override
public void onSettings(SettingsFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("received {} on {}", frame, this);
frame.getSettings().forEach((key, value) ->
{
if (key == SettingsFrame.MAX_TABLE_CAPACITY)
onSettingMaxTableCapacity(value);
else if (key == SettingsFrame.MAX_FIELD_SECTION_SIZE)
onSettingMaxFieldSectionSize(value);
else if (key == SettingsFrame.MAX_BLOCKED_STREAMS)
onSettingMaxBlockedStreams(value);
});
notifySettings(frame);
}
protected void onSettingMaxTableCapacity(long value)
{
}
protected void onSettingMaxFieldSectionSize(long value)
{
}
protected void onSettingMaxBlockedStreams(long value)
{
}
private void notifySettings(SettingsFrame frame)
{
try
@ -435,7 +410,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
}
@Override
public void onHeaders(long streamId, HeadersFrame frame)
public void onHeaders(long streamId, HeadersFrame frame, boolean wasBlocked)
{
MetaData metaData = frame.getMetaData();
if (metaData.isRequest() || metaData.isResponse())
@ -480,7 +455,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
LOG.debug("received {} on {}", frame, this);
boolean failStreams = false;
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
switch (closeState)
{
@ -591,7 +566,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
public boolean onIdleTimeout()
{
boolean notify = false;
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
switch (closeState)
{
@ -647,7 +622,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
public void inwardClose(long error, String reason)
{
GoAwayFrame goAwayFrame = null;
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
switch (closeState)
{
@ -751,7 +726,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
streamTimeouts.destroy();
// Notify the shutdown completable.
CompletableFuture<Void> shutdown;
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
shutdown = this.shutdown;
}
@ -762,7 +737,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
private void tryRunZeroStreamsAction()
{
Runnable action = null;
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
long count = streamCount.get();
if (count > 0)
@ -835,7 +810,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
// A close at the QUIC level does not allow any
// data to be sent, update the state and notify.
boolean notifyFailure;
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
notifyFailure = closeState == CloseState.NOT_CLOSED;
closeState = CloseState.CLOSED;

View File

@ -17,6 +17,7 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.MetaData;
@ -41,17 +42,16 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
private static final ByteBuffer EMPTY_DATA_FRAME = ByteBuffer.allocate(2);
private final AutoLock lock = new AutoLock();
private final AtomicReference<Runnable> event = new AtomicReference<>();
private final RetainableByteBufferPool buffers;
private final MessageParser parser;
private boolean useInputDirectByteBuffers = true;
private RetainableByteBuffer buffer;
private boolean applicationMode;
private boolean parserDataMode;
private boolean dataDemand;
private boolean dataStalled;
private DataFrame dataFrame;
private boolean dataLast;
private boolean noData;
private boolean hasNetworkData;
private boolean remotelyClosed;
public HTTP3StreamConnection(QuicStreamEndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, MessageParser parser)
@ -78,11 +78,6 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
this.useInputDirectByteBuffers = useInputDirectByteBuffers;
}
public void setApplicationMode(boolean mode)
{
this.applicationMode = mode;
}
@Override
public void onOpen()
{
@ -108,8 +103,8 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
public void onFillable()
{
if (LOG.isDebugEnabled())
LOG.debug("processing parserDataMode={} on {}", parserDataMode, this);
if (parserDataMode)
LOG.debug("processing dataMode={} on {}", parser.isDataMode(), this);
if (parser.isDataMode())
processDataFrames();
else
processNonDataFrames();
@ -118,7 +113,7 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
private void processDataFrames()
{
processDataDemand();
if (!parserDataMode)
if (!parser.isDataMode())
{
if (hasBuffer() && buffer.hasRemaining())
processNonDataFrames();
@ -133,50 +128,51 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
{
tryAcquireBuffer();
while (true)
MessageParser.Result result = parseAndFill(true);
if (result == MessageParser.Result.NO_FRAME)
{
if (parseAndFill(true) == MessageParser.Result.NO_FRAME)
{
tryReleaseBuffer(false);
return;
}
tryReleaseBuffer(false);
return;
}
// TODO: we should also exit if the connection was closed due to errors.
// There is not yet a isClosed() primitive though.
if (remotelyClosed)
{
// We have detected the end of the stream,
// do not loop around to fill & parse again.
// However, the last frame may have
// caused a write that we need to flush.
getEndPoint().getQuicSession().flush();
tryReleaseBuffer(false);
return;
}
if (result == MessageParser.Result.BLOCKED_FRAME)
{
// Return immediately because another thread may
// resume the processing as the stream is unblocked.
tryReleaseBuffer(false);
return;
}
if (parserDataMode)
{
if (buffer.hasRemaining())
{
processDataFrames();
}
else
{
if (applicationMode)
{
if (LOG.isDebugEnabled())
LOG.debug("skipping fill interest on {}", this);
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("setting fill interest on {}", this);
fillInterested();
}
tryReleaseBuffer(false);
}
return;
}
Runnable action = event.getAndSet(null);
if (action == null)
throw new IllegalStateException();
action.run();
// TODO: we should also exit if the connection was closed due to errors.
// This can be done by overriding relevant methods in MessageListener.
if (remotelyClosed)
{
// We have detected the end of the stream, do not try to fill & parse again.
// However, the last frame may have caused a write that needs to be flushed.
getEndPoint().getQuicSession().flush();
tryReleaseBuffer(false);
return;
}
if (!parser.isDataMode())
throw new IllegalStateException();
if (hasBuffer() && buffer.hasRemaining())
{
processDataFrames();
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("setting fill interest on {}", this);
tryReleaseBuffer(false);
fillInterested();
}
}
catch (Throwable x)
@ -204,7 +200,12 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
{
case FRAME:
{
if (parserDataMode)
Runnable action = event.getAndSet(null);
if (action == null)
throw new IllegalStateException();
action.run();
if (parser.isDataMode())
{
DataFrame frame = dataFrame;
dataFrame = null;
@ -225,12 +226,11 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
return null;
}
}
case MODE_SWITCH:
case SWITCH_MODE:
{
if (LOG.isDebugEnabled())
LOG.debug("switching to parserDataMode=false on {}", this);
LOG.debug("switching to dataMode=false on {}", this);
dataLast = true;
parserDataMode = false;
parser.setDataMode(false);
tryReleaseBuffer(false);
return null;
@ -269,9 +269,9 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
{
boolean hasData;
boolean process = false;
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
hasData = !noData;
hasData = hasNetworkData;
dataDemand = true;
if (dataStalled && hasData)
{
@ -289,7 +289,7 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
public boolean hasDemand()
{
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
return dataDemand;
}
@ -297,7 +297,7 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
private void cancelDemand()
{
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
dataDemand = false;
}
@ -305,17 +305,17 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
private boolean isStalled()
{
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
return dataStalled;
}
}
private void setNoData(boolean noData)
private void setHasNetworkData(boolean noData)
{
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
this.noData = noData;
this.hasNetworkData = noData;
}
}
@ -324,7 +324,7 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
while (true)
{
boolean process = true;
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
if (LOG.isDebugEnabled())
LOG.debug("processing demand={}, last={} fillInterested={} on {}", dataDemand, dataLast, isFillInterested(), this);
@ -389,7 +389,7 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
if (LOG.isDebugEnabled())
LOG.debug("parse+fill setFillInterest={} on {} with buffer {}", setFillInterest, this, buffer);
setNoData(false);
setHasNetworkData(true);
while (true)
{
@ -397,7 +397,7 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
MessageParser.Result result = parser.parse(byteBuffer);
if (LOG.isDebugEnabled())
LOG.debug("parsed {} on {} with buffer {}", result, this, buffer);
if (result == MessageParser.Result.FRAME || result == MessageParser.Result.MODE_SWITCH)
if (result != MessageParser.Result.NO_FRAME)
return result;
if (buffer.isRetained())
@ -430,7 +430,7 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
return MessageParser.Result.FRAME;
}
setNoData(true);
setHasNetworkData(false);
if (setFillInterest)
fillInterested();
}
@ -458,10 +458,74 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
}
}
private void processHeaders(HeadersFrame frame, boolean wasBlocked, Runnable delegate)
{
MetaData metaData = frame.getMetaData();
if (metaData.isRequest())
{
// Expect DATA frames now.
parser.setDataMode(true);
if (LOG.isDebugEnabled())
LOG.debug("switching to dataMode=true for request {} on {}", metaData, this);
}
else if (metaData.isResponse())
{
MetaData.Response response = (MetaData.Response)metaData;
if (HttpStatus.isInformational(response.getStatus()))
{
if (LOG.isDebugEnabled())
LOG.debug("staying in dataMode=false for response {} on {}", metaData, this);
}
else
{
// Expect DATA frames now.
parser.setDataMode(true);
if (LOG.isDebugEnabled())
LOG.debug("switching to dataMode=true for response {} on {}", metaData, this);
}
}
else
{
// Trailer.
if (!frame.isLast())
frame = new HeadersFrame(metaData, true);
}
if (frame.isLast())
shutdownInput();
delegate.run();
if (wasBlocked)
onFillable();
}
private void processData(DataFrame frame, Runnable delegate)
{
if (dataFrame != null)
throw new IllegalStateException();
dataFrame = frame;
if (frame.isLast())
{
dataLast = true;
shutdownInput();
}
delegate.run();
}
private void shutdownInput()
{
remotelyClosed = true;
// We want to shutdown the input to avoid "spurious" wakeups where
// zero bytes could be spuriously read from the EndPoint after the
// stream is remotely closed by receiving a frame with last=true.
getEndPoint().shutdownInput(HTTP3ErrorCode.NO_ERROR.code());
}
@Override
public String toConnectionString()
{
return String.format("%s[demand=%b,stalled=%b,parserDataMode=%b]", super.toConnectionString(), hasDemand(), isStalled(), parserDataMode);
return String.format("%s[demand=%b,stalled=%b,dataMode=%b]", super.toConnectionString(), hasDemand(), isStalled(), parser.isDataMode());
}
private class MessageListener extends ParserListener.Wrapper
@ -472,66 +536,26 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
}
@Override
public void onHeaders(long streamId, HeadersFrame frame)
public void onHeaders(long streamId, HeadersFrame frame, boolean wasBlocked)
{
MetaData metaData = frame.getMetaData();
if (metaData.isRequest())
{
// Expect DATA frames now.
parserDataMode = true;
parser.setDataMode(true);
if (LOG.isDebugEnabled())
LOG.debug("switching to parserDataMode=true for request {} on {}", metaData, this);
}
else if (metaData.isResponse())
{
MetaData.Response response = (MetaData.Response)metaData;
if (HttpStatus.isInformational(response.getStatus()))
{
if (LOG.isDebugEnabled())
LOG.debug("staying in parserDataMode=false for response {} on {}", metaData, this);
}
else
{
// Expect DATA frames now.
parserDataMode = true;
parser.setDataMode(true);
if (LOG.isDebugEnabled())
LOG.debug("switching to parserDataMode=true for response {} on {}", metaData, this);
}
}
else
{
// Trailer.
if (!frame.isLast())
frame = new HeadersFrame(metaData, true);
}
if (frame.isLast())
shutdownInput();
super.onHeaders(streamId, frame);
if (LOG.isDebugEnabled())
LOG.debug("received {}#{} wasBlocked={}", frame, streamId, wasBlocked);
Runnable delegate = () -> super.onHeaders(streamId, frame, wasBlocked);
Runnable action = () -> processHeaders(frame, wasBlocked, delegate);
if (wasBlocked)
action.run();
else if (!event.compareAndSet(null, action))
throw new IllegalStateException();
}
@Override
public void onData(long streamId, DataFrame frame)
{
if (dataFrame != null)
if (LOG.isDebugEnabled())
LOG.debug("received {}#{}", frame, streamId);
Runnable delegate = () -> super.onData(streamId, frame);
if (!event.compareAndSet(null, () -> processData(frame, delegate)))
throw new IllegalStateException();
dataFrame = frame;
if (frame.isLast())
{
dataLast = true;
shutdownInput();
}
super.onData(streamId, frame);
}
private void shutdownInput()
{
remotelyClosed = true;
// We want to shutdown the input to avoid "spurious" wakeups where
// zero bytes could be spuriously read from the EndPoint after the
// stream is remotely closed by receiving a frame with last=true.
getEndPoint().shutdownInput(HTTP3ErrorCode.NO_ERROR.code());
}
}
}

View File

@ -16,6 +16,7 @@ package org.eclipse.jetty.http3.internal;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import org.eclipse.jetty.http3.internal.parser.ParserListener;
import org.eclipse.jetty.http3.qpack.QpackException;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
@ -29,13 +30,15 @@ public abstract class InstructionStreamConnection extends AbstractConnection imp
{
private static final Logger LOG = LoggerFactory.getLogger(InstructionStreamConnection.class);
private final ByteBufferPool byteBufferPool;
private final ParserListener listener;
private boolean useInputDirectByteBuffers = true;
private ByteBuffer buffer;
public InstructionStreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool)
public InstructionStreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, ParserListener listener)
{
super(endPoint, executor);
this.byteBufferPool = byteBufferPool;
this.listener = listener;
}
public boolean isUseInputDirectByteBuffers()
@ -102,13 +105,34 @@ public abstract class InstructionStreamConnection extends AbstractConnection imp
}
}
}
catch (QpackException.SessionException x)
{
fail(x.getErrorCode(), x.getMessage(), x);
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("could not process decoder stream {}", getEndPoint(), x);
byteBufferPool.release(buffer);
buffer = null;
getEndPoint().close(x);
fail(HTTP3ErrorCode.INTERNAL_ERROR.code(), "internal_error", x);
}
}
private void fail(long errorCode, String message, Throwable failure)
{
byteBufferPool.release(buffer);
buffer = null;
if (LOG.isDebugEnabled())
LOG.debug("could not process instruction stream {}", getEndPoint(), failure);
notifySessionFailure(errorCode, message, failure);
}
protected void notifySessionFailure(long error, String reason, Throwable failure)
{
try
{
listener.onSessionFailure(error, reason, failure);
}
catch (Throwable x)
{
LOG.info("failure while notifying listener {}", listener, x);
}
}

View File

@ -39,10 +39,10 @@ public class MessageFlusher extends IteratingCallback
private final MessageGenerator generator;
private Entry entry;
public MessageFlusher(ByteBufferPool byteBufferPool, QpackEncoder encoder, int maxHeadersLength, boolean useDirectByteBuffers)
public MessageFlusher(ByteBufferPool byteBufferPool, QpackEncoder encoder, boolean useDirectByteBuffers)
{
this.lease = new ByteBufferPool.Lease(byteBufferPool);
this.generator = new MessageGenerator(encoder, maxHeadersLength, useDirectByteBuffers);
this.generator = new MessageGenerator(encoder, useDirectByteBuffers);
}
public boolean offer(QuicStreamEndPoint endPoint, Frame frame, Callback callback)

View File

@ -143,7 +143,7 @@ public class UnidirectionalStreamConnection extends AbstractConnection implement
}
else if (streamType == EncoderStreamConnection.STREAM_TYPE)
{
EncoderStreamConnection newConnection = new EncoderStreamConnection(getEndPoint(), getExecutor(), byteBufferPool, decoder);
EncoderStreamConnection newConnection = new EncoderStreamConnection(getEndPoint(), getExecutor(), byteBufferPool, decoder, listener);
newConnection.setInputBufferSize(getInputBufferSize());
newConnection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers());
if (LOG.isDebugEnabled())
@ -152,7 +152,7 @@ public class UnidirectionalStreamConnection extends AbstractConnection implement
}
else if (streamType == DecoderStreamConnection.STREAM_TYPE)
{
DecoderStreamConnection newConnection = new DecoderStreamConnection(getEndPoint(), getExecutor(), byteBufferPool, encoder);
DecoderStreamConnection newConnection = new DecoderStreamConnection(getEndPoint(), getExecutor(), byteBufferPool, encoder, listener);
newConnection.setInputBufferSize(getInputBufferSize());
newConnection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers());
if (LOG.isDebugEnabled())

View File

@ -27,13 +27,11 @@ import org.eclipse.jetty.io.ByteBufferPool;
public class HeadersGenerator extends FrameGenerator
{
private final QpackEncoder encoder;
private final int maxLength;
private final boolean useDirectByteBuffers;
public HeadersGenerator(QpackEncoder encoder, int maxLength, boolean useDirectByteBuffers)
public HeadersGenerator(QpackEncoder encoder, boolean useDirectByteBuffers)
{
this.encoder = encoder;
this.maxLength = maxLength;
this.useDirectByteBuffers = useDirectByteBuffers;
}
@ -52,6 +50,7 @@ public class HeadersGenerator extends FrameGenerator
int frameTypeLength = VarLenInt.length(FrameType.HEADERS.type());
int maxHeaderLength = frameTypeLength + VarLenInt.MAX_LENGTH;
// The capacity of the buffer is larger than maxLength, but we need to enforce at most maxLength.
int maxLength = encoder.getMaxHeadersSize();
ByteBuffer buffer = lease.acquire(maxHeaderLength + maxLength, useDirectByteBuffers);
buffer.position(maxHeaderLength);
buffer.limit(buffer.position() + maxLength);

View File

@ -24,10 +24,10 @@ public class MessageGenerator
{
private final FrameGenerator[] generators = new FrameGenerator[FrameType.maxType() + 1];
public MessageGenerator(QpackEncoder encoder, int maxHeadersLength, boolean useDirectByteBuffers)
public MessageGenerator(QpackEncoder encoder, boolean useDirectByteBuffers)
{
generators[FrameType.DATA.type()] = new DataGenerator(useDirectByteBuffers);
generators[FrameType.HEADERS.type()] = new HeadersGenerator(encoder, maxHeadersLength, useDirectByteBuffers);
generators[FrameType.HEADERS.type()] = new HeadersGenerator(encoder, useDirectByteBuffers);
generators[FrameType.PUSH_PROMISE.type()] = new PushPromiseGenerator();
}

View File

@ -125,6 +125,6 @@ public abstract class BodyParser
public enum Result
{
NO_FRAME, FRAGMENT_FRAME, WHOLE_FRAME
NO_FRAME, BLOCKED_FRAME, FRAGMENT_FRAME, WHOLE_FRAME
}
}

View File

@ -105,7 +105,7 @@ public class HeadersBodyParser extends BodyParser
// needs to be parsed, then it's not the last frame.
boolean last = isLast.getAsBoolean() && !buffer.hasRemaining();
return decode(encoded, last) ? Result.WHOLE_FRAME : Result.NO_FRAME;
return decode(encoded, last) ? Result.WHOLE_FRAME : Result.BLOCKED_FRAME;
}
}
default:
@ -121,7 +121,7 @@ public class HeadersBodyParser extends BodyParser
{
try
{
return decoder.decode(streamId, encoded, (streamId, metaData) -> onHeaders(metaData, last));
return decoder.decode(streamId, encoded, (streamId, metaData, wasBlocked) -> onHeaders(metaData, last, wasBlocked));
}
catch (QpackException.StreamException x)
{
@ -144,19 +144,19 @@ public class HeadersBodyParser extends BodyParser
return false;
}
private void onHeaders(MetaData metaData, boolean last)
private void onHeaders(MetaData metaData, boolean last, boolean wasBlocked)
{
HeadersFrame frame = new HeadersFrame(metaData, last);
reset();
notifyHeaders(frame);
notifyHeaders(frame, wasBlocked);
}
protected void notifyHeaders(HeadersFrame frame)
protected void notifyHeaders(HeadersFrame frame, boolean wasBlocked)
{
ParserListener listener = getParserListener();
try
{
listener.onHeaders(streamId, frame);
listener.onHeaders(streamId, frame, wasBlocked);
}
catch (Throwable x)
{

View File

@ -72,6 +72,11 @@ public class MessageParser
return listener;
}
public boolean isDataMode()
{
return dataMode;
}
public void setDataMode(boolean enable)
{
this.dataMode = enable;
@ -100,7 +105,7 @@ public class MessageParser
state = State.BODY;
// If we are in data mode, but we did not parse a DATA frame, bail out.
if (dataMode && headerParser.getFrameType() != FrameType.DATA.type())
return Result.MODE_SWITCH;
return Result.SWITCH_MODE;
break;
}
return Result.NO_FRAME;
@ -143,18 +148,32 @@ public class MessageParser
if (LOG.isDebugEnabled())
LOG.debug("parsed {} empty frame body from {}", FrameType.from(frameType), BufferUtil.toDetailString(buffer));
reset();
return Result.FRAME;
}
else
{
BodyParser.Result result = bodyParser.parse(buffer);
if (LOG.isDebugEnabled())
LOG.debug("parsed {} {} body from {}", result, FrameType.from(frameType), BufferUtil.toDetailString(buffer));
// Not enough bytes, there is no frame.
if (result == BodyParser.Result.NO_FRAME)
return Result.NO_FRAME;
if (LOG.isDebugEnabled())
LOG.debug("parsed {} frame body from {}", FrameType.from(frameType), BufferUtil.toDetailString(buffer));
// Do not reset() if it is a fragment frame.
if (result == BodyParser.Result.FRAGMENT_FRAME)
return Result.FRAME;
reset();
if (result == BodyParser.Result.BLOCKED_FRAME)
return Result.BLOCKED_FRAME;
if (result == BodyParser.Result.WHOLE_FRAME)
reset();
return Result.FRAME;
throw new IllegalStateException();
}
return Result.FRAME;
}
}
default:
@ -180,7 +199,23 @@ public class MessageParser
public enum Result
{
NO_FRAME, FRAME, MODE_SWITCH
/**
* Indicates that no frame was parsed, either for lack of bytes, or because or errors.
*/
NO_FRAME,
/**
* Indicates that a frame was parsed.
*/
FRAME,
/**
* Indicates that a frame was parsed but its notification was deferred.
* This is the case of HEADERS frames that are waiting to be unblocked.
*/
BLOCKED_FRAME,
/**
* Indicates that a DATA frame was expected, but a HEADERS was found instead.
*/
SWITCH_MODE
}
private enum State

View File

@ -20,7 +20,7 @@ import org.eclipse.jetty.http3.frames.SettingsFrame;
public interface ParserListener
{
public default void onHeaders(long streamId, HeadersFrame frame)
public default void onHeaders(long streamId, HeadersFrame frame, boolean wasBlocked)
{
}
@ -54,9 +54,9 @@ public interface ParserListener
}
@Override
public void onHeaders(long streamId, HeadersFrame frame)
public void onHeaders(long streamId, HeadersFrame frame, boolean wasBlocked)
{
listener.onHeaders(streamId, frame);
listener.onHeaders(streamId, frame, wasBlocked);
}
@Override

View File

@ -55,7 +55,7 @@ public class DataGenerateParseTest
DataFrame input = new DataFrame(ByteBuffer.wrap(inputBytes), true);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(new NullByteBufferPool());
new MessageGenerator(null, 8192, true).generate(lease, 0, input, null);
new MessageGenerator(null, true).generate(lease, 0, input, null);
List<DataFrame> frames = new ArrayList<>();
MessageParser parser = new MessageParser(new ParserListener()

View File

@ -47,16 +47,18 @@ public class HeadersGenerateParseTest
.put("Cookie", "c=d");
HeadersFrame input = new HeadersFrame(new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, fields), true);
QpackEncoder encoder = new QpackEncoder(instructions -> {}, 100);
QpackEncoder encoder = new QpackEncoder(instructions -> {});
encoder.setMaxHeadersSize(4 * 1024);
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(new NullByteBufferPool());
new MessageGenerator(encoder, 8192, true).generate(lease, 0, input, null);
new MessageGenerator(encoder, true).generate(lease, 0, input, null);
QpackDecoder decoder = new QpackDecoder(instructions -> {}, 8192);
QpackDecoder decoder = new QpackDecoder(instructions -> {});
decoder.setMaxHeadersSize(4 * 1024);
List<HeadersFrame> frames = new ArrayList<>();
MessageParser parser = new MessageParser(new ParserListener()
{
@Override
public void onHeaders(long streamId, HeadersFrame frame)
public void onHeaders(long streamId, HeadersFrame frame, boolean wasBlocked)
{
frames.add(frame);
}

View File

@ -39,6 +39,7 @@ import org.eclipse.jetty.util.component.Dumpable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.eclipse.jetty.http3.qpack.QpackException.H3_GENERAL_PROTOCOL_ERROR;
import static org.eclipse.jetty.http3.qpack.QpackException.QPACK_DECOMPRESSION_FAILED;
import static org.eclipse.jetty.http3.qpack.QpackException.QPACK_ENCODER_STREAM_ERROR;
@ -55,8 +56,9 @@ public class QpackDecoder implements Dumpable
private final NBitIntegerDecoder _integerDecoder = new NBitIntegerDecoder();
private final InstructionHandler _instructionHandler = new InstructionHandler();
private final Map<Long, AtomicInteger> _blockedStreams = new HashMap<>();
private int _maxHeaderSize;
private int _maxHeadersSize;
private int _maxBlockedStreams;
private int _maxTableCapacity;
private static class MetaDataNotification
{
@ -71,21 +73,24 @@ public class QpackDecoder implements Dumpable
_handler = handler;
}
public void notifyHandler()
public void notifyHandler(boolean wasBlocked)
{
_handler.onMetaData(_streamId, _metaData);
_handler.onMetaData(_streamId, _metaData, wasBlocked);
}
}
/**
* @param maxHeaderSize The maximum allowed size of a headers block, expressed as total of all name and value characters, plus 32 per field
*/
public QpackDecoder(Instruction.Handler handler, int maxHeaderSize)
public QpackDecoder(Instruction.Handler handler)
{
_context = new QpackContext();
_handler = handler;
_parser = new DecoderInstructionParser(_instructionHandler);
_maxHeaderSize = maxHeaderSize;
}
@Deprecated
public QpackDecoder(Instruction.Handler handler, int maxHeaderSize)
{
this(handler);
setMaxHeadersSize(maxHeaderSize);
}
QpackContext getQpackContext()
@ -93,14 +98,17 @@ public class QpackDecoder implements Dumpable
return _context;
}
public int getMaxHeaderSize()
public int getMaxHeadersSize()
{
return _maxHeaderSize;
return _maxHeadersSize;
}
public void setMaxHeaderSize(int maxHeaderSize)
/**
* @param maxHeadersSize The maximum allowed size of a headers block, expressed as total of all name and value characters, plus 32 per field
*/
public void setMaxHeadersSize(int maxHeadersSize)
{
_maxHeaderSize = maxHeaderSize;
_maxHeadersSize = maxHeadersSize;
}
public int getMaxBlockedStreams()
@ -113,9 +121,19 @@ public class QpackDecoder implements Dumpable
_maxBlockedStreams = maxBlockedStreams;
}
public int getMaxTableCapacity()
{
return _maxTableCapacity;
}
public void setMaxTableCapacity(int maxTableCapacity)
{
_maxTableCapacity = maxTableCapacity;
}
public interface Handler
{
void onMetaData(long streamId, MetaData metadata);
void onMetaData(long streamId, MetaData metadata, boolean wasBlocked);
}
/**
@ -137,8 +155,8 @@ public class QpackDecoder implements Dumpable
// If the buffer is big, don't even think about decoding it
// Huffman may double the size, but it will only be a temporary allocation until detected in MetaDataBuilder.emit().
int maxHeaderSize = getMaxHeaderSize();
if (buffer.remaining() > maxHeaderSize)
int maxHeaderSize = getMaxHeadersSize();
if (maxHeaderSize > 0 && buffer.remaining() > maxHeaderSize)
throw new QpackException.SessionException(QPACK_DECOMPRESSION_FAILED, "header_too_large");
_integerDecoder.setPrefix(8);
@ -155,7 +173,7 @@ public class QpackDecoder implements Dumpable
// Decode the Required Insert Count using the DynamicTable state.
DynamicTable dynamicTable = _context.getDynamicTable();
int insertCount = dynamicTable.getInsertCount();
int maxDynamicTableSize = dynamicTable.getCapacity();
int maxDynamicTableSize = getMaxTableCapacity();
int requiredInsertCount = decodeInsertCount(encodedInsertCount, insertCount, maxDynamicTableSize);
try
@ -177,7 +195,7 @@ public class QpackDecoder implements Dumpable
else
{
if (LOG.isDebugEnabled())
LOG.debug("Deferred Decoding: streamId={}, encodedFieldSection={}", streamId, encodedFieldSection);
LOG.debug("Deferred decoding: streamId={}, encodedFieldSection={}", streamId, encodedFieldSection);
AtomicInteger blockedFields = _blockedStreams.computeIfAbsent(streamId, id -> new AtomicInteger(0));
blockedFields.incrementAndGet();
if (_blockedStreams.size() > _maxBlockedStreams)
@ -187,7 +205,7 @@ public class QpackDecoder implements Dumpable
boolean hadMetaData = !_metaDataNotifications.isEmpty();
notifyInstructionHandler();
notifyMetaDataHandler();
notifyMetaDataHandler(false);
return hadMetaData;
}
catch (QpackException.SessionException e)
@ -205,10 +223,13 @@ public class QpackDecoder implements Dumpable
* the Encoder to the Decoder. This method will fully consume the supplied {@link ByteBuffer} and produce instructions
* to update the state of the Decoder and its Dynamic Table.
* @param buffer a buffer containing bytes from the Encoder stream.
* @throws QpackException if there was an error parsing or handling the instructions.
* @throws QpackException.SessionException if there was an error parsing or handling the instructions.
*/
public void parseInstructions(ByteBuffer buffer) throws QpackException
public void parseInstructions(ByteBuffer buffer) throws QpackException.SessionException
{
if (LOG.isDebugEnabled())
LOG.debug("Parsing Instructions {}", BufferUtil.toDetailString(buffer));
try
{
while (BufferUtil.hasContent(buffer))
@ -216,7 +237,7 @@ public class QpackDecoder implements Dumpable
_parser.parse(buffer);
}
notifyInstructionHandler();
notifyMetaDataHandler();
notifyMetaDataHandler(true);
}
catch (QpackException.SessionException e)
{
@ -254,7 +275,7 @@ public class QpackDecoder implements Dumpable
{
iterator.remove();
long streamId = encodedFieldSection.getStreamId();
MetaData metaData = encodedFieldSection.decode(_context, _maxHeaderSize);
MetaData metaData = encodedFieldSection.decode(_context, getMaxHeadersSize());
if (_blockedStreams.get(streamId).decrementAndGet() <= 0)
_blockedStreams.remove(streamId);
if (LOG.isDebugEnabled())
@ -316,13 +337,16 @@ public class QpackDecoder implements Dumpable
_instructions.clear();
}
private void notifyMetaDataHandler()
private void notifyMetaDataHandler(boolean wasBlocked)
{
for (MetaDataNotification notification : _metaDataNotifications)
{
notification.notifyHandler();
}
// Copy the list to avoid re-entrance, where the call to
// notifyHandler() may end up calling again this method.
List<MetaDataNotification> notifications = new ArrayList<>(_metaDataNotifications);
_metaDataNotifications.clear();
for (MetaDataNotification notification : notifications)
{
notification.notifyHandler(wasBlocked);
}
}
InstructionHandler getInstructionHandler()
@ -336,8 +360,10 @@ public class QpackDecoder implements Dumpable
class InstructionHandler implements DecoderInstructionParser.Handler
{
@Override
public void onSetDynamicTableCapacity(int capacity)
public void onSetDynamicTableCapacity(int capacity) throws QpackException
{
if (capacity > getMaxTableCapacity())
throw new QpackException.StreamException(H3_GENERAL_PROTOCOL_ERROR, "DynamicTable capacity exceeds max capacity");
_context.getDynamicTable().setCapacity(capacity);
}

View File

@ -95,17 +95,25 @@ public class QpackEncoder implements Dumpable
private final Map<Long, StreamInfo> _streamInfoMap = new HashMap<>();
private final EncoderInstructionParser _parser;
private final InstructionHandler _instructionHandler = new InstructionHandler();
private int _knownInsertCount = 0;
private int _blockedStreams = 0;
private int _knownInsertCount;
private int _blockedStreams;
private int _maxHeadersSize;
private int _maxTableCapacity;
public QpackEncoder(Instruction.Handler handler, int maxBlockedStreams)
public QpackEncoder(Instruction.Handler handler)
{
_handler = handler;
_context = new QpackContext();
_maxBlockedStreams = maxBlockedStreams;
_parser = new EncoderInstructionParser(_instructionHandler);
}
@Deprecated
public QpackEncoder(Instruction.Handler handler, int maxBlockedStreams)
{
this(handler);
setMaxBlockedStreams(maxBlockedStreams);
}
Map<Long, StreamInfo> getStreamInfoMap()
{
return _streamInfoMap;
@ -121,7 +129,30 @@ public class QpackEncoder implements Dumpable
_maxBlockedStreams = maxBlockedStreams;
}
public int getCapacity()
public int getMaxHeadersSize()
{
return _maxHeadersSize;
}
public void setMaxHeadersSize(int maxHeadersSize)
{
_maxHeadersSize = maxHeadersSize;
}
public int getMaxTableCapacity()
{
return _maxTableCapacity;
}
public void setMaxTableCapacity(int maxTableCapacity)
{
_maxTableCapacity = maxTableCapacity;
int capacity = getTableCapacity();
if (capacity > maxTableCapacity)
setTableCapacity(maxTableCapacity);
}
public int getTableCapacity()
{
return _context.getDynamicTable().getCapacity();
}
@ -131,11 +162,16 @@ public class QpackEncoder implements Dumpable
*
* @param capacity the new capacity.
*/
public void setCapacity(int capacity)
public void setTableCapacity(int capacity)
{
_context.getDynamicTable().setCapacity(capacity);
_handler.onInstructions(List.of(new SetCapacityInstruction(capacity)));
notifyInstructionHandler();
try (AutoLock ignored = lock.lock())
{
if (capacity > getMaxTableCapacity())
throw new IllegalArgumentException("DynamicTable capacity exceeds max capacity");
_context.getDynamicTable().setCapacity(capacity);
_handler.onInstructions(List.of(new SetCapacityInstruction(capacity)));
notifyInstructionHandler();
}
}
/**
@ -151,7 +187,7 @@ public class QpackEncoder implements Dumpable
*/
public void encode(ByteBuffer buffer, long streamId, MetaData metadata) throws QpackException
{
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
if (LOG.isDebugEnabled())
LOG.debug("Encoding: streamId={}, metadata={}", streamId, metadata);
@ -159,6 +195,9 @@ public class QpackEncoder implements Dumpable
// Verify that we can encode without errors.
if (metadata.getFields() != null)
{
// TODO: enforce that the length of the header is less than maxHeadersSize.
// See RFC 9114, section 4.2.2.
for (HttpField field : metadata.getFields())
{
String name = field.getName();
@ -252,7 +291,7 @@ public class QpackEncoder implements Dumpable
*/
public void parseInstructions(ByteBuffer buffer) throws QpackException
{
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
while (BufferUtil.hasContent(buffer))
{
@ -279,43 +318,46 @@ public class QpackEncoder implements Dumpable
*/
public boolean insert(HttpField field)
{
DynamicTable dynamicTable = _context.getDynamicTable();
if (field.getValue() == null)
field = new HttpField(field.getHeader(), field.getName(), "");
// If we should not index this entry or there is no room to insert it, then just return false.
boolean canCreateEntry = shouldIndex(field) && dynamicTable.canInsert(field);
if (!canCreateEntry)
return false;
// Can we insert by duplicating an existing entry?
Entry entry = _context.get(field);
if (entry != null)
try (AutoLock ignored = lock.lock())
{
int index = _context.indexOf(entry);
DynamicTable dynamicTable = _context.getDynamicTable();
if (field.getValue() == null)
field = new HttpField(field.getHeader(), field.getName(), "");
// If we should not index this entry or there is no room to insert it, then just return false.
boolean canCreateEntry = shouldIndex(field) && dynamicTable.canInsert(field);
if (!canCreateEntry)
return false;
// Can we insert by duplicating an existing entry?
Entry entry = _context.get(field);
if (entry != null)
{
int index = _context.indexOf(entry);
dynamicTable.add(new Entry(field));
_instructions.add(new DuplicateInstruction(index));
notifyInstructionHandler();
return true;
}
// Can we insert by referencing a name?
boolean huffman = shouldHuffmanEncode(field);
Entry nameEntry = _context.get(field.getName());
if (nameEntry != null)
{
int index = _context.indexOf(nameEntry);
dynamicTable.add(new Entry(field));
_instructions.add(new IndexedNameEntryInstruction(!nameEntry.isStatic(), index, huffman, field.getValue()));
notifyInstructionHandler();
return true;
}
// Add the entry without referencing an existing entry.
dynamicTable.add(new Entry(field));
_instructions.add(new DuplicateInstruction(index));
_instructions.add(new LiteralNameEntryInstruction(field, huffman));
notifyInstructionHandler();
return true;
}
// Can we insert by referencing a name?
boolean huffman = shouldHuffmanEncode(field);
Entry nameEntry = _context.get(field.getName());
if (nameEntry != null)
{
int index = _context.indexOf(nameEntry);
dynamicTable.add(new Entry(field));
_instructions.add(new IndexedNameEntryInstruction(!nameEntry.isStatic(), index, huffman, field.getValue()));
notifyInstructionHandler();
return true;
}
// Add the entry without referencing an existing entry.
dynamicTable.add(new Entry(field));
_instructions.add(new LiteralNameEntryInstruction(field, huffman));
notifyInstructionHandler();
return true;
}
/**
@ -327,8 +369,11 @@ public class QpackEncoder implements Dumpable
*/
public void streamCancellation(long streamId)
{
_instructionHandler.onStreamCancellation(streamId);
notifyInstructionHandler();
try (AutoLock ignored = lock.lock())
{
_instructionHandler.onStreamCancellation(streamId);
notifyInstructionHandler();
}
}
protected boolean shouldIndex(HttpField httpField)

View File

@ -48,6 +48,6 @@ public class DuplicateInstruction implements Instruction
@Override
public String toString()
{
return String.format("%s@%x", getClass().getSimpleName(), hashCode());
return String.format("%s@%x[index=%d]", getClass().getSimpleName(), hashCode(), getIndex());
}
}

View File

@ -83,6 +83,6 @@ public class IndexedNameEntryInstruction implements Instruction
@Override
public String toString()
{
return String.format("%s@%x", getClass().getSimpleName(), hashCode());
return String.format("%s@%x[index=%d,name=%s]", getClass().getSimpleName(), hashCode(), getIndex(), getValue());
}
}

View File

@ -48,6 +48,6 @@ public class InsertCountIncrementInstruction implements Instruction
@Override
public String toString()
{
return String.format("%s@%x", getClass().getSimpleName(), hashCode());
return String.format("%s@%x[increment=%d]", getClass().getSimpleName(), hashCode(), getIncrement());
}
}

View File

@ -93,6 +93,6 @@ public class LiteralNameEntryInstruction implements Instruction
@Override
public String toString()
{
return String.format("%s@%x", getClass().getSimpleName(), hashCode());
return String.format("%s@%x[name=%s,value=%s]", getClass().getSimpleName(), hashCode(), getName(), getValue());
}
}

View File

@ -48,6 +48,6 @@ public class SectionAcknowledgmentInstruction implements Instruction
@Override
public String toString()
{
return String.format("%s@%x", getClass().getSimpleName(), hashCode());
return String.format("%s@%x[stream=%d]", getClass().getSimpleName(), hashCode(), _streamId);
}
}

View File

@ -48,6 +48,6 @@ public class SetCapacityInstruction implements Instruction
@Override
public String toString()
{
return String.format("%s@%x", getClass().getSimpleName(), hashCode());
return String.format("%s@%x[capacity=%d]", getClass().getSimpleName(), hashCode(), getCapacity());
}
}

View File

@ -43,6 +43,6 @@ public class StreamCancellationInstruction implements Instruction
@Override
public String toString()
{
return String.format("%s@%x", getClass().getSimpleName(), hashCode());
return String.format("%s@%x[stream=%d]", getClass().getSimpleName(), hashCode(), _streamId);
}
}

View File

@ -78,7 +78,7 @@ public class MetaDataBuilder
String value = field.getValue();
int fieldSize = name.length() + (value == null ? 0 : value.length());
_size += fieldSize + 32;
if (_size > _maxSize)
if (_maxSize > 0 && _size > _maxSize)
throw new QpackException.SessionException(QpackException.QPACK_DECOMPRESSION_FAILED, String.format("Header size %d > %d", _size, _maxSize));
if (field instanceof StaticTableHttpField)

View File

@ -67,7 +67,10 @@ public class BlockedStreamsTest
// Set capacity of the encoder & decoder to allow entries to be added to the table.
int capacity = 1024;
_encoder.setCapacity(capacity);
_encoder.setMaxTableCapacity(capacity);
_encoder.setTableCapacity(capacity);
_decoder.setMaxTableCapacity(capacity);
Instruction instruction = _encoderHandler.getInstruction();
assertThat(instruction, instanceOf(SetCapacityInstruction.class));
_decoder.parseInstructions(QpackTestUtil.toBuffer(instruction));
@ -178,7 +181,10 @@ public class BlockedStreamsTest
// Set capacity of the encoder & decoder to allow entries to be added to the table.
int capacity = 1024;
_encoder.setCapacity(capacity);
_encoder.setMaxTableCapacity(capacity);
_encoder.setTableCapacity(capacity);
_decoder.setMaxTableCapacity(capacity);
Instruction instruction = _encoderHandler.getInstruction();
assertThat(instruction, instanceOf(SetCapacityInstruction.class));
_decoder.parseInstructions(QpackTestUtil.toBuffer(instruction));

View File

@ -96,7 +96,7 @@ public class EncodeDecodeTest
// B.2. Dynamic Table.
// Set capacity to 220.
_encoder.setCapacity(220);
_encoder.setTableCapacity(220);
Instruction instruction = _encoderHandler.getInstruction();
assertThat(instruction, instanceOf(SetCapacityInstruction.class));
assertThat(((SetCapacityInstruction)instruction).getCapacity(), is(220));

View File

@ -34,14 +34,14 @@ public class EvictionTest
private final TestEncoderHandler _encoderHandler = new TestEncoderHandler();
private final Random random = new Random();
private static final int MAX_BLOCKED_STREAMS = 5;
private static final int MAX_HEADER_SIZE = 1024;
@BeforeEach
public void before()
{
_decoder = new QpackDecoder(_decoderHandler, MAX_HEADER_SIZE);
_encoder = new QpackEncoder(_encoderHandler, MAX_BLOCKED_STREAMS)
_decoder = new QpackDecoder(_decoderHandler);
_decoder.setMaxHeadersSize(1024);
_decoder.setMaxTableCapacity(4 * 1024);
_encoder = new QpackEncoder(_encoderHandler)
{
@Override
protected boolean shouldHuffmanEncode(HttpField httpField)
@ -49,12 +49,14 @@ public class EvictionTest
return false;
}
};
_encoder.setMaxTableCapacity(4 * 1024);
_encoder.setTableCapacity(4 * 1024);
_encoder.setMaxBlockedStreams(5);
}
@Test
public void test() throws Exception
{
_encoder.setCapacity(1024);
ByteBuffer encodedFields = ByteBuffer.allocate(1024);
for (int i = 0; i < 10000; i++)

View File

@ -25,7 +25,7 @@ public class TestDecoderHandler implements QpackDecoder.Handler, Instruction.Han
private final LinkedList<Instruction> _instructionList = new LinkedList<>();
@Override
public void onMetaData(long streamId, MetaData metadata)
public void onMetaData(long streamId, MetaData metadata, boolean wasBlocked)
{
_metadataList.add(metadata);
}

View File

@ -17,6 +17,7 @@ import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.GoAwayFrame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.HTTP3Session;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.util.Callback;
@ -27,7 +28,7 @@ public class HTTP3SessionServer extends HTTP3Session implements Session.Server
{
private static final Logger LOG = LoggerFactory.getLogger(HTTP3SessionServer.class);
public HTTP3SessionServer(ServerHTTP3Session session, Session.Server.Listener listener)
public HTTP3SessionServer(ServerHTTP3Session session, Server.Listener listener)
{
super(session, listener);
}
@ -58,7 +59,7 @@ public class HTTP3SessionServer extends HTTP3Session implements Session.Server
}
@Override
public void onHeaders(long streamId, HeadersFrame frame)
public void onHeaders(long streamId, HeadersFrame frame, boolean wasBlocked)
{
if (frame.getMetaData().isRequest())
{
@ -71,10 +72,19 @@ public class HTTP3SessionServer extends HTTP3Session implements Session.Server
}
else
{
super.onHeaders(streamId, frame);
super.onHeaders(streamId, frame, wasBlocked);
}
}
@Override
public void onSettings(SettingsFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("received {} on {}", frame, this);
getProtocolSession().onSettings(frame);
super.onSettings(frame);
}
@Override
public void writeControlFrame(Frame frame, Callback callback)
{
@ -95,26 +105,6 @@ public class HTTP3SessionServer extends HTTP3Session implements Session.Server
return super.newGoAwayFrame(graceful);
}
@Override
protected void onSettingMaxTableCapacity(long value)
{
getProtocolSession().getQpackEncoder().setCapacity((int)value);
}
@Override
protected void onSettingMaxFieldSectionSize(long value)
{
getProtocolSession().getQpackDecoder().setMaxHeaderSize((int)value);
}
@Override
protected void onSettingMaxBlockedStreams(long value)
{
ServerHTTP3Session session = getProtocolSession();
session.getQpackDecoder().setMaxBlockedStreams((int)value);
session.getQpackEncoder().setMaxBlockedStreams((int)value);
}
private void notifyAccept()
{
Server.Listener listener = getListener();

View File

@ -46,16 +46,14 @@ public class HttpChannelOverHTTP3 extends HttpChannel
private final AutoLock lock = new AutoLock();
private final HTTP3Stream stream;
private final ServerHTTP3StreamConnection connection;
private HttpInput.Content content;
private boolean expect100Continue;
private boolean delayedUntilContent;
public HttpChannelOverHTTP3(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransportOverHTTP3 transport, HTTP3Stream stream, ServerHTTP3StreamConnection connection)
public HttpChannelOverHTTP3(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransportOverHTTP3 transport, HTTP3Stream stream)
{
super(connector, configuration, endPoint, transport);
this.stream = stream;
this.connection = connection;
}
@Override
@ -142,8 +140,6 @@ public class HttpChannelOverHTTP3 extends HttpChannel
// demand for content, so when it arrives we can dispatch.
if (delayedUntilContent)
stream.demand();
else
connection.setApplicationMode(true);
}
if (LOG.isDebugEnabled())
@ -195,9 +191,6 @@ public class HttpChannelOverHTTP3 extends HttpChannel
boolean wasDelayed = delayedUntilContent;
delayedUntilContent = false;
if (wasDelayed)
connection.setApplicationMode(true);
return wasDelayed || woken ? this : null;
}
@ -222,9 +215,6 @@ public class HttpChannelOverHTTP3 extends HttpChannel
boolean wasDelayed = delayedUntilContent;
delayedUntilContent = false;
if (wasDelayed)
connection.setApplicationMode(true);
return wasDelayed || handle ? this : null;
}
@ -233,9 +223,6 @@ public class HttpChannelOverHTTP3 extends HttpChannel
boolean wasDelayed = delayedUntilContent;
delayedUntilContent = false;
if (wasDelayed)
connection.setApplicationMode(true);
getHttpTransport().onIdleTimeout(failure);
boolean neverDispatched = getState().isIdle();

View File

@ -13,6 +13,7 @@
package org.eclipse.jetty.http3.server.internal;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@ -43,6 +44,7 @@ public class ServerHTTP3Session extends ServerProtocolSession
{
private static final Logger LOG = LoggerFactory.getLogger(ServerHTTP3Session.class);
private final HTTP3Configuration configuration;
private final HTTP3SessionServer session;
private final QpackEncoder encoder;
private final QpackDecoder decoder;
@ -52,7 +54,8 @@ public class ServerHTTP3Session extends ServerProtocolSession
public ServerHTTP3Session(HTTP3Configuration configuration, ServerQuicSession quicSession, Session.Server.Listener listener)
{
super(quicSession);
this.session = new HTTP3SessionServer(this, listener);
this.configuration = configuration;
session = new HTTP3SessionServer(this, listener);
addBean(session);
session.setStreamIdleTimeout(configuration.getStreamIdleTimeout());
@ -62,7 +65,8 @@ public class ServerHTTP3Session extends ServerProtocolSession
long encoderStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL);
QuicStreamEndPoint encoderEndPoint = openInstructionEndPoint(encoderStreamId);
InstructionFlusher encoderInstructionFlusher = new InstructionFlusher(quicSession, encoderEndPoint, EncoderStreamConnection.STREAM_TYPE);
this.encoder = new QpackEncoder(new InstructionHandler(encoderInstructionFlusher), configuration.getMaxBlockedStreams());
encoder = new QpackEncoder(new InstructionHandler(encoderInstructionFlusher));
encoder.setMaxHeadersSize(configuration.getMaxResponseHeadersSize());
addBean(encoder);
if (LOG.isDebugEnabled())
LOG.debug("created encoder stream #{} on {}", encoderStreamId, encoderEndPoint);
@ -70,19 +74,19 @@ public class ServerHTTP3Session extends ServerProtocolSession
long decoderStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL);
QuicStreamEndPoint decoderEndPoint = openInstructionEndPoint(decoderStreamId);
InstructionFlusher decoderInstructionFlusher = new InstructionFlusher(quicSession, decoderEndPoint, DecoderStreamConnection.STREAM_TYPE);
this.decoder = new QpackDecoder(new InstructionHandler(decoderInstructionFlusher), configuration.getMaxRequestHeadersSize());
decoder = new QpackDecoder(new InstructionHandler(decoderInstructionFlusher));
addBean(decoder);
if (LOG.isDebugEnabled())
LOG.debug("created decoder stream #{} on {}", decoderStreamId, decoderEndPoint);
long controlStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL);
QuicStreamEndPoint controlEndPoint = openControlEndPoint(controlStreamId);
this.controlFlusher = new ControlFlusher(quicSession, controlEndPoint, configuration.isUseOutputDirectByteBuffers());
controlFlusher = new ControlFlusher(quicSession, controlEndPoint, configuration.isUseOutputDirectByteBuffers());
addBean(controlFlusher);
if (LOG.isDebugEnabled())
LOG.debug("created control stream #{} on {}", controlStreamId, controlEndPoint);
this.messageFlusher = new MessageFlusher(quicSession.getByteBufferPool(), encoder, configuration.getMaxResponseHeadersSize(), configuration.isUseOutputDirectByteBuffers());
messageFlusher = new MessageFlusher(quicSession.getByteBufferPool(), encoder, configuration.isUseOutputDirectByteBuffers());
addBean(messageFlusher);
}
@ -104,16 +108,86 @@ public class ServerHTTP3Session extends ServerProtocolSession
@Override
protected void onStart()
{
// Queue the mandatory SETTINGS frame.
Map<Long, Long> settings = session.onPreface();
if (settings == null)
settings = Map.of();
// TODO: add default settings.
settings = settings != null ? new HashMap<>(settings) : new HashMap<>();
settings.compute(SettingsFrame.MAX_TABLE_CAPACITY, (k, v) ->
{
if (v == null)
{
v = (long)configuration.getMaxDecoderTableCapacity();
if (v == 0)
v = null;
}
return v;
});
settings.compute(SettingsFrame.MAX_FIELD_SECTION_SIZE, (k, v) ->
{
if (v == null)
{
v = (long)configuration.getMaxRequestHeadersSize();
if (v <= 0)
v = null;
}
return v;
});
settings.compute(SettingsFrame.MAX_BLOCKED_STREAMS, (k, v) ->
{
if (v == null)
{
v = (long)configuration.getMaxBlockedStreams();
if (v == 0)
v = null;
}
return v;
});
if (LOG.isDebugEnabled())
LOG.debug("configuring decoder {} on {}", settings, this);
settings.forEach((key, value) ->
{
if (key == SettingsFrame.MAX_TABLE_CAPACITY)
decoder.setMaxTableCapacity(value.intValue());
else if (key == SettingsFrame.MAX_FIELD_SECTION_SIZE)
decoder.setMaxHeadersSize(value.intValue());
else if (key == SettingsFrame.MAX_BLOCKED_STREAMS)
decoder.setMaxBlockedStreams(value.intValue());
});
// Queue the mandatory SETTINGS frame.
SettingsFrame frame = new SettingsFrame(settings);
if (controlFlusher.offer(frame, Callback.from(Invocable.InvocationType.NON_BLOCKING, session::onOpen, this::failControlStream)))
controlFlusher.iterate();
}
public void onSettings(SettingsFrame frame)
{
Map<Long, Long> settings = frame.getSettings();
if (LOG.isDebugEnabled())
LOG.debug("configuring encoder {} on {}", settings, this);
settings.forEach((key, value) ->
{
if (key == SettingsFrame.MAX_TABLE_CAPACITY)
{
int maxTableCapacity = value.intValue();
encoder.setMaxTableCapacity(maxTableCapacity);
encoder.setTableCapacity(Math.min(maxTableCapacity, configuration.getInitialEncoderTableCapacity()));
}
else if (key == SettingsFrame.MAX_FIELD_SECTION_SIZE)
{
// Must cap the maxHeaderSize to avoid large allocations.
int maxHeadersSize = Math.min(value.intValue(), configuration.getMaxResponseHeadersSize());
encoder.setMaxHeadersSize(maxHeadersSize);
}
else if (key == SettingsFrame.MAX_BLOCKED_STREAMS)
{
int maxBlockedStreams = value.intValue();
encoder.setMaxBlockedStreams(maxBlockedStreams);
}
});
}
private void failControlStream(Throwable failure)
{
long error = HTTP3ErrorCode.CLOSED_CRITICAL_STREAM_ERROR.code();

View File

@ -46,7 +46,7 @@ public class ServerHTTP3StreamConnection extends HTTP3StreamConnection
public Runnable onRequest(HTTP3StreamServer stream, HeadersFrame frame)
{
HttpTransportOverHTTP3 transport = new HttpTransportOverHTTP3(stream);
HttpChannelOverHTTP3 channel = new HttpChannelOverHTTP3(connector, httpConfiguration, getEndPoint(), transport, stream, this);
HttpChannelOverHTTP3 channel = new HttpChannelOverHTTP3(connector, httpConfiguration, getEndPoint(), transport, stream);
stream.setAttachment(channel);
return channel.onRequest(frame);
}

View File

@ -26,6 +26,7 @@ import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http3.HTTP3Configuration;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.client.internal.HTTP3SessionClient;
@ -42,9 +43,12 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ClientServerTest extends AbstractClientServerTest
@ -135,15 +139,15 @@ public class ClientServerTest extends AbstractClientServerTest
assertTrue(settingsLatch.await(5, TimeUnit.SECONDS));
HTTP3SessionServer serverSession = serverSessionRef.get();
assertEquals(maxTableCapacity.getValue(), serverSession.getProtocolSession().getQpackEncoder().getCapacity());
assertEquals(maxTableCapacity.getValue(), serverSession.getProtocolSession().getQpackEncoder().getMaxTableCapacity());
assertEquals(maxBlockedStreams.getValue(), serverSession.getProtocolSession().getQpackEncoder().getMaxBlockedStreams());
assertEquals(maxBlockedStreams.getValue(), serverSession.getProtocolSession().getQpackDecoder().getMaxBlockedStreams());
assertEquals(maxHeaderSize.getValue(), serverSession.getProtocolSession().getQpackDecoder().getMaxHeaderSize());
assertEquals(maxHeaderSize.getValue(), serverSession.getProtocolSession().getQpackDecoder().getMaxHeadersSize());
assertEquals(maxTableCapacity.getValue(), clientSession.getProtocolSession().getQpackEncoder().getCapacity());
assertEquals(maxTableCapacity.getValue(), clientSession.getProtocolSession().getQpackEncoder().getMaxTableCapacity());
assertEquals(maxBlockedStreams.getValue(), clientSession.getProtocolSession().getQpackEncoder().getMaxBlockedStreams());
assertEquals(maxBlockedStreams.getValue(), clientSession.getProtocolSession().getQpackDecoder().getMaxBlockedStreams());
assertEquals(maxHeaderSize.getValue(), clientSession.getProtocolSession().getQpackDecoder().getMaxHeaderSize());
assertEquals(maxHeaderSize.getValue(), clientSession.getProtocolSession().getQpackDecoder().getMaxHeadersSize());
}
@Test
@ -370,7 +374,11 @@ public class ClientServerTest extends AbstractClientServerTest
});
int maxRequestHeadersSize = 128;
http3Client.getHTTP3Configuration().setMaxRequestHeadersSize(maxRequestHeadersSize);
HTTP3Configuration http3Configuration = http3Client.getHTTP3Configuration();
http3Configuration.setMaxRequestHeadersSize(maxRequestHeadersSize);
// Disable the dynamic table, otherwise the large header
// is sent as string literal on the encoder stream.
http3Configuration.setInitialEncoderTableCapacity(0);
Session.Client clientSession = newSession(new Session.Client.Listener() {});
CountDownLatch requestFailureLatch = new CountDownLatch(1);
@ -406,10 +414,17 @@ public class ClientServerTest extends AbstractClientServerTest
public void testResponseHeadersTooLarge() throws Exception
{
int maxResponseHeadersSize = 128;
CountDownLatch settingsLatch = new CountDownLatch(2);
AtomicReference<Session> serverSessionRef = new AtomicReference<>();
CountDownLatch responseFailureLatch = new CountDownLatch(1);
start(new Session.Server.Listener()
{
@Override
public void onSettings(Session session, SettingsFrame frame)
{
settingsLatch.countDown();
}
@Override
public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame)
{
@ -441,9 +456,22 @@ public class ClientServerTest extends AbstractClientServerTest
});
AbstractHTTP3ServerConnectionFactory h3 = connector.getConnectionFactory(AbstractHTTP3ServerConnectionFactory.class);
assertNotNull(h3);
h3.getHTTP3Configuration().setMaxResponseHeadersSize(maxResponseHeadersSize);
HTTP3Configuration http3Configuration = h3.getHTTP3Configuration();
// Disable the dynamic table, otherwise the large header
// is sent as string literal on the encoder stream.
http3Configuration.setInitialEncoderTableCapacity(0);
http3Configuration.setMaxResponseHeadersSize(maxResponseHeadersSize);
Session.Client clientSession = newSession(new Session.Client.Listener() {});
Session.Client clientSession = newSession(new Session.Client.Listener()
{
@Override
public void onSettings(Session session, SettingsFrame frame)
{
settingsLatch.countDown();
}
});
assertTrue(settingsLatch.await(5, TimeUnit.SECONDS));
CountDownLatch streamFailureLatch = new CountDownLatch(1);
clientSession.newRequest(new HeadersFrame(newRequest("/large"), true), new Stream.Client.Listener()
@ -474,5 +502,118 @@ public class ClientServerTest extends AbstractClientServerTest
assertTrue(responseLatch.await(5, TimeUnit.SECONDS));
}
// TODO: write a test calling readData() from onRequest() (not from onDataAvailable()).
@Test
public void testHeadersThenTrailers() throws Exception
{
CountDownLatch requestLatch = new CountDownLatch(1);
CountDownLatch trailerLatch = new CountDownLatch(1);
start(new Session.Server.Listener()
{
@Override
public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame)
{
stream.demand();
requestLatch.countDown();
return new Stream.Server.Listener()
{
@Override
public void onDataAvailable(Stream.Server stream)
{
// TODO: we should not be needing this!!!
Stream.Data data = stream.readData();
assertNull(data);
stream.demand();
}
@Override
public void onTrailer(Stream.Server stream, HeadersFrame frame)
{
trailerLatch.countDown();
stream.respond(new HeadersFrame(new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY), true));
}
};
}
});
Session.Client clientSession = newSession(new Session.Client.Listener() {});
CountDownLatch responseLatch = new CountDownLatch(1);
Stream clientStream = clientSession.newRequest(new HeadersFrame(newRequest("/large"), false), new Stream.Client.Listener()
{
@Override
public void onResponse(Stream.Client stream, HeadersFrame frame)
{
MetaData.Response response = (MetaData.Response)frame.getMetaData();
assertEquals(HttpStatus.OK_200, response.getStatus());
responseLatch.countDown();
}
})
.get(5, TimeUnit.SECONDS);
assertTrue(requestLatch.await(5, TimeUnit.SECONDS));
clientStream.trailer(new HeadersFrame(new MetaData(HttpVersion.HTTP_3, HttpFields.EMPTY), true));
assertTrue(trailerLatch.await(5, TimeUnit.SECONDS));
assertTrue(responseLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testReadDataFromOnRequest() throws Exception
{
CountDownLatch requestLatch = new CountDownLatch(1);
CountDownLatch data1Latch = new CountDownLatch(1);
start(new Session.Server.Listener()
{
@Override
public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame)
{
requestLatch.countDown();
Stream.Data data = await().atMost(5, TimeUnit.SECONDS).until(stream::readData, notNullValue());
data.complete();
stream.demand();
data1Latch.countDown();
return new Stream.Server.Listener()
{
@Override
public void onDataAvailable(Stream.Server stream)
{
Stream.Data data = stream.readData();
if (data == null)
{
stream.demand();
return;
}
data.complete();
if (!data.isLast())
{
stream.demand();
return;
}
stream.respond(new HeadersFrame(new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY), true));
}
};
}
});
Session.Client clientSession = newSession(new Session.Client.Listener() {});
CountDownLatch responseLatch = new CountDownLatch(1);
Stream clientStream = clientSession.newRequest(new HeadersFrame(newRequest("/large"), false), new Stream.Client.Listener()
{
@Override
public void onResponse(Stream.Client stream, HeadersFrame frame)
{
responseLatch.countDown();
}
})
.get(5, TimeUnit.SECONDS);
assertTrue(requestLatch.await(5, TimeUnit.SECONDS));
clientStream.data(new DataFrame(ByteBuffer.allocate(1024), false));
assertTrue(data1Latch.await(555, TimeUnit.SECONDS));
clientStream.data(new DataFrame(ByteBuffer.allocate(512), true));
assertTrue(responseLatch.await(5, TimeUnit.SECONDS));
}
}

View File

@ -1177,6 +1177,9 @@ public class GoAwayTest extends AbstractClientServerTest
assertTrue(settingsLatch.await(5, TimeUnit.SECONDS));
// Wait a bit more to allow the unidirectional streams to be setup.
Thread.sleep(1000);
// Stopping the HttpClient will also stop the HTTP3Client.
httpClient.stop();
@ -1234,6 +1237,9 @@ public class GoAwayTest extends AbstractClientServerTest
assertTrue(settingsLatch.await(5, TimeUnit.SECONDS));
// Wait a bit more to allow the unidirectional streams to be setup.
Thread.sleep(1000);
server.stop();
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));