diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Stream.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Stream.java index f7faeee379a..536e8bd4266 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Stream.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Stream.java @@ -14,6 +14,7 @@ package org.eclipse.jetty.http3.api; import java.nio.ByteBuffer; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import org.eclipse.jetty.http.MetaData; @@ -258,8 +259,8 @@ public interface Stream public Data(DataFrame frame, Runnable complete) { - this.frame = frame; - this.complete = complete; + this.frame = Objects.requireNonNull(frame); + this.complete = Objects.requireNonNull(complete); } /** diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3StreamConnection.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3StreamConnection.java index 5025f381a93..e7caad01409 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3StreamConnection.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3StreamConnection.java @@ -113,7 +113,7 @@ public abstract class HTTP3StreamConnection extends AbstractConnection processDataDemand(); if (!parserDataMode) { - if (buffer.hasRemaining()) + if (buffer != null && buffer.hasRemaining()) processNonDataFrames(); else fillInterested(); @@ -124,10 +124,15 @@ public abstract class HTTP3StreamConnection extends AbstractConnection { try { + tryAcquireBuffer(); + while (true) { if (parseAndFill(true) == MessageParser.Result.NO_FRAME) - break; + { + tryReleaseBuffer(false); + return; + } // TODO: we should also exit if the connection was closed due to errors. // There is not yet a isClosed() primitive though. @@ -138,7 +143,8 @@ public abstract class HTTP3StreamConnection extends AbstractConnection // However, the last frame may have // caused a write that we need to flush. getEndPoint().getQuicSession().flush(); - break; + tryReleaseBuffer(false); + return; } if (parserDataMode) @@ -161,12 +167,14 @@ public abstract class HTTP3StreamConnection extends AbstractConnection fillInterested(); } } - break; + tryReleaseBuffer(false); + return; } } } catch (Throwable x) { + tryReleaseBuffer(true); long error = HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code(); getEndPoint().close(error, x); // Notify the application that a failure happened. @@ -183,6 +191,8 @@ public abstract class HTTP3StreamConnection extends AbstractConnection if (LOG.isDebugEnabled()) LOG.debug("reading data on {}", this); + tryAcquireBuffer(); + switch (parseAndFill(false)) { case FRAME: @@ -191,12 +201,8 @@ public abstract class HTTP3StreamConnection extends AbstractConnection dataFrame = null; if (LOG.isDebugEnabled()) LOG.debug("read data {} on {}", frame, this); - if (frame == null) - return null; - buffer.retain(); - - return new Stream.Data(frame, buffer::release); + return new Stream.Data(frame, this::completeReadData); } case MODE_SWITCH: { @@ -205,12 +211,14 @@ public abstract class HTTP3StreamConnection extends AbstractConnection dataLast = true; parserDataMode = false; parser.setDataMode(false); + tryReleaseBuffer(false); return null; } case NO_FRAME: { if (LOG.isDebugEnabled()) LOG.debug("read no data on {}", this); + tryReleaseBuffer(false); return null; } default: @@ -222,12 +230,20 @@ public abstract class HTTP3StreamConnection extends AbstractConnection catch (Throwable x) { cancelDemand(); + tryReleaseBuffer(true); getEndPoint().close(HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code(), x); // Rethrow so the application has a chance to handle it. throw x; } } + private void completeReadData() + { + buffer.release(); + if (!buffer.isRetained()) + tryReleaseBuffer(false); + } + public void demand() { boolean hasData; @@ -314,16 +330,39 @@ public abstract class HTTP3StreamConnection extends AbstractConnection } } - public MessageParser.Result parseAndFill(boolean setFillInterest) + private void tryAcquireBuffer() + { + if (buffer == null) + { + buffer = buffers.acquire(getInputBufferSize(), isUseInputDirectByteBuffers()); + if (LOG.isDebugEnabled()) + LOG.debug("acquired {}", buffer); + } + } + + private void tryReleaseBuffer(boolean force) + { + if (buffer != null) + { + if (buffer.hasRemaining() && force) + buffer.clear(); + if (!buffer.hasRemaining()) + { + buffer.release(); + if (LOG.isDebugEnabled()) + LOG.debug("released {}", buffer); + buffer = null; + } + } + } + + private MessageParser.Result parseAndFill(boolean setFillInterest) { try { if (LOG.isDebugEnabled()) LOG.debug("parse+fill setFillInterest={} on {} with buffer {}", setFillInterest, this, buffer); - if (buffer == null) - buffer = buffers.acquire(getInputBufferSize(), isUseInputDirectByteBuffers()); - setNoData(false); while (true) @@ -359,29 +398,17 @@ public abstract class HTTP3StreamConnection extends AbstractConnection if (filled == 0) { - buffer.release(); - buffer = null; setNoData(true); if (setFillInterest) fillInterested(); - break; - } - else - { - buffer.release(); - buffer = null; - break; } + return MessageParser.Result.NO_FRAME; } - return MessageParser.Result.NO_FRAME; } catch (Throwable x) { if (LOG.isDebugEnabled()) LOG.debug("parse+fill failure on {}", this, x); - if (buffer != null) - buffer.release(); - buffer = null; throw x; } } diff --git a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicConnection.java b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicConnection.java index b898011621a..8094afb9799 100644 --- a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicConnection.java +++ b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicConnection.java @@ -202,15 +202,15 @@ public abstract class QuicConnection extends AbstractConnection private Runnable receiveAndProcess() { + boolean interested = isFillInterested(); + if (LOG.isDebugEnabled()) + LOG.debug("receiveAndProcess() fillInterested={}", interested); + if (interested) + return null; + + ByteBuffer cipherBuffer = byteBufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers()); try { - boolean interested = isFillInterested(); - if (LOG.isDebugEnabled()) - LOG.debug("receiveAndProcess() fillInterested={}", interested); - if (interested) - return null; - - ByteBuffer cipherBuffer = byteBufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers()); while (true) { BufferUtil.clear(cipherBuffer); @@ -266,7 +266,10 @@ public abstract class QuicConnection extends AbstractConnection if (LOG.isDebugEnabled()) LOG.debug("processing creation task {} on {}", task, session); if (task != null) + { + byteBufferPool.release(cipherBuffer); return task; + } } else { @@ -282,13 +285,17 @@ public abstract class QuicConnection extends AbstractConnection if (LOG.isDebugEnabled()) LOG.debug("produced session task {} on {}", task, this); if (task != null) + { + byteBufferPool.release(cipherBuffer); return task; + } } } catch (Throwable x) { if (LOG.isDebugEnabled()) LOG.debug("receiveAndProcess() failure", x); + byteBufferPool.release(cipherBuffer); // TODO: close? return null; } diff --git a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java index 7c3fe360ffb..9a59fd85768 100644 --- a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java +++ b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java @@ -492,6 +492,8 @@ public abstract class QuicSession extends ContainerLifeCycle Action action = connectionClosed ? Action.SUCCEEDED : Action.IDLE; if (LOG.isDebugEnabled()) LOG.debug("connection draining={} closed={}, action={} on {}", quicheConnection.isDraining(), connectionClosed, action, QuicSession.this); + if (action == Action.IDLE) + byteBufferPool.release(cipherBuffer); return action; } BufferUtil.flipToFlush(cipherBuffer, pos); diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientLoadTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientLoadTest.java index 95d604dd8f3..b487a4450cf 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientLoadTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientLoadTest.java @@ -161,7 +161,7 @@ public class HttpClientLoadTest extends AbstractTest