Implement SectionAcknowledgment and StreamCancellation

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2021-03-04 14:50:28 +11:00 committed by Simone Bordet
parent c271b70d29
commit f8e2831185
4 changed files with 154 additions and 56 deletions

View File

@ -20,7 +20,6 @@ import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
@ -104,10 +103,12 @@ public class QpackEncoder
private final Handler _handler;
private final QpackContext _context;
private final int _maxBlockedStreams;
private final Map<Integer, AtomicInteger> _blockedStreams = new HashMap<>();
private boolean _validateEncoding = true;
private int _knownInsertCount;
private int _blockedStreams = 0;
private final Map<Integer, StreamInfo> _streamInfoMap = new HashMap<>();
public QpackEncoder(Handler handler, int maxBlockedStreams)
{
this(handler, maxBlockedStreams, new NullByteBufferPool());
@ -122,32 +123,6 @@ public class QpackEncoder
_knownInsertCount = 0;
}
private boolean acquireBlockedStream(int streamId)
{
AtomicInteger atomicInteger = _blockedStreams.get(streamId);
if (atomicInteger == null && (_blockedStreams.size() > _maxBlockedStreams))
return false;
if (atomicInteger == null)
{
atomicInteger = new AtomicInteger();
_blockedStreams.put(streamId, atomicInteger);
}
atomicInteger.incrementAndGet();
return true;
}
private void releaseBlockedStream(int streamId)
{
AtomicInteger atomicInteger = _blockedStreams.get(streamId);
if (atomicInteger == null)
throw new IllegalArgumentException("Invalid Stream ID");
if (atomicInteger.decrementAndGet() == 0)
_blockedStreams.remove(streamId);
}
public void setCapacity(int capacity)
{
_context.getDynamicTable().setCapacity(capacity);
@ -159,14 +134,26 @@ public class QpackEncoder
_knownInsertCount += increment;
}
public void sectionAcknowledgement(int streamId)
public void sectionAcknowledgement(int streamId) throws QpackException
{
// TODO: Implement.
StreamInfo streamInfo = _streamInfoMap.get(streamId);
if (streamInfo == null)
throw new QpackException.StreamException("No StreamInfo for " + streamId);
// The KnownInsertCount should be updated to the earliest sent RequiredInsertCount on that stream.
StreamInfo.SectionInfo sectionInfo = streamInfo.acknowledge();
_knownInsertCount = Math.max(_knownInsertCount, sectionInfo.getRequiredInsertCount());
// If we have no more outstanding section acknowledgments remove the StreamInfo.
if (streamInfo.isEmpty())
_streamInfoMap.remove(streamId);
}
public void streamCancellation(int streamId)
public void streamCancellation(int streamId) throws QpackException
{
// TODO: Implement.
StreamInfo streamInfo = _streamInfoMap.remove(streamId);
if (streamInfo == null)
throw new QpackException.StreamException("No StreamInfo for " + streamId);
}
public QpackContext getQpackContext()
@ -184,7 +171,7 @@ public class QpackEncoder
_validateEncoding = validateEncoding;
}
public boolean referenceEntry(Entry entry, int streamId)
public boolean referenceEntry(Entry entry, StreamInfo streamInfo)
{
if (entry == null)
return false;
@ -196,17 +183,28 @@ public class QpackEncoder
if (inEvictionZone)
return false;
// If they have already acknowledged this entry we can reference it straight away.
if (_knownInsertCount >= entry.getIndex() + 1)
{
entry.reference();
return true;
}
// We might be able to risk blocking the decoder stream and reference this immediately.
boolean riskBlockedStream = acquireBlockedStream(streamId);
if (riskBlockedStream)
entry.reference();
return riskBlockedStream;
// We may need to risk blocking the stream in order to reference it.
if (streamInfo.isBlocked())
{
streamInfo.getCurrentSectionInfo().block();
return true;
}
if (_blockedStreams < _maxBlockedStreams)
{
_blockedStreams++;
streamInfo.getCurrentSectionInfo().block();
return true;
}
return false;
}
public static boolean shouldIndex(HttpField httpField)
@ -233,13 +231,22 @@ public class QpackEncoder
}
}
StreamInfo streamInfo = _streamInfoMap.get(streamId);
if (streamInfo == null)
{
streamInfo = new StreamInfo(streamId);
_streamInfoMap.put(streamId, streamInfo);
}
StreamInfo.SectionInfo sectionInfo = new StreamInfo.SectionInfo();
streamInfo.add(sectionInfo);
int requiredInsertCount = 0;
List<EncodableEntry> encodableEntries = new ArrayList<>();
if (httpFields != null)
{
for (HttpField field : httpFields)
{
EncodableEntry entry = encode(streamId, field);
EncodableEntry entry = encode(streamInfo, field);
encodableEntries.add(entry);
// Update the required InsertCount.
@ -274,7 +281,7 @@ public class QpackEncoder
return buffer;
}
private EncodableEntry encode(int streamId, HttpField field)
private EncodableEntry encode(StreamInfo streamInfo, HttpField field)
{
DynamicTable dynamicTable = _context.getDynamicTable();
@ -288,7 +295,7 @@ public class QpackEncoder
boolean canCreateEntry = shouldIndex(field) && (Entry.getSize(field) <= dynamicTable.getSpace());
Entry entry = _context.get(field);
if (entry != null && referenceEntry(entry, streamId))
if (entry != null && referenceEntry(entry, streamInfo))
{
return new EncodableEntry(entry);
}
@ -302,14 +309,14 @@ public class QpackEncoder
_handler.onInstruction(new DuplicateInstruction(entry.getIndex()));
// Should we reference this entry and risk blocking.
if (referenceEntry(newEntry, streamId))
if (referenceEntry(newEntry, streamInfo))
return new EncodableEntry(newEntry);
}
}
boolean huffman = shouldHuffmanEncode(field);
Entry nameEntry = _context.get(field.getName());
if (nameEntry != null && referenceEntry(nameEntry, streamId))
if (nameEntry != null && referenceEntry(nameEntry, streamInfo))
{
// Should we copy this entry
if (canCreateEntry)
@ -319,7 +326,7 @@ public class QpackEncoder
_handler.onInstruction(new IndexedNameEntryInstruction(!nameEntry.isStatic(), nameEntry.getIndex(), huffman, field.getValue()));
// Should we reference this entry and risk blocking.
if (referenceEntry(newEntry, streamId))
if (referenceEntry(newEntry, streamInfo))
return new EncodableEntry(newEntry);
}
@ -334,7 +341,7 @@ public class QpackEncoder
_handler.onInstruction(new LiteralNameEntryInstruction(huffman, field.getName(), huffman, field.getValue()));
// Should we reference this entry and risk blocking.
if (referenceEntry(newEntry, streamId))
if (referenceEntry(newEntry, streamInfo))
return new EncodableEntry(newEntry);
}

View File

@ -0,0 +1,90 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.http3.qpack;
import java.util.LinkedList;
import java.util.Queue;
public class StreamInfo
{
private final int _streamId;
private final Queue<SectionInfo> _sectionInfos = new LinkedList<>();
public StreamInfo(int streamId)
{
_streamId = streamId;
}
public int getStreamId()
{
return _streamId;
}
public void add(SectionInfo sectionInfo)
{
_sectionInfos.add(sectionInfo);
}
public SectionInfo getCurrentSectionInfo()
{
return _sectionInfos.peek();
}
public SectionInfo acknowledge()
{
return _sectionInfos.poll();
}
public boolean isEmpty()
{
return _sectionInfos.isEmpty();
}
public boolean isBlocked()
{
for (SectionInfo info : _sectionInfos)
{
if (info.isBlocking())
return true;
}
return false;
}
public static class SectionInfo
{
private int _requiredInsertCount;
private boolean _block = false;
public void block()
{
_block = true;
}
public boolean isBlocking()
{
return _block;
}
public void setRequiredInsertCount(int requiredInsertCount)
{
_requiredInsertCount = requiredInsertCount;
}
public int getRequiredInsertCount()
{
return _requiredInsertCount;
}
}
}

View File

@ -16,6 +16,7 @@ package org.eclipse.jetty.http3.qpack.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http3.qpack.QpackEncoder;
import org.eclipse.jetty.http3.qpack.QpackException;
/**
* Receives instructions coming from the remote Encoder as a sequence of unframed instructions.
@ -40,11 +41,11 @@ public class DecoderInstructionParser
public interface Handler
{
void onSectionAcknowledgement(int streamId);
void onSectionAcknowledgement(int streamId) throws QpackException;
void onStreamCancellation(int streamId);
void onStreamCancellation(int streamId) throws QpackException;
void onInsertCountIncrement(int increment);
void onInsertCountIncrement(int increment) throws QpackException;
}
public static class EncoderHandler implements Handler
@ -57,19 +58,19 @@ public class DecoderInstructionParser
}
@Override
public void onSectionAcknowledgement(int streamId)
public void onSectionAcknowledgement(int streamId) throws QpackException
{
_encoder.sectionAcknowledgement(streamId);
}
@Override
public void onStreamCancellation(int streamId)
public void onStreamCancellation(int streamId) throws QpackException
{
_encoder.streamCancellation(streamId);
}
@Override
public void onInsertCountIncrement(int increment)
public void onInsertCountIncrement(int increment) throws QpackException
{
_encoder.insertCountIncrement(increment);
}
@ -86,7 +87,7 @@ public class DecoderInstructionParser
_integerParser = new NBitIntegerParser();
}
public void parse(ByteBuffer buffer)
public void parse(ByteBuffer buffer) throws QpackException
{
if (buffer == null || !buffer.hasRemaining())
return;
@ -133,7 +134,7 @@ public class DecoderInstructionParser
}
}
private void parseSectionAcknowledgment(ByteBuffer buffer)
private void parseSectionAcknowledgment(ByteBuffer buffer) throws QpackException
{
int streamId = _integerParser.decode(buffer);
if (streamId >= 0)
@ -143,7 +144,7 @@ public class DecoderInstructionParser
}
}
private void parseStreamCancellation(ByteBuffer buffer)
private void parseStreamCancellation(ByteBuffer buffer) throws QpackException
{
int streamId = _integerParser.decode(buffer);
if (streamId >= 0)
@ -153,7 +154,7 @@ public class DecoderInstructionParser
}
}
private void parseInsertCountIncrement(ByteBuffer buffer)
private void parseInsertCountIncrement(ByteBuffer buffer) throws QpackException
{
int increment = _integerParser.decode(buffer);
if (increment >= 0)

View File

@ -60,7 +60,7 @@ public class DecoderInstructionParserTest
}
@Test
public void testSectionAcknowledgement()
public void testSectionAcknowledgement() throws Exception
{
DebugHandler debugHandler = new DebugHandler();
DecoderInstructionParser incomingEncoderStream = new DecoderInstructionParser(debugHandler);