Issue #6728 - QUIC and HTTP/3
- Avoid sending the last=true flag as a separate QUIC frame. Apparently Quiche has problem notifying this frame to the receiving peer. Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
parent
161f8e4f82
commit
2d4cb5abbf
|
@ -39,6 +39,16 @@ import org.eclipse.jetty.http3.frames.HeadersFrame;
|
|||
*/
|
||||
public interface Stream
|
||||
{
|
||||
/**
|
||||
* @return the stream id
|
||||
*/
|
||||
public long getId();
|
||||
|
||||
/**
|
||||
* @return the session this stream is associated to
|
||||
*/
|
||||
public Session getSession();
|
||||
|
||||
/**
|
||||
* <p>Responds to a request performed via {@link Session.Client#newRequest(HeadersFrame, Listener)},
|
||||
* sending the given HEADERS frame containing the response status code and response headers.</p>
|
||||
|
|
|
@ -19,12 +19,14 @@ public class DataFrame extends Frame
|
|||
{
|
||||
private final ByteBuffer data;
|
||||
private final boolean last;
|
||||
private final int length;
|
||||
|
||||
public DataFrame(ByteBuffer data, boolean last)
|
||||
{
|
||||
super(FrameType.DATA);
|
||||
this.data = data;
|
||||
this.last = last;
|
||||
this.length = data.remaining();
|
||||
}
|
||||
|
||||
public ByteBuffer getByteBuffer()
|
||||
|
@ -40,6 +42,6 @@ public class DataFrame extends Frame
|
|||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return String.format("%s[last=%b,length=%d]", super.toString(), isLast(), getByteBuffer().remaining());
|
||||
return String.format("%s[last=%b,length=%d]", super.toString(), isLast(), length);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,6 +15,7 @@ package org.eclipse.jetty.http3.internal;
|
|||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
import org.eclipse.jetty.http3.api.Session;
|
||||
import org.eclipse.jetty.http3.api.Stream;
|
||||
import org.eclipse.jetty.http3.frames.DataFrame;
|
||||
import org.eclipse.jetty.http3.frames.Frame;
|
||||
|
@ -36,6 +37,18 @@ public class HTTP3Stream implements Stream
|
|||
this.endPoint = endPoint;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getId()
|
||||
{
|
||||
return endPoint.getStreamId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Session getSession()
|
||||
{
|
||||
return session;
|
||||
}
|
||||
|
||||
public Listener getListener()
|
||||
{
|
||||
return listener;
|
||||
|
@ -92,4 +105,10 @@ public class HTTP3Stream implements Stream
|
|||
session.writeFrame(endPoint.getStreamId(), frame, Callback.from(Invocable.InvocationType.NON_BLOCKING, () -> completable.succeeded(this), completable::failed));
|
||||
return completable;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return String.format("%s@%x#%d", getClass().getSimpleName(), hashCode(), getId());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
<parent>
|
||||
<groupId>org.eclipse.jetty.http3</groupId>
|
||||
<artifactId>http3-parent</artifactId>
|
||||
<version>10.0.1-SNAPSHOT</version>
|
||||
<version>10.0.7-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
|
|
@ -201,7 +201,7 @@ public class HTTP3ClientServerTest extends AbstractHTTP3ClientServerTest
|
|||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(ints = {1024, 128 * 1024, 1024 * 1024})
|
||||
@ValueSource(ints = {1024, 10 * 1024, 100 * 1024, 1000 * 1024})
|
||||
public void testEchoRequestContentAsResponseContent(int length) throws Exception
|
||||
{
|
||||
startServer(new Session.Server.Listener()
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
<parent>
|
||||
<artifactId>jetty-project</artifactId>
|
||||
<groupId>org.eclipse.jetty</groupId>
|
||||
<version>10.0.1-SNAPSHOT</version>
|
||||
<version>10.0.7-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
|
|
@ -135,9 +135,9 @@ public abstract class QuicSession
|
|||
return drained;
|
||||
}
|
||||
|
||||
public int flush(long streamId, ByteBuffer buffer) throws IOException
|
||||
public int flush(long streamId, ByteBuffer buffer, boolean last) throws IOException
|
||||
{
|
||||
int flushed = quicheConnection.feedClearTextForStream(streamId, buffer);
|
||||
int flushed = quicheConnection.feedClearTextForStream(streamId, buffer, last);
|
||||
flush();
|
||||
return flushed;
|
||||
}
|
||||
|
@ -153,6 +153,16 @@ public abstract class QuicSession
|
|||
return quicheConnection.isStreamFinished(streamId);
|
||||
}
|
||||
|
||||
public long getWindowCapacity()
|
||||
{
|
||||
return quicheConnection.windowCapacity();
|
||||
}
|
||||
|
||||
public long getWindowCapacity(long streamId) throws IOException
|
||||
{
|
||||
return quicheConnection.windowCapacity(streamId);
|
||||
}
|
||||
|
||||
public void shutdownInput(long streamId) throws IOException
|
||||
{
|
||||
quicheConnection.shutdownStream(streamId, false);
|
||||
|
@ -194,7 +204,12 @@ public abstract class QuicSession
|
|||
// the remote address may change so store it again.
|
||||
this.remoteAddress = remoteAddress;
|
||||
|
||||
quicheConnection.feedCipherText(cipherBufferIn, remoteAddress);
|
||||
int remaining = cipherBufferIn.remaining();
|
||||
int accepted = quicheConnection.feedCipherText(cipherBufferIn, remoteAddress);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("feeding {}/{} cipher bytes to cid={}", accepted, remaining, quicheConnectionId);
|
||||
if (accepted != remaining)
|
||||
throw new IllegalStateException();
|
||||
|
||||
if (quicheConnection.isConnectionEstablished())
|
||||
{
|
||||
|
|
|
@ -17,6 +17,7 @@ import java.io.IOException;
|
|||
import java.net.SocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import org.eclipse.jetty.io.AbstractEndPoint;
|
||||
import org.eclipse.jetty.io.Connection;
|
||||
|
@ -35,6 +36,7 @@ import org.slf4j.LoggerFactory;
|
|||
public class QuicStreamEndPoint extends AbstractEndPoint
|
||||
{
|
||||
private static final Logger LOG = LoggerFactory.getLogger(QuicStreamEndPoint.class);
|
||||
private static final ByteBuffer LAST_FLAG = ByteBuffer.allocate(0);
|
||||
|
||||
private final QuicSession session;
|
||||
private final long streamId;
|
||||
|
@ -137,13 +139,19 @@ public class QuicStreamEndPoint extends AbstractEndPoint
|
|||
{
|
||||
// TODO: session.flush(streamId, buffer) feeds Quiche and then calls flush().
|
||||
// Can we call flush() only after the for loop below?
|
||||
|
||||
int length = buffers.length;
|
||||
boolean last = buffers[length - 1] == LAST_FLAG;
|
||||
if (last)
|
||||
--length;
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("flushing {} buffer(s) to stream {}", buffers.length, streamId);
|
||||
for (ByteBuffer buffer : buffers)
|
||||
LOG.debug("flushing {} buffer(s) to stream {}", length, streamId);
|
||||
for (int i = 0; i < length; ++i)
|
||||
{
|
||||
int flushed = session.flush(streamId, buffer);
|
||||
ByteBuffer buffer = buffers[i];
|
||||
int flushed = session.flush(streamId, buffer, (i == length - 1) && last);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("flushed {} bytes to stream {}", flushed, streamId);
|
||||
LOG.debug("flushed {} bytes to stream {} window={}/{}", flushed, streamId, session.getWindowCapacity(streamId), session.getWindowCapacity());
|
||||
if (buffer.hasRemaining())
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
|
@ -158,23 +166,19 @@ public class QuicStreamEndPoint extends AbstractEndPoint
|
|||
|
||||
public void write(Callback callback, List<ByteBuffer> buffers, boolean last)
|
||||
{
|
||||
// TODO: writing the last flag after the buffers is inefficient,
|
||||
// but Quiche supports it, so we need to expose the Quiche API.
|
||||
write(Callback.from(callback.getInvocationType(), () -> finishWrite(callback, last), callback::failed), buffers.toArray(ByteBuffer[]::new));
|
||||
}
|
||||
|
||||
private void finishWrite(Callback callback, boolean last)
|
||||
{
|
||||
try
|
||||
ByteBuffer[] array;
|
||||
if (last)
|
||||
{
|
||||
if (last)
|
||||
session.flushFinished(streamId);
|
||||
callback.succeeded();
|
||||
int size = buffers.size();
|
||||
array = new ByteBuffer[size + 1];
|
||||
IntStream.range(0, size).forEach(i -> array[i] = buffers.get(i));
|
||||
array[size] = LAST_FLAG;
|
||||
}
|
||||
catch (Throwable x)
|
||||
else
|
||||
{
|
||||
callback.failed(x);
|
||||
array = buffers.toArray(ByteBuffer[]::new);
|
||||
}
|
||||
write(callback, array);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.eclipse.jetty.quic.quiche.ffi.uint32_t_pointer;
|
|||
import org.eclipse.jetty.quic.quiche.ffi.uint64_t;
|
||||
import org.eclipse.jetty.quic.quiche.ffi.uint64_t_pointer;
|
||||
import org.eclipse.jetty.quic.quiche.ffi.uint8_t_pointer;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.thread.AutoLock;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -559,7 +560,7 @@ public class QuicheConnection
|
|||
{
|
||||
if (quicheConn == null)
|
||||
throw new IOException("Quiche connection was released");
|
||||
int written = LibQuiche.INSTANCE.quiche_conn_stream_send(quicheConn, new uint64_t(streamId), null, new size_t(0), true).intValue();
|
||||
int written = LibQuiche.INSTANCE.quiche_conn_stream_send(quicheConn, new uint64_t(streamId), BufferUtil.EMPTY_BUFFER, new size_t(0), true).intValue();
|
||||
if (written == LibQuiche.quiche_error.QUICHE_ERR_DONE)
|
||||
return;
|
||||
if (written < 0L)
|
||||
|
|
Loading…
Reference in New Issue