diff --git a/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/QpackDecoder.java b/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/QpackDecoder.java index 5beff005e92..ca2cecb584f 100644 --- a/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/QpackDecoder.java +++ b/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/QpackDecoder.java @@ -16,7 +16,11 @@ package org.eclipse.jetty.http3.qpack; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.http.MetaData; @@ -50,6 +54,7 @@ public class QpackDecoder implements Dumpable private final List _encodedFieldSections = new ArrayList<>(); private final NBitIntegerParser _integerDecoder = new NBitIntegerParser(); private final InstructionHandler _instructionHandler = new InstructionHandler(); + private final Map _blockedStreams = new HashMap<>(); private int _maxHeaderSize; private int _maxBlockedStreams; @@ -100,7 +105,6 @@ public class QpackDecoder implements Dumpable public int getMaxBlockedStreams() { - // TODO: implement logic about blocked streams by calling this method. return _maxBlockedStreams; } @@ -173,6 +177,10 @@ public class QpackDecoder implements Dumpable { if (LOG.isDebugEnabled()) LOG.debug("Deferred Decoding: streamId={}, encodedFieldSection={}", streamId, encodedFieldSection); + AtomicInteger blockedFields = _blockedStreams.computeIfAbsent(streamId, id -> new AtomicInteger(0)); + blockedFields.incrementAndGet(); + if (_blockedStreams.size() > _maxBlockedStreams) + throw new QpackException.SessionException(QPACK_DECOMPRESSION_FAILED, "exceeded max blocked streams"); _encodedFieldSections.add(encodedFieldSection); } @@ -227,6 +235,7 @@ public class QpackDecoder implements Dumpable public void streamCancellation(long streamId) { _encodedFieldSections.removeIf(encodedFieldSection -> encodedFieldSection.getStreamId() == streamId); + _blockedStreams.remove(streamId); _metaDataNotifications.removeIf(notification -> notification._streamId == streamId); _instructions.add(new StreamCancellationInstruction(streamId)); notifyInstructionHandler(); @@ -235,13 +244,18 @@ public class QpackDecoder implements Dumpable private void checkEncodedFieldSections() throws QpackException { int insertCount = _context.getDynamicTable().getInsertCount(); - for (EncodedFieldSection encodedFieldSection : _encodedFieldSections) + Iterator iterator = _encodedFieldSections.iterator(); + while (iterator.hasNext()) { + EncodedFieldSection encodedFieldSection = iterator.next(); int requiredInsertCount = encodedFieldSection.getRequiredInsertCount(); if (requiredInsertCount <= insertCount) { + iterator.remove(); long streamId = encodedFieldSection.getStreamId(); MetaData metaData = encodedFieldSection.decode(_context, _maxHeaderSize); + if (_blockedStreams.get(streamId).decrementAndGet() <= 0) + _blockedStreams.remove(streamId); if (LOG.isDebugEnabled()) LOG.debug("Decoded: streamId={}, metadata={}", streamId, metaData); diff --git a/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/QpackEncoder.java b/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/QpackEncoder.java index e96ba17767f..d8427466f9c 100644 --- a/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/QpackEncoder.java +++ b/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/QpackEncoder.java @@ -106,6 +106,11 @@ public class QpackEncoder implements Dumpable _parser = new EncoderInstructionParser(_instructionHandler); } + Map getStreamInfoMap() + { + return _streamInfoMap; + } + public int getMaxBlockedStreams() { return _maxBlockedStreams; @@ -480,9 +485,13 @@ public class QpackEncoder implements Dumpable // The KnownInsertCount should be updated to the earliest sent RequiredInsertCount on that stream. StreamInfo.SectionInfo sectionInfo = streamInfo.acknowledge(); + boolean wasBlocked = sectionInfo.isBlocking(); sectionInfo.release(); _knownInsertCount = Math.max(_knownInsertCount, sectionInfo.getRequiredInsertCount()); + if (wasBlocked && !streamInfo.isBlocked()) + _blockedStreams--; + // If we have no more outstanding section acknowledgments remove the StreamInfo. if (streamInfo.isEmpty()) _streamInfoMap.remove(streamId); diff --git a/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/StreamInfo.java b/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/StreamInfo.java index 08e7dc09af8..4d229e094b3 100644 --- a/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/StreamInfo.java +++ b/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/StreamInfo.java @@ -106,6 +106,7 @@ public class StreamInfo implements Iterable { entry.release(); } + _block = false; _entries.clear(); } diff --git a/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/instruction/InsertCountIncrementInstruction.java b/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/instruction/InsertCountIncrementInstruction.java index 00421c71558..1a57fe1507f 100644 --- a/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/instruction/InsertCountIncrementInstruction.java +++ b/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/instruction/InsertCountIncrementInstruction.java @@ -29,6 +29,11 @@ public class InsertCountIncrementInstruction implements Instruction _increment = increment; } + public int getIncrement() + { + return _increment; + } + @Override public void encode(ByteBufferPool.Lease lease) { diff --git a/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/instruction/SectionAcknowledgmentInstruction.java b/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/instruction/SectionAcknowledgmentInstruction.java index 24ff83265fc..9d6e916591c 100644 --- a/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/instruction/SectionAcknowledgmentInstruction.java +++ b/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/instruction/SectionAcknowledgmentInstruction.java @@ -29,6 +29,11 @@ public class SectionAcknowledgmentInstruction implements Instruction _streamId = streamId; } + public long getStreamId() + { + return _streamId; + } + @Override public void encode(ByteBufferPool.Lease lease) { diff --git a/jetty-http3/http3-qpack/src/test/java/org/eclipse/jetty/http3/qpack/BlockedStreamsTest.java b/jetty-http3/http3-qpack/src/test/java/org/eclipse/jetty/http3/qpack/BlockedStreamsTest.java new file mode 100644 index 00000000000..2584c43d52a --- /dev/null +++ b/jetty-http3/http3-qpack/src/test/java/org/eclipse/jetty/http3/qpack/BlockedStreamsTest.java @@ -0,0 +1,226 @@ +// +// ======================================================================== +// Copyright (c) 1995-2022 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.http3.qpack; + +import java.nio.ByteBuffer; + +import org.eclipse.jetty.http.HttpField; +import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http3.qpack.QpackException.SessionException; +import org.eclipse.jetty.http3.qpack.internal.instruction.IndexedNameEntryInstruction; +import org.eclipse.jetty.http3.qpack.internal.instruction.InsertCountIncrementInstruction; +import org.eclipse.jetty.http3.qpack.internal.instruction.LiteralNameEntryInstruction; +import org.eclipse.jetty.http3.qpack.internal.instruction.SectionAcknowledgmentInstruction; +import org.eclipse.jetty.http3.qpack.internal.instruction.SetCapacityInstruction; +import org.eclipse.jetty.util.BufferUtil; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.eclipse.jetty.http3.qpack.QpackTestUtil.encode; +import static org.eclipse.jetty.http3.qpack.QpackTestUtil.toBuffer; +import static org.eclipse.jetty.http3.qpack.QpackTestUtil.toMetaData; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class BlockedStreamsTest +{ + private static final int MAX_BLOCKED_STREAMS = 5; + private static final int MAX_HEADER_SIZE = 1024; + + private QpackEncoder _encoder; + private QpackDecoder _decoder; + private TestDecoderHandler _decoderHandler; + private TestEncoderHandler _encoderHandler; + + @BeforeEach + public void before() + { + _encoderHandler = new TestEncoderHandler(); + _decoderHandler = new TestDecoderHandler(); + _encoder = new QpackEncoder(_encoderHandler, MAX_BLOCKED_STREAMS); + _decoder = new QpackDecoder(_decoderHandler, MAX_HEADER_SIZE); + } + + @Test + public void testBlockedStreams() throws Exception + { + // These settings are determined by HTTP/3 settings frames. + _encoder.setMaxBlockedStreams(2); + _decoder.setMaxBlockedStreams(2); + + // Set capacity of the encoder & decoder to allow entries to be added to the table. + int capacity = 1024; + _encoder.setCapacity(capacity); + Instruction instruction = _encoderHandler.getInstruction(); + assertThat(instruction, instanceOf(SetCapacityInstruction.class)); + _decoder.parseInstructions(QpackTestUtil.toBuffer(instruction)); + + // Encode a new field, which will be added to table. But do not forward insertion instruction to decoder, + // this will cause decoder to become "blocked" on stream 0 until receives the instruction. + HttpField entry1 = new HttpField("name1", "value1"); + ByteBuffer buffer = encode(_encoder, 0, toMetaData("GET", "/", "http", entry1)); + assertThat(BufferUtil.remaining(buffer), greaterThan(0L)); + Instruction instruction1 = _encoderHandler.getInstruction(); + assertThat(instruction1, instanceOf(LiteralNameEntryInstruction.class)); + assertNull(_encoderHandler.getInstruction()); + + // Decoder will not be able to decode this header until it receives instruction. + boolean decoded = _decoder.decode(0, buffer, _decoderHandler); + assertFalse(decoded); + assertThat(BufferUtil.remaining(buffer), equalTo(0L)); + assertNull(_decoderHandler.getMetaData()); + assertNull(_decoderHandler.getInstruction()); + + // Encode second field with dynamic table, do not forward instruction to decoder. + HttpField entry2 = new HttpField("name1", "value2"); + buffer = encode(_encoder, 1, toMetaData("GET", "/", "http", entry2)); + assertThat(BufferUtil.remaining(buffer), greaterThan(0L)); + Instruction instruction2 = _encoderHandler.getInstruction(); + assertThat(instruction2, instanceOf(IndexedNameEntryInstruction.class)); + assertNull(_encoderHandler.getInstruction()); + + // Decoder will not be able to decode this header until it receives instruction. + decoded = _decoder.decode(1, buffer, _decoderHandler); + assertFalse(decoded); + assertNull(_decoderHandler.getMetaData()); + assertNull(_decoderHandler.getInstruction()); + + // Give first instruction to get first metadata. + _decoder.parseInstructions(QpackTestUtil.toBuffer(instruction1)); + MetaData metaData = _decoderHandler.getMetaData(); + assertThat(metaData.getFields().size(), equalTo(1)); + assertThat(metaData.getFields().get(entry1.getHeader()), equalTo(entry1.getValue())); + + Instruction inc1 = _decoderHandler.getInstruction(); + assertThat(inc1, instanceOf(InsertCountIncrementInstruction.class)); + assertThat(((InsertCountIncrementInstruction)inc1).getIncrement(), equalTo(1)); + + Instruction ack1 = _decoderHandler.getInstruction(); + assertThat(ack1, instanceOf(SectionAcknowledgmentInstruction.class)); + assertThat(((SectionAcknowledgmentInstruction)ack1).getStreamId(), equalTo(0L)); + + assertNull(_decoderHandler.getMetaData()); + assertNull(_decoderHandler.getInstruction()); + + // Give second instruction to get second metadata. + _decoder.parseInstructions(QpackTestUtil.toBuffer(instruction2)); + metaData = _decoderHandler.getMetaData(); + assertThat(metaData.getFields().size(), equalTo(1)); + assertThat(metaData.getFields().get(entry2.getHeader()), equalTo(entry2.getValue())); + + Instruction inc2 = _decoderHandler.getInstruction(); + assertThat(inc2, instanceOf(InsertCountIncrementInstruction.class)); + assertThat(((InsertCountIncrementInstruction)inc2).getIncrement(), equalTo(1)); + + Instruction ack2 = _decoderHandler.getInstruction(); + assertThat(ack2, instanceOf(SectionAcknowledgmentInstruction.class)); + assertThat(((SectionAcknowledgmentInstruction)ack2).getStreamId(), equalTo(1L)); + + assertNull(_decoderHandler.getMetaData()); + assertNull(_decoderHandler.getInstruction()); + + // The encoder hasn't received any InsertCountIncrementInstruction and so it thinks there are two streams blocked. + // It should only encode literal entries to not risk blocking another stream on the decoder. + HttpField entry3 = new HttpField("name3", "value3"); + buffer = encode(_encoder, 3, toMetaData("GET", "/", "http", entry3)); + assertThat(BufferUtil.remaining(buffer), greaterThan(0L)); + instruction = _encoderHandler.getInstruction(); + assertThat(instruction, instanceOf(LiteralNameEntryInstruction.class)); + assertNull(_encoderHandler.getInstruction()); + + // Can decode literal entry immediately without any further instructions. + decoded = _decoder.decode(3, buffer, _decoderHandler); + assertTrue(decoded); + metaData = _decoderHandler.getMetaData(); + assertThat(metaData.getFields().size(), equalTo(1)); + assertThat(metaData.getFields().get(entry3.getHeader()), equalTo(entry3.getValue())); + + // No longer referencing any streams that have been acknowledged. + buffer = toBuffer(inc1, ack1, inc2, ack2); + _encoder.parseInstructions(buffer); + assertThat(BufferUtil.remaining(buffer), equalTo(0L)); + assertThat(_encoder.getStreamInfoMap().size(), equalTo(0)); + + // Encoder can now reference entries not acknowledged by the decoder again. + HttpField entry4 = new HttpField("name4", "value4"); + buffer = encode(_encoder, 4, toMetaData("GET", "/", "http", entry4)); + assertThat(BufferUtil.remaining(buffer), greaterThan(0L)); + instruction = _encoderHandler.getInstruction(); + assertThat(instruction, instanceOf(LiteralNameEntryInstruction.class)); + assertNull(_encoderHandler.getInstruction()); + decoded = _decoder.decode(4, buffer, _decoderHandler); + assertFalse(decoded); + } + + @Test + public void testMaxBlockedStreams() throws Exception + { + // Encoder will risk blocking 1 more stream than the decoder will allow. + _encoder.setMaxBlockedStreams(3); + _decoder.setMaxBlockedStreams(2); + + // Set capacity of the encoder & decoder to allow entries to be added to the table. + int capacity = 1024; + _encoder.setCapacity(capacity); + Instruction instruction = _encoderHandler.getInstruction(); + assertThat(instruction, instanceOf(SetCapacityInstruction.class)); + _decoder.parseInstructions(QpackTestUtil.toBuffer(instruction)); + + // Encode a new field, which will be added to table. But do not forward insertion instruction to decoder, + // this will cause decoder to become "blocked" on stream 0 until receives the instruction. + HttpField entry1 = new HttpField("name1", "value1"); + ByteBuffer buffer = encode(_encoder, 0, toMetaData("GET", "/", "http", entry1)); + assertThat(BufferUtil.remaining(buffer), greaterThan(0L)); + Instruction instruction1 = _encoderHandler.getInstruction(); + assertThat(instruction1, instanceOf(LiteralNameEntryInstruction.class)); + assertNull(_encoderHandler.getInstruction()); + + // Decoder will not be able to decode this header until it receives instruction. + boolean decoded = _decoder.decode(0, buffer, _decoderHandler); + assertFalse(decoded); + assertThat(BufferUtil.remaining(buffer), equalTo(0L)); + assertNull(_decoderHandler.getMetaData()); + assertNull(_decoderHandler.getInstruction()); + + // Encode second field with dynamic table, do not forward instruction to decoder. + HttpField entry2 = new HttpField("name1", "value2"); + buffer = encode(_encoder, 1, toMetaData("GET", "/", "http", entry2)); + assertThat(BufferUtil.remaining(buffer), greaterThan(0L)); + Instruction instruction2 = _encoderHandler.getInstruction(); + assertThat(instruction2, instanceOf(IndexedNameEntryInstruction.class)); + assertNull(_encoderHandler.getInstruction()); + + // Decoder will not be able to decode this header until it receives instruction. + decoded = _decoder.decode(1, buffer, _decoderHandler); + assertFalse(decoded); + assertNull(_decoderHandler.getMetaData()); + assertNull(_decoderHandler.getInstruction()); + + // This entry will block a 3rd stream which the decoder must not allow. + HttpField entry3 = new HttpField("name3", "value3"); + ByteBuffer encodedMetadata = encode(_encoder, 3, toMetaData("GET", "/", "http", entry3)); + assertThat(BufferUtil.remaining(encodedMetadata), greaterThan(0L)); + instruction = _encoderHandler.getInstruction(); + assertThat(instruction, instanceOf(LiteralNameEntryInstruction.class)); + assertNull(_encoderHandler.getInstruction()); + + assertThrows(SessionException.class, () -> _decoder.decode(3, encodedMetadata, _decoderHandler)); + } +} diff --git a/jetty-http3/http3-qpack/src/test/java/org/eclipse/jetty/http3/qpack/QpackTestUtil.java b/jetty-http3/http3-qpack/src/test/java/org/eclipse/jetty/http3/qpack/QpackTestUtil.java index 78174a1965b..fc12a5906f8 100644 --- a/jetty-http3/http3-qpack/src/test/java/org/eclipse/jetty/http3/qpack/QpackTestUtil.java +++ b/jetty-http3/http3-qpack/src/test/java/org/eclipse/jetty/http3/qpack/QpackTestUtil.java @@ -99,16 +99,27 @@ public class QpackTestUtil public static MetaData toMetaData(String method, String path, String scheme) { - return toMetaData(method, path, scheme, null); + return toMetaData(method, path, scheme, (HttpField)null); + } + + public static MetaData toMetaData(String method, String path, String scheme, HttpField... fields) + { + HttpFields.Mutable httpFields = HttpFields.build(); + for (HttpField field : fields) + { + httpFields.add(field); + } + + return toMetaData(method, path, scheme, httpFields); } public static MetaData toMetaData(String method, String path, String scheme, HttpFields.Mutable fields) { - if (fields == null) - fields = HttpFields.build(); - fields.put(":scheme", scheme); - fields.put(":method", method); - fields.put(":path", path); + fields = HttpFields.build() + .put(":scheme", scheme) + .put(":method", method) + .put(":path", path) + .add(fields); return new MetaData(HttpVersion.HTTP_3, fields); }