Issue #7635 - implement maxBlockedStreams logic in QpackDecoder

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2022-02-28 18:47:36 +11:00
parent dcdda2a7f0
commit 39ff60f595
1 changed files with 16 additions and 2 deletions

View File

@ -16,7 +16,11 @@ package org.eclipse.jetty.http3.qpack;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.MetaData;
@ -50,6 +54,7 @@ public class QpackDecoder implements Dumpable
private final List<EncodedFieldSection> _encodedFieldSections = new ArrayList<>();
private final NBitIntegerParser _integerDecoder = new NBitIntegerParser();
private final InstructionHandler _instructionHandler = new InstructionHandler();
private final Map<Long, AtomicInteger> _blockedStreams = new HashMap<>();
private int _maxHeaderSize;
private int _maxBlockedStreams;
@ -100,7 +105,6 @@ public class QpackDecoder implements Dumpable
public int getMaxBlockedStreams()
{
// TODO: implement logic about blocked streams by calling this method.
return _maxBlockedStreams;
}
@ -172,6 +176,10 @@ public class QpackDecoder implements Dumpable
{
if (LOG.isDebugEnabled())
LOG.debug("Deferred Decoding: streamId={}, encodedFieldSection={}", streamId, encodedFieldSection);
AtomicInteger blockedFields = _blockedStreams.computeIfAbsent(streamId, id -> new AtomicInteger(0));
blockedFields.incrementAndGet();
if (_blockedStreams.size() > _maxBlockedStreams)
throw new QpackException.SessionException(QPACK_DECOMPRESSION_FAILED, "exceeded max blocked streams");
_encodedFieldSections.add(encodedFieldSection);
}
@ -226,6 +234,7 @@ public class QpackDecoder implements Dumpable
public void streamCancellation(long streamId)
{
_encodedFieldSections.removeIf(encodedFieldSection -> encodedFieldSection.getStreamId() == streamId);
_blockedStreams.remove(streamId);
_metaDataNotifications.removeIf(notification -> notification._streamId == streamId);
_instructions.add(new StreamCancellationInstruction(streamId));
notifyInstructionHandler();
@ -234,12 +243,17 @@ public class QpackDecoder implements Dumpable
private void checkEncodedFieldSections() throws QpackException
{
int insertCount = _context.getDynamicTable().getInsertCount();
for (EncodedFieldSection encodedFieldSection : _encodedFieldSections)
Iterator<EncodedFieldSection> iterator = _encodedFieldSections.iterator();
while (iterator.hasNext())
{
EncodedFieldSection encodedFieldSection = iterator.next();
if (encodedFieldSection.getRequiredInsertCount() <= insertCount)
{
iterator.remove();
long streamId = encodedFieldSection.getStreamId();
MetaData metaData = encodedFieldSection.decode(_context, _maxHeaderSize);
if (_blockedStreams.get(streamId).decrementAndGet() <= 0)
_blockedStreams.remove(streamId);
if (LOG.isDebugEnabled())
LOG.debug("Decoded: streamId={}, metadata={}", streamId, metaData);