Issue #6728 - QUIC and HTTP/3

- Fixed handling of the last frame, exposing a new API in QuicStreamEndPoint.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-09-16 19:30:36 +02:00
parent a092078668
commit b42bfa214a
4 changed files with 44 additions and 31 deletions

View File

@ -68,14 +68,17 @@ public class HTTP3Flusher extends IteratingCallback
if (LOG.isDebugEnabled())
LOG.debug("flushing {} on {}", entry, this);
generator.generate(lease, entry.endPoint.getStreamId(), entry.frame);
Frame frame = entry.frame;
generator.generate(lease, entry.endPoint.getStreamId(), frame);
boolean last = frame instanceof HeadersFrame && ((HeadersFrame)frame).isLast() ||
frame instanceof DataFrame && ((DataFrame)frame).isLast();
QuicStreamEndPoint endPoint = entry.endPoint;
List<ByteBuffer> buffers = lease.getByteBuffers();
if (LOG.isDebugEnabled())
LOG.debug("writing {} buffers ({} bytes) for stream #{} on {}", buffers.size(), lease.getTotalLength(), endPoint.getStreamId(), this);
endPoint.write(this, buffers.toArray(ByteBuffer[]::new));
endPoint.write(this, buffers, last);
return Action.SCHEDULED;
}
@ -84,19 +87,6 @@ public class HTTP3Flusher extends IteratingCallback
{
if (LOG.isDebugEnabled())
LOG.debug("succeeded to write {} on {}", entry, this);
// TODO: this is inefficient, as it will write
// an empty DATA frame with the FIN flag.
// Could be coalesced with the write above,
// but needs an additional boolean parameter.
if (entry.last)
{
QuicStreamEndPoint endPoint = entry.endPoint;
if (LOG.isDebugEnabled())
LOG.debug("last frame on stream #{} on {}", endPoint.getStreamId(), this);
endPoint.shutdownOutput();
}
lease.recycle();
entry.callback.succeeded();
entry = null;
@ -122,15 +112,12 @@ public class HTTP3Flusher extends IteratingCallback
private final QuicStreamEndPoint endPoint;
private final Frame frame;
private final Callback callback;
private final boolean last;
private Entry(QuicStreamEndPoint endPoint, Frame frame, Callback callback)
{
this.endPoint = endPoint;
this.frame = frame;
this.callback = callback;
this.last = frame instanceof HeadersFrame && ((HeadersFrame)frame).isLast() ||
frame instanceof DataFrame && ((DataFrame)frame).isLast();
}
@Override

View File

@ -94,6 +94,13 @@ public abstract class ProtocolSession
if (LOG.isDebugEnabled())
LOG.debug("readable stream ids: {}", readableStreamIds);
readableStreamIds.forEach(this::onReadable);
// TODO: ExecutionStrategy plug-in point is here.
// this::onReadable() just feeds the decoder and the instruction streams.
// Note that req/rsp streams never eat DATA frame, it's a noop because they pull data
// when they want to read data frames, either via Stream.readData() or ServletInputStream.read().
// Then here we ask decoder for tasks, and have the ExecutionStrategy process them.
return !readableStreamIds.isEmpty();
}

View File

@ -16,11 +16,13 @@ package org.eclipse.jetty.quic.common;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.List;
import org.eclipse.jetty.io.AbstractEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -99,19 +101,17 @@ public class QuicStreamEndPoint extends AbstractEndPoint
}
@Override
public void onClose(Throwable failure)
protected void doClose()
{
if (LOG.isDebugEnabled())
LOG.debug("closing stream {}", streamId);
try
{
session.flushFinished(streamId);
}
catch (IOException e)
{
if (LOG.isDebugEnabled())
LOG.debug("error closing stream {}", streamId, e);
}
doShutdownInput();
doShutdownOutput();
}
@Override
public void onClose(Throwable failure)
{
super.onClose(failure);
session.onClose(streamId);
}
@ -124,8 +124,6 @@ public class QuicStreamEndPoint extends AbstractEndPoint
int pos = BufferUtil.flipToFill(buffer);
int drained = session.fill(streamId, buffer);
BufferUtil.flipToFlush(buffer, pos);
if (drained < 0)
shutdownInput();
return drained;
}
@ -151,6 +149,27 @@ public class QuicStreamEndPoint extends AbstractEndPoint
return true;
}
public void write(Callback callback, List<ByteBuffer> buffers, boolean last)
{
// TODO: writing the last flag after the buffers is inefficient,
// but Quiche supports it, so we need to expose the Quiche API.
write(Callback.from(callback.getInvocationType(), () -> finishWrite(callback, last), callback::failed), buffers.toArray(ByteBuffer[]::new));
}
private void finishWrite(Callback callback, boolean last)
{
try
{
if (last)
session.flushFinished(streamId);
callback.succeeded();
}
catch (Throwable x)
{
callback.failed(x);
}
}
@Override
public Object getTransport()
{

View File

@ -610,7 +610,7 @@ public class QuicheConnection
bool_pointer fin = new bool_pointer();
int read = LibQuiche.INSTANCE.quiche_conn_stream_recv(quicheConn, new uint64_t(streamId), buffer, new size_t(buffer.remaining()), fin).intValue();
if (read == LibQuiche.quiche_error.QUICHE_ERR_DONE)
return fin.getValue() ? -1 : 0;
return isStreamFinished(streamId) ? -1 : 0;
if (read < 0L)
throw new IOException("Quiche failed to read from stream " + streamId + "; err=" + LibQuiche.quiche_error.errToString(read));
buffer.position(buffer.position() + read);