diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Flusher.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Flusher.java index ae8ed30c2ca..58299b3e2a6 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Flusher.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Flusher.java @@ -68,14 +68,17 @@ public class HTTP3Flusher extends IteratingCallback if (LOG.isDebugEnabled()) LOG.debug("flushing {} on {}", entry, this); - generator.generate(lease, entry.endPoint.getStreamId(), entry.frame); + Frame frame = entry.frame; + generator.generate(lease, entry.endPoint.getStreamId(), frame); + boolean last = frame instanceof HeadersFrame && ((HeadersFrame)frame).isLast() || + frame instanceof DataFrame && ((DataFrame)frame).isLast(); QuicStreamEndPoint endPoint = entry.endPoint; List buffers = lease.getByteBuffers(); if (LOG.isDebugEnabled()) LOG.debug("writing {} buffers ({} bytes) for stream #{} on {}", buffers.size(), lease.getTotalLength(), endPoint.getStreamId(), this); - endPoint.write(this, buffers.toArray(ByteBuffer[]::new)); + endPoint.write(this, buffers, last); return Action.SCHEDULED; } @@ -84,19 +87,6 @@ public class HTTP3Flusher extends IteratingCallback { if (LOG.isDebugEnabled()) LOG.debug("succeeded to write {} on {}", entry, this); - - // TODO: this is inefficient, as it will write - // an empty DATA frame with the FIN flag. - // Could be coalesced with the write above, - // but needs an additional boolean parameter. - if (entry.last) - { - QuicStreamEndPoint endPoint = entry.endPoint; - if (LOG.isDebugEnabled()) - LOG.debug("last frame on stream #{} on {}", endPoint.getStreamId(), this); - endPoint.shutdownOutput(); - } - lease.recycle(); entry.callback.succeeded(); entry = null; @@ -122,15 +112,12 @@ public class HTTP3Flusher extends IteratingCallback private final QuicStreamEndPoint endPoint; private final Frame frame; private final Callback callback; - private final boolean last; private Entry(QuicStreamEndPoint endPoint, Frame frame, Callback callback) { this.endPoint = endPoint; this.frame = frame; this.callback = callback; - this.last = frame instanceof HeadersFrame && ((HeadersFrame)frame).isLast() || - frame instanceof DataFrame && ((DataFrame)frame).isLast(); } @Override diff --git a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/ProtocolSession.java b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/ProtocolSession.java index a5ac19d7cd1..9c5125e15da 100644 --- a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/ProtocolSession.java +++ b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/ProtocolSession.java @@ -94,6 +94,13 @@ public abstract class ProtocolSession if (LOG.isDebugEnabled()) LOG.debug("readable stream ids: {}", readableStreamIds); readableStreamIds.forEach(this::onReadable); + + // TODO: ExecutionStrategy plug-in point is here. + // this::onReadable() just feeds the decoder and the instruction streams. + // Note that req/rsp streams never eat DATA frame, it's a noop because they pull data + // when they want to read data frames, either via Stream.readData() or ServletInputStream.read(). + // Then here we ask decoder for tasks, and have the ExecutionStrategy process them. + return !readableStreamIds.isEmpty(); } diff --git a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicStreamEndPoint.java b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicStreamEndPoint.java index a8722dda168..eb273e4518d 100644 --- a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicStreamEndPoint.java +++ b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicStreamEndPoint.java @@ -16,11 +16,13 @@ package org.eclipse.jetty.quic.common; import java.io.IOException; import java.net.SocketAddress; import java.nio.ByteBuffer; +import java.util.List; import org.eclipse.jetty.io.AbstractEndPoint; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.thread.Scheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -99,19 +101,17 @@ public class QuicStreamEndPoint extends AbstractEndPoint } @Override - public void onClose(Throwable failure) + protected void doClose() { if (LOG.isDebugEnabled()) LOG.debug("closing stream {}", streamId); - try - { - session.flushFinished(streamId); - } - catch (IOException e) - { - if (LOG.isDebugEnabled()) - LOG.debug("error closing stream {}", streamId, e); - } + doShutdownInput(); + doShutdownOutput(); + } + + @Override + public void onClose(Throwable failure) + { super.onClose(failure); session.onClose(streamId); } @@ -124,8 +124,6 @@ public class QuicStreamEndPoint extends AbstractEndPoint int pos = BufferUtil.flipToFill(buffer); int drained = session.fill(streamId, buffer); BufferUtil.flipToFlush(buffer, pos); - if (drained < 0) - shutdownInput(); return drained; } @@ -151,6 +149,27 @@ public class QuicStreamEndPoint extends AbstractEndPoint return true; } + public void write(Callback callback, List buffers, boolean last) + { + // TODO: writing the last flag after the buffers is inefficient, + // but Quiche supports it, so we need to expose the Quiche API. + write(Callback.from(callback.getInvocationType(), () -> finishWrite(callback, last), callback::failed), buffers.toArray(ByteBuffer[]::new)); + } + + private void finishWrite(Callback callback, boolean last) + { + try + { + if (last) + session.flushFinished(streamId); + callback.succeeded(); + } + catch (Throwable x) + { + callback.failed(x); + } + } + @Override public Object getTransport() { diff --git a/jetty-quic/quic-quiche/src/main/java/org/eclipse/jetty/quic/quiche/QuicheConnection.java b/jetty-quic/quic-quiche/src/main/java/org/eclipse/jetty/quic/quiche/QuicheConnection.java index 73876977b32..4aa9df1d17a 100644 --- a/jetty-quic/quic-quiche/src/main/java/org/eclipse/jetty/quic/quiche/QuicheConnection.java +++ b/jetty-quic/quic-quiche/src/main/java/org/eclipse/jetty/quic/quiche/QuicheConnection.java @@ -610,7 +610,7 @@ public class QuicheConnection bool_pointer fin = new bool_pointer(); int read = LibQuiche.INSTANCE.quiche_conn_stream_recv(quicheConn, new uint64_t(streamId), buffer, new size_t(buffer.remaining()), fin).intValue(); if (read == LibQuiche.quiche_error.QUICHE_ERR_DONE) - return fin.getValue() ? -1 : 0; + return isStreamFinished(streamId) ? -1 : 0; if (read < 0L) throw new IOException("Quiche failed to read from stream " + streamId + "; err=" + LibQuiche.quiche_error.errToString(read)); buffer.position(buffer.position() + read);