Add synchronization to both Qpack Encoder and Decoder.

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2021-03-05 15:20:25 +11:00 committed by Simone Bordet
parent 7c042b5205
commit f1e46c6127
4 changed files with 159 additions and 184 deletions

View File

@ -20,13 +20,9 @@ import java.util.List;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http3.qpack.generator.DuplicateInstruction;
import org.eclipse.jetty.http3.qpack.generator.IndexedNameEntryInstruction;
import org.eclipse.jetty.http3.qpack.generator.InsertCountIncrementInstruction;
import org.eclipse.jetty.http3.qpack.generator.Instruction;
import org.eclipse.jetty.http3.qpack.generator.LiteralNameEntryInstruction;
import org.eclipse.jetty.http3.qpack.generator.SectionAcknowledgmentInstruction;
import org.eclipse.jetty.http3.qpack.generator.SetCapacityInstruction;
import org.eclipse.jetty.http3.qpack.parser.EncodedFieldSection;
import org.eclipse.jetty.http3.qpack.parser.NBitIntegerParser;
import org.eclipse.jetty.http3.qpack.table.DynamicTable;
@ -69,51 +65,53 @@ public class QpackDecoder
public interface Handler
{
// TODO: should this have the streamId?
void onHttpFields(HttpFields httpFields);
void onHttpFields(int streamId, HttpFields httpFields);
void onInstruction(Instruction instruction);
}
public void decode(int streamId, ByteBuffer buffer) throws QpackException
{
if (LOG.isDebugEnabled())
LOG.debug(String.format("CtxTbl[%x] decoding %d octets", _context.hashCode(), buffer.remaining()));
// If the buffer is big, don't even think about decoding it
if (buffer.remaining() > _builder.getMaxSize())
throw new QpackException.SessionException("431 Request Header Fields too large");
_integerDecoder.setPrefix(8);
int encodedInsertCount = _integerDecoder.decode(buffer);
if (encodedInsertCount < 0)
throw new QpackException.CompressionException("Could not parse Required Insert Count");
_integerDecoder.setPrefix(7);
boolean signBit = (buffer.get(buffer.position()) & 0x80) != 0;
int deltaBase = _integerDecoder.decode(buffer);
if (deltaBase < 0)
throw new QpackException.CompressionException("Could not parse Delta Base");
// Decode the Required Insert Count using the DynamicTable state.
DynamicTable dynamicTable = _context.getDynamicTable();
int insertCount = dynamicTable.getInsertCount();
int maxDynamicTableSize = dynamicTable.getCapacity();
int requiredInsertCount = decodeInsertCount(encodedInsertCount, insertCount, maxDynamicTableSize);
// Parse the buffer into an Encoded Field Section.
int base = signBit ? requiredInsertCount - deltaBase - 1 : requiredInsertCount + deltaBase;
EncodedFieldSection encodedFieldSection = new EncodedFieldSection(streamId, requiredInsertCount, base);
encodedFieldSection.parse(buffer);
if (encodedFieldSection.getRequiredInsertCount() <= insertCount)
synchronized (this)
{
_handler.onHttpFields(encodedFieldSection.decode(_context));
_handler.onInstruction(new SectionAcknowledgmentInstruction(streamId));
}
else
{
_encodedFieldSections.add(encodedFieldSection);
if (LOG.isDebugEnabled())
LOG.debug(String.format("CtxTbl[%x] decoding %d octets", _context.hashCode(), buffer.remaining()));
// If the buffer is big, don't even think about decoding it
if (buffer.remaining() > _builder.getMaxSize())
throw new QpackException.SessionException("431 Request Header Fields too large");
_integerDecoder.setPrefix(8);
int encodedInsertCount = _integerDecoder.decode(buffer);
if (encodedInsertCount < 0)
throw new QpackException.CompressionException("Could not parse Required Insert Count");
_integerDecoder.setPrefix(7);
boolean signBit = (buffer.get(buffer.position()) & 0x80) != 0;
int deltaBase = _integerDecoder.decode(buffer);
if (deltaBase < 0)
throw new QpackException.CompressionException("Could not parse Delta Base");
// Decode the Required Insert Count using the DynamicTable state.
DynamicTable dynamicTable = _context.getDynamicTable();
int insertCount = dynamicTable.getInsertCount();
int maxDynamicTableSize = dynamicTable.getCapacity();
int requiredInsertCount = decodeInsertCount(encodedInsertCount, insertCount, maxDynamicTableSize);
// Parse the buffer into an Encoded Field Section.
int base = signBit ? requiredInsertCount - deltaBase - 1 : requiredInsertCount + deltaBase;
EncodedFieldSection encodedFieldSection = new EncodedFieldSection(streamId, requiredInsertCount, base);
encodedFieldSection.parse(buffer);
if (encodedFieldSection.getRequiredInsertCount() <= insertCount)
{
_handler.onHttpFields(streamId, encodedFieldSection.decode(_context));
_handler.onInstruction(new SectionAcknowledgmentInstruction(streamId));
}
else
{
_encodedFieldSections.add(encodedFieldSection);
}
}
}
@ -124,7 +122,7 @@ public class QpackDecoder
{
if (encodedFieldSection.getRequiredInsertCount() <= insertCount)
{
_handler.onHttpFields(encodedFieldSection.decode(_context));
_handler.onHttpFields(encodedFieldSection.getStreamId(), encodedFieldSection.decode(_context));
_handler.onInstruction(new SectionAcknowledgmentInstruction(encodedFieldSection.getStreamId()));
}
}
@ -132,69 +130,33 @@ public class QpackDecoder
public void setCapacity(int capacity)
{
_context.getDynamicTable().setCapacity(capacity);
synchronized (this)
{
_context.getDynamicTable().setCapacity(capacity);
}
}
public void insert(int index) throws QpackException
{
DynamicTable dynamicTable = _context.getDynamicTable();
Entry entry = dynamicTable.get(index);
// Add the new Entry to the DynamicTable.
dynamicTable.add(entry);
_handler.onInstruction(new InsertCountIncrementInstruction(1));
checkEncodedFieldSections();
}
public void insert(int nameIndex, boolean isDynamicTableIndex, String value) throws QpackException
{
StaticTable staticTable = _context.getStaticTable();
DynamicTable dynamicTable = _context.getDynamicTable();
Entry referencedEntry = isDynamicTableIndex ? dynamicTable.get(nameIndex) : staticTable.get(nameIndex);
// Add the new Entry to the DynamicTable.
Entry entry = new Entry(new HttpField(referencedEntry.getHttpField().getHeader(), referencedEntry.getHttpField().getName(), value));
dynamicTable.add(entry);
_handler.onInstruction(new InsertCountIncrementInstruction(1));
checkEncodedFieldSections();
}
public void insert(String name, String value) throws QpackException
{
DynamicTable dynamicTable = _context.getDynamicTable();
Entry entry = new Entry(new HttpField(name, value));
// Add the new Entry to the DynamicTable.
dynamicTable.add(entry);
_handler.onInstruction(new InsertCountIncrementInstruction(1));
checkEncodedFieldSections();
}
public void onInstruction(Instruction instruction) throws QpackException
{
StaticTable staticTable = _context.getStaticTable();
DynamicTable dynamicTable = _context.getDynamicTable();
if (instruction instanceof SetCapacityInstruction)
synchronized (this)
{
int capacity = ((SetCapacityInstruction)instruction).getCapacity();
dynamicTable.setCapacity(capacity);
}
else if (instruction instanceof DuplicateInstruction)
{
DuplicateInstruction duplicate = (DuplicateInstruction)instruction;
Entry entry = dynamicTable.get(duplicate.getIndex());
DynamicTable dynamicTable = _context.getDynamicTable();
Entry entry = dynamicTable.get(index);
// Add the new Entry to the DynamicTable.
dynamicTable.add(entry);
_handler.onInstruction(new InsertCountIncrementInstruction(1));
checkEncodedFieldSections();
}
else if (instruction instanceof IndexedNameEntryInstruction)
}
public void insert(int nameIndex, boolean isDynamicTableIndex, String value) throws QpackException
{
synchronized (this)
{
IndexedNameEntryInstruction nameEntryInstruction = (IndexedNameEntryInstruction)instruction;
int index = nameEntryInstruction.getIndex();
String value = nameEntryInstruction.getValue();
Entry referencedEntry = nameEntryInstruction.isDynamic() ? dynamicTable.get(index) : staticTable.get(index);
StaticTable staticTable = _context.getStaticTable();
DynamicTable dynamicTable = _context.getDynamicTable();
Entry referencedEntry = isDynamicTableIndex ? dynamicTable.get(nameIndex) : staticTable.get(nameIndex);
// Add the new Entry to the DynamicTable.
Entry entry = new Entry(new HttpField(referencedEntry.getHttpField().getHeader(), referencedEntry.getHttpField().getName(), value));
@ -202,11 +164,13 @@ public class QpackDecoder
_handler.onInstruction(new InsertCountIncrementInstruction(1));
checkEncodedFieldSections();
}
else if (instruction instanceof LiteralNameEntryInstruction)
}
public void insert(String name, String value) throws QpackException
{
synchronized (this)
{
LiteralNameEntryInstruction literalEntryInstruction = (LiteralNameEntryInstruction)instruction;
String name = literalEntryInstruction.getName();
String value = literalEntryInstruction.getValue();
DynamicTable dynamicTable = _context.getDynamicTable();
Entry entry = new Entry(new HttpField(name, value));
// Add the new Entry to the DynamicTable.
@ -214,10 +178,6 @@ public class QpackDecoder
_handler.onInstruction(new InsertCountIncrementInstruction(1));
checkEncodedFieldSections();
}
else
{
throw new IllegalStateException("Invalid Encoder Instruction");
}
}
private static int decodeInsertCount(int encInsertCount, int totalNumInserts, int maxTableCapacity) throws QpackException

View File

@ -103,7 +103,6 @@ public class QpackEncoder
private final Handler _handler;
private final QpackContext _context;
private final int _maxBlockedStreams;
private boolean _validateEncoding = true;
private int _knownInsertCount;
private int _blockedStreams = 0;
@ -123,66 +122,68 @@ public class QpackEncoder
_knownInsertCount = 0;
}
public void setCapacity(int capacity)
{
_context.getDynamicTable().setCapacity(capacity);
_handler.onInstruction(new SetCapacityInstruction(capacity));
}
public void insertCountIncrement(int increment) throws QpackException
{
int insertCount = _context.getDynamicTable().getInsertCount();
if (_knownInsertCount + increment > insertCount)
throw new QpackException.StreamException("KnownInsertCount incremented over InsertCount");
// TODO: release any references to entries which used to insert new entries.
for (Entry entry : _context.getDynamicTable())
{
if (entry.getIndex() > _knownInsertCount)
break;
}
_knownInsertCount += increment;
}
public void sectionAcknowledgement(int streamId) throws QpackException
{
StreamInfo streamInfo = _streamInfoMap.get(streamId);
if (streamInfo == null)
throw new QpackException.StreamException("No StreamInfo for " + streamId);
// The KnownInsertCount should be updated to the earliest sent RequiredInsertCount on that stream.
StreamInfo.SectionInfo sectionInfo = streamInfo.acknowledge();
_knownInsertCount = Math.max(_knownInsertCount, sectionInfo.getRequiredInsertCount());
// If we have no more outstanding section acknowledgments remove the StreamInfo.
if (streamInfo.isEmpty())
_streamInfoMap.remove(streamId);
}
public void streamCancellation(int streamId) throws QpackException
{
StreamInfo streamInfo = _streamInfoMap.remove(streamId);
if (streamInfo == null)
throw new QpackException.StreamException("No StreamInfo for " + streamId);
}
public QpackContext getQpackContext()
{
return _context;
}
public boolean isValidateEncoding()
public void setCapacity(int capacity)
{
return _validateEncoding;
synchronized (this)
{
_context.getDynamicTable().setCapacity(capacity);
_handler.onInstruction(new SetCapacityInstruction(capacity));
}
}
public void setValidateEncoding(boolean validateEncoding)
public void insertCountIncrement(int increment) throws QpackException
{
_validateEncoding = validateEncoding;
synchronized (this)
{
int insertCount = _context.getDynamicTable().getInsertCount();
if (_knownInsertCount + increment > insertCount)
throw new QpackException.StreamException("KnownInsertCount incremented over InsertCount");
// TODO: release any references to entries which used to insert new entries.
for (Entry entry : _context.getDynamicTable())
{
if (entry.getIndex() > _knownInsertCount)
break;
}
_knownInsertCount += increment;
}
}
public boolean referenceEntry(Entry entry)
public void sectionAcknowledgement(int streamId) throws QpackException
{
synchronized (this)
{
StreamInfo streamInfo = _streamInfoMap.get(streamId);
if (streamInfo == null)
throw new QpackException.StreamException("No StreamInfo for " + streamId);
// The KnownInsertCount should be updated to the earliest sent RequiredInsertCount on that stream.
StreamInfo.SectionInfo sectionInfo = streamInfo.acknowledge();
_knownInsertCount = Math.max(_knownInsertCount, sectionInfo.getRequiredInsertCount());
// If we have no more outstanding section acknowledgments remove the StreamInfo.
if (streamInfo.isEmpty())
_streamInfoMap.remove(streamId);
}
}
public void streamCancellation(int streamId) throws QpackException
{
synchronized (this)
{
StreamInfo streamInfo = _streamInfoMap.remove(streamId);
if (streamInfo == null)
throw new QpackException.StreamException("No StreamInfo for " + streamId);
}
}
private boolean referenceEntry(Entry entry)
{
if (entry == null)
return false;
@ -204,7 +205,7 @@ public class QpackEncoder
return false;
}
public boolean referenceEntry(Entry entry, StreamInfo streamInfo)
private boolean referenceEntry(Entry entry, StreamInfo streamInfo)
{
if (referenceEntry(entry))
return true;
@ -226,7 +227,7 @@ public class QpackEncoder
return false;
}
public boolean shouldIndex(HttpField httpField)
protected boolean shouldIndex(HttpField httpField)
{
return !DO_NOT_INDEX.contains(httpField.getHeader());
}
@ -239,7 +240,7 @@ public class QpackEncoder
public ByteBuffer encode(int streamId, HttpFields httpFields) throws QpackException
{
// Verify that we can encode without errors.
if (isValidateEncoding() && httpFields != null)
if (httpFields != null)
{
for (HttpField field : httpFields)
{
@ -250,36 +251,43 @@ public class QpackEncoder
}
}
StreamInfo streamInfo = _streamInfoMap.get(streamId);
if (streamInfo == null)
{
streamInfo = new StreamInfo(streamId);
_streamInfoMap.put(streamId, streamInfo);
}
StreamInfo.SectionInfo sectionInfo = new StreamInfo.SectionInfo();
streamInfo.add(sectionInfo);
int requiredInsertCount = 0;
int base;
int encodedInsertCount;
boolean signBit;
int deltaBase;
List<EncodableEntry> encodableEntries = new ArrayList<>();
if (httpFields != null)
synchronized (this)
{
for (HttpField field : httpFields)
StreamInfo streamInfo = _streamInfoMap.get(streamId);
if (streamInfo == null)
{
EncodableEntry entry = encode(streamInfo, field);
encodableEntries.add(entry);
// Update the required InsertCount.
int entryRequiredInsertCount = entry.getRequiredInsertCount();
if (entryRequiredInsertCount > requiredInsertCount)
requiredInsertCount = entryRequiredInsertCount;
streamInfo = new StreamInfo(streamId);
_streamInfoMap.put(streamId, streamInfo);
}
}
StreamInfo.SectionInfo sectionInfo = new StreamInfo.SectionInfo();
streamInfo.add(sectionInfo);
DynamicTable dynamicTable = _context.getDynamicTable();
int base = dynamicTable.getBase();
int encodedInsertCount = encodeInsertCount(requiredInsertCount, dynamicTable.getCapacity());
boolean signBit = base < requiredInsertCount;
int deltaBase = signBit ? requiredInsertCount - base - 1 : base - requiredInsertCount;
int requiredInsertCount = 0;
if (httpFields != null)
{
for (HttpField field : httpFields)
{
EncodableEntry entry = encode(streamInfo, field);
encodableEntries.add(entry);
// Update the required InsertCount.
int entryRequiredInsertCount = entry.getRequiredInsertCount();
if (entryRequiredInsertCount > requiredInsertCount)
requiredInsertCount = entryRequiredInsertCount;
}
}
DynamicTable dynamicTable = _context.getDynamicTable();
base = dynamicTable.getBase();
encodedInsertCount = encodeInsertCount(requiredInsertCount, dynamicTable.getCapacity());
signBit = base < requiredInsertCount;
deltaBase = signBit ? requiredInsertCount - base - 1 : base - requiredInsertCount;
}
// TODO: Calculate the size required.
ByteBuffer buffer = _bufferPool.acquire(1024, false);
@ -400,7 +408,7 @@ public class QpackEncoder
return true;
}
public static int encodeInsertCount(int reqInsertCount, int maxTableCapacity)
private static int encodeInsertCount(int reqInsertCount, int maxTableCapacity)
{
if (reqInsertCount == 0)
return 0;

View File

@ -13,8 +13,10 @@
package org.eclipse.jetty.http3.qpack.table;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
@ -25,9 +27,8 @@ import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http3.qpack.StaticTableHttpField;
import org.eclipse.jetty.util.Index;
public class StaticTable
public class StaticTable implements Iterable<Entry>
{
private static final String EMPTY = "";
public static final String[][] STATIC_TABLE =
{
{":authority", ""},
@ -230,4 +231,10 @@ public class StaticTable
return null;
return _staticTableByHeader[index];
}
@Override
public Iterator<Entry> iterator()
{
return Arrays.stream(_staticTable).map(e -> (Entry)e).iterator();
}
}

View File

@ -25,7 +25,7 @@ public class TestDecoderHandler implements QpackDecoder.Handler
private final Queue<Instruction> _instructionList = new LinkedList<>();
@Override
public void onHttpFields(HttpFields httpFields)
public void onHttpFields(int streamId, HttpFields httpFields)
{
_httpFieldsList.add(httpFields);
}