diff --git a/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/QpackEncoder.java b/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/QpackEncoder.java index 05b5c412692..b111d3bbd43 100644 --- a/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/QpackEncoder.java +++ b/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/QpackEncoder.java @@ -20,7 +20,6 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.http.HttpFields; @@ -104,10 +103,12 @@ public class QpackEncoder private final Handler _handler; private final QpackContext _context; private final int _maxBlockedStreams; - private final Map _blockedStreams = new HashMap<>(); private boolean _validateEncoding = true; private int _knownInsertCount; + private int _blockedStreams = 0; + private final Map _streamInfoMap = new HashMap<>(); + public QpackEncoder(Handler handler, int maxBlockedStreams) { this(handler, maxBlockedStreams, new NullByteBufferPool()); @@ -122,32 +123,6 @@ public class QpackEncoder _knownInsertCount = 0; } - private boolean acquireBlockedStream(int streamId) - { - AtomicInteger atomicInteger = _blockedStreams.get(streamId); - if (atomicInteger == null && (_blockedStreams.size() > _maxBlockedStreams)) - return false; - - if (atomicInteger == null) - { - atomicInteger = new AtomicInteger(); - _blockedStreams.put(streamId, atomicInteger); - } - - atomicInteger.incrementAndGet(); - return true; - } - - private void releaseBlockedStream(int streamId) - { - AtomicInteger atomicInteger = _blockedStreams.get(streamId); - if (atomicInteger == null) - throw new IllegalArgumentException("Invalid Stream ID"); - - if (atomicInteger.decrementAndGet() == 0) - _blockedStreams.remove(streamId); - } - public void setCapacity(int capacity) { _context.getDynamicTable().setCapacity(capacity); @@ -159,14 +134,26 @@ public class QpackEncoder _knownInsertCount += increment; } - public void sectionAcknowledgement(int streamId) + public void sectionAcknowledgement(int streamId) throws QpackException { - // TODO: Implement. + StreamInfo streamInfo = _streamInfoMap.get(streamId); + if (streamInfo == null) + throw new QpackException.StreamException("No StreamInfo for " + streamId); + + // The KnownInsertCount should be updated to the earliest sent RequiredInsertCount on that stream. + StreamInfo.SectionInfo sectionInfo = streamInfo.acknowledge(); + _knownInsertCount = Math.max(_knownInsertCount, sectionInfo.getRequiredInsertCount()); + + // If we have no more outstanding section acknowledgments remove the StreamInfo. + if (streamInfo.isEmpty()) + _streamInfoMap.remove(streamId); } - public void streamCancellation(int streamId) + public void streamCancellation(int streamId) throws QpackException { - // TODO: Implement. + StreamInfo streamInfo = _streamInfoMap.remove(streamId); + if (streamInfo == null) + throw new QpackException.StreamException("No StreamInfo for " + streamId); } public QpackContext getQpackContext() @@ -184,7 +171,7 @@ public class QpackEncoder _validateEncoding = validateEncoding; } - public boolean referenceEntry(Entry entry, int streamId) + public boolean referenceEntry(Entry entry, StreamInfo streamInfo) { if (entry == null) return false; @@ -196,17 +183,28 @@ public class QpackEncoder if (inEvictionZone) return false; + // If they have already acknowledged this entry we can reference it straight away. if (_knownInsertCount >= entry.getIndex() + 1) { entry.reference(); return true; } - // We might be able to risk blocking the decoder stream and reference this immediately. - boolean riskBlockedStream = acquireBlockedStream(streamId); - if (riskBlockedStream) - entry.reference(); - return riskBlockedStream; + // We may need to risk blocking the stream in order to reference it. + if (streamInfo.isBlocked()) + { + streamInfo.getCurrentSectionInfo().block(); + return true; + } + + if (_blockedStreams < _maxBlockedStreams) + { + _blockedStreams++; + streamInfo.getCurrentSectionInfo().block(); + return true; + } + + return false; } public static boolean shouldIndex(HttpField httpField) @@ -233,13 +231,22 @@ public class QpackEncoder } } + StreamInfo streamInfo = _streamInfoMap.get(streamId); + if (streamInfo == null) + { + streamInfo = new StreamInfo(streamId); + _streamInfoMap.put(streamId, streamInfo); + } + StreamInfo.SectionInfo sectionInfo = new StreamInfo.SectionInfo(); + streamInfo.add(sectionInfo); + int requiredInsertCount = 0; List encodableEntries = new ArrayList<>(); if (httpFields != null) { for (HttpField field : httpFields) { - EncodableEntry entry = encode(streamId, field); + EncodableEntry entry = encode(streamInfo, field); encodableEntries.add(entry); // Update the required InsertCount. @@ -274,7 +281,7 @@ public class QpackEncoder return buffer; } - private EncodableEntry encode(int streamId, HttpField field) + private EncodableEntry encode(StreamInfo streamInfo, HttpField field) { DynamicTable dynamicTable = _context.getDynamicTable(); @@ -288,7 +295,7 @@ public class QpackEncoder boolean canCreateEntry = shouldIndex(field) && (Entry.getSize(field) <= dynamicTable.getSpace()); Entry entry = _context.get(field); - if (entry != null && referenceEntry(entry, streamId)) + if (entry != null && referenceEntry(entry, streamInfo)) { return new EncodableEntry(entry); } @@ -302,14 +309,14 @@ public class QpackEncoder _handler.onInstruction(new DuplicateInstruction(entry.getIndex())); // Should we reference this entry and risk blocking. - if (referenceEntry(newEntry, streamId)) + if (referenceEntry(newEntry, streamInfo)) return new EncodableEntry(newEntry); } } boolean huffman = shouldHuffmanEncode(field); Entry nameEntry = _context.get(field.getName()); - if (nameEntry != null && referenceEntry(nameEntry, streamId)) + if (nameEntry != null && referenceEntry(nameEntry, streamInfo)) { // Should we copy this entry if (canCreateEntry) @@ -319,7 +326,7 @@ public class QpackEncoder _handler.onInstruction(new IndexedNameEntryInstruction(!nameEntry.isStatic(), nameEntry.getIndex(), huffman, field.getValue())); // Should we reference this entry and risk blocking. - if (referenceEntry(newEntry, streamId)) + if (referenceEntry(newEntry, streamInfo)) return new EncodableEntry(newEntry); } @@ -334,7 +341,7 @@ public class QpackEncoder _handler.onInstruction(new LiteralNameEntryInstruction(huffman, field.getName(), huffman, field.getValue())); // Should we reference this entry and risk blocking. - if (referenceEntry(newEntry, streamId)) + if (referenceEntry(newEntry, streamInfo)) return new EncodableEntry(newEntry); } diff --git a/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/StreamInfo.java b/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/StreamInfo.java new file mode 100644 index 00000000000..d124ffc6336 --- /dev/null +++ b/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/StreamInfo.java @@ -0,0 +1,90 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.http3.qpack; + +import java.util.LinkedList; +import java.util.Queue; + +public class StreamInfo +{ + private final int _streamId; + private final Queue _sectionInfos = new LinkedList<>(); + + public StreamInfo(int streamId) + { + _streamId = streamId; + } + + public int getStreamId() + { + return _streamId; + } + + public void add(SectionInfo sectionInfo) + { + _sectionInfos.add(sectionInfo); + } + + public SectionInfo getCurrentSectionInfo() + { + return _sectionInfos.peek(); + } + + public SectionInfo acknowledge() + { + return _sectionInfos.poll(); + } + + public boolean isEmpty() + { + return _sectionInfos.isEmpty(); + } + + public boolean isBlocked() + { + for (SectionInfo info : _sectionInfos) + { + if (info.isBlocking()) + return true; + } + + return false; + } + + public static class SectionInfo + { + private int _requiredInsertCount; + private boolean _block = false; + + public void block() + { + _block = true; + } + + public boolean isBlocking() + { + return _block; + } + + public void setRequiredInsertCount(int requiredInsertCount) + { + _requiredInsertCount = requiredInsertCount; + } + + public int getRequiredInsertCount() + { + return _requiredInsertCount; + } + } +} diff --git a/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/parser/DecoderInstructionParser.java b/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/parser/DecoderInstructionParser.java index ec132957459..ad9ff98df21 100644 --- a/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/parser/DecoderInstructionParser.java +++ b/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/parser/DecoderInstructionParser.java @@ -16,6 +16,7 @@ package org.eclipse.jetty.http3.qpack.parser; import java.nio.ByteBuffer; import org.eclipse.jetty.http3.qpack.QpackEncoder; +import org.eclipse.jetty.http3.qpack.QpackException; /** * Receives instructions coming from the remote Encoder as a sequence of unframed instructions. @@ -40,11 +41,11 @@ public class DecoderInstructionParser public interface Handler { - void onSectionAcknowledgement(int streamId); + void onSectionAcknowledgement(int streamId) throws QpackException; - void onStreamCancellation(int streamId); + void onStreamCancellation(int streamId) throws QpackException; - void onInsertCountIncrement(int increment); + void onInsertCountIncrement(int increment) throws QpackException; } public static class EncoderHandler implements Handler @@ -57,19 +58,19 @@ public class DecoderInstructionParser } @Override - public void onSectionAcknowledgement(int streamId) + public void onSectionAcknowledgement(int streamId) throws QpackException { _encoder.sectionAcknowledgement(streamId); } @Override - public void onStreamCancellation(int streamId) + public void onStreamCancellation(int streamId) throws QpackException { _encoder.streamCancellation(streamId); } @Override - public void onInsertCountIncrement(int increment) + public void onInsertCountIncrement(int increment) throws QpackException { _encoder.insertCountIncrement(increment); } @@ -86,7 +87,7 @@ public class DecoderInstructionParser _integerParser = new NBitIntegerParser(); } - public void parse(ByteBuffer buffer) + public void parse(ByteBuffer buffer) throws QpackException { if (buffer == null || !buffer.hasRemaining()) return; @@ -133,7 +134,7 @@ public class DecoderInstructionParser } } - private void parseSectionAcknowledgment(ByteBuffer buffer) + private void parseSectionAcknowledgment(ByteBuffer buffer) throws QpackException { int streamId = _integerParser.decode(buffer); if (streamId >= 0) @@ -143,7 +144,7 @@ public class DecoderInstructionParser } } - private void parseStreamCancellation(ByteBuffer buffer) + private void parseStreamCancellation(ByteBuffer buffer) throws QpackException { int streamId = _integerParser.decode(buffer); if (streamId >= 0) @@ -153,7 +154,7 @@ public class DecoderInstructionParser } } - private void parseInsertCountIncrement(ByteBuffer buffer) + private void parseInsertCountIncrement(ByteBuffer buffer) throws QpackException { int increment = _integerParser.decode(buffer); if (increment >= 0) diff --git a/jetty-http3/http3-qpack/src/test/java/org/eclipse/jetty/http3/qpack/DecoderInstructionParserTest.java b/jetty-http3/http3-qpack/src/test/java/org/eclipse/jetty/http3/qpack/DecoderInstructionParserTest.java index 4fa95f2399e..e52136ecada 100644 --- a/jetty-http3/http3-qpack/src/test/java/org/eclipse/jetty/http3/qpack/DecoderInstructionParserTest.java +++ b/jetty-http3/http3-qpack/src/test/java/org/eclipse/jetty/http3/qpack/DecoderInstructionParserTest.java @@ -60,7 +60,7 @@ public class DecoderInstructionParserTest } @Test - public void testSectionAcknowledgement() + public void testSectionAcknowledgement() throws Exception { DebugHandler debugHandler = new DebugHandler(); DecoderInstructionParser incomingEncoderStream = new DecoderInstructionParser(debugHandler);