From d2dff9a758e69619bf092310dd421cc72eb0d629 Mon Sep 17 00:00:00 2001
From: Ludovic Orban
Date: Tue, 26 Sep 2023 10:13:35 +0200
Subject: [PATCH] Fixes #10483 - Improve BufferedResponseHandler
* Changed default buffer size from 2 GB to 16 KB.
* Make max buffer size configurable.
* Introduce `BufferedContentSink` with all the buffering logic, doing only one buffer copy instead of two, starting with a small buffer and growing it if needed.
* Refactor `BufferedResponseHandler` to delegate all buffering work to `BufferedContentSink`
* Introduced `ByteBufferAggregator` to aggregate ByteBuffers into a single ByteBuffer.
Signed-off-by: Ludovic Orban
Co-authored-by: gregw
---
jetty-core/jetty-io/pom.xml | 5 +
.../jetty/io/ByteBufferAggregator.java | 138 +++++
.../java/org/eclipse/jetty/io/Content.java | 17 +
.../jetty/io/content/BufferedContentSink.java | 277 +++++++++
.../jetty/io/BufferedContentSinkTest.java | 553 ++++++++++++++++++
.../jetty/io/ByteBufferAggregatorTest.java | 88 +++
.../org/eclipse/jetty/server/Response.java | 36 ++
.../handler/BufferedResponseHandler.java | 152 ++---
.../handler/BufferedResponseHandlerTest.java | 10 +-
9 files changed, 1161 insertions(+), 115 deletions(-)
create mode 100644 jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAggregator.java
create mode 100644 jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/BufferedContentSink.java
create mode 100644 jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/BufferedContentSinkTest.java
create mode 100644 jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ByteBufferAggregatorTest.java
diff --git a/jetty-core/jetty-io/pom.xml b/jetty-core/jetty-io/pom.xml
index 4437a759321..e57f528229a 100644
--- a/jetty-core/jetty-io/pom.xml
+++ b/jetty-core/jetty-io/pom.xml
@@ -49,5 +49,10 @@
jetty-test-helper
test
+
+ org.awaitility
+ awaitility
+ test
+
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAggregator.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAggregator.java
new file mode 100644
index 00000000000..3d929d2d5f2
--- /dev/null
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAggregator.java
@@ -0,0 +1,138 @@
+//
+// ========================================================================
+// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
+//
+// This program and the accompanying materials are made available under the
+// terms of the Eclipse Public License v. 2.0 which is available at
+// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+// which is available at https://www.apache.org/licenses/LICENSE-2.0.
+//
+// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+// ========================================================================
+//
+
+package org.eclipse.jetty.io;
+
+import java.nio.ByteBuffer;
+
+import org.eclipse.jetty.util.BufferUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Aggregates data into a single ByteBuffer of a specified maximum size.
+ * The buffer automatically grows as data is written to it, up until it reaches the specified maximum size.
+ * Once the buffer is full, the aggregator will not aggregate any more bytes until its buffer is taken out,
+ * after which a new aggregate/take buffer cycle can start.
+ * The buffers are taken from the supplied {@link ByteBufferPool} or freshly allocated if one is not supplied.
+ */
+public class ByteBufferAggregator
+{
+ private static final Logger LOG = LoggerFactory.getLogger(ByteBufferAggregator.class);
+
+ private final ByteBufferPool _bufferPool;
+ private final boolean _direct;
+ private final int _maxSize;
+ private RetainableByteBuffer _retainableByteBuffer;
+ private int _aggregatedSize;
+ private int _currentSize;
+
+ /**
+ * Creates a ByteBuffer aggregator.
+ * @param bufferPool The {@link ByteBufferPool} from which to acquire the buffers
+ * @param direct whether to get direct buffers
+ * @param startSize the starting size of the buffer
+ * @param maxSize the maximum size of the buffer which must be greater than {@code startSize}
+ */
+ public ByteBufferAggregator(ByteBufferPool bufferPool, boolean direct, int startSize, int maxSize)
+ {
+ if (maxSize <= 0)
+ throw new IllegalArgumentException("maxSize must be > 0, was: " + maxSize);
+ if (startSize <= 0)
+ throw new IllegalArgumentException("startSize must be > 0, was: " + startSize);
+ if (startSize > maxSize)
+ throw new IllegalArgumentException("maxSize (" + maxSize + ") must be >= startSize (" + startSize + ")");
+ _bufferPool = (bufferPool == null) ? new ByteBufferPool.NonPooling() : bufferPool;
+ _direct = direct;
+ _maxSize = maxSize;
+ _currentSize = startSize;
+ }
+
+ /**
+ * Aggregates the given ByteBuffer. This copies bytes up to the specified maximum size, at which
+ * time this method returns {@code true} and {@link #takeRetainableByteBuffer()} must be called
+ * for this method to accept aggregating again.
+ * @param buffer the buffer to copy into this aggregator; its position is updated according to
+ * the number of aggregated bytes
+ * @return true if the aggregator's buffer is full and should be taken, false otherwise
+ */
+ public boolean aggregate(ByteBuffer buffer)
+ {
+ tryExpandBufferCapacity(buffer.remaining());
+ if (_retainableByteBuffer == null)
+ {
+ _retainableByteBuffer = _bufferPool.acquire(_currentSize, _direct);
+ BufferUtil.flipToFill(_retainableByteBuffer.getByteBuffer());
+ }
+ int copySize = Math.min(_currentSize - _aggregatedSize, buffer.remaining());
+
+ ByteBuffer byteBuffer = _retainableByteBuffer.getByteBuffer();
+ byteBuffer.put(byteBuffer.position(), buffer, buffer.position(), copySize);
+ byteBuffer.position(byteBuffer.position() + copySize);
+ buffer.position(buffer.position() + copySize);
+ _aggregatedSize += copySize;
+ return _aggregatedSize == _maxSize;
+ }
+
+ private void tryExpandBufferCapacity(int remaining)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("tryExpandBufferCapacity remaining: {} _currentSize: {} _accumulatedSize={}", remaining, _currentSize, _aggregatedSize);
+ if (_currentSize == _maxSize)
+ return;
+ int capacityLeft = _currentSize - _aggregatedSize;
+ if (remaining <= capacityLeft)
+ return;
+ int need = remaining - capacityLeft;
+ _currentSize = Math.min(_maxSize, ceilToNextPowerOfTwo(_currentSize + need));
+
+ if (_retainableByteBuffer != null)
+ {
+ BufferUtil.flipToFlush(_retainableByteBuffer.getByteBuffer(), 0);
+ RetainableByteBuffer newBuffer = _bufferPool.acquire(_currentSize, _direct);
+ BufferUtil.flipToFill(newBuffer.getByteBuffer());
+ newBuffer.getByteBuffer().put(_retainableByteBuffer.getByteBuffer());
+ _retainableByteBuffer.release();
+ _retainableByteBuffer = newBuffer;
+ }
+ }
+
+ private static int ceilToNextPowerOfTwo(int val)
+ {
+ int result = 1 << (Integer.SIZE - Integer.numberOfLeadingZeros(val - 1));
+ return result > 0 ? result : Integer.MAX_VALUE;
+ }
+
+ /**
+ * Takes the buffer out of the aggregator. Once the buffer has been taken out,
+ * the aggregator resets itself and a new buffer will be acquired from the pool
+ * during the next {@link #aggregate(ByteBuffer)} call.
+ * @return the aggregated buffer, or null if nothing has been buffered yet
+ */
+ public RetainableByteBuffer takeRetainableByteBuffer()
+ {
+ if (_retainableByteBuffer == null)
+ return null;
+ BufferUtil.flipToFlush(_retainableByteBuffer.getByteBuffer(), 0);
+ RetainableByteBuffer result = _retainableByteBuffer;
+ _retainableByteBuffer = null;
+ _aggregatedSize = 0;
+ return result;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "%s@%x{a=%d c=%d m=%d b=%s}".formatted(getClass().getSimpleName(), hashCode(), _aggregatedSize, _currentSize, _maxSize, _retainableByteBuffer);
+ }
+}
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Content.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Content.java
index 09ae16bdb94..02e86b47e53 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Content.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Content.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.function.Consumer;
+import org.eclipse.jetty.io.content.BufferedContentSink;
import org.eclipse.jetty.io.content.ContentSinkOutputStream;
import org.eclipse.jetty.io.content.ContentSinkSubscriber;
import org.eclipse.jetty.io.content.ContentSourceInputStream;
@@ -411,6 +412,22 @@ public class Content
*/
public interface Sink
{
+ /**
+ * Wraps the given content sink with a buffering sink.
+ *
+ * @param sink the sink to write to
+ * @param bufferPool the {@link ByteBufferPool} to use
+ * @param direct true to use direct buffers, false to use heap buffers
+ * @param maxAggregationSize the maximum size that can be buffered in a single write;
+ * any size above this threshold triggers a buffer flush
+ * @param maxBufferSize the maximum size of the buffer
+ * @return a Sink that writes to the given content sink
+ */
+ static Sink asBuffered(Sink sink, ByteBufferPool bufferPool, boolean direct, int maxAggregationSize, int maxBufferSize)
+ {
+ return new BufferedContentSink(sink, bufferPool, direct, maxAggregationSize, maxBufferSize);
+ }
+
/**
* Wraps the given content sink with an {@link OutputStream}.
*
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/BufferedContentSink.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/BufferedContentSink.java
new file mode 100644
index 00000000000..d835716e7b1
--- /dev/null
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/BufferedContentSink.java
@@ -0,0 +1,277 @@
+//
+// ========================================================================
+// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
+//
+// This program and the accompanying materials are made available under the
+// terms of the Eclipse Public License v. 2.0 which is available at
+// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+// which is available at https://www.apache.org/licenses/LICENSE-2.0.
+//
+// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+// ========================================================================
+//
+
+package org.eclipse.jetty.io.content;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritePendingException;
+
+import org.eclipse.jetty.io.ByteBufferAggregator;
+import org.eclipse.jetty.io.ByteBufferPool;
+import org.eclipse.jetty.io.Content;
+import org.eclipse.jetty.io.RetainableByteBuffer;
+import org.eclipse.jetty.util.BufferUtil;
+import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.IteratingCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link Content.Sink} backed by another {@link Content.Sink}.
+ * Any content written to this {@link Content.Sink} is buffered,
+ * then written to the delegate using
+ * {@link Content.Sink#write(boolean, ByteBuffer, Callback)}.
+ */
+public class BufferedContentSink implements Content.Sink
+{
+ private static final Logger LOG = LoggerFactory.getLogger(BufferedContentSink.class);
+
+ private static final int START_BUFFER_SIZE = 1024;
+
+ private final Content.Sink _delegate;
+ private final ByteBufferPool _bufferPool;
+ private final boolean _direct;
+ private final int _maxBufferSize;
+ private final int _maxAggregationSize;
+ private final Flusher _flusher;
+ private ByteBufferAggregator _aggregator;
+ private boolean _firstWrite = true;
+ private boolean _lastWritten;
+
+ public BufferedContentSink(Content.Sink delegate, ByteBufferPool bufferPool, boolean direct, int maxAggregationSize, int maxBufferSize)
+ {
+ if (maxBufferSize <= 0)
+ throw new IllegalArgumentException("maxBufferSize must be > 0, was: " + maxBufferSize);
+ if (maxAggregationSize <= 0)
+ throw new IllegalArgumentException("maxAggregationSize must be > 0, was: " + maxAggregationSize);
+ if (maxBufferSize < maxAggregationSize)
+ throw new IllegalArgumentException("maxBufferSize (" + maxBufferSize + ") must be >= maxAggregationSize (" + maxAggregationSize + ")");
+ _delegate = delegate;
+ _bufferPool = (bufferPool == null) ? new ByteBufferPool.NonPooling() : bufferPool;
+ _direct = direct;
+ _maxBufferSize = maxBufferSize;
+ _maxAggregationSize = maxAggregationSize;
+ _flusher = new Flusher(delegate);
+ }
+
+ @Override
+ public void write(boolean last, ByteBuffer byteBuffer, Callback callback)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("writing last={} {}", last, BufferUtil.toDetailString(byteBuffer));
+
+ if (_lastWritten)
+ {
+ callback.failed(new IOException("complete"));
+ return;
+ }
+ _lastWritten = last;
+ if (_firstWrite)
+ {
+ _firstWrite = false;
+ if (last)
+ {
+ // No need to buffer if this is both the first and the last write.
+ _delegate.write(true, byteBuffer, callback);
+ return;
+ }
+ }
+
+ ByteBuffer current = byteBuffer != null ? byteBuffer : BufferUtil.EMPTY_BUFFER;
+ if (current.remaining() <= _maxAggregationSize)
+ {
+ // current buffer can be aggregated
+ if (_aggregator == null)
+ _aggregator = new ByteBufferAggregator(_bufferPool, _direct, Math.min(START_BUFFER_SIZE, _maxBufferSize), _maxBufferSize);
+ aggregateAndFlush(last, current, callback);
+ }
+ else
+ {
+ // current buffer is greater than the max aggregation size
+ flush(last, current, callback);
+ }
+ }
+
+ /**
+ * Flushes the aggregated buffer if something was aggregated, then flushes the
+ * given buffer, bypassing the aggregator.
+ */
+ private void flush(boolean last, ByteBuffer currentBuffer, Callback callback)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("given buffer is greater than _maxBufferSize");
+
+ RetainableByteBuffer aggregatedBuffer = _aggregator == null ? null : _aggregator.takeRetainableByteBuffer();
+ if (aggregatedBuffer == null)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("nothing aggregated, flushing current buffer {}", currentBuffer);
+ _flusher.offer(last, currentBuffer, callback);
+ }
+ else
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("flushing aggregated buffer {}", aggregatedBuffer);
+ _flusher.offer(false, aggregatedBuffer.getByteBuffer(), new Callback.Nested(Callback.from(aggregatedBuffer::release))
+ {
+ @Override
+ public void succeeded()
+ {
+ super.succeeded();
+ if (LOG.isDebugEnabled())
+ LOG.debug("succeeded writing aggregated buffer, flushing current buffer {}", currentBuffer);
+ _flusher.offer(last, currentBuffer, callback);
+ }
+
+ @Override
+ public void failed(Throwable x)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("failure writing aggregated buffer", x);
+ super.failed(x);
+ callback.failed(x);
+ }
+ });
+ }
+ }
+
+ /**
+ * Aggregates the given buffer, flushing the aggregated buffer if necessary.
+ */
+ private void aggregateAndFlush(boolean last, ByteBuffer currentBuffer, Callback callback)
+ {
+ boolean full = _aggregator.aggregate(currentBuffer);
+ boolean complete = last && !currentBuffer.hasRemaining();
+ if (LOG.isDebugEnabled())
+ LOG.debug("aggregated current buffer, full={}, complete={}, bytes left={}, aggregator={}", full, complete, currentBuffer.remaining(), _aggregator);
+ if (complete)
+ {
+ RetainableByteBuffer aggregatedBuffer = _aggregator.takeRetainableByteBuffer();
+ if (aggregatedBuffer != null)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("complete; writing aggregated buffer as the last one: {} bytes", aggregatedBuffer.remaining());
+ _flusher.offer(true, aggregatedBuffer.getByteBuffer(), Callback.from(callback, aggregatedBuffer::release));
+ }
+ else
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("complete; no aggregated buffer, writing last empty buffer");
+ _flusher.offer(true, BufferUtil.EMPTY_BUFFER, callback);
+ }
+ }
+ else if (full)
+ {
+ RetainableByteBuffer aggregatedBuffer = _aggregator.takeRetainableByteBuffer();
+ if (LOG.isDebugEnabled())
+ LOG.debug("writing aggregated buffer: {} bytes", aggregatedBuffer.remaining());
+ _flusher.offer(false, aggregatedBuffer.getByteBuffer(), new Callback.Nested(Callback.from(aggregatedBuffer::release))
+ {
+ @Override
+ public void succeeded()
+ {
+ super.succeeded();
+ if (LOG.isDebugEnabled())
+ LOG.debug("written aggregated buffer, writing remaining of current: {} bytes{}", currentBuffer.remaining(), (last ? " (last write)" : ""));
+ if (last)
+ _flusher.offer(true, currentBuffer, callback);
+ else
+ aggregateAndFlush(false, currentBuffer, callback);
+ }
+
+ @Override
+ public void failed(Throwable x)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("failure writing aggregated buffer", x);
+ super.failed(x);
+ callback.failed(x);
+ }
+ });
+ }
+ else
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("buffer fully aggregated, delaying writing - aggregator: {}", _aggregator);
+ _flusher.offer(callback);
+ }
+ }
+
+ private static class Flusher extends IteratingCallback
+ {
+ private static final ByteBuffer COMPLETE_CALLBACK = BufferUtil.allocate(0);
+
+ private final Content.Sink _sink;
+ private boolean _last;
+ private ByteBuffer _buffer;
+ private Callback _callback;
+ private boolean _lastWritten;
+
+ Flusher(Content.Sink sink)
+ {
+ _sink = sink;
+ }
+
+ void offer(Callback callback)
+ {
+ offer(false, COMPLETE_CALLBACK, callback);
+ }
+
+ void offer(boolean last, ByteBuffer byteBuffer, Callback callback)
+ {
+ if (_callback != null)
+ throw new WritePendingException();
+ _last = last;
+ _buffer = byteBuffer;
+ _callback = callback;
+ iterate();
+ }
+
+ @Override
+ protected Action process()
+ {
+ if (_lastWritten)
+ return Action.SUCCEEDED;
+ if (_callback == null)
+ return Action.IDLE;
+ if (_buffer != COMPLETE_CALLBACK)
+ {
+ _lastWritten = _last;
+ _sink.write(_last, _buffer, this);
+ }
+ else
+ {
+ succeeded();
+ }
+ return Action.SCHEDULED;
+ }
+
+ @Override
+ public void succeeded()
+ {
+ _buffer = null;
+ Callback callback = _callback;
+ _callback = null;
+ callback.succeeded();
+ super.succeeded();
+ }
+
+ @Override
+ protected void onCompleteFailure(Throwable cause)
+ {
+ _buffer = null;
+ _callback.failed(cause);
+ }
+ }
+}
diff --git a/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/BufferedContentSinkTest.java b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/BufferedContentSinkTest.java
new file mode 100644
index 00000000000..2d1b2bbaf81
--- /dev/null
+++ b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/BufferedContentSinkTest.java
@@ -0,0 +1,553 @@
+//
+// ========================================================================
+// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
+//
+// This program and the accompanying materials are made available under the
+// terms of the Eclipse Public License v. 2.0 which is available at
+// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+// which is available at https://www.apache.org/licenses/LICENSE-2.0.
+//
+// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+// ========================================================================
+//
+
+package org.eclipse.jetty.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.eclipse.jetty.io.content.AsyncContent;
+import org.eclipse.jetty.io.content.BufferedContentSink;
+import org.eclipse.jetty.util.BufferUtil;
+import org.eclipse.jetty.util.Callback;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class BufferedContentSinkTest
+{
+ private ArrayByteBufferPool.Tracking _bufferPool;
+
+ @BeforeEach
+ public void setUp()
+ {
+ _bufferPool = new ArrayByteBufferPool.Tracking();
+ }
+
+ @AfterEach
+ public void tearDown()
+ {
+ assertThat("Leaks: " + _bufferPool.dumpLeaks(), _bufferPool.getLeaks().size(), is(0));
+ }
+
+ @Test
+ public void testConstructor()
+ {
+ assertThrows(IllegalArgumentException.class, () -> new BufferedContentSink(new AsyncContent(), _bufferPool, true, 1, 0));
+ assertThrows(IllegalArgumentException.class, () -> new BufferedContentSink(new AsyncContent(), _bufferPool, true, 1, -1));
+ assertThrows(IllegalArgumentException.class, () -> new BufferedContentSink(new AsyncContent(), _bufferPool, true, 0, 1));
+ assertThrows(IllegalArgumentException.class, () -> new BufferedContentSink(new AsyncContent(), _bufferPool, true, -1, 1));
+ assertThrows(IllegalArgumentException.class, () -> new BufferedContentSink(new AsyncContent(), _bufferPool, true, 0, 0));
+ assertThrows(IllegalArgumentException.class, () -> new BufferedContentSink(new AsyncContent(), _bufferPool, true, -1, -1));
+ assertThrows(IllegalArgumentException.class, () -> new BufferedContentSink(new AsyncContent(), _bufferPool, true, 2, 1));
+ }
+
+ @Test
+ public void testWriteInvokesDemandCallback() throws Exception
+ {
+ try (AsyncContent async = new AsyncContent())
+ {
+ BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, 4096, 4096);
+
+ CountDownLatch latch = new CountDownLatch(1);
+ async.demand(latch::countDown);
+ assertFalse(latch.await(250, TimeUnit.MILLISECONDS));
+
+ buffered.write(true, UTF_8.encode("one"), Callback.NOOP);
+
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+ Content.Chunk chunk = async.read();
+ assertNotNull(chunk);
+ chunk.release();
+ }
+ }
+
+ @Test
+ public void testChunkReleaseSucceedsWriteCallback()
+ {
+ try (AsyncContent async = new AsyncContent())
+ {
+ BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, 4096, 4096);
+
+ AtomicInteger successCounter = new AtomicInteger();
+ AtomicReference failureRef = new AtomicReference<>();
+
+ buffered.write(true, ByteBuffer.wrap(new byte[1]), Callback.from(successCounter::incrementAndGet, failureRef::set));
+
+ Content.Chunk chunk = async.read();
+ assertThat(successCounter.get(), is(0));
+ chunk.retain();
+ assertThat(chunk.release(), is(false));
+ assertThat(successCounter.get(), is(0));
+ assertThat(chunk.release(), is(true));
+ assertThat(successCounter.get(), is(1));
+ assertThat(failureRef.get(), is(nullValue()));
+ }
+ }
+
+ @Test
+ public void testEmptyChunkReadSucceedsWriteCallback()
+ {
+ try (AsyncContent async = new AsyncContent())
+ {
+ BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, 4096, 4096);
+
+ AtomicInteger successCounter = new AtomicInteger();
+ AtomicReference failureRef = new AtomicReference<>();
+
+ buffered.write(true, ByteBuffer.wrap(new byte[0]), Callback.from(successCounter::incrementAndGet, failureRef::set));
+
+ Content.Chunk chunk = async.read();
+ assertThat(successCounter.get(), is(1));
+ assertThat(chunk.isLast(), is(true));
+ assertThat(chunk.hasRemaining(), is(false));
+ assertThat(chunk.release(), is(true));
+ assertThat(successCounter.get(), is(1));
+ assertThat(failureRef.get(), is(nullValue()));
+ }
+ }
+
+ @Test
+ public void testWriteAfterWriteLast()
+ {
+ try (AsyncContent async = new AsyncContent())
+ {
+ BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, 4096, 4096);
+
+ AtomicInteger successCounter = new AtomicInteger();
+ AtomicReference failureRef = new AtomicReference<>();
+
+ buffered.write(true, ByteBuffer.wrap(new byte[0]), Callback.from(successCounter::incrementAndGet, failureRef::set));
+
+ Content.Chunk chunk = async.read();
+ assertThat(chunk.isLast(), is(true));
+ assertThat(chunk.release(), is(true));
+
+ assertThat(successCounter.get(), is(1));
+ assertThat(failureRef.get(), is(nullValue()));
+
+ buffered.write(false, ByteBuffer.wrap(new byte[0]), Callback.from(successCounter::incrementAndGet, failureRef::set));
+ assertThat(successCounter.get(), is(1));
+ assertThat(failureRef.get(), instanceOf(IOException.class));
+ }
+ }
+
+ @Test
+ public void testLargeBuffer()
+ {
+ try (AsyncContent async = new AsyncContent())
+ {
+ BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, 4096, 4096);
+
+ buffered.write(false, ByteBuffer.wrap("one ".getBytes(UTF_8)), Callback.NOOP);
+ Content.Chunk chunk = async.read();
+ assertThat(chunk, nullValue());
+
+ buffered.write(false, ByteBuffer.wrap("two".getBytes(UTF_8)), Callback.NOOP);
+ chunk = async.read();
+ assertThat(chunk, nullValue());
+
+ buffered.write(true, null, Callback.NOOP);
+ chunk = async.read();
+ assertThat(chunk.isLast(), is(true));
+ assertThat(BufferUtil.toString(chunk.getByteBuffer(), UTF_8), is("one two"));
+ assertThat(chunk.release(), is(true));
+ }
+ }
+
+ @Test
+ public void testSmallBuffer()
+ {
+ int maxBufferSize = 16;
+ byte[] input1 = new byte[1024];
+ Arrays.fill(input1, (byte)'1');
+ byte[] input2 = new byte[1023];
+ Arrays.fill(input2, (byte)'2');
+
+ ByteBuffer accumulatingBuffer = BufferUtil.allocate(4096);
+ BufferUtil.flipToFill(accumulatingBuffer);
+
+ try (AsyncContent async = new AsyncContent())
+ {
+ BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, maxBufferSize, maxBufferSize);
+
+ buffered.write(false, ByteBuffer.wrap(input1), Callback.from(() ->
+ buffered.write(true, ByteBuffer.wrap(input2), Callback.NOOP)));
+
+ int loopCount = 0;
+ while (true)
+ {
+ loopCount++;
+ Content.Chunk chunk = async.read();
+ assertThat(chunk, notNullValue());
+ accumulatingBuffer.put(chunk.getByteBuffer());
+ assertThat(chunk.release(), is(true));
+ if (chunk.isLast())
+ break;
+ }
+ assertThat(loopCount, is(2));
+
+ BufferUtil.flipToFlush(accumulatingBuffer, 0);
+ assertThat(accumulatingBuffer.remaining(), is(input1.length + input2.length));
+ for (byte b : input1)
+ {
+ assertThat(accumulatingBuffer.get(), is(b));
+ }
+ for (byte b : input2)
+ {
+ assertThat(accumulatingBuffer.get(), is(b));
+ }
+ }
+ }
+
+ @Test
+ public void testMaxAggregationSizeExceeded()
+ {
+ int maxBufferSize = 1024;
+ int maxAggregationSize = 128;
+ byte[] input1 = new byte[512];
+ Arrays.fill(input1, (byte)'1');
+ byte[] input2 = new byte[128];
+ Arrays.fill(input2, (byte)'2');
+
+ ByteBuffer accumulatingBuffer = BufferUtil.allocate(4096);
+ BufferUtil.flipToFill(accumulatingBuffer);
+
+ try (AsyncContent async = new AsyncContent())
+ {
+ BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, maxAggregationSize, maxBufferSize);
+
+ buffered.write(false, ByteBuffer.wrap(input1), Callback.from(() ->
+ buffered.write(true, ByteBuffer.wrap(input2), Callback.NOOP)));
+
+ Content.Chunk chunk = async.read();
+ assertThat(chunk, notNullValue());
+ assertThat(chunk.remaining(), is(512));
+ accumulatingBuffer.put(chunk.getByteBuffer());
+ assertThat(chunk.release(), is(true));
+ assertThat(chunk.isLast(), is(false));
+
+ chunk = async.read();
+ assertThat(chunk, notNullValue());
+ assertThat(chunk.remaining(), is(128));
+ accumulatingBuffer.put(chunk.getByteBuffer());
+ assertThat(chunk.release(), is(true));
+ assertThat(chunk.isLast(), is(true));
+
+ BufferUtil.flipToFlush(accumulatingBuffer, 0);
+ assertThat(accumulatingBuffer.remaining(), is(input1.length + input2.length));
+ for (byte b : input1)
+ {
+ assertThat(accumulatingBuffer.get(), is(b));
+ }
+ for (byte b : input2)
+ {
+ assertThat(accumulatingBuffer.get(), is(b));
+ }
+ }
+ }
+
+ @Test
+ public void testMaxAggregationSizeExceededAfterBuffering()
+ {
+ int maxBufferSize = 1024;
+ int maxAggregationSize = 128;
+ byte[] input1 = new byte[128];
+ Arrays.fill(input1, (byte)'1');
+ byte[] input2 = new byte[512];
+ Arrays.fill(input2, (byte)'2');
+
+ ByteBuffer accumulatingBuffer = BufferUtil.allocate(4096);
+ BufferUtil.flipToFill(accumulatingBuffer);
+
+ try (AsyncContent async = new AsyncContent())
+ {
+ BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, maxAggregationSize, maxBufferSize);
+
+ buffered.write(false, ByteBuffer.wrap(input1), Callback.from(() ->
+ buffered.write(true, ByteBuffer.wrap(input2), Callback.NOOP)));
+
+ Content.Chunk chunk = async.read();
+ assertThat(chunk, notNullValue());
+ assertThat(chunk.remaining(), is(128));
+ accumulatingBuffer.put(chunk.getByteBuffer());
+ assertThat(chunk.release(), is(true));
+ assertThat(chunk.isLast(), is(false));
+
+ chunk = async.read();
+ assertThat(chunk, notNullValue());
+ assertThat(chunk.remaining(), is(512));
+ accumulatingBuffer.put(chunk.getByteBuffer());
+ assertThat(chunk.release(), is(true));
+ assertThat(chunk.isLast(), is(true));
+
+ BufferUtil.flipToFlush(accumulatingBuffer, 0);
+ assertThat(accumulatingBuffer.remaining(), is(input1.length + input2.length));
+ for (byte b : input1)
+ {
+ assertThat(accumulatingBuffer.get(), is(b));
+ }
+ for (byte b : input2)
+ {
+ assertThat(accumulatingBuffer.get(), is(b));
+ }
+ }
+ }
+
+ @Test
+ public void testMaxAggregationSizeRespected()
+ {
+ int maxBufferSize = 1024;
+ int maxAggregationSize = 128;
+ byte[] input1 = new byte[128];
+ Arrays.fill(input1, (byte)'1');
+ byte[] input2 = new byte[128];
+ Arrays.fill(input2, (byte)'2');
+
+ ByteBuffer accumulatingBuffer = BufferUtil.allocate(4096);
+ BufferUtil.flipToFill(accumulatingBuffer);
+
+ try (AsyncContent async = new AsyncContent())
+ {
+ BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, maxAggregationSize, maxBufferSize);
+
+ buffered.write(false, ByteBuffer.wrap(input1), Callback.from(() ->
+ buffered.write(true, ByteBuffer.wrap(input2), Callback.NOOP)));
+
+ Content.Chunk chunk = async.read();
+ assertThat(chunk, notNullValue());
+ assertThat(chunk.remaining(), is(256));
+ accumulatingBuffer.put(chunk.getByteBuffer());
+ assertThat(chunk.release(), is(true));
+ assertThat(chunk.isLast(), is(true));
+
+ BufferUtil.flipToFlush(accumulatingBuffer, 0);
+ assertThat(accumulatingBuffer.remaining(), is(input1.length + input2.length));
+ for (byte b : input1)
+ {
+ assertThat(accumulatingBuffer.get(), is(b));
+ }
+ for (byte b : input2)
+ {
+ assertThat(accumulatingBuffer.get(), is(b));
+ }
+ }
+ }
+
+ @Test
+ public void testBufferGrowth()
+ {
+ byte[] input1 = new byte[4000];
+ Arrays.fill(input1, (byte)'1');
+ byte[] input2 = new byte[4000];
+ Arrays.fill(input2, (byte)'2');
+ byte[] input3 = new byte[2000];
+ Arrays.fill(input3, (byte)'3');
+
+ ByteBuffer accumulatingBuffer = BufferUtil.allocate(16384);
+ BufferUtil.flipToFill(accumulatingBuffer);
+
+ try (AsyncContent async = new AsyncContent())
+ {
+ BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, 4096, 4096);
+
+ buffered.write(false, ByteBuffer.wrap(input1), Callback.from(() ->
+ buffered.write(false, ByteBuffer.wrap(input2), Callback.from(() ->
+ buffered.write(true, ByteBuffer.wrap(input3), Callback.NOOP)))));
+
+ // We expect 3 buffer flushes: 4096b + 4096b + 1808b == 10_000b.
+ Content.Chunk chunk = async.read();
+ assertThat(chunk, notNullValue());
+ assertThat(chunk.remaining(), is(4096));
+ accumulatingBuffer.put(chunk.getByteBuffer());
+ assertThat(chunk.release(), is(true));
+ assertThat(chunk.isLast(), is(false));
+
+ chunk = async.read();
+ assertThat(chunk, notNullValue());
+ assertThat(chunk.remaining(), is(4096));
+ accumulatingBuffer.put(chunk.getByteBuffer());
+ assertThat(chunk.release(), is(true));
+ assertThat(chunk.isLast(), is(false));
+
+ chunk = async.read();
+ assertThat(chunk, notNullValue());
+ assertThat(chunk.remaining(), is(1808));
+ accumulatingBuffer.put(chunk.getByteBuffer());
+ assertThat(chunk.release(), is(true));
+ assertThat(chunk.isLast(), is(true));
+
+ BufferUtil.flipToFlush(accumulatingBuffer, 0);
+ assertThat(accumulatingBuffer.remaining(), is(input1.length + input2.length + input3.length));
+ for (byte b : input1)
+ {
+ assertThat(accumulatingBuffer.get(), is(b));
+ }
+ for (byte b : input2)
+ {
+ assertThat(accumulatingBuffer.get(), is(b));
+ }
+ for (byte b : input3)
+ {
+ assertThat(accumulatingBuffer.get(), is(b));
+ }
+ }
+ }
+
+ @Test
+ public void testByteByByteRecursion() throws Exception
+ {
+ try (AsyncContent async = new AsyncContent())
+ {
+ BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, 4096, 4096);
+ AtomicInteger count = new AtomicInteger(8192);
+ CountDownLatch complete = new CountDownLatch(1);
+ Callback callback = new Callback()
+ {
+ @Override
+ public void succeeded()
+ {
+ int c = count.decrementAndGet();
+ ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[]{(byte)c});
+ if (c >= 0)
+ buffered.write(c == 0, byteBuffer, this);
+ else
+ complete.countDown();
+ }
+ };
+
+ callback.succeeded();
+
+ Content.Chunk read = async.read();
+ assertThat(read.isLast(), is(false));
+ assertThat(read.remaining(), is(4096));
+ assertThat(read.release(), is(true));
+
+ read = async.read();
+ assertThat(read.isLast(), is(true));
+ assertThat(read.remaining(), is(4096));
+ assertThat(read.release(), is(true));
+
+ assertTrue(complete.await(5, TimeUnit.SECONDS));
+ assertThat(count.get(), is(-1));
+ }
+ }
+
+ @Test
+ public void testByteByByteAsync() throws Exception
+ {
+ try (AsyncContent async = new AsyncContent())
+ {
+ BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, 1024, 1024);
+ AtomicInteger count = new AtomicInteger(2048);
+ CountDownLatch complete = new CountDownLatch(1);
+ Callback callback = new Callback()
+ {
+ @Override
+ public void succeeded()
+ {
+ int c = count.decrementAndGet();
+ if (c >= 0)
+ {
+ Callback cb = this;
+ new Thread(() ->
+ {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[]{(byte)c});
+ buffered.write(c == 0, byteBuffer, cb);
+ }).start();
+ }
+ else
+ {
+ complete.countDown();
+ }
+ }
+ };
+
+ callback.succeeded();
+
+ Content.Chunk read = await().atMost(5, TimeUnit.SECONDS).until(async::read, Objects::nonNull);
+ assertThat(read.isLast(), is(false));
+ assertThat(read.remaining(), is(1024));
+ assertThat(read.release(), is(true));
+
+ read = await().atMost(5, TimeUnit.SECONDS).until(async::read, Objects::nonNull);
+ assertThat(read.isLast(), is(true));
+ assertThat(read.remaining(), is(1024));
+ assertThat(read.release(), is(true));
+
+ assertTrue(complete.await(5, TimeUnit.SECONDS));
+ assertThat(count.get(), is(-1));
+ }
+ }
+
+ @Test
+ public void testSmallThenLargeWritesRecursion() throws Exception
+ {
+ try (AsyncContent async = new AsyncContent())
+ {
+ BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, 1, 4096);
+ AtomicInteger count = new AtomicInteger(8192);
+ CountDownLatch complete = new CountDownLatch(1);
+ Callback callback = new Callback()
+ {
+ @Override
+ public void succeeded()
+ {
+ int c = count.decrementAndGet();
+ ByteBuffer byteBuffer = (c % 2 == 0) ? ByteBuffer.wrap(new byte[512]) : ByteBuffer.wrap(new byte[]{(byte)c});
+ if (c >= 0)
+ buffered.write(c == 0, byteBuffer, this);
+ else
+ complete.countDown();
+ }
+ };
+
+ callback.succeeded();
+
+ for (int i = 0; i < 4096; i++)
+ {
+ Content.Chunk read = async.read();
+ assertThat(read.isLast(), is(false));
+ assertThat(read.remaining(), is(1));
+ assertThat(read.release(), is(true));
+
+ read = async.read();
+ assertThat(read.isLast(), is(i == 4095));
+ assertThat(read.remaining(), is(512));
+ assertThat(read.release(), is(true));
+ }
+
+ assertTrue(complete.await(5, TimeUnit.SECONDS));
+ assertThat(count.get(), is(-1));
+ }
+ }
+}
diff --git a/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ByteBufferAggregatorTest.java b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ByteBufferAggregatorTest.java
new file mode 100644
index 00000000000..80509a36452
--- /dev/null
+++ b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ByteBufferAggregatorTest.java
@@ -0,0 +1,88 @@
+//
+// ========================================================================
+// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
+//
+// This program and the accompanying materials are made available under the
+// terms of the Eclipse Public License v. 2.0 which is available at
+// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+// which is available at https://www.apache.org/licenses/LICENSE-2.0.
+//
+// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+// ========================================================================
+//
+
+package org.eclipse.jetty.io;
+
+import java.nio.ByteBuffer;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class ByteBufferAggregatorTest
+{
+ private ArrayByteBufferPool.Tracking bufferPool;
+
+ @BeforeEach
+ public void before()
+ {
+ bufferPool = new ArrayByteBufferPool.Tracking();
+ }
+
+ @AfterEach
+ public void tearDown()
+ {
+ assertThat("Leaks: " + bufferPool.dumpLeaks(), bufferPool.getLeaks().size(), is(0));
+ }
+
+ @Test
+ public void testConstructor()
+ {
+ assertThrows(IllegalArgumentException.class, () -> new ByteBufferAggregator(bufferPool, true, 0, 0));
+ assertThrows(IllegalArgumentException.class, () -> new ByteBufferAggregator(bufferPool, true, 0, 1));
+ assertThrows(IllegalArgumentException.class, () -> new ByteBufferAggregator(bufferPool, true, 1, 0));
+ assertThrows(IllegalArgumentException.class, () -> new ByteBufferAggregator(bufferPool, true, 0, -1));
+ assertThrows(IllegalArgumentException.class, () -> new ByteBufferAggregator(bufferPool, true, -1, 0));
+ assertThrows(IllegalArgumentException.class, () -> new ByteBufferAggregator(bufferPool, true, 2, 1));
+ }
+
+ @Test
+ public void testFullInSingleShot()
+ {
+ ByteBufferAggregator aggregator = new ByteBufferAggregator(bufferPool, true, 1, 16);
+
+ ByteBuffer byteBuffer1 = ByteBuffer.wrap(new byte[16]);
+ assertThat(aggregator.aggregate(byteBuffer1), is(true));
+ assertThat(byteBuffer1.remaining(), is(0));
+
+ ByteBuffer byteBuffer2 = ByteBuffer.wrap(new byte[16]);
+ assertThat(aggregator.aggregate(byteBuffer2), is(true));
+ assertThat(byteBuffer2.remaining(), is(16));
+
+ RetainableByteBuffer retainableByteBuffer = aggregator.takeRetainableByteBuffer();
+ assertThat(retainableByteBuffer.getByteBuffer().remaining(), is(16));
+ assertThat(retainableByteBuffer.release(), is(true));
+ }
+
+ @Test
+ public void testFullInMultipleShots()
+ {
+ ByteBufferAggregator aggregator = new ByteBufferAggregator(bufferPool, true, 1, 16);
+
+ ByteBuffer byteBuffer1 = ByteBuffer.wrap(new byte[15]);
+ assertThat(aggregator.aggregate(byteBuffer1), is(false));
+ assertThat(byteBuffer1.remaining(), is(0));
+
+ ByteBuffer byteBuffer2 = ByteBuffer.wrap(new byte[16]);
+ assertThat(aggregator.aggregate(byteBuffer2), is(true));
+ assertThat(byteBuffer2.remaining(), is(15));
+
+ RetainableByteBuffer retainableByteBuffer = aggregator.takeRetainableByteBuffer();
+ assertThat(retainableByteBuffer.getByteBuffer().remaining(), is(16));
+ assertThat(retainableByteBuffer.release(), is(true));
+ }
+}
diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java
index 4239026e1d9..470e3d68173 100644
--- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java
+++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java
@@ -13,6 +13,7 @@
package org.eclipse.jetty.server;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ListIterator;
import java.util.concurrent.CompletableFuture;
@@ -30,6 +31,7 @@ import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.Trailers;
+import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.QuietException;
import org.eclipse.jetty.server.handler.ErrorHandler;
@@ -530,6 +532,40 @@ public interface Response extends Content.Sink
return -1;
}
+ /**
+ * Wraps a {@link Response} as a {@link OutputStream} that performs buffering. The necessary
+ * {@link ByteBufferPool} is taken from the request's connector while the size and direction of the buffer
+ * is read from the request's {@link HttpConfiguration}.
+ * This is equivalent to:
+ * {@code Content.Sink.asOutputStream(Response.asBufferedSink(request, response))}
+ * @param request the request from which to get the buffering sink's settings
+ * @param response the response to wrap
+ * @return a buffering {@link OutputStream}
+ */
+ static OutputStream asBufferedOutputStream(Request request, Response response)
+ {
+ return Content.Sink.asOutputStream(Response.asBufferedSink(request, response));
+ }
+
+ /**
+ * Wraps a {@link Response} as a {@link Content.Sink} that performs buffering. The necessary
+ * {@link ByteBufferPool} is taken from the request's connector while the size, direction of the buffer
+ * and commit size are read from the request's {@link HttpConfiguration}.
+ * @param request the request from which to get the buffering sink's settings
+ * @param response the response to wrap
+ * @return a buffering {@link Content.Sink}
+ */
+ static Content.Sink asBufferedSink(Request request, Response response)
+ {
+ ConnectionMetaData connectionMetaData = request.getConnectionMetaData();
+ ByteBufferPool bufferPool = connectionMetaData.getConnector().getByteBufferPool();
+ HttpConfiguration httpConfiguration = connectionMetaData.getHttpConfiguration();
+ int bufferSize = httpConfiguration.getOutputBufferSize();
+ boolean useOutputDirectByteBuffers = httpConfiguration.isUseOutputDirectByteBuffers();
+ int outputAggregationSize = httpConfiguration.getOutputAggregationSize();
+ return Content.Sink.asBuffered(response, bufferPool, useOutputDirectByteBuffers, outputAggregationSize, bufferSize);
+ }
+
class Wrapper implements Response
{
private final Request _request;
diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/BufferedResponseHandler.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/BufferedResponseHandler.java
index d2cc992b85f..c2ca86df24b 100644
--- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/BufferedResponseHandler.java
+++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/BufferedResponseHandler.java
@@ -19,17 +19,15 @@ import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.MimeTypes;
-import org.eclipse.jetty.io.ByteBufferAccumulator;
import org.eclipse.jetty.io.ByteBufferPool;
-import org.eclipse.jetty.io.RetainableByteBuffer;
+import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.server.ConnectionMetaData;
import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
-import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IncludeExclude;
-import org.eclipse.jetty.util.IteratingNestedCallback;
import org.eclipse.jetty.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,14 +45,25 @@ import org.slf4j.LoggerFactory;
* decision to buffer or not.
*
*
- * Note also that there are no memory limits to the size of the buffer, thus
- * this handler can represent an unbounded memory commitment if the content
- * generated can also be unbounded.
+ * Note also that the size of the buffer can be controlled by setting the
+ * {@link #BUFFER_SIZE_ATTRIBUTE_NAME} request attribute to an integer;
+ * in the absence of such header, the {@link HttpConfiguration#getOutputBufferSize()}
+ * config setting is used, while the maximum aggregation size can be controlled
+ * by setting the {@link #MAX_AGGREGATION_SIZE_ATTRIBUTE_NAME} request attribute to an integer,
+ * in the absence of such header, the {@link HttpConfiguration#getOutputAggregationSize()}
+ * config setting is used.
*
*/
public class BufferedResponseHandler extends ConditionalHandler.Abstract
{
+ /**
+ * The name of the request attribute used to control the buffer size of a particular request.
+ */
public static final String BUFFER_SIZE_ATTRIBUTE_NAME = BufferedResponseHandler.class.getName() + ".buffer-size";
+ /**
+ * The name of the request attribute used to control the max aggregation size of a particular request.
+ */
+ public static final String MAX_AGGREGATION_SIZE_ATTRIBUTE_NAME = BufferedResponseHandler.class.getName() + ".max-aggregation-size";
private static final Logger LOG = LoggerFactory.getLogger(BufferedResponseHandler.class);
@@ -159,8 +168,9 @@ public class BufferedResponseHandler extends ConditionalHandler.Abstract
private class BufferedResponse extends Response.Wrapper implements Callback
{
private final Callback _callback;
- private CountingByteBufferAccumulator _accumulator;
+ private Content.Sink _bufferedContentSink;
private boolean _firstWrite = true;
+ private boolean _lastWritten;
private BufferedResponse(Request request, Response response, Callback callback)
{
@@ -173,130 +183,44 @@ public class BufferedResponseHandler extends ConditionalHandler.Abstract
{
if (_firstWrite)
{
- if (shouldBuffer(this, last))
- {
- ConnectionMetaData connectionMetaData = getRequest().getConnectionMetaData();
- ByteBufferPool bufferPool = connectionMetaData.getConnector().getByteBufferPool();
- boolean useOutputDirectByteBuffers = connectionMetaData.getHttpConfiguration().isUseOutputDirectByteBuffers();
- _accumulator = new CountingByteBufferAccumulator(bufferPool, useOutputDirectByteBuffers, getBufferSize());
- }
_firstWrite = false;
+ if (shouldBuffer(this, last))
+ _bufferedContentSink = createBufferedSink();
}
-
- if (_accumulator != null)
- {
- ByteBuffer current = byteBuffer != null ? byteBuffer : BufferUtil.EMPTY_BUFFER;
- IteratingNestedCallback writer = new IteratingNestedCallback(callback)
- {
- private boolean complete;
-
- @Override
- protected Action process()
- {
- if (complete)
- return Action.SUCCEEDED;
- boolean write = _accumulator.copyBuffer(current);
- complete = last && !current.hasRemaining();
- if (write || complete)
- {
- RetainableByteBuffer buffer = _accumulator.takeRetainableByteBuffer();
- BufferedResponse.super.write(complete, buffer.getByteBuffer(), Callback.from(this, buffer::release));
- return Action.SCHEDULED;
- }
- return Action.SUCCEEDED;
- }
- };
- writer.iterate();
- }
- else
- {
- super.write(last, byteBuffer, callback);
- }
+ _lastWritten |= last;
+ Content.Sink destSink = _bufferedContentSink != null ? _bufferedContentSink : getWrapped();
+ destSink.write(last, byteBuffer, callback);
}
- private int getBufferSize()
+ private Content.Sink createBufferedSink()
{
- Object attribute = getRequest().getAttribute(BufferedResponseHandler.BUFFER_SIZE_ATTRIBUTE_NAME);
- return attribute instanceof Integer ? (int)attribute : Integer.MAX_VALUE;
+ Request request = getRequest();
+ ConnectionMetaData connectionMetaData = request.getConnectionMetaData();
+ ByteBufferPool bufferPool = connectionMetaData.getConnector().getByteBufferPool();
+ HttpConfiguration httpConfiguration = connectionMetaData.getHttpConfiguration();
+ Object attribute = request.getAttribute(BufferedResponseHandler.BUFFER_SIZE_ATTRIBUTE_NAME);
+ int bufferSize = attribute instanceof Integer ? (int)attribute : httpConfiguration.getOutputBufferSize();
+ attribute = request.getAttribute(BufferedResponseHandler.MAX_AGGREGATION_SIZE_ATTRIBUTE_NAME);
+ int maxAggregationSize = attribute instanceof Integer ? (int)attribute : httpConfiguration.getOutputAggregationSize();
+ boolean direct = httpConfiguration.isUseOutputDirectByteBuffers();
+ return Content.Sink.asBuffered(getWrapped(), bufferPool, direct, maxAggregationSize, bufferSize);
}
@Override
public void succeeded()
{
- // TODO pass all accumulated buffers as an array instead of allocating & copying into a single one.
- if (_accumulator != null)
- {
- RetainableByteBuffer buffer = _accumulator.takeRetainableByteBuffer();
- super.write(true, buffer.getByteBuffer(), Callback.from(_callback, () ->
- {
- buffer.release();
- _accumulator.close();
- }));
- }
+ if (_bufferedContentSink != null && !_lastWritten)
+ _bufferedContentSink.write(true, null, _callback);
else
- {
_callback.succeeded();
- }
}
@Override
public void failed(Throwable x)
{
- if (_accumulator != null)
- _accumulator.close();
+ if (_bufferedContentSink != null && !_lastWritten)
+ _bufferedContentSink.write(true, null, Callback.NOOP);
_callback.failed(x);
}
}
-
- private static class CountingByteBufferAccumulator implements AutoCloseable
- {
- private final ByteBufferAccumulator _accumulator;
- private final int _maxSize;
- private int _accumulatedCount;
-
- private CountingByteBufferAccumulator(ByteBufferPool bufferPool, boolean direct, int maxSize)
- {
- if (maxSize <= 0)
- throw new IllegalArgumentException("maxSize must be > 0, was: " + maxSize);
- _maxSize = maxSize;
- _accumulator = new ByteBufferAccumulator(bufferPool, direct);
- }
-
- private boolean copyBuffer(ByteBuffer buffer)
- {
- int remainingCapacity = space();
- if (buffer.remaining() >= remainingCapacity)
- {
- _accumulatedCount += remainingCapacity;
- int end = buffer.position() + remainingCapacity;
- _accumulator.copyBuffer(buffer.duplicate().limit(end));
- buffer.position(end);
- return true;
- }
- else
- {
- _accumulatedCount += buffer.remaining();
- _accumulator.copyBuffer(buffer);
- return false;
- }
- }
-
- private int space()
- {
- return _maxSize - _accumulatedCount;
- }
-
- private RetainableByteBuffer takeRetainableByteBuffer()
- {
- _accumulatedCount = 0;
- return _accumulator.takeRetainableByteBuffer();
- }
-
- @Override
- public void close()
- {
- _accumulatedCount = 0;
- _accumulator.close();
- }
- }
}
diff --git a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/handler/BufferedResponseHandlerTest.java b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/handler/BufferedResponseHandlerTest.java
index 378bce1048a..acebc7ee926 100644
--- a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/handler/BufferedResponseHandlerTest.java
+++ b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/handler/BufferedResponseHandlerTest.java
@@ -87,11 +87,13 @@ public class BufferedResponseHandlerTest
@Test
public void testIncluded() throws Exception
{
+ _test._bufferSize = 2048;
String response = _local.getResponse("GET /ctx/include/path HTTP/1.1\r\nHost: localhost\r\n\r\n");
assertThat(response, containsString(" 200 OK"));
assertThat(response, containsString("Write: 0"));
assertThat(response, containsString("Write: 9"));
assertThat(response, containsString("Written: true"));
+ assertThat(response, containsString("Content-Length: "));
}
@Test
@@ -124,6 +126,7 @@ public class BufferedResponseHandlerTest
public void testFlushed() throws Exception
{
_test._flush = true;
+ _test._bufferSize = 2048;
String response = _local.getResponse("GET /ctx/include/path HTTP/1.1\r\nHost: localhost\r\n\r\n");
assertThat(response, containsString(" 200 OK"));
assertThat(response, containsString("Write: 0"));
@@ -134,6 +137,7 @@ public class BufferedResponseHandlerTest
@Test
public void testBufferSizeSmall() throws Exception
{
+ _test._aggregationSize = 16;
_test._bufferSize = 16;
String response = _local.getResponse("GET /ctx/include/path HTTP/1.1\r\nHost: localhost\r\n\r\n");
assertThat(response, containsString(" 200 OK"));
@@ -187,6 +191,7 @@ public class BufferedResponseHandlerTest
public void testReset() throws Exception
{
_test._reset = true;
+ _test._bufferSize = 2048;
String response = _local.getResponse("GET /ctx/include/path HTTP/1.1\r\nHost: localhost\r\n\r\n");
assertThat(response, containsString(" 200 OK"));
assertThat(response, containsString("Write: 0"));
@@ -197,6 +202,7 @@ public class BufferedResponseHandlerTest
public static class TestHandler extends Handler.Abstract
{
+ int _aggregationSize = -1;
int _bufferSize = -1;
String _mimeType;
byte[] _content = new byte[128];
@@ -217,6 +223,8 @@ public class BufferedResponseHandlerTest
if (_bufferSize > 0)
request.setAttribute(BufferedResponseHandler.BUFFER_SIZE_ATTRIBUTE_NAME, _bufferSize);
+ if (_aggregationSize > 0)
+ request.setAttribute(BufferedResponseHandler.MAX_AGGREGATION_SIZE_ATTRIBUTE_NAME, _aggregationSize);
if (_mimeType != null)
response.getHeaders().put(HttpHeader.CONTENT_TYPE, _mimeType);
@@ -231,8 +239,8 @@ public class BufferedResponseHandlerTest
outputStream.flush();
}
response.getHeaders().add("Written", "true");
- callback.succeeded();
}
+ callback.succeeded();
return true;
}