Issue #6728 - QUIC and HTTP/3

- Removed dataFrame queue in HTTP3StreamConnection.
- Made readData() idempotent.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-10-20 12:42:08 +02:00
parent 21464f85ff
commit 5ca88d3231
5 changed files with 90 additions and 33 deletions

View File

@ -33,7 +33,7 @@ public abstract class AbstractConnectorHttpClientTransport extends AbstractHttpC
protected AbstractConnectorHttpClientTransport(ClientConnector connector)
{
this.connector = Objects.requireNonNull(connector);
addBean(connector);
addBean(connector, false);
}
public ClientConnector getClientConnector()

View File

@ -16,8 +16,6 @@ package org.eclipse.jetty.http3.internal;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.Executor;
import org.eclipse.jetty.http.MetaData;
@ -42,7 +40,6 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
private static final ByteBuffer EMPTY_DATA_FRAME = ByteBuffer.allocate(2);
private final AutoLock lock = new AutoLock();
private final Queue<DataFrame> dataFrames = new ArrayDeque<>();
private final RetainableByteBufferPool buffers;
private final MessageParser parser;
private boolean useInputDirectByteBuffers = true;
@ -50,7 +47,9 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
private boolean dataMode;
private boolean dataDemand;
private boolean dataStalled;
private DataFrame dataFrame;
private boolean dataLast;
private boolean noData;
private boolean remotelyClosed;
public HTTP3StreamConnection(QuicStreamEndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, MessageParser parser)
@ -120,7 +119,7 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
{
while (true)
{
if (parseAndFill() == MessageParser.Result.NO_FRAME)
if (parseAndFill(true) == MessageParser.Result.NO_FRAME)
break;
// TODO: we should also exit if the connection was closed due to errors.
@ -168,11 +167,12 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
if (hasDemand())
throw new IllegalStateException("invalid call to readData(): outstanding demand");
switch (parseAndFill())
switch (parseAndFill(false))
{
case FRAME:
{
DataFrame frame = dataFrames.poll();
DataFrame frame = dataFrame;
dataFrame = null;
if (LOG.isDebugEnabled())
LOG.debug("read data {} on {}", frame, this);
if (frame == null)
@ -193,6 +193,8 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
}
case NO_FRAME:
{
if (LOG.isDebugEnabled())
LOG.debug("read no data on {}", this);
return null;
}
default:
@ -211,9 +213,11 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
public void demand()
{
boolean interested;
boolean process = false;
try (AutoLock l = lock.lock())
{
interested = noData;
dataDemand = true;
if (dataStalled)
{
@ -225,6 +229,8 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
LOG.debug("demand, wasStalled={} on {}", process, this);
if (process)
processDataDemand();
else if (interested)
fillInterested();
}
public boolean hasDemand()
@ -243,6 +249,14 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
}
}
private void setNoData(boolean noData)
{
try (AutoLock l = lock.lock())
{
this.noData = noData;
}
}
private void processDataDemand()
{
while (true)
@ -275,16 +289,18 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
}
}
public MessageParser.Result parseAndFill()
public MessageParser.Result parseAndFill(boolean setFillInterest)
{
try
{
if (LOG.isDebugEnabled())
LOG.debug("parse+fill on {} with buffer {}", this, buffer);
LOG.debug("parse+fill interest={} on {} with buffer {}", setFillInterest, this, buffer);
if (buffer == null)
buffer = buffers.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
setNoData(false);
while (true)
{
ByteBuffer byteBuffer = buffer.getBuffer();
@ -320,7 +336,9 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
{
buffer.release();
buffer = null;
fillInterested();
setNoData(true);
if (setFillInterest)
fillInterested();
break;
}
else
@ -355,11 +373,6 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
}
}
public DataFrame pollContent()
{
return dataFrames.poll();
}
@Override
public String toConnectionString()
{
@ -397,9 +410,11 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
@Override
public void onData(long streamId, DataFrame frame)
{
remotelyClosed = frame.isLast();
if (dataFrame != null)
throw new IllegalStateException();
dataFrame = frame;
dataLast = frame.isLast();
dataFrames.offer(frame);
remotelyClosed = frame.isLast();
super.onData(streamId, frame);
}
}

View File

@ -39,7 +39,6 @@ import org.slf4j.LoggerFactory;
public class HttpChannelOverHTTP3 extends HttpChannel
{
private static final Logger LOG = LoggerFactory.getLogger(HttpChannelOverHTTP3.class);
private static final HttpInput.Content NULL_CONTENT = new NullContent();
private static final HttpField SERVER_VERSION = new PreEncodedHttpField(HttpHeader.SERVER, HttpConfiguration.SERVER_VERSION);
private static final HttpField POWERED_BY = new PreEncodedHttpField(HttpHeader.X_POWERED_BY, HttpConfiguration.SERVER_VERSION);
@ -215,9 +214,6 @@ public class HttpChannelOverHTTP3 extends HttpChannel
@Override
public boolean needContent()
{
if (content == NULL_CONTENT)
content = null;
if (content != null)
return true;
@ -230,8 +226,6 @@ public class HttpChannelOverHTTP3 extends HttpChannel
{
if (content != null)
{
if (content == NULL_CONTENT)
return null;
HttpInput.Content result = content;
if (!result.isSpecial())
content = null;
@ -244,10 +238,7 @@ public class HttpChannelOverHTTP3 extends HttpChannel
if (LOG.isDebugEnabled())
LOG.debug("read {} on {}", data, this);
if (data == null)
{
content = NULL_CONTENT;
return null;
}
content = new HttpInput.Content(data.getByteBuffer())
{
@ -305,8 +296,4 @@ public class HttpChannelOverHTTP3 extends HttpChannel
{
return false;
}
private static class NullContent extends HttpInput.SpecialContent
{
}
}

View File

@ -511,4 +511,56 @@ public class DataDemandTest extends AbstractClientServerTest
assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testReadDataIdempotent() throws Exception
{
CountDownLatch nullDataLatch = new CountDownLatch(1);
CountDownLatch lastDataLatch = new CountDownLatch(1);
start(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{
stream.demand();
return new Stream.Listener()
{
@Override
public void onDataAvailable(Stream stream)
{
Stream.Data data = stream.readData();
if (data == null)
{
// Second attempt to read still has no data, should be idempotent.
assertNull(stream.readData());
stream.demand();
nullDataLatch.countDown();
}
else
{
data.complete();
if (data.isLast())
lastDataLatch.countDown();
else
stream.demand();
}
}
};
}
});
Session.Client session = newSession(new Session.Client.Listener() {});
HeadersFrame request = new HeadersFrame(newRequest("/"), false);
Stream stream = session.newRequest(request, new Stream.Listener() {}).get(5, TimeUnit.SECONDS);
// Send a first chunk to trigger reads.
stream.data(new DataFrame(ByteBuffer.allocate(16), false));
assertTrue(nullDataLatch.await(555, TimeUnit.SECONDS));
stream.data(new DataFrame(ByteBuffer.allocate(4096), true));
assertTrue(lastDataLatch.await(5, TimeUnit.SECONDS));
}
}

View File

@ -94,9 +94,12 @@ public abstract class ProtocolSession extends ContainerLifeCycle
List<Long> readableStreamIds = session.getReadableStreamIds();
if (LOG.isDebugEnabled())
LOG.debug("readable stream ids: {}", readableStreamIds);
return readableStreamIds.stream()
.map(this::onReadable)
.reduce(false, (result, readable) -> result || readable);
boolean result = false;
for (long readableStreamId : readableStreamIds)
{
result = result || onReadable(readableStreamId);
}
return result;
}
protected abstract boolean onReadable(long readableStreamId);