Merged branch 'jetty-11.0.x' into 'jetty-12.0.x'.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2023-05-15 16:48:06 +02:00
commit 7bbf966949
No known key found for this signature in database
GPG Key ID: 1677D141BCF3584D
38 changed files with 847 additions and 366 deletions

View File

@ -13,6 +13,7 @@
package org.eclipse.jetty.http3.client;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@ -44,6 +45,7 @@ public class ClientHTTP3Session extends ClientProtocolSession
{
private static final Logger LOG = LoggerFactory.getLogger(ClientHTTP3Session.class);
private final HTTP3Configuration configuration;
private final HTTP3SessionClient session;
private final QpackEncoder encoder;
private final QpackDecoder decoder;
@ -53,7 +55,8 @@ public class ClientHTTP3Session extends ClientProtocolSession
public ClientHTTP3Session(HTTP3Configuration configuration, ClientQuicSession quicSession, Session.Client.Listener listener, Promise<Session.Client> promise)
{
super(quicSession);
this.session = new HTTP3SessionClient(this, listener, promise);
this.configuration = configuration;
session = new HTTP3SessionClient(this, listener, promise);
addBean(session);
session.setStreamIdleTimeout(configuration.getStreamIdleTimeout());
@ -63,7 +66,8 @@ public class ClientHTTP3Session extends ClientProtocolSession
long encoderStreamId = getQuicSession().newStreamId(StreamType.CLIENT_UNIDIRECTIONAL);
QuicStreamEndPoint encoderEndPoint = openInstructionEndPoint(encoderStreamId);
InstructionFlusher encoderInstructionFlusher = new InstructionFlusher(quicSession, encoderEndPoint, EncoderStreamConnection.STREAM_TYPE);
this.encoder = new QpackEncoder(new InstructionHandler(encoderInstructionFlusher), configuration.getMaxBlockedStreams());
encoder = new QpackEncoder(new InstructionHandler(encoderInstructionFlusher));
encoder.setMaxHeadersSize(configuration.getMaxRequestHeadersSize());
addBean(encoder);
if (LOG.isDebugEnabled())
LOG.debug("created encoder stream #{} on {}", encoderStreamId, encoderEndPoint);
@ -71,19 +75,19 @@ public class ClientHTTP3Session extends ClientProtocolSession
long decoderStreamId = getQuicSession().newStreamId(StreamType.CLIENT_UNIDIRECTIONAL);
QuicStreamEndPoint decoderEndPoint = openInstructionEndPoint(decoderStreamId);
InstructionFlusher decoderInstructionFlusher = new InstructionFlusher(quicSession, decoderEndPoint, DecoderStreamConnection.STREAM_TYPE);
this.decoder = new QpackDecoder(new InstructionHandler(decoderInstructionFlusher), configuration.getMaxResponseHeadersSize());
decoder = new QpackDecoder(new InstructionHandler(decoderInstructionFlusher));
addBean(decoder);
if (LOG.isDebugEnabled())
LOG.debug("created decoder stream #{} on {}", decoderStreamId, decoderEndPoint);
long controlStreamId = getQuicSession().newStreamId(StreamType.CLIENT_UNIDIRECTIONAL);
QuicStreamEndPoint controlEndPoint = openControlEndPoint(controlStreamId);
this.controlFlusher = new ControlFlusher(quicSession, controlEndPoint, true);
controlFlusher = new ControlFlusher(quicSession, controlEndPoint, true);
addBean(controlFlusher);
if (LOG.isDebugEnabled())
LOG.debug("created control stream #{} on {}", controlStreamId, controlEndPoint);
this.messageFlusher = new MessageFlusher(quicSession.getByteBufferPool(), encoder, configuration.getMaxRequestHeadersSize(), configuration.isUseOutputDirectByteBuffers());
messageFlusher = new MessageFlusher(quicSession.getByteBufferPool(), encoder, configuration.isUseOutputDirectByteBuffers());
addBean(messageFlusher);
}
@ -105,16 +109,86 @@ public class ClientHTTP3Session extends ClientProtocolSession
@Override
protected void onStart()
{
// Queue the mandatory SETTINGS frame.
Map<Long, Long> settings = session.onPreface();
if (settings == null)
settings = Map.of();
// TODO: add default settings.
settings = settings != null ? new HashMap<>(settings) : new HashMap<>();
settings.compute(SettingsFrame.MAX_TABLE_CAPACITY, (k, v) ->
{
if (v == null)
{
v = (long)configuration.getMaxDecoderTableCapacity();
if (v == 0)
v = null;
}
return v;
});
settings.compute(SettingsFrame.MAX_FIELD_SECTION_SIZE, (k, v) ->
{
if (v == null)
{
v = (long)configuration.getMaxResponseHeadersSize();
if (v <= 0)
v = null;
}
return v;
});
settings.compute(SettingsFrame.MAX_BLOCKED_STREAMS, (k, v) ->
{
if (v == null)
{
v = (long)configuration.getMaxBlockedStreams();
if (v == 0)
v = null;
}
return v;
});
if (LOG.isDebugEnabled())
LOG.debug("configuring local {} on {}", settings, this);
settings.forEach((key, value) ->
{
if (key == SettingsFrame.MAX_TABLE_CAPACITY)
decoder.setMaxTableCapacity(value.intValue());
else if (key == SettingsFrame.MAX_FIELD_SECTION_SIZE)
decoder.setMaxHeadersSize(value.intValue());
else if (key == SettingsFrame.MAX_BLOCKED_STREAMS)
decoder.setMaxBlockedStreams(value.intValue());
});
// Queue the mandatory SETTINGS frame.
SettingsFrame frame = new SettingsFrame(settings);
if (controlFlusher.offer(frame, Callback.from(Invocable.InvocationType.NON_BLOCKING, session::onOpen, this::failControlStream)))
controlFlusher.iterate();
}
public void onSettings(SettingsFrame frame)
{
Map<Long, Long> settings = frame.getSettings();
if (LOG.isDebugEnabled())
LOG.debug("configuring encoder {} on {}", settings, this);
settings.forEach((key, value) ->
{
if (key == SettingsFrame.MAX_TABLE_CAPACITY)
{
int maxTableCapacity = value.intValue();
encoder.setMaxTableCapacity(maxTableCapacity);
encoder.setTableCapacity(Math.min(maxTableCapacity, configuration.getInitialEncoderTableCapacity()));
}
else if (key == SettingsFrame.MAX_FIELD_SECTION_SIZE)
{
// Must cap the maxHeaderSize to avoid large allocations.
int maxHeadersSize = Math.min(value.intValue(), configuration.getMaxRequestHeadersSize());
encoder.setMaxHeadersSize(maxHeadersSize);
}
else if (key == SettingsFrame.MAX_BLOCKED_STREAMS)
{
int maxBlockedStreams = value.intValue();
encoder.setMaxBlockedStreams(maxBlockedStreams);
}
});
}
private void failControlStream(Throwable failure)
{
long error = HTTP3ErrorCode.CLOSED_CRITICAL_STREAM_ERROR.code();

View File

@ -22,6 +22,7 @@ import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.GoAwayFrame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.quic.common.ProtocolSession;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.quic.common.StreamType;
@ -63,7 +64,7 @@ public class HTTP3SessionClient extends HTTP3Session implements Session.Client
}
@Override
public void onHeaders(long streamId, HeadersFrame frame)
public void onHeaders(long streamId, HeadersFrame frame, boolean wasBlocked)
{
if (frame.getMetaData().isResponse())
{
@ -76,10 +77,19 @@ public class HTTP3SessionClient extends HTTP3Session implements Session.Client
}
else
{
super.onHeaders(streamId, frame);
super.onHeaders(streamId, frame, wasBlocked);
}
}
@Override
public void onSettings(SettingsFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("received {} on {}", frame, this);
getProtocolSession().onSettings(frame);
super.onSettings(frame);
}
@Override
public CompletableFuture<Stream> newRequest(HeadersFrame frame, Stream.Client.Listener listener)
{
@ -147,24 +157,4 @@ public class HTTP3SessionClient extends HTTP3Session implements Session.Client
return GoAwayFrame.CLIENT_GRACEFUL;
return super.newGoAwayFrame(graceful);
}
@Override
protected void onSettingMaxTableCapacity(long value)
{
getProtocolSession().getQpackEncoder().setCapacity((int)value);
}
@Override
protected void onSettingMaxFieldSectionSize(long value)
{
getProtocolSession().getQpackDecoder().setMaxHeaderSize((int)value);
}
@Override
protected void onSettingMaxBlockedStreams(long value)
{
ClientHTTP3Session session = getProtocolSession();
session.getQpackDecoder().setMaxBlockedStreams((int)value);
session.getQpackEncoder().setMaxBlockedStreams((int)value);
}
}

View File

@ -16,6 +16,7 @@ package org.eclipse.jetty.http3;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import org.eclipse.jetty.http3.parser.ParserListener;
import org.eclipse.jetty.http3.qpack.QpackEncoder;
import org.eclipse.jetty.http3.qpack.QpackException;
import org.eclipse.jetty.io.ByteBufferPool;
@ -28,9 +29,9 @@ public class DecoderStreamConnection extends InstructionStreamConnection
private final QpackEncoder encoder;
public DecoderStreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool bufferPool, QpackEncoder encoder)
public DecoderStreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool bufferPool, QpackEncoder encoder, ParserListener listener)
{
super(endPoint, executor, bufferPool);
super(endPoint, executor, bufferPool, listener);
this.encoder = encoder;
}

View File

@ -16,6 +16,7 @@ package org.eclipse.jetty.http3;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import org.eclipse.jetty.http3.parser.ParserListener;
import org.eclipse.jetty.http3.qpack.QpackDecoder;
import org.eclipse.jetty.http3.qpack.QpackException;
import org.eclipse.jetty.io.ByteBufferPool;
@ -28,9 +29,9 @@ public class EncoderStreamConnection extends InstructionStreamConnection
private final QpackDecoder decoder;
public EncoderStreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool bufferPool, QpackDecoder decoder)
public EncoderStreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool bufferPool, QpackDecoder decoder, ParserListener listener)
{
super(endPoint, executor, bufferPool);
super(endPoint, executor, bufferPool, listener);
this.decoder = decoder;
}

View File

@ -17,7 +17,7 @@ import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
/**
* <p>A record that captures HTTP/3 configuration parameters.</p>
* <p>The HTTP/3 configuration parameters.</p>
*/
@ManagedObject
public class HTTP3Configuration
@ -27,9 +27,11 @@ public class HTTP3Configuration
private int outputBufferSize = 2048;
private boolean useInputDirectByteBuffers = true;
private boolean useOutputDirectByteBuffers = true;
private int maxBlockedStreams = 0;
private int maxRequestHeadersSize = 8192;
private int maxResponseHeadersSize = 8192;
private int maxBlockedStreams = 64;
private int maxTableCapacity = 64 * 1024;
private int initialTableCapacity = 64 * 1024;
private int maxRequestHeadersSize = 8 * 1024;
private int maxResponseHeadersSize = 8 * 1024;
@ManagedAttribute("The stream idle timeout in milliseconds")
public long getStreamIdleTimeout()
@ -37,6 +39,13 @@ public class HTTP3Configuration
return streamIdleTimeout;
}
/**
* <p>Sets the stream idle timeout in milliseconds.</p>
* <p>Negative values and zero mean that the stream never times out.</p>
* <p>Default value is {@code 30} seconds.</p>
*
* @param streamIdleTimeout the stream idle timeout in milliseconds
*/
public void setStreamIdleTimeout(long streamIdleTimeout)
{
this.streamIdleTimeout = streamIdleTimeout;
@ -48,6 +57,12 @@ public class HTTP3Configuration
return inputBufferSize;
}
/**
* <p>Sets the size of the buffer used for QUIC network reads.</p>
* <p>Default value is {@code 2048} bytes.</p>
*
* @param inputBufferSize the buffer size in bytes
*/
public void setInputBufferSize(int inputBufferSize)
{
this.inputBufferSize = inputBufferSize;
@ -59,39 +74,105 @@ public class HTTP3Configuration
return outputBufferSize;
}
/**
* <p>Sets the size of the buffer used for QUIC network writes.</p>
* <p>Default value is {@code 2048} bytes.</p>
*
* @param outputBufferSize the buffer size in bytes
*/
public void setOutputBufferSize(int outputBufferSize)
{
this.outputBufferSize = outputBufferSize;
}
@ManagedAttribute("Whether to use direct buffers for input")
@ManagedAttribute("Whether to use direct buffers for network reads")
public boolean isUseInputDirectByteBuffers()
{
return useInputDirectByteBuffers;
}
/**
* <p>Sets whether to use direct buffers for QUIC network reads.</p>
* <p>Default value is {@code true}.</p>
*
* @param useInputDirectByteBuffers whether to use direct buffers for network reads
*/
public void setUseInputDirectByteBuffers(boolean useInputDirectByteBuffers)
{
this.useInputDirectByteBuffers = useInputDirectByteBuffers;
}
@ManagedAttribute("Whether to use direct buffers for output")
@ManagedAttribute("Whether to use direct buffers for network writes")
public boolean isUseOutputDirectByteBuffers()
{
return useOutputDirectByteBuffers;
}
/**
* <p>Sets whether to use direct buffers for QUIC network writes.</p>
* <p>Default value is {@code true}.</p>
*
* @param useOutputDirectByteBuffers whether to use direct buffers for network writes
*/
public void setUseOutputDirectByteBuffers(boolean useOutputDirectByteBuffers)
{
this.useOutputDirectByteBuffers = useOutputDirectByteBuffers;
}
@ManagedAttribute("The local QPACK max decoder dynamic table capacity")
public int getMaxDecoderTableCapacity()
{
return maxTableCapacity;
}
/**
* <p>Sets the local QPACK decoder max dynamic table capacity.</p>
* <p>The default value is {@code 65536} bytes.</p>
* <p>This value is configured on the local QPACK decoder, and then
* communicated to the remote QPACK encoder via the SETTINGS frame.</p>
*
* @param maxTableCapacity the QPACK decoder dynamic table max capacity
* @see #setInitialEncoderTableCapacity(int)
*/
public void setMaxDecoderTableCapacity(int maxTableCapacity)
{
this.maxTableCapacity = maxTableCapacity;
}
@ManagedAttribute("The local QPACK initial encoder dynamic table capacity")
public int getInitialEncoderTableCapacity()
{
return initialTableCapacity;
}
/**
* <p>Sets the local QPACK encoder initial dynamic table capacity.</p>
* <p>The default value is {@code 65536} bytes.</p>
* <p>This value is configured in the local QPACK encoder, and may be
* overwritten by a smaller value received via the SETTINGS frame.</p>
*
* @param initialTableCapacity the QPACK encoder dynamic table initial capacity
* @see #setMaxDecoderTableCapacity(int)
*/
public void setInitialEncoderTableCapacity(int initialTableCapacity)
{
this.initialTableCapacity = initialTableCapacity;
}
@ManagedAttribute("The max number of QPACK blocked streams")
public int getMaxBlockedStreams()
{
return maxBlockedStreams;
}
/**
* <p>Sets the local QPACK decoder max number of blocked streams.</p>
* <p>The default value is {@code 64}.</p>
* <p>This value is configured in the local QPACK decoder, and then
* communicated to the remote QPACK encoder via the SETTINGS frame.</p>
*
* @param maxBlockedStreams the QPACK decoder max blocked streams
*/
public void setMaxBlockedStreams(int maxBlockedStreams)
{
this.maxBlockedStreams = maxBlockedStreams;
@ -103,6 +184,17 @@ public class HTTP3Configuration
return maxRequestHeadersSize;
}
/**
* <p>Sets max request headers size.</p>
* <p>The default value is {@code 8192} bytes.</p>
* <p>This value is configured in the server-side QPACK decoder, and
* then communicated to the client-side QPACK encoder via the SETTINGS
* frame.</p>
* <p>The client-side QPACK encoder uses this value to cap, if necessary,
* the value sent by the server-side QPACK decoder.</p>
*
* @param maxRequestHeadersSize the max request headers size in bytes
*/
public void setMaxRequestHeadersSize(int maxRequestHeadersSize)
{
this.maxRequestHeadersSize = maxRequestHeadersSize;
@ -114,6 +206,17 @@ public class HTTP3Configuration
return maxResponseHeadersSize;
}
/**
* <p>Sets max response headers size.</p>
* <p>The default value is {@code 8192} bytes.</p>
* <p>This value is configured in the client-side QPACK decoder, and
* then communicated to the server-side QPACK encoder via the SETTINGS
* frame.</p>
* <p>The server-side QPACK encoder uses this value to cap, if necessary,
* the value sent by the client-side QPACK decoder.</p>
*
* @param maxResponseHeadersSize the max response headers size
*/
public void setMaxResponseHeadersSize(int maxResponseHeadersSize)
{
this.maxResponseHeadersSize = maxResponseHeadersSize;

View File

@ -127,7 +127,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
boolean failStreams = false;
boolean sendGoAway = false;
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
switch (closeState)
{
@ -233,7 +233,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
public CompletableFuture<Void> shutdown()
{
CompletableFuture<Void> result;
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
if (shutdown != null)
return shutdown;
@ -289,7 +289,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
private HTTP3Stream newHTTP3Stream(QuicStreamEndPoint endPoint, Consumer<Throwable> fail, boolean local)
{
Throwable failure = null;
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
if (closeState == CloseState.NOT_CLOSED)
streamCount.incrementAndGet();
@ -352,7 +352,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
{
Map<Long, Long> settings = notifyPreface();
if (LOG.isDebugEnabled())
LOG.debug("produced settings {} on {}", settings, this);
LOG.debug("application produced settings {} on {}", settings, this);
return settings;
}
@ -372,34 +372,9 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
@Override
public void onSettings(SettingsFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("received {} on {}", frame, this);
frame.getSettings().forEach((key, value) ->
{
if (key == SettingsFrame.MAX_TABLE_CAPACITY)
onSettingMaxTableCapacity(value);
else if (key == SettingsFrame.MAX_FIELD_SECTION_SIZE)
onSettingMaxFieldSectionSize(value);
else if (key == SettingsFrame.MAX_BLOCKED_STREAMS)
onSettingMaxBlockedStreams(value);
});
notifySettings(frame);
}
protected void onSettingMaxTableCapacity(long value)
{
}
protected void onSettingMaxFieldSectionSize(long value)
{
}
protected void onSettingMaxBlockedStreams(long value)
{
}
private void notifySettings(SettingsFrame frame)
{
try
@ -438,7 +413,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
}
@Override
public void onHeaders(long streamId, HeadersFrame frame)
public void onHeaders(long streamId, HeadersFrame frame, boolean wasBlocked)
{
MetaData metaData = frame.getMetaData();
if (metaData.isRequest() || metaData.isResponse())
@ -475,7 +450,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
LOG.debug("received {} on {}", frame, this);
boolean failStreams = false;
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
switch (closeState)
{
@ -586,7 +561,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
public boolean onIdleTimeout()
{
boolean notify = false;
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
switch (closeState)
{
@ -642,7 +617,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
public void inwardClose(long error, String reason)
{
GoAwayFrame goAwayFrame = null;
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
switch (closeState)
{
@ -745,7 +720,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
streamTimeouts.destroy();
// Notify the shutdown completable.
CompletableFuture<Void> shutdown;
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
shutdown = this.shutdown;
}
@ -756,7 +731,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
private void tryRunZeroStreamsAction()
{
Runnable action = null;
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
long count = streamCount.get();
if (count > 0)
@ -829,7 +804,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
// A close at the QUIC level does not allow any
// data to be sent, update the state and notify.
boolean notifyFailure;
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
notifyFailure = closeState == CloseState.NOT_CLOSED;
closeState = CloseState.CLOSED;

View File

@ -45,7 +45,6 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
private boolean useInputDirectByteBuffers = true;
private HTTP3Stream stream;
private RetainableByteBuffer networkBuffer;
private boolean applicationMode;
private boolean remotelyClosed;
public HTTP3StreamConnection(QuicStreamEndPoint endPoint, Executor executor, ByteBufferPool bufferPool, MessageParser parser)
@ -77,11 +76,6 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
this.stream = stream;
}
public void setApplicationMode(boolean mode)
{
this.applicationMode = mode;
}
@Override
public void onOpen()
{
@ -123,7 +117,7 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
switch (result)
{
case NO_FRAME -> tryReleaseBuffer(false);
case MODE_SWITCH ->
case SWITCH_MODE ->
{
parser.setDataMode(false);
processNonDataFrames();
@ -163,17 +157,28 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
tryReleaseBuffer(false);
return;
}
case MODE_SWITCH ->
case BLOCKED_FRAME ->
{
// Return immediately because another thread may
// resume the processing as the stream is unblocked.
tryReleaseBuffer(false);
return;
}
case SWITCH_MODE ->
{
// MODE_SWITCH is only reported when parsing DATA frames.
throw new IllegalStateException();
}
case FRAME ->
{
action.getAndSet(null).run();
Runnable action = this.action.getAndSet(null);
if (action == null)
throw new IllegalStateException();
action.run();
// TODO: we should also exit if the connection was closed due to errors.
// There is not yet a isClosed() primitive though.
// This can be done by overriding relevant methods in MessageListener.
if (remotelyClosed)
{
// We have detected the end of the stream,
@ -185,32 +190,32 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
return;
}
if (parser.isDataMode())
if (!parser.isDataMode())
throw new IllegalStateException();
if (stream.hasDemandOrStall())
{
// TODO: handle applicationMode here?
if (stream.hasDemandOrStall())
if (networkBuffer != null && networkBuffer.hasRemaining())
{
if (networkBuffer != null && networkBuffer.hasRemaining())
{
// There are bytes left in the buffer; if there are not
// enough bytes to parse a DATA frame and call the
// application (so that it can drive), set fill interest.
processDataFrames(true);
}
else
{
// No bytes left in the buffer, but there is demand.
// Set fill interest to call the application when bytes arrive.
fillInterested();
}
// There are bytes left in the buffer; if there are not
// enough bytes to parse a DATA frame and call the
// application (so that it can drive), set fill interest.
processDataFrames(true);
}
else
{
// No bytes left in the buffer, but there is demand.
// Set fill interest to call the application when bytes arrive.
tryReleaseBuffer(false);
fillInterested();
}
// From now on it's the application that drives
// demand, reads, parse+fill and fill interest.
return;
}
// From now on it's the application that drives
// demand, reads, parse+fill and fill interest.
return;
// TODO: do we loop here?
// There might be a trailer, loop around.
}
default -> throw new IllegalStateException("unknown message parser result: " + result);
@ -273,7 +278,7 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
MessageParser.Result result = parser.parse(byteBuffer);
if (LOG.isDebugEnabled())
LOG.debug("parsed {} on {} with buffer {}", result, this, networkBuffer);
if (result == MessageParser.Result.FRAME || result == MessageParser.Result.MODE_SWITCH)
if (result != MessageParser.Result.NO_FRAME)
return result;
if (networkBuffer.isRetained())
@ -333,6 +338,81 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
}
}
private void processHeaders(HeadersFrame frame, boolean wasBlocked, Runnable delegate)
{
MetaData metaData = frame.getMetaData();
if (metaData.isRequest())
{
// Expect DATA frames now.
parser.setDataMode(true);
if (LOG.isDebugEnabled())
LOG.debug("switching to dataMode=true for request {} on {}", metaData, this);
}
else if (metaData.isResponse())
{
MetaData.Response response = (MetaData.Response)metaData;
if (HttpStatus.isInformational(response.getStatus()))
{
if (LOG.isDebugEnabled())
LOG.debug("staying in dataMode=false for response {} on {}", metaData, this);
}
else
{
// Expect DATA frames now.
parser.setDataMode(true);
if (LOG.isDebugEnabled())
LOG.debug("switching to dataMode=true for response {} on {}", metaData, this);
}
}
else
{
// Trailer.
if (!frame.isLast())
frame = new HeadersFrame(metaData, true);
}
if (frame.isLast())
shutdownInput();
delegate.run();
if (wasBlocked)
onFillable();
}
private void processData(DataFrame frame, Runnable delegate)
{
if (frame.isLast())
shutdownInput();
Stream.Data data;
if (!frame.getByteBuffer().hasRemaining() && frame.isLast())
{
data = Stream.Data.EOF;
}
else
{
// No need to call networkBuffer.retain() here, since we know
// that the action will be run before releasing the networkBuffer.
data = new StreamData(frame, networkBuffer);
}
delegate.run();
if (LOG.isDebugEnabled())
LOG.debug("notifying {} on {}", data, stream);
stream.onData(data);
}
private void shutdownInput()
{
remotelyClosed = true;
// We want to shutdown the input to avoid "spurious" wakeups where
// zero bytes could be spuriously read from the EndPoint after the
// stream is remotely closed by receiving a frame with last=true.
getEndPoint().shutdownInput(HTTP3ErrorCode.NO_ERROR.code());
}
@Override
public String toConnectionString()
{
@ -376,90 +456,26 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
}
@Override
public void onHeaders(long streamId, HeadersFrame frame)
public void onHeaders(long streamId, HeadersFrame frame, boolean wasBlocked)
{
if (LOG.isDebugEnabled())
LOG.debug("onHeaders #{} {} on {}", streamId, frame, this);
MetaData metaData = frame.getMetaData();
if (metaData.isRequest())
{
// Expect DATA frames now.
parser.setDataMode(true);
if (LOG.isDebugEnabled())
LOG.debug("switching to dataMode=true for request {} on {}", metaData, this);
}
else if (metaData.isResponse())
{
MetaData.Response response = (MetaData.Response)metaData;
if (HttpStatus.isInformational(response.getStatus()))
{
if (LOG.isDebugEnabled())
LOG.debug("staying in dataMode=false for response {} on {}", metaData, this);
}
else
{
// Expect DATA frames now.
parser.setDataMode(true);
if (LOG.isDebugEnabled())
LOG.debug("switching to dataMode=true for response {} on {}", metaData, this);
}
}
else
{
// Trailer.
if (!frame.isLast())
frame = new HeadersFrame(metaData, true);
}
HeadersFrame headersFrame = frame;
if (headersFrame.isLast())
shutdownInput();
Runnable existing = action.getAndSet(() -> super.onHeaders(streamId, headersFrame));
if (existing != null)
throw new IllegalStateException("existing onHeaders action " + existing);
LOG.debug("received {}#{} wasBlocked={}", frame, streamId, wasBlocked);
Runnable delegate = () -> super.onHeaders(streamId, frame, wasBlocked);
Runnable action = () -> processHeaders(frame, wasBlocked, delegate);
if (wasBlocked)
action.run();
else if (!HTTP3StreamConnection.this.action.compareAndSet(null, action))
throw new IllegalStateException();
}
@Override
public void onData(long streamId, DataFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("onData #{} {} on {}", streamId, frame, this);
if (frame.isLast())
shutdownInput();
Stream.Data data;
if (!frame.getByteBuffer().hasRemaining() && frame.isLast())
{
data = Stream.Data.EOF;
}
else
{
// No need to call networkBuffer.retain() here, since we know
// that the action will be run before releasing the networkBuffer.
data = new StreamData(frame, networkBuffer);
}
Runnable existing = action.getAndSet(() ->
{
super.onData(streamId, frame);
if (LOG.isDebugEnabled())
LOG.debug("notifying {} on {}", data, stream);
stream.onData(data);
});
if (existing != null)
throw new IllegalStateException("existing onData action " + existing);
}
private void shutdownInput()
{
remotelyClosed = true;
// We want to shutdown the input to avoid "spurious" wakeups where
// zero bytes could be spuriously read from the EndPoint after the
// stream is remotely closed by receiving a frame with last=true.
getEndPoint().shutdownInput(HTTP3ErrorCode.NO_ERROR.code());
LOG.debug("received {}#{}", frame, streamId);
Runnable delegate = () -> super.onData(streamId, frame);
if (!HTTP3StreamConnection.this.action.compareAndSet(null, () -> processData(frame, delegate)))
throw new IllegalStateException();
}
}
}

View File

@ -16,6 +16,7 @@ package org.eclipse.jetty.http3;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import org.eclipse.jetty.http3.parser.ParserListener;
import org.eclipse.jetty.http3.qpack.QpackException;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
@ -30,13 +31,15 @@ public abstract class InstructionStreamConnection extends AbstractConnection imp
{
private static final Logger LOG = LoggerFactory.getLogger(InstructionStreamConnection.class);
private final ByteBufferPool bufferPool;
private final ParserListener listener;
private boolean useInputDirectByteBuffers = true;
private RetainableByteBuffer buffer;
public InstructionStreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool bufferPool)
public InstructionStreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool bufferPool, ParserListener listener)
{
super(endPoint, executor);
this.bufferPool = bufferPool;
this.listener = listener;
}
public boolean isUseInputDirectByteBuffers()
@ -104,13 +107,34 @@ public abstract class InstructionStreamConnection extends AbstractConnection imp
}
}
}
catch (QpackException.SessionException x)
{
fail(x.getErrorCode(), x.getMessage(), x);
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("could not process decoder stream {}", getEndPoint(), x);
buffer.release();
buffer = null;
getEndPoint().close(x);
fail(HTTP3ErrorCode.INTERNAL_ERROR.code(), "internal_error", x);
}
}
private void fail(long errorCode, String message, Throwable failure)
{
buffer.release();
buffer = null;
if (LOG.isDebugEnabled())
LOG.debug("could not process instruction stream {}", getEndPoint(), failure);
notifySessionFailure(errorCode, message, failure);
}
protected void notifySessionFailure(long error, String reason, Throwable failure)
{
try
{
listener.onSessionFailure(error, reason, failure);
}
catch (Throwable x)
{
LOG.info("failure while notifying listener {}", listener, x);
}
}

View File

@ -39,10 +39,10 @@ public class MessageFlusher extends IteratingCallback
private final MessageGenerator generator;
private Entry entry;
public MessageFlusher(ByteBufferPool bufferPool, QpackEncoder encoder, int maxHeadersLength, boolean useDirectByteBuffers)
public MessageFlusher(ByteBufferPool bufferPool, QpackEncoder encoder, boolean useDirectByteBuffers)
{
this.accumulator = new ByteBufferPool.Accumulator();
this.generator = new MessageGenerator(bufferPool, encoder, maxHeadersLength, useDirectByteBuffers);
this.generator = new MessageGenerator(bufferPool, encoder, useDirectByteBuffers);
}
public boolean offer(QuicStreamEndPoint endPoint, Frame frame, Callback callback)

View File

@ -145,7 +145,7 @@ public class UnidirectionalStreamConnection extends AbstractConnection implement
}
else if (streamType == EncoderStreamConnection.STREAM_TYPE)
{
EncoderStreamConnection newConnection = new EncoderStreamConnection(getEndPoint(), getExecutor(), bufferPool, decoder);
EncoderStreamConnection newConnection = new EncoderStreamConnection(getEndPoint(), getExecutor(), bufferPool, decoder, listener);
newConnection.setInputBufferSize(getInputBufferSize());
newConnection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers());
if (LOG.isDebugEnabled())
@ -154,7 +154,7 @@ public class UnidirectionalStreamConnection extends AbstractConnection implement
}
else if (streamType == DecoderStreamConnection.STREAM_TYPE)
{
DecoderStreamConnection newConnection = new DecoderStreamConnection(getEndPoint(), getExecutor(), bufferPool, encoder);
DecoderStreamConnection newConnection = new DecoderStreamConnection(getEndPoint(), getExecutor(), bufferPool, encoder, listener);
newConnection.setInputBufferSize(getInputBufferSize());
newConnection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers());
if (LOG.isDebugEnabled())

View File

@ -29,14 +29,12 @@ import org.eclipse.jetty.util.BufferUtil;
public class HeadersGenerator extends FrameGenerator
{
private final QpackEncoder encoder;
private final int maxLength;
private final boolean useDirectByteBuffers;
public HeadersGenerator(ByteBufferPool bufferPool, QpackEncoder encoder, int maxLength, boolean useDirectByteBuffers)
public HeadersGenerator(ByteBufferPool bufferPool, QpackEncoder encoder, boolean useDirectByteBuffers)
{
super(bufferPool);
this.encoder = encoder;
this.maxLength = maxLength;
this.useDirectByteBuffers = useDirectByteBuffers;
}
@ -55,6 +53,7 @@ public class HeadersGenerator extends FrameGenerator
int frameTypeLength = VarLenInt.length(FrameType.HEADERS.type());
int maxHeaderLength = frameTypeLength + VarLenInt.MAX_LENGTH;
// The capacity of the buffer is larger than maxLength, but we need to enforce at most maxLength.
int maxLength = encoder.getMaxHeadersSize();
RetainableByteBuffer buffer = getByteBufferPool().acquire(maxHeaderLength + maxLength, useDirectByteBuffers);
ByteBuffer byteBuffer = buffer.getByteBuffer();
BufferUtil.clearToFill(byteBuffer);

View File

@ -24,10 +24,10 @@ public class MessageGenerator
{
private final FrameGenerator[] generators = new FrameGenerator[FrameType.maxType() + 1];
public MessageGenerator(ByteBufferPool bufferPool, QpackEncoder encoder, int maxHeadersLength, boolean useDirectByteBuffers)
public MessageGenerator(ByteBufferPool bufferPool, QpackEncoder encoder, boolean useDirectByteBuffers)
{
generators[FrameType.DATA.type()] = new DataGenerator(bufferPool, useDirectByteBuffers);
generators[FrameType.HEADERS.type()] = new HeadersGenerator(bufferPool, encoder, maxHeadersLength, useDirectByteBuffers);
generators[FrameType.HEADERS.type()] = new HeadersGenerator(bufferPool, encoder, useDirectByteBuffers);
generators[FrameType.PUSH_PROMISE.type()] = new PushPromiseGenerator(bufferPool);
}

View File

@ -125,6 +125,6 @@ public abstract class BodyParser
public enum Result
{
NO_FRAME, FRAGMENT_FRAME, WHOLE_FRAME
NO_FRAME, BLOCKED_FRAME, FRAGMENT_FRAME, WHOLE_FRAME
}
}

View File

@ -105,7 +105,7 @@ public class HeadersBodyParser extends BodyParser
// needs to be parsed, then it's not the last frame.
boolean last = isLast.getAsBoolean() && !buffer.hasRemaining();
return decode(encoded, last) ? Result.WHOLE_FRAME : Result.NO_FRAME;
return decode(encoded, last) ? Result.WHOLE_FRAME : Result.BLOCKED_FRAME;
}
}
default:
@ -121,7 +121,7 @@ public class HeadersBodyParser extends BodyParser
{
try
{
return decoder.decode(streamId, encoded, (streamId, metaData) -> onHeaders(metaData, last));
return decoder.decode(streamId, encoded, (streamId, metaData, wasBlocked) -> onHeaders(metaData, last, wasBlocked));
}
catch (QpackException.StreamException x)
{
@ -144,19 +144,19 @@ public class HeadersBodyParser extends BodyParser
return false;
}
private void onHeaders(MetaData metaData, boolean last)
private void onHeaders(MetaData metaData, boolean last, boolean wasBlocked)
{
HeadersFrame frame = new HeadersFrame(metaData, last);
reset();
notifyHeaders(frame);
notifyHeaders(frame, wasBlocked);
}
protected void notifyHeaders(HeadersFrame frame)
protected void notifyHeaders(HeadersFrame frame, boolean wasBlocked)
{
ParserListener listener = getParserListener();
try
{
listener.onHeaders(streamId, frame);
listener.onHeaders(streamId, frame, wasBlocked);
}
catch (Throwable x)
{

View File

@ -105,7 +105,7 @@ public class MessageParser
state = State.BODY;
// If we are in data mode, but we did not parse a DATA frame, bail out.
if (isDataMode() && headerParser.getFrameType() != FrameType.DATA.type())
return Result.MODE_SWITCH;
return Result.SWITCH_MODE;
}
else
{
@ -149,18 +149,32 @@ public class MessageParser
if (LOG.isDebugEnabled())
LOG.debug("parsed {} empty frame body from {}", FrameType.from(frameType), BufferUtil.toDetailString(buffer));
reset();
return Result.FRAME;
}
else
{
BodyParser.Result result = bodyParser.parse(buffer);
if (LOG.isDebugEnabled())
LOG.debug("parsed {} {} body from {}", result, FrameType.from(frameType), BufferUtil.toDetailString(buffer));
// Not enough bytes, there is no frame.
if (result == BodyParser.Result.NO_FRAME)
return Result.NO_FRAME;
if (LOG.isDebugEnabled())
LOG.debug("parsed {} frame body from {}", FrameType.from(frameType), BufferUtil.toDetailString(buffer));
// Do not reset() if it is a fragment frame.
if (result == BodyParser.Result.FRAGMENT_FRAME)
return Result.FRAME;
reset();
if (result == BodyParser.Result.BLOCKED_FRAME)
return Result.BLOCKED_FRAME;
if (result == BodyParser.Result.WHOLE_FRAME)
reset();
return Result.FRAME;
throw new IllegalStateException();
}
return Result.FRAME;
}
}
default -> throw new IllegalStateException();
@ -183,7 +197,23 @@ public class MessageParser
public enum Result
{
NO_FRAME, FRAME, MODE_SWITCH
/**
* Indicates that no frame was parsed, either for lack of bytes, or because or errors.
*/
NO_FRAME,
/**
* Indicates that a frame was parsed.
*/
FRAME,
/**
* Indicates that a frame was parsed but its notification was deferred.
* This is the case of HEADERS frames that are waiting to be unblocked.
*/
BLOCKED_FRAME,
/**
* Indicates that a DATA frame was expected, but a HEADERS was found instead.
*/
SWITCH_MODE
}
private enum State

View File

@ -20,7 +20,7 @@ import org.eclipse.jetty.http3.frames.SettingsFrame;
public interface ParserListener
{
public default void onHeaders(long streamId, HeadersFrame frame)
public default void onHeaders(long streamId, HeadersFrame frame, boolean wasBlocked)
{
}
@ -54,9 +54,9 @@ public interface ParserListener
}
@Override
public void onHeaders(long streamId, HeadersFrame frame)
public void onHeaders(long streamId, HeadersFrame frame, boolean wasBlocked)
{
listener.onHeaders(streamId, frame);
listener.onHeaders(streamId, frame, wasBlocked);
}
@Override

View File

@ -55,7 +55,7 @@ public class DataGenerateParseTest
ByteBufferPool.NonPooling bufferPool = new ByteBufferPool.NonPooling();
ByteBufferPool.Accumulator accumulator = new ByteBufferPool.Accumulator();
new MessageGenerator(bufferPool, null, 8192, true).generate(accumulator, 0, input, null);
new MessageGenerator(bufferPool, null, true).generate(accumulator, 0, input, null);
List<DataFrame> frames = new ArrayList<>();
MessageParser parser = new MessageParser(new ParserListener()

View File

@ -46,17 +46,19 @@ public class HeadersGenerateParseTest
.put("Cookie", "c=d");
HeadersFrame input = new HeadersFrame(new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, fields), true);
QpackEncoder encoder = new QpackEncoder(instructions -> {});
encoder.setMaxHeadersSize(4 * 1024);
ByteBufferPool.NonPooling bufferPool = new ByteBufferPool.NonPooling();
QpackEncoder encoder = new QpackEncoder(instructions -> {}, 100);
ByteBufferPool.Accumulator accumulator = new ByteBufferPool.Accumulator();
new MessageGenerator(bufferPool, encoder, 8192, true).generate(accumulator, 0, input, null);
new MessageGenerator(bufferPool, encoder, true).generate(accumulator, 0, input, null);
QpackDecoder decoder = new QpackDecoder(instructions -> {}, 8192);
QpackDecoder decoder = new QpackDecoder(instructions -> {});
decoder.setMaxHeadersSize(4 * 1024);
List<HeadersFrame> frames = new ArrayList<>();
MessageParser parser = new MessageParser(new ParserListener()
{
@Override
public void onHeaders(long streamId, HeadersFrame frame)
public void onHeaders(long streamId, HeadersFrame frame, boolean wasBlocked)
{
frames.add(frame);
}

View File

@ -39,6 +39,7 @@ import org.eclipse.jetty.util.component.Dumpable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.eclipse.jetty.http3.qpack.QpackException.H3_GENERAL_PROTOCOL_ERROR;
import static org.eclipse.jetty.http3.qpack.QpackException.QPACK_DECOMPRESSION_FAILED;
import static org.eclipse.jetty.http3.qpack.QpackException.QPACK_ENCODER_STREAM_ERROR;
@ -55,8 +56,9 @@ public class QpackDecoder implements Dumpable
private final NBitIntegerDecoder _integerDecoder = new NBitIntegerDecoder();
private final InstructionHandler _instructionHandler = new InstructionHandler();
private final Map<Long, AtomicInteger> _blockedStreams = new HashMap<>();
private int _maxHeaderSize;
private int _maxHeadersSize;
private int _maxBlockedStreams;
private int _maxTableCapacity;
private static class MetaDataNotification
{
@ -71,21 +73,17 @@ public class QpackDecoder implements Dumpable
_handler = handler;
}
public void notifyHandler()
public void notifyHandler(boolean wasBlocked)
{
_handler.onMetaData(_streamId, _metaData);
_handler.onMetaData(_streamId, _metaData, wasBlocked);
}
}
/**
* @param maxHeaderSize The maximum allowed size of a headers block, expressed as total of all name and value characters, plus 32 per field
*/
public QpackDecoder(Instruction.Handler handler, int maxHeaderSize)
public QpackDecoder(Instruction.Handler handler)
{
_context = new QpackContext();
_handler = handler;
_parser = new DecoderInstructionParser(_instructionHandler);
_maxHeaderSize = maxHeaderSize;
}
QpackContext getQpackContext()
@ -93,14 +91,17 @@ public class QpackDecoder implements Dumpable
return _context;
}
public int getMaxHeaderSize()
public int getMaxHeadersSize()
{
return _maxHeaderSize;
return _maxHeadersSize;
}
public void setMaxHeaderSize(int maxHeaderSize)
/**
* @param maxHeadersSize The maximum allowed size of a headers block, expressed as total of all name and value characters, plus 32 per field
*/
public void setMaxHeadersSize(int maxHeadersSize)
{
_maxHeaderSize = maxHeaderSize;
_maxHeadersSize = maxHeadersSize;
}
public int getMaxBlockedStreams()
@ -113,9 +114,19 @@ public class QpackDecoder implements Dumpable
_maxBlockedStreams = maxBlockedStreams;
}
public int getMaxTableCapacity()
{
return _maxTableCapacity;
}
public void setMaxTableCapacity(int maxTableCapacity)
{
_maxTableCapacity = maxTableCapacity;
}
public interface Handler
{
void onMetaData(long streamId, MetaData metadata);
void onMetaData(long streamId, MetaData metadata, boolean wasBlocked);
}
/**
@ -137,8 +148,8 @@ public class QpackDecoder implements Dumpable
// If the buffer is big, don't even think about decoding it
// Huffman may double the size, but it will only be a temporary allocation until detected in MetaDataBuilder.emit().
int maxHeaderSize = getMaxHeaderSize();
if (buffer.remaining() > maxHeaderSize)
int maxHeaderSize = getMaxHeadersSize();
if (maxHeaderSize > 0 && buffer.remaining() > maxHeaderSize)
throw new QpackException.SessionException(QPACK_DECOMPRESSION_FAILED, "header_too_large");
_integerDecoder.setPrefix(8);
@ -155,7 +166,7 @@ public class QpackDecoder implements Dumpable
// Decode the Required Insert Count using the DynamicTable state.
DynamicTable dynamicTable = _context.getDynamicTable();
int insertCount = dynamicTable.getInsertCount();
int maxDynamicTableSize = dynamicTable.getCapacity();
int maxDynamicTableSize = getMaxTableCapacity();
int requiredInsertCount = decodeInsertCount(encodedInsertCount, insertCount, maxDynamicTableSize);
try
@ -177,7 +188,7 @@ public class QpackDecoder implements Dumpable
else
{
if (LOG.isDebugEnabled())
LOG.debug("Deferred Decoding: streamId={}, encodedFieldSection={}", streamId, encodedFieldSection);
LOG.debug("Deferred decoding: streamId={}, encodedFieldSection={}", streamId, encodedFieldSection);
AtomicInteger blockedFields = _blockedStreams.computeIfAbsent(streamId, id -> new AtomicInteger(0));
blockedFields.incrementAndGet();
if (_blockedStreams.size() > _maxBlockedStreams)
@ -187,7 +198,7 @@ public class QpackDecoder implements Dumpable
boolean hadMetaData = !_metaDataNotifications.isEmpty();
notifyInstructionHandler();
notifyMetaDataHandler();
notifyMetaDataHandler(false);
return hadMetaData;
}
catch (QpackException.SessionException e)
@ -205,10 +216,13 @@ public class QpackDecoder implements Dumpable
* the Encoder to the Decoder. This method will fully consume the supplied {@link ByteBuffer} and produce instructions
* to update the state of the Decoder and its Dynamic Table.
* @param buffer a buffer containing bytes from the Encoder stream.
* @throws QpackException if there was an error parsing or handling the instructions.
* @throws QpackException.SessionException if there was an error parsing or handling the instructions.
*/
public void parseInstructions(ByteBuffer buffer) throws QpackException
public void parseInstructions(ByteBuffer buffer) throws QpackException.SessionException
{
if (LOG.isDebugEnabled())
LOG.debug("Parsing Instructions {}", BufferUtil.toDetailString(buffer));
try
{
while (BufferUtil.hasContent(buffer))
@ -216,7 +230,7 @@ public class QpackDecoder implements Dumpable
_parser.parse(buffer);
}
notifyInstructionHandler();
notifyMetaDataHandler();
notifyMetaDataHandler(true);
}
catch (QpackException.SessionException e)
{
@ -254,7 +268,7 @@ public class QpackDecoder implements Dumpable
{
iterator.remove();
long streamId = encodedFieldSection.getStreamId();
MetaData metaData = encodedFieldSection.decode(_context, _maxHeaderSize);
MetaData metaData = encodedFieldSection.decode(_context, getMaxHeadersSize());
if (_blockedStreams.get(streamId).decrementAndGet() <= 0)
_blockedStreams.remove(streamId);
if (LOG.isDebugEnabled())
@ -316,13 +330,16 @@ public class QpackDecoder implements Dumpable
_instructions.clear();
}
private void notifyMetaDataHandler()
private void notifyMetaDataHandler(boolean wasBlocked)
{
for (MetaDataNotification notification : _metaDataNotifications)
{
notification.notifyHandler();
}
// Copy the list to avoid re-entrance, where the call to
// notifyHandler() may end up calling again this method.
List<MetaDataNotification> notifications = new ArrayList<>(_metaDataNotifications);
_metaDataNotifications.clear();
for (MetaDataNotification notification : notifications)
{
notification.notifyHandler(wasBlocked);
}
}
InstructionHandler getInstructionHandler()
@ -336,8 +353,10 @@ public class QpackDecoder implements Dumpable
class InstructionHandler implements DecoderInstructionParser.Handler
{
@Override
public void onSetDynamicTableCapacity(int capacity)
public void onSetDynamicTableCapacity(int capacity) throws QpackException
{
if (capacity > getMaxTableCapacity())
throw new QpackException.StreamException(H3_GENERAL_PROTOCOL_ERROR, "DynamicTable capacity exceeds max capacity");
_context.getDynamicTable().setCapacity(capacity);
}

View File

@ -95,14 +95,15 @@ public class QpackEncoder implements Dumpable
private final Map<Long, StreamInfo> _streamInfoMap = new HashMap<>();
private final EncoderInstructionParser _parser;
private final InstructionHandler _instructionHandler = new InstructionHandler();
private int _knownInsertCount = 0;
private int _blockedStreams = 0;
private int _knownInsertCount;
private int _blockedStreams;
private int _maxHeadersSize;
private int _maxTableCapacity;
public QpackEncoder(Instruction.Handler handler, int maxBlockedStreams)
public QpackEncoder(Instruction.Handler handler)
{
_handler = handler;
_context = new QpackContext();
_maxBlockedStreams = maxBlockedStreams;
_parser = new EncoderInstructionParser(_instructionHandler);
}
@ -121,7 +122,30 @@ public class QpackEncoder implements Dumpable
_maxBlockedStreams = maxBlockedStreams;
}
public int getCapacity()
public int getMaxHeadersSize()
{
return _maxHeadersSize;
}
public void setMaxHeadersSize(int maxHeadersSize)
{
_maxHeadersSize = maxHeadersSize;
}
public int getMaxTableCapacity()
{
return _maxTableCapacity;
}
public void setMaxTableCapacity(int maxTableCapacity)
{
_maxTableCapacity = maxTableCapacity;
int capacity = getTableCapacity();
if (capacity > maxTableCapacity)
setTableCapacity(maxTableCapacity);
}
public int getTableCapacity()
{
return _context.getDynamicTable().getCapacity();
}
@ -131,11 +155,16 @@ public class QpackEncoder implements Dumpable
*
* @param capacity the new capacity.
*/
public void setCapacity(int capacity)
public void setTableCapacity(int capacity)
{
_context.getDynamicTable().setCapacity(capacity);
_handler.onInstructions(List.of(new SetCapacityInstruction(capacity)));
notifyInstructionHandler();
try (AutoLock ignored = lock.lock())
{
if (capacity > getMaxTableCapacity())
throw new IllegalArgumentException("DynamicTable capacity exceeds max capacity");
_context.getDynamicTable().setCapacity(capacity);
_handler.onInstructions(List.of(new SetCapacityInstruction(capacity)));
notifyInstructionHandler();
}
}
/**
@ -151,7 +180,7 @@ public class QpackEncoder implements Dumpable
*/
public void encode(ByteBuffer buffer, long streamId, MetaData metadata) throws QpackException
{
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
if (LOG.isDebugEnabled())
LOG.debug("Encoding: streamId={}, metadata={}", streamId, metadata);
@ -159,6 +188,9 @@ public class QpackEncoder implements Dumpable
// Verify that we can encode without errors.
if (metadata.getHttpFields() != null)
{
// TODO: enforce that the length of the header is less than maxHeadersSize.
// See RFC 9114, section 4.2.2.
for (HttpField field : metadata.getHttpFields())
{
String name = field.getName();
@ -252,7 +284,7 @@ public class QpackEncoder implements Dumpable
*/
public void parseInstructions(ByteBuffer buffer) throws QpackException
{
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
while (BufferUtil.hasContent(buffer))
{
@ -279,43 +311,46 @@ public class QpackEncoder implements Dumpable
*/
public boolean insert(HttpField field)
{
DynamicTable dynamicTable = _context.getDynamicTable();
if (field.getValue() == null)
field = new HttpField(field.getHeader(), field.getName(), "");
// If we should not index this entry or there is no room to insert it, then just return false.
boolean canCreateEntry = shouldIndex(field) && dynamicTable.canInsert(field);
if (!canCreateEntry)
return false;
// Can we insert by duplicating an existing entry?
Entry entry = _context.get(field);
if (entry != null)
try (AutoLock ignored = lock.lock())
{
int index = _context.indexOf(entry);
DynamicTable dynamicTable = _context.getDynamicTable();
if (field.getValue() == null)
field = new HttpField(field.getHeader(), field.getName(), "");
// If we should not index this entry or there is no room to insert it, then just return false.
boolean canCreateEntry = shouldIndex(field) && dynamicTable.canInsert(field);
if (!canCreateEntry)
return false;
// Can we insert by duplicating an existing entry?
Entry entry = _context.get(field);
if (entry != null)
{
int index = _context.indexOf(entry);
dynamicTable.add(new Entry(field));
_instructions.add(new DuplicateInstruction(index));
notifyInstructionHandler();
return true;
}
// Can we insert by referencing a name?
boolean huffman = shouldHuffmanEncode(field);
Entry nameEntry = _context.get(field.getName());
if (nameEntry != null)
{
int index = _context.indexOf(nameEntry);
dynamicTable.add(new Entry(field));
_instructions.add(new IndexedNameEntryInstruction(!nameEntry.isStatic(), index, huffman, field.getValue()));
notifyInstructionHandler();
return true;
}
// Add the entry without referencing an existing entry.
dynamicTable.add(new Entry(field));
_instructions.add(new DuplicateInstruction(index));
_instructions.add(new LiteralNameEntryInstruction(field, huffman));
notifyInstructionHandler();
return true;
}
// Can we insert by referencing a name?
boolean huffman = shouldHuffmanEncode(field);
Entry nameEntry = _context.get(field.getName());
if (nameEntry != null)
{
int index = _context.indexOf(nameEntry);
dynamicTable.add(new Entry(field));
_instructions.add(new IndexedNameEntryInstruction(!nameEntry.isStatic(), index, huffman, field.getValue()));
notifyInstructionHandler();
return true;
}
// Add the entry without referencing an existing entry.
dynamicTable.add(new Entry(field));
_instructions.add(new LiteralNameEntryInstruction(field, huffman));
notifyInstructionHandler();
return true;
}
/**
@ -327,8 +362,11 @@ public class QpackEncoder implements Dumpable
*/
public void streamCancellation(long streamId)
{
_instructionHandler.onStreamCancellation(streamId);
notifyInstructionHandler();
try (AutoLock ignored = lock.lock())
{
_instructionHandler.onStreamCancellation(streamId);
notifyInstructionHandler();
}
}
protected boolean shouldIndex(HttpField httpField)

View File

@ -51,6 +51,6 @@ public class DuplicateInstruction implements Instruction
@Override
public String toString()
{
return String.format("%s@%x", getClass().getSimpleName(), hashCode());
return String.format("%s@%x[index=%d]", getClass().getSimpleName(), hashCode(), getIndex());
}
}

View File

@ -86,6 +86,6 @@ public class IndexedNameEntryInstruction implements Instruction
@Override
public String toString()
{
return String.format("%s@%x", getClass().getSimpleName(), hashCode());
return String.format("%s@%x[index=%d,name=%s]", getClass().getSimpleName(), hashCode(), getIndex(), getValue());
}
}

View File

@ -51,6 +51,6 @@ public class InsertCountIncrementInstruction implements Instruction
@Override
public String toString()
{
return String.format("%s@%x", getClass().getSimpleName(), hashCode());
return String.format("%s@%x[increment=%d]", getClass().getSimpleName(), hashCode(), getIncrement());
}
}

View File

@ -96,6 +96,6 @@ public class LiteralNameEntryInstruction implements Instruction
@Override
public String toString()
{
return String.format("%s@%x", getClass().getSimpleName(), hashCode());
return String.format("%s@%x[name=%s,value=%s]", getClass().getSimpleName(), hashCode(), getName(), getValue());
}
}

View File

@ -51,6 +51,6 @@ public class SectionAcknowledgmentInstruction implements Instruction
@Override
public String toString()
{
return String.format("%s@%x", getClass().getSimpleName(), hashCode());
return String.format("%s@%x[stream=%d]", getClass().getSimpleName(), hashCode(), _streamId);
}
}

View File

@ -51,6 +51,6 @@ public class SetCapacityInstruction implements Instruction
@Override
public String toString()
{
return String.format("%s@%x", getClass().getSimpleName(), hashCode());
return String.format("%s@%x[capacity=%d]", getClass().getSimpleName(), hashCode(), getCapacity());
}
}

View File

@ -46,6 +46,6 @@ public class StreamCancellationInstruction implements Instruction
@Override
public String toString()
{
return String.format("%s@%x", getClass().getSimpleName(), hashCode());
return String.format("%s@%x[stream=%d]", getClass().getSimpleName(), hashCode(), _streamId);
}
}

View File

@ -78,7 +78,7 @@ public class MetaDataBuilder
String value = field.getValue();
int fieldSize = name.length() + (value == null ? 0 : value.length());
_size += fieldSize + 32;
if (_size > _maxSize)
if (_maxSize > 0 && _size > _maxSize)
throw new QpackException.SessionException(QpackException.QPACK_DECOMPRESSION_FAILED, String.format("Header size %d > %d", _size, _maxSize));
if (field instanceof StaticTableHttpField)

View File

@ -41,9 +41,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class BlockedStreamsTest
{
private static final int MAX_BLOCKED_STREAMS = 5;
private static final int MAX_HEADER_SIZE = 1024;
private QpackEncoder _encoder;
private QpackDecoder _decoder;
private TestDecoderHandler _decoderHandler;
@ -54,8 +51,8 @@ public class BlockedStreamsTest
{
_encoderHandler = new TestEncoderHandler();
_decoderHandler = new TestDecoderHandler();
_encoder = new QpackEncoder(_encoderHandler, MAX_BLOCKED_STREAMS);
_decoder = new QpackDecoder(_decoderHandler, MAX_HEADER_SIZE);
_encoder = new QpackEncoder(_encoderHandler);
_decoder = new QpackDecoder(_decoderHandler);
}
@Test
@ -67,7 +64,10 @@ public class BlockedStreamsTest
// Set capacity of the encoder & decoder to allow entries to be added to the table.
int capacity = 1024;
_encoder.setCapacity(capacity);
_encoder.setMaxTableCapacity(capacity);
_encoder.setTableCapacity(capacity);
_decoder.setMaxTableCapacity(capacity);
Instruction instruction = _encoderHandler.getInstruction();
assertThat(instruction, instanceOf(SetCapacityInstruction.class));
_decoder.parseInstructions(QpackTestUtil.toBuffer(instruction));
@ -178,7 +178,10 @@ public class BlockedStreamsTest
// Set capacity of the encoder & decoder to allow entries to be added to the table.
int capacity = 1024;
_encoder.setCapacity(capacity);
_encoder.setMaxTableCapacity(capacity);
_encoder.setTableCapacity(capacity);
_decoder.setMaxTableCapacity(capacity);
Instruction instruction = _encoderHandler.getInstruction();
assertThat(instruction, instanceOf(SetCapacityInstruction.class));
_decoder.parseInstructions(QpackTestUtil.toBuffer(instruction));

View File

@ -56,7 +56,7 @@ public class EncodeDecodeTest
{
_encoderHandler = new TestEncoderHandler();
_decoderHandler = new TestDecoderHandler();
_encoder = new QpackEncoder(_encoderHandler, MAX_BLOCKED_STREAMS)
_encoder = new QpackEncoder(_encoderHandler)
{
@Override
protected boolean shouldHuffmanEncode(HttpField httpField)
@ -64,7 +64,7 @@ public class EncodeDecodeTest
return false;
}
};
_decoder = new QpackDecoder(_decoderHandler, MAX_HEADER_SIZE);
_decoder = new QpackDecoder(_decoderHandler);
_encoderInstructionParser = new EncoderInstructionParser(new EncoderParserDebugHandler(_encoder));
_decoderInstructionParser = new DecoderInstructionParser(new DecoderParserDebugHandler(_decoder));
@ -96,7 +96,7 @@ public class EncodeDecodeTest
// B.2. Dynamic Table.
// Set capacity to 220.
_encoder.setCapacity(220);
_encoder.setTableCapacity(220);
Instruction instruction = _encoderHandler.getInstruction();
assertThat(instruction, instanceOf(SetCapacityInstruction.class));
assertThat(((SetCapacityInstruction)instruction).getCapacity(), is(220));

View File

@ -34,14 +34,14 @@ public class EvictionTest
private final TestEncoderHandler _encoderHandler = new TestEncoderHandler();
private final Random random = new Random();
private static final int MAX_BLOCKED_STREAMS = 5;
private static final int MAX_HEADER_SIZE = 1024;
@BeforeEach
public void before()
{
_decoder = new QpackDecoder(_decoderHandler, MAX_HEADER_SIZE);
_encoder = new QpackEncoder(_encoderHandler, MAX_BLOCKED_STREAMS)
_decoder = new QpackDecoder(_decoderHandler);
_decoder.setMaxHeadersSize(1024);
_decoder.setMaxTableCapacity(4 * 1024);
_encoder = new QpackEncoder(_encoderHandler)
{
@Override
protected boolean shouldHuffmanEncode(HttpField httpField)
@ -49,12 +49,14 @@ public class EvictionTest
return false;
}
};
_encoder.setMaxTableCapacity(4 * 1024);
_encoder.setTableCapacity(4 * 1024);
_encoder.setMaxBlockedStreams(5);
}
@Test
public void test() throws Exception
{
_encoder.setCapacity(1024);
ByteBuffer encodedFields = ByteBuffer.allocate(1024);
for (int i = 0; i < 10000; i++)

View File

@ -46,8 +46,8 @@ public class SectionAcknowledgmentTest
{
_encoderHandler = new TestEncoderHandler();
_decoderHandler = new TestDecoderHandler();
_encoder = new QpackEncoder(_encoderHandler, MAX_BLOCKED_STREAMS);
_decoder = new QpackDecoder(_decoderHandler, MAX_HEADER_SIZE);
_encoder = new QpackEncoder(_encoderHandler);
_decoder = new QpackDecoder(_decoderHandler);
}
@Test

View File

@ -25,7 +25,7 @@ public class TestDecoderHandler implements QpackDecoder.Handler, Instruction.Han
private final LinkedList<Instruction> _instructionList = new LinkedList<>();
@Override
public void onMetaData(long streamId, MetaData metadata)
public void onMetaData(long streamId, MetaData metadata, boolean wasBlocked)
{
_metadataList.add(metadata);
}

View File

@ -18,6 +18,7 @@ import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.GoAwayFrame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.util.Callback;
import org.slf4j.Logger;
@ -58,7 +59,7 @@ public class HTTP3SessionServer extends HTTP3Session implements Session.Server
}
@Override
public void onHeaders(long streamId, HeadersFrame frame)
public void onHeaders(long streamId, HeadersFrame frame, boolean wasBlocked)
{
if (frame.getMetaData().isRequest())
{
@ -71,10 +72,19 @@ public class HTTP3SessionServer extends HTTP3Session implements Session.Server
}
else
{
super.onHeaders(streamId, frame);
super.onHeaders(streamId, frame, wasBlocked);
}
}
@Override
public void onSettings(SettingsFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("received {} on {}", frame, this);
getProtocolSession().onSettings(frame);
super.onSettings(frame);
}
@Override
public void writeControlFrame(Frame frame, Callback callback)
{
@ -95,26 +105,6 @@ public class HTTP3SessionServer extends HTTP3Session implements Session.Server
return super.newGoAwayFrame(graceful);
}
@Override
protected void onSettingMaxTableCapacity(long value)
{
getProtocolSession().getQpackEncoder().setCapacity((int)value);
}
@Override
protected void onSettingMaxFieldSectionSize(long value)
{
getProtocolSession().getQpackDecoder().setMaxHeaderSize((int)value);
}
@Override
protected void onSettingMaxBlockedStreams(long value)
{
ServerHTTP3Session session = getProtocolSession();
session.getQpackDecoder().setMaxBlockedStreams((int)value);
session.getQpackEncoder().setMaxBlockedStreams((int)value);
}
private void notifyAccept()
{
Server.Listener listener = getListener();

View File

@ -98,11 +98,6 @@ public class HttpStreamOverHTTP3 implements HttpStream
expects100Continue = fields.contains(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString());
boolean connect = requestMetaData instanceof MetaData.ConnectRequest;
if (!connect)
connection.setApplicationMode(true);
if (LOG.isDebugEnabled())
{
LOG.debug("HTTP3 request #{}/{}, {} {} {}{}{}",

View File

@ -13,6 +13,7 @@
package org.eclipse.jetty.http3.server.internal;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@ -43,6 +44,7 @@ public class ServerHTTP3Session extends ServerProtocolSession
{
private static final Logger LOG = LoggerFactory.getLogger(ServerHTTP3Session.class);
private final HTTP3Configuration configuration;
private final HTTP3SessionServer session;
private final QpackEncoder encoder;
private final QpackDecoder decoder;
@ -52,7 +54,8 @@ public class ServerHTTP3Session extends ServerProtocolSession
public ServerHTTP3Session(HTTP3Configuration configuration, ServerQuicSession quicSession, Session.Server.Listener listener)
{
super(quicSession);
this.session = new HTTP3SessionServer(this, listener);
this.configuration = configuration;
session = new HTTP3SessionServer(this, listener);
addBean(session);
session.setStreamIdleTimeout(configuration.getStreamIdleTimeout());
@ -62,7 +65,8 @@ public class ServerHTTP3Session extends ServerProtocolSession
long encoderStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL);
QuicStreamEndPoint encoderEndPoint = openInstructionEndPoint(encoderStreamId);
InstructionFlusher encoderInstructionFlusher = new InstructionFlusher(quicSession, encoderEndPoint, EncoderStreamConnection.STREAM_TYPE);
this.encoder = new QpackEncoder(new InstructionHandler(encoderInstructionFlusher), configuration.getMaxBlockedStreams());
encoder = new QpackEncoder(new InstructionHandler(encoderInstructionFlusher));
encoder.setMaxHeadersSize(configuration.getMaxResponseHeadersSize());
addBean(encoder);
if (LOG.isDebugEnabled())
LOG.debug("created encoder stream #{} on {}", encoderStreamId, encoderEndPoint);
@ -70,19 +74,19 @@ public class ServerHTTP3Session extends ServerProtocolSession
long decoderStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL);
QuicStreamEndPoint decoderEndPoint = openInstructionEndPoint(decoderStreamId);
InstructionFlusher decoderInstructionFlusher = new InstructionFlusher(quicSession, decoderEndPoint, DecoderStreamConnection.STREAM_TYPE);
this.decoder = new QpackDecoder(new InstructionHandler(decoderInstructionFlusher), configuration.getMaxRequestHeadersSize());
decoder = new QpackDecoder(new InstructionHandler(decoderInstructionFlusher));
addBean(decoder);
if (LOG.isDebugEnabled())
LOG.debug("created decoder stream #{} on {}", decoderStreamId, decoderEndPoint);
long controlStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL);
QuicStreamEndPoint controlEndPoint = openControlEndPoint(controlStreamId);
this.controlFlusher = new ControlFlusher(quicSession, controlEndPoint, configuration.isUseOutputDirectByteBuffers());
controlFlusher = new ControlFlusher(quicSession, controlEndPoint, configuration.isUseOutputDirectByteBuffers());
addBean(controlFlusher);
if (LOG.isDebugEnabled())
LOG.debug("created control stream #{} on {}", controlStreamId, controlEndPoint);
this.messageFlusher = new MessageFlusher(quicSession.getByteBufferPool(), encoder, configuration.getMaxResponseHeadersSize(), configuration.isUseOutputDirectByteBuffers());
messageFlusher = new MessageFlusher(quicSession.getByteBufferPool(), encoder, configuration.isUseOutputDirectByteBuffers());
addBean(messageFlusher);
}
@ -104,16 +108,86 @@ public class ServerHTTP3Session extends ServerProtocolSession
@Override
protected void onStart()
{
// Queue the mandatory SETTINGS frame.
Map<Long, Long> settings = session.onPreface();
if (settings == null)
settings = Map.of();
// TODO: add default settings.
settings = settings != null ? new HashMap<>(settings) : new HashMap<>();
settings.compute(SettingsFrame.MAX_TABLE_CAPACITY, (k, v) ->
{
if (v == null)
{
v = (long)configuration.getMaxDecoderTableCapacity();
if (v == 0)
v = null;
}
return v;
});
settings.compute(SettingsFrame.MAX_FIELD_SECTION_SIZE, (k, v) ->
{
if (v == null)
{
v = (long)configuration.getMaxRequestHeadersSize();
if (v <= 0)
v = null;
}
return v;
});
settings.compute(SettingsFrame.MAX_BLOCKED_STREAMS, (k, v) ->
{
if (v == null)
{
v = (long)configuration.getMaxBlockedStreams();
if (v == 0)
v = null;
}
return v;
});
if (LOG.isDebugEnabled())
LOG.debug("configuring decoder {} on {}", settings, this);
settings.forEach((key, value) ->
{
if (key == SettingsFrame.MAX_TABLE_CAPACITY)
decoder.setMaxTableCapacity(value.intValue());
else if (key == SettingsFrame.MAX_FIELD_SECTION_SIZE)
decoder.setMaxHeadersSize(value.intValue());
else if (key == SettingsFrame.MAX_BLOCKED_STREAMS)
decoder.setMaxBlockedStreams(value.intValue());
});
// Queue the mandatory SETTINGS frame.
SettingsFrame frame = new SettingsFrame(settings);
if (controlFlusher.offer(frame, Callback.from(Invocable.InvocationType.NON_BLOCKING, session::onOpen, this::failControlStream)))
controlFlusher.iterate();
}
public void onSettings(SettingsFrame frame)
{
Map<Long, Long> settings = frame.getSettings();
if (LOG.isDebugEnabled())
LOG.debug("configuring encoder {} on {}", settings, this);
settings.forEach((key, value) ->
{
if (key == SettingsFrame.MAX_TABLE_CAPACITY)
{
int maxTableCapacity = value.intValue();
encoder.setMaxTableCapacity(maxTableCapacity);
encoder.setTableCapacity(Math.min(maxTableCapacity, configuration.getInitialEncoderTableCapacity()));
}
else if (key == SettingsFrame.MAX_FIELD_SECTION_SIZE)
{
// Must cap the maxHeaderSize to avoid large allocations.
int maxHeadersSize = Math.min(value.intValue(), configuration.getMaxResponseHeadersSize());
encoder.setMaxHeadersSize(maxHeadersSize);
}
else if (key == SettingsFrame.MAX_BLOCKED_STREAMS)
{
int maxBlockedStreams = value.intValue();
encoder.setMaxBlockedStreams(maxBlockedStreams);
}
});
}
private void failControlStream(Throwable failure)
{
long error = HTTP3ErrorCode.CLOSED_CRITICAL_STREAM_ERROR.code();

View File

@ -26,6 +26,7 @@ import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http3.HTTP3Configuration;
import org.eclipse.jetty.http3.HTTP3ErrorCode;
import org.eclipse.jetty.http3.HTTP3Session;
import org.eclipse.jetty.http3.api.Session;
@ -43,12 +44,13 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
// TODO check that the Awaitility usage is the right thing to do.
public class ClientServerTest extends AbstractClientServerTest
{
@Test
@ -137,15 +139,15 @@ public class ClientServerTest extends AbstractClientServerTest
assertTrue(settingsLatch.await(5, TimeUnit.SECONDS));
HTTP3SessionServer serverSession = serverSessionRef.get();
assertEquals(maxTableCapacity.getValue(), serverSession.getProtocolSession().getQpackEncoder().getCapacity());
assertEquals(maxTableCapacity.getValue(), serverSession.getProtocolSession().getQpackEncoder().getMaxTableCapacity());
assertEquals(maxBlockedStreams.getValue(), serverSession.getProtocolSession().getQpackEncoder().getMaxBlockedStreams());
assertEquals(maxBlockedStreams.getValue(), serverSession.getProtocolSession().getQpackDecoder().getMaxBlockedStreams());
assertEquals(maxHeaderSize.getValue(), serverSession.getProtocolSession().getQpackDecoder().getMaxHeaderSize());
assertEquals(maxHeaderSize.getValue(), serverSession.getProtocolSession().getQpackDecoder().getMaxHeadersSize());
assertEquals(maxTableCapacity.getValue(), clientSession.getProtocolSession().getQpackEncoder().getCapacity());
assertEquals(maxTableCapacity.getValue(), clientSession.getProtocolSession().getQpackEncoder().getMaxTableCapacity());
assertEquals(maxBlockedStreams.getValue(), clientSession.getProtocolSession().getQpackEncoder().getMaxBlockedStreams());
assertEquals(maxBlockedStreams.getValue(), clientSession.getProtocolSession().getQpackDecoder().getMaxBlockedStreams());
assertEquals(maxHeaderSize.getValue(), clientSession.getProtocolSession().getQpackDecoder().getMaxHeaderSize());
assertEquals(maxHeaderSize.getValue(), clientSession.getProtocolSession().getQpackDecoder().getMaxHeadersSize());
}
@Test
@ -377,7 +379,11 @@ public class ClientServerTest extends AbstractClientServerTest
});
int maxRequestHeadersSize = 128;
http3Client.getHTTP3Configuration().setMaxRequestHeadersSize(maxRequestHeadersSize);
HTTP3Configuration http3Configuration = http3Client.getHTTP3Configuration();
http3Configuration.setMaxRequestHeadersSize(maxRequestHeadersSize);
// Disable the dynamic table, otherwise the large header
// is sent as string literal on the encoder stream.
http3Configuration.setInitialEncoderTableCapacity(0);
Session.Client clientSession = newSession(new Session.Client.Listener() {});
CountDownLatch requestFailureLatch = new CountDownLatch(1);
@ -413,10 +419,17 @@ public class ClientServerTest extends AbstractClientServerTest
public void testResponseHeadersTooLarge() throws Exception
{
int maxResponseHeadersSize = 128;
CountDownLatch settingsLatch = new CountDownLatch(2);
AtomicReference<Session> serverSessionRef = new AtomicReference<>();
CountDownLatch responseFailureLatch = new CountDownLatch(1);
start(new Session.Server.Listener()
{
@Override
public void onSettings(Session session, SettingsFrame frame)
{
settingsLatch.countDown();
}
@Override
public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame)
{
@ -448,9 +461,22 @@ public class ClientServerTest extends AbstractClientServerTest
});
AbstractHTTP3ServerConnectionFactory h3 = connector.getConnectionFactory(AbstractHTTP3ServerConnectionFactory.class);
assertNotNull(h3);
h3.getHTTP3Configuration().setMaxResponseHeadersSize(maxResponseHeadersSize);
HTTP3Configuration http3Configuration = h3.getHTTP3Configuration();
// Disable the dynamic table, otherwise the large header
// is sent as string literal on the encoder stream.
http3Configuration.setInitialEncoderTableCapacity(0);
http3Configuration.setMaxResponseHeadersSize(maxResponseHeadersSize);
Session.Client clientSession = newSession(new Session.Client.Listener() {});
Session.Client clientSession = newSession(new Session.Client.Listener()
{
@Override
public void onSettings(Session session, SettingsFrame frame)
{
settingsLatch.countDown();
}
});
assertTrue(settingsLatch.await(5, TimeUnit.SECONDS));
CountDownLatch streamFailureLatch = new CountDownLatch(1);
clientSession.newRequest(new HeadersFrame(newRequest("/large"), true), new Stream.Client.Listener()
@ -481,5 +507,118 @@ public class ClientServerTest extends AbstractClientServerTest
assertTrue(responseLatch.await(5, TimeUnit.SECONDS));
}
// TODO: write a test calling readData() from onRequest() (not from onDataAvailable()).
@Test
public void testHeadersThenTrailers() throws Exception
{
CountDownLatch requestLatch = new CountDownLatch(1);
CountDownLatch trailerLatch = new CountDownLatch(1);
start(new Session.Server.Listener()
{
@Override
public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame)
{
stream.demand();
requestLatch.countDown();
return new Stream.Server.Listener()
{
@Override
public void onDataAvailable(Stream.Server stream)
{
// TODO: we should not be needing this!!!
Stream.Data data = stream.readData();
assertNull(data);
stream.demand();
}
@Override
public void onTrailer(Stream.Server stream, HeadersFrame frame)
{
trailerLatch.countDown();
stream.respond(new HeadersFrame(new MetaData.Response(HttpStatus.OK_200, null, HttpVersion.HTTP_3, HttpFields.EMPTY), true));
}
};
}
});
Session.Client clientSession = newSession(new Session.Client.Listener() {});
CountDownLatch responseLatch = new CountDownLatch(1);
Stream clientStream = clientSession.newRequest(new HeadersFrame(newRequest("/large"), false), new Stream.Client.Listener()
{
@Override
public void onResponse(Stream.Client stream, HeadersFrame frame)
{
MetaData.Response response = (MetaData.Response)frame.getMetaData();
assertEquals(HttpStatus.OK_200, response.getStatus());
responseLatch.countDown();
}
})
.get(5, TimeUnit.SECONDS);
assertTrue(requestLatch.await(5, TimeUnit.SECONDS));
clientStream.trailer(new HeadersFrame(new MetaData(HttpVersion.HTTP_3, HttpFields.EMPTY), true));
assertTrue(trailerLatch.await(5, TimeUnit.SECONDS));
assertTrue(responseLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testReadDataFromOnRequest() throws Exception
{
CountDownLatch requestLatch = new CountDownLatch(1);
CountDownLatch data1Latch = new CountDownLatch(1);
start(new Session.Server.Listener()
{
@Override
public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame)
{
requestLatch.countDown();
Stream.Data data = await().atMost(5, TimeUnit.SECONDS).until(stream::readData, notNullValue());
data.release();
stream.demand();
data1Latch.countDown();
return new Stream.Server.Listener()
{
@Override
public void onDataAvailable(Stream.Server stream)
{
Stream.Data data = stream.readData();
if (data == null)
{
stream.demand();
return;
}
data.release();
if (!data.isLast())
{
stream.demand();
return;
}
stream.respond(new HeadersFrame(new MetaData.Response(HttpStatus.OK_200, null, HttpVersion.HTTP_3, HttpFields.EMPTY), true));
}
};
}
});
Session.Client clientSession = newSession(new Session.Client.Listener() {});
CountDownLatch responseLatch = new CountDownLatch(1);
Stream clientStream = clientSession.newRequest(new HeadersFrame(newRequest("/large"), false), new Stream.Client.Listener()
{
@Override
public void onResponse(Stream.Client stream, HeadersFrame frame)
{
responseLatch.countDown();
}
})
.get(5, TimeUnit.SECONDS);
assertTrue(requestLatch.await(5, TimeUnit.SECONDS));
clientStream.data(new DataFrame(ByteBuffer.allocate(1024), false));
assertTrue(data1Latch.await(555, TimeUnit.SECONDS));
clientStream.data(new DataFrame(ByteBuffer.allocate(512), true));
assertTrue(responseLatch.await(5, TimeUnit.SECONDS));
}
}

View File

@ -1177,6 +1177,9 @@ public class GoAwayTest extends AbstractClientServerTest
assertTrue(settingsLatch.await(5, TimeUnit.SECONDS));
// Wait a bit more to allow the unidirectional streams to be setup.
Thread.sleep(1000);
// Stopping the HttpClient will also stop the HTTP3Client.
httpClient.stop();
@ -1234,6 +1237,9 @@ public class GoAwayTest extends AbstractClientServerTest
assertTrue(settingsLatch.await(10, TimeUnit.SECONDS));
// Wait a bit more to allow the unidirectional streams to be setup.
Thread.sleep(1000);
server.stop();
assertTrue(clientGoAwayLatch.await(30, TimeUnit.SECONDS));