Issue #6728 - QUIC and HTTP/3

- Fixed parsing of HEADERS frames.
- Fixed locking in QpackEncoder.
- Fixed creation of QuicStreamEndPoints.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-11-05 18:03:35 +01:00
parent cd1343fd6c
commit a7ec4ff525
5 changed files with 103 additions and 89 deletions

View File

@ -255,7 +255,7 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
{
buffer.release();
if (LOG.isDebugEnabled())
LOG.debug("retained released {}", buffer);
LOG.debug("released retained {}", buffer);
}
public void demand()

View File

@ -23,7 +23,6 @@ import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.internal.HTTP3ErrorCode;
import org.eclipse.jetty.http3.qpack.QpackDecoder;
import org.eclipse.jetty.http3.qpack.QpackException;
import org.eclipse.jetty.util.BufferUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -72,7 +71,9 @@ public class HeadersBodyParser extends BodyParser
{
// Copy and accumulate the buffer.
length -= remaining;
ByteBuffer copy = BufferUtil.copy(buffer);
ByteBuffer copy = buffer.isDirect() ? ByteBuffer.allocateDirect(remaining) : ByteBuffer.allocate(remaining);
copy.put(buffer);
copy.flip();
byteBuffers.add(copy);
return Result.NO_FRAME;
}
@ -96,6 +97,7 @@ public class HeadersBodyParser extends BodyParser
byteBuffers.add(slice);
int capacity = byteBuffers.stream().mapToInt(ByteBuffer::remaining).sum();
encoded = byteBuffers.stream().reduce(ByteBuffer.allocate(capacity), ByteBuffer::put);
encoded.flip();
byteBuffers.clear();
}

View File

@ -40,6 +40,7 @@ import org.eclipse.jetty.http3.qpack.internal.table.Entry;
import org.eclipse.jetty.http3.qpack.internal.util.NBitIntegerEncoder;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -86,6 +87,7 @@ public class QpackEncoder implements Dumpable
HttpHeader.SET_COOKIE,
HttpHeader.SET_COOKIE2);
private final AutoLock lock = new AutoLock();
private final List<Instruction> _instructions = new ArrayList<>();
private final Instruction.Handler _handler;
private final QpackContext _context;
@ -129,83 +131,86 @@ public class QpackEncoder implements Dumpable
*/
public void encode(ByteBuffer buffer, long streamId, MetaData metadata) throws QpackException
{
if (LOG.isDebugEnabled())
LOG.debug("Encoding: streamId={}, metadata={}", streamId, metadata);
// Verify that we can encode without errors.
if (metadata.getFields() != null)
try (AutoLock l = lock.lock())
{
for (HttpField field : metadata.getFields())
if (LOG.isDebugEnabled())
LOG.debug("Encoding: streamId={}, metadata={}", streamId, metadata);
// Verify that we can encode without errors.
if (metadata.getFields() != null)
{
String name = field.getName();
char firstChar = name.charAt(0);
if (firstChar <= ' ')
throw new QpackException.StreamException(H3_GENERAL_PROTOCOL_ERROR, String.format("Invalid header name: '%s'", name));
}
}
List<EncodableEntry> encodableEntries = new ArrayList<>();
DynamicTable dynamicTable = _context.getDynamicTable();
// We need to remember what fields were referenced for each stream for multiple reasons:
// 1. We can only (potentially) block up to SETTINGS_QPACK_BLOCKED_STREAMS by referencing entries which may not have arrived.
// 2. We need to remember reference counts to each entry which are then acknowledged by the remote decoder, this
// allows us to know when we can evict an entry (when it has no un-acknowledged references).
StreamInfo streamInfo = _streamInfoMap.get(streamId);
if (streamInfo == null)
{
streamInfo = new StreamInfo(streamId);
_streamInfoMap.put(streamId, streamInfo);
}
StreamInfo.SectionInfo sectionInfo = new StreamInfo.SectionInfo();
streamInfo.add(sectionInfo);
try
{
int requiredInsertCount = 0;
for (HttpField field : new Http3Fields(metadata))
{
EncodableEntry entry = encode(streamInfo, field);
encodableEntries.add(entry);
// Update the required InsertCount.
int entryRequiredInsertCount = entry.getRequiredInsertCount();
if (entryRequiredInsertCount > requiredInsertCount)
requiredInsertCount = entryRequiredInsertCount;
for (HttpField field : metadata.getFields())
{
String name = field.getName();
char firstChar = name.charAt(0);
if (firstChar <= ' ')
throw new QpackException.StreamException(H3_GENERAL_PROTOCOL_ERROR, String.format("Invalid header name: '%s'", name));
}
}
sectionInfo.setRequiredInsertCount(requiredInsertCount);
int base = dynamicTable.getBase();
int encodedInsertCount = encodeInsertCount(requiredInsertCount, dynamicTable.getCapacity());
boolean signBit = base < requiredInsertCount;
int deltaBase = signBit ? requiredInsertCount - base - 1 : base - requiredInsertCount;
List<EncodableEntry> encodableEntries = new ArrayList<>();
DynamicTable dynamicTable = _context.getDynamicTable();
// Encode the Field Section Prefix into the ByteBuffer.
NBitIntegerEncoder.encode(buffer, 8, encodedInsertCount);
buffer.put(signBit ? (byte)0x80 : (byte)0x00);
NBitIntegerEncoder.encode(buffer, 7, deltaBase);
// Encode the field lines into the ByteBuffer.
for (EncodableEntry entry : encodableEntries)
// We need to remember what fields were referenced for each stream for multiple reasons:
// 1. We can only (potentially) block up to SETTINGS_QPACK_BLOCKED_STREAMS by referencing entries which may not have arrived.
// 2. We need to remember reference counts to each entry which are then acknowledged by the remote decoder, this
// allows us to know when we can evict an entry (when it has no un-acknowledged references).
StreamInfo streamInfo = _streamInfoMap.get(streamId);
if (streamInfo == null)
{
entry.encode(buffer, base);
streamInfo = new StreamInfo(streamId);
_streamInfoMap.put(streamId, streamInfo);
}
StreamInfo.SectionInfo sectionInfo = new StreamInfo.SectionInfo();
streamInfo.add(sectionInfo);
notifyInstructionHandler();
}
catch (BufferOverflowException e)
{
// TODO: We have already added to the dynamic table so we need to send the instructions to maintain correct state.
// Can we prevent adding to the table until we know the buffer has enough space?
notifyInstructionHandler();
streamInfo.remove(sectionInfo);
sectionInfo.release();
throw new QpackException.StreamException(H3_GENERAL_PROTOCOL_ERROR, "buffer_space_exceeded", e);
}
catch (Throwable t)
{
// We are failing the whole Session so don't need to send instructions back.
throw new QpackException.SessionException(H3_GENERAL_PROTOCOL_ERROR, "compression_error", t);
try
{
int requiredInsertCount = 0;
for (HttpField field : new Http3Fields(metadata))
{
EncodableEntry entry = encode(streamInfo, field);
encodableEntries.add(entry);
// Update the required InsertCount.
int entryRequiredInsertCount = entry.getRequiredInsertCount();
if (entryRequiredInsertCount > requiredInsertCount)
requiredInsertCount = entryRequiredInsertCount;
}
sectionInfo.setRequiredInsertCount(requiredInsertCount);
int base = dynamicTable.getBase();
int encodedInsertCount = encodeInsertCount(requiredInsertCount, dynamicTable.getCapacity());
boolean signBit = base < requiredInsertCount;
int deltaBase = signBit ? requiredInsertCount - base - 1 : base - requiredInsertCount;
// Encode the Field Section Prefix into the ByteBuffer.
NBitIntegerEncoder.encode(buffer, 8, encodedInsertCount);
buffer.put(signBit ? (byte)0x80 : (byte)0x00);
NBitIntegerEncoder.encode(buffer, 7, deltaBase);
// Encode the field lines into the ByteBuffer.
for (EncodableEntry entry : encodableEntries)
{
entry.encode(buffer, base);
}
notifyInstructionHandler();
}
catch (BufferOverflowException e)
{
// TODO: We have already added to the dynamic table so we need to send the instructions to maintain correct state.
// Can we prevent adding to the table until we know the buffer has enough space?
notifyInstructionHandler();
streamInfo.remove(sectionInfo);
sectionInfo.release();
throw new QpackException.StreamException(H3_GENERAL_PROTOCOL_ERROR, "buffer_space_exceeded", e);
}
catch (Throwable t)
{
// We are failing the whole Session so don't need to send instructions back.
throw new QpackException.SessionException(H3_GENERAL_PROTOCOL_ERROR, "compression_error", t);
}
}
}
@ -219,7 +224,7 @@ public class QpackEncoder implements Dumpable
*/
public void parseInstructions(ByteBuffer buffer) throws QpackException
{
try
try (AutoLock l = lock.lock())
{
while (BufferUtil.hasContent(buffer))
{

View File

@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
@ -379,17 +380,22 @@ public abstract class QuicSession extends ContainerLifeCycle
public QuicStreamEndPoint getOrCreateStreamEndPoint(long streamId, Consumer<QuicStreamEndPoint> consumer)
{
QuicStreamEndPoint endPoint = endPoints.compute(streamId, (id, quicStreamEndPoint) ->
AtomicBoolean created = new AtomicBoolean();
QuicStreamEndPoint endPoint = endPoints.computeIfAbsent(streamId, id ->
{
if (quicStreamEndPoint == null)
{
if (LOG.isDebugEnabled())
LOG.debug("creating endpoint for stream #{} for {}", id, this);
quicStreamEndPoint = newQuicStreamEndPoint(streamId);
consumer.accept(quicStreamEndPoint);
}
return quicStreamEndPoint;
if (LOG.isDebugEnabled())
LOG.debug("creating endpoint for stream #{} for {}", id, this);
QuicStreamEndPoint result = newQuicStreamEndPoint(id);
created.set(true);
return result;
});
// The consumer must be executed outside the Map.compute() above,
// since it may take a long time and it may be re-entrant, causing the
// creation of two QuicStreamEndPoint objects for the same stream id.
if (created.get())
consumer.accept(endPoint);
if (LOG.isDebugEnabled())
LOG.debug("returning {} for {}", endPoint, this);
return endPoint;

View File

@ -96,7 +96,7 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
}
// Re-run after warmup
iterations = 1_000;
iterations = 500;
for (int i = 0; i < runs; ++i)
{
run(transport, iterations);
@ -140,7 +140,7 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
});
int runs = 1;
int iterations = 256;
int iterations = 64;
IntStream.range(0, 16).parallel().forEach(i ->
IntStream.range(0, runs).forEach(j ->
run(transport, iterations)));
@ -155,13 +155,14 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
// Dumps the state of the client if the test takes too long
Thread testThread = Thread.currentThread();
long maxTime = Math.max(5000, (long)iterations * factor);
Scheduler.Task task = scenario.client.getScheduler().schedule(() ->
{
logger.warn("Interrupting test, it is taking too long{}{}{}{}",
logger.warn("Interrupting test, it is taking too long (maxTime={} ms){}{}{}{}", maxTime,
System.lineSeparator(), scenario.server.dump(),
System.lineSeparator(), scenario.client.dump());
testThread.interrupt();
}, Math.max(5000, (long)iterations * factor), TimeUnit.MILLISECONDS);
}, maxTime, TimeUnit.MILLISECONDS);
long begin = System.nanoTime();
for (int i = 0; i < iterations; ++i)
@ -169,7 +170,6 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
test(latch, failures);
// test("http", "localhost", "GET", false, false, 64 * 1024, false, latch, failures);
}
assertTrue(await(latch, iterations, TimeUnit.SECONDS));
long end = System.nanoTime();
task.cancel();
long elapsed = TimeUnit.NANOSECONDS.toMillis(end - begin);
@ -282,9 +282,10 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
latch.countDown();
}
});
if (!await(requestLatch, 5, TimeUnit.SECONDS))
int maxTime = 5000;
if (!await(requestLatch, maxTime, TimeUnit.MILLISECONDS))
{
logger.warn("Request {} took too long{}{}{}{}", requestId,
logger.warn("Request {} took too long (maxTime={} ms){}{}{}{}", requestId, maxTime,
System.lineSeparator(), scenario.server.dump(),
System.lineSeparator(), scenario.client.dump());
}