diff --git a/jetty-core/jetty-io/pom.xml b/jetty-core/jetty-io/pom.xml index 4437a759321..e57f528229a 100644 --- a/jetty-core/jetty-io/pom.xml +++ b/jetty-core/jetty-io/pom.xml @@ -49,5 +49,10 @@ jetty-test-helper test + + org.awaitility + awaitility + test + diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAggregator.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAggregator.java new file mode 100644 index 00000000000..3d929d2d5f2 --- /dev/null +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAggregator.java @@ -0,0 +1,138 @@ +// +// ======================================================================== +// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.io; + +import java.nio.ByteBuffer; + +import org.eclipse.jetty.util.BufferUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + *

Aggregates data into a single ByteBuffer of a specified maximum size.

+ *

The buffer automatically grows as data is written to it, up until it reaches the specified maximum size. + * Once the buffer is full, the aggregator will not aggregate any more bytes until its buffer is taken out, + * after which a new aggregate/take buffer cycle can start.

+ *

The buffers are taken from the supplied {@link ByteBufferPool} or freshly allocated if one is not supplied.

+ */ +public class ByteBufferAggregator +{ + private static final Logger LOG = LoggerFactory.getLogger(ByteBufferAggregator.class); + + private final ByteBufferPool _bufferPool; + private final boolean _direct; + private final int _maxSize; + private RetainableByteBuffer _retainableByteBuffer; + private int _aggregatedSize; + private int _currentSize; + + /** + * Creates a ByteBuffer aggregator. + * @param bufferPool The {@link ByteBufferPool} from which to acquire the buffers + * @param direct whether to get direct buffers + * @param startSize the starting size of the buffer + * @param maxSize the maximum size of the buffer which must be greater than {@code startSize} + */ + public ByteBufferAggregator(ByteBufferPool bufferPool, boolean direct, int startSize, int maxSize) + { + if (maxSize <= 0) + throw new IllegalArgumentException("maxSize must be > 0, was: " + maxSize); + if (startSize <= 0) + throw new IllegalArgumentException("startSize must be > 0, was: " + startSize); + if (startSize > maxSize) + throw new IllegalArgumentException("maxSize (" + maxSize + ") must be >= startSize (" + startSize + ")"); + _bufferPool = (bufferPool == null) ? new ByteBufferPool.NonPooling() : bufferPool; + _direct = direct; + _maxSize = maxSize; + _currentSize = startSize; + } + + /** + * Aggregates the given ByteBuffer. This copies bytes up to the specified maximum size, at which + * time this method returns {@code true} and {@link #takeRetainableByteBuffer()} must be called + * for this method to accept aggregating again. + * @param buffer the buffer to copy into this aggregator; its position is updated according to + * the number of aggregated bytes + * @return true if the aggregator's buffer is full and should be taken, false otherwise + */ + public boolean aggregate(ByteBuffer buffer) + { + tryExpandBufferCapacity(buffer.remaining()); + if (_retainableByteBuffer == null) + { + _retainableByteBuffer = _bufferPool.acquire(_currentSize, _direct); + BufferUtil.flipToFill(_retainableByteBuffer.getByteBuffer()); + } + int copySize = Math.min(_currentSize - _aggregatedSize, buffer.remaining()); + + ByteBuffer byteBuffer = _retainableByteBuffer.getByteBuffer(); + byteBuffer.put(byteBuffer.position(), buffer, buffer.position(), copySize); + byteBuffer.position(byteBuffer.position() + copySize); + buffer.position(buffer.position() + copySize); + _aggregatedSize += copySize; + return _aggregatedSize == _maxSize; + } + + private void tryExpandBufferCapacity(int remaining) + { + if (LOG.isDebugEnabled()) + LOG.debug("tryExpandBufferCapacity remaining: {} _currentSize: {} _accumulatedSize={}", remaining, _currentSize, _aggregatedSize); + if (_currentSize == _maxSize) + return; + int capacityLeft = _currentSize - _aggregatedSize; + if (remaining <= capacityLeft) + return; + int need = remaining - capacityLeft; + _currentSize = Math.min(_maxSize, ceilToNextPowerOfTwo(_currentSize + need)); + + if (_retainableByteBuffer != null) + { + BufferUtil.flipToFlush(_retainableByteBuffer.getByteBuffer(), 0); + RetainableByteBuffer newBuffer = _bufferPool.acquire(_currentSize, _direct); + BufferUtil.flipToFill(newBuffer.getByteBuffer()); + newBuffer.getByteBuffer().put(_retainableByteBuffer.getByteBuffer()); + _retainableByteBuffer.release(); + _retainableByteBuffer = newBuffer; + } + } + + private static int ceilToNextPowerOfTwo(int val) + { + int result = 1 << (Integer.SIZE - Integer.numberOfLeadingZeros(val - 1)); + return result > 0 ? result : Integer.MAX_VALUE; + } + + /** + * Takes the buffer out of the aggregator. Once the buffer has been taken out, + * the aggregator resets itself and a new buffer will be acquired from the pool + * during the next {@link #aggregate(ByteBuffer)} call. + * @return the aggregated buffer, or null if nothing has been buffered yet + */ + public RetainableByteBuffer takeRetainableByteBuffer() + { + if (_retainableByteBuffer == null) + return null; + BufferUtil.flipToFlush(_retainableByteBuffer.getByteBuffer(), 0); + RetainableByteBuffer result = _retainableByteBuffer; + _retainableByteBuffer = null; + _aggregatedSize = 0; + return result; + } + + @Override + public String toString() + { + return "%s@%x{a=%d c=%d m=%d b=%s}".formatted(getClass().getSimpleName(), hashCode(), _aggregatedSize, _currentSize, _maxSize, _retainableByteBuffer); + } +} diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Content.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Content.java index 09ae16bdb94..02e86b47e53 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Content.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Content.java @@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Flow; import java.util.function.Consumer; +import org.eclipse.jetty.io.content.BufferedContentSink; import org.eclipse.jetty.io.content.ContentSinkOutputStream; import org.eclipse.jetty.io.content.ContentSinkSubscriber; import org.eclipse.jetty.io.content.ContentSourceInputStream; @@ -411,6 +412,22 @@ public class Content */ public interface Sink { + /** + *

Wraps the given content sink with a buffering sink.

+ * + * @param sink the sink to write to + * @param bufferPool the {@link ByteBufferPool} to use + * @param direct true to use direct buffers, false to use heap buffers + * @param maxAggregationSize the maximum size that can be buffered in a single write; + * any size above this threshold triggers a buffer flush + * @param maxBufferSize the maximum size of the buffer + * @return a Sink that writes to the given content sink + */ + static Sink asBuffered(Sink sink, ByteBufferPool bufferPool, boolean direct, int maxAggregationSize, int maxBufferSize) + { + return new BufferedContentSink(sink, bufferPool, direct, maxAggregationSize, maxBufferSize); + } + /** *

Wraps the given content sink with an {@link OutputStream}.

* diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/BufferedContentSink.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/BufferedContentSink.java new file mode 100644 index 00000000000..d835716e7b1 --- /dev/null +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/BufferedContentSink.java @@ -0,0 +1,277 @@ +// +// ======================================================================== +// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.io.content; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritePendingException; + +import org.eclipse.jetty.io.ByteBufferAggregator; +import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.Content; +import org.eclipse.jetty.io.RetainableByteBuffer; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.IteratingCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + *

A {@link Content.Sink} backed by another {@link Content.Sink}. + * Any content written to this {@link Content.Sink} is buffered, + * then written to the delegate using + * {@link Content.Sink#write(boolean, ByteBuffer, Callback)}.

+ */ +public class BufferedContentSink implements Content.Sink +{ + private static final Logger LOG = LoggerFactory.getLogger(BufferedContentSink.class); + + private static final int START_BUFFER_SIZE = 1024; + + private final Content.Sink _delegate; + private final ByteBufferPool _bufferPool; + private final boolean _direct; + private final int _maxBufferSize; + private final int _maxAggregationSize; + private final Flusher _flusher; + private ByteBufferAggregator _aggregator; + private boolean _firstWrite = true; + private boolean _lastWritten; + + public BufferedContentSink(Content.Sink delegate, ByteBufferPool bufferPool, boolean direct, int maxAggregationSize, int maxBufferSize) + { + if (maxBufferSize <= 0) + throw new IllegalArgumentException("maxBufferSize must be > 0, was: " + maxBufferSize); + if (maxAggregationSize <= 0) + throw new IllegalArgumentException("maxAggregationSize must be > 0, was: " + maxAggregationSize); + if (maxBufferSize < maxAggregationSize) + throw new IllegalArgumentException("maxBufferSize (" + maxBufferSize + ") must be >= maxAggregationSize (" + maxAggregationSize + ")"); + _delegate = delegate; + _bufferPool = (bufferPool == null) ? new ByteBufferPool.NonPooling() : bufferPool; + _direct = direct; + _maxBufferSize = maxBufferSize; + _maxAggregationSize = maxAggregationSize; + _flusher = new Flusher(delegate); + } + + @Override + public void write(boolean last, ByteBuffer byteBuffer, Callback callback) + { + if (LOG.isDebugEnabled()) + LOG.debug("writing last={} {}", last, BufferUtil.toDetailString(byteBuffer)); + + if (_lastWritten) + { + callback.failed(new IOException("complete")); + return; + } + _lastWritten = last; + if (_firstWrite) + { + _firstWrite = false; + if (last) + { + // No need to buffer if this is both the first and the last write. + _delegate.write(true, byteBuffer, callback); + return; + } + } + + ByteBuffer current = byteBuffer != null ? byteBuffer : BufferUtil.EMPTY_BUFFER; + if (current.remaining() <= _maxAggregationSize) + { + // current buffer can be aggregated + if (_aggregator == null) + _aggregator = new ByteBufferAggregator(_bufferPool, _direct, Math.min(START_BUFFER_SIZE, _maxBufferSize), _maxBufferSize); + aggregateAndFlush(last, current, callback); + } + else + { + // current buffer is greater than the max aggregation size + flush(last, current, callback); + } + } + + /** + * Flushes the aggregated buffer if something was aggregated, then flushes the + * given buffer, bypassing the aggregator. + */ + private void flush(boolean last, ByteBuffer currentBuffer, Callback callback) + { + if (LOG.isDebugEnabled()) + LOG.debug("given buffer is greater than _maxBufferSize"); + + RetainableByteBuffer aggregatedBuffer = _aggregator == null ? null : _aggregator.takeRetainableByteBuffer(); + if (aggregatedBuffer == null) + { + if (LOG.isDebugEnabled()) + LOG.debug("nothing aggregated, flushing current buffer {}", currentBuffer); + _flusher.offer(last, currentBuffer, callback); + } + else + { + if (LOG.isDebugEnabled()) + LOG.debug("flushing aggregated buffer {}", aggregatedBuffer); + _flusher.offer(false, aggregatedBuffer.getByteBuffer(), new Callback.Nested(Callback.from(aggregatedBuffer::release)) + { + @Override + public void succeeded() + { + super.succeeded(); + if (LOG.isDebugEnabled()) + LOG.debug("succeeded writing aggregated buffer, flushing current buffer {}", currentBuffer); + _flusher.offer(last, currentBuffer, callback); + } + + @Override + public void failed(Throwable x) + { + if (LOG.isDebugEnabled()) + LOG.debug("failure writing aggregated buffer", x); + super.failed(x); + callback.failed(x); + } + }); + } + } + + /** + * Aggregates the given buffer, flushing the aggregated buffer if necessary. + */ + private void aggregateAndFlush(boolean last, ByteBuffer currentBuffer, Callback callback) + { + boolean full = _aggregator.aggregate(currentBuffer); + boolean complete = last && !currentBuffer.hasRemaining(); + if (LOG.isDebugEnabled()) + LOG.debug("aggregated current buffer, full={}, complete={}, bytes left={}, aggregator={}", full, complete, currentBuffer.remaining(), _aggregator); + if (complete) + { + RetainableByteBuffer aggregatedBuffer = _aggregator.takeRetainableByteBuffer(); + if (aggregatedBuffer != null) + { + if (LOG.isDebugEnabled()) + LOG.debug("complete; writing aggregated buffer as the last one: {} bytes", aggregatedBuffer.remaining()); + _flusher.offer(true, aggregatedBuffer.getByteBuffer(), Callback.from(callback, aggregatedBuffer::release)); + } + else + { + if (LOG.isDebugEnabled()) + LOG.debug("complete; no aggregated buffer, writing last empty buffer"); + _flusher.offer(true, BufferUtil.EMPTY_BUFFER, callback); + } + } + else if (full) + { + RetainableByteBuffer aggregatedBuffer = _aggregator.takeRetainableByteBuffer(); + if (LOG.isDebugEnabled()) + LOG.debug("writing aggregated buffer: {} bytes", aggregatedBuffer.remaining()); + _flusher.offer(false, aggregatedBuffer.getByteBuffer(), new Callback.Nested(Callback.from(aggregatedBuffer::release)) + { + @Override + public void succeeded() + { + super.succeeded(); + if (LOG.isDebugEnabled()) + LOG.debug("written aggregated buffer, writing remaining of current: {} bytes{}", currentBuffer.remaining(), (last ? " (last write)" : "")); + if (last) + _flusher.offer(true, currentBuffer, callback); + else + aggregateAndFlush(false, currentBuffer, callback); + } + + @Override + public void failed(Throwable x) + { + if (LOG.isDebugEnabled()) + LOG.debug("failure writing aggregated buffer", x); + super.failed(x); + callback.failed(x); + } + }); + } + else + { + if (LOG.isDebugEnabled()) + LOG.debug("buffer fully aggregated, delaying writing - aggregator: {}", _aggregator); + _flusher.offer(callback); + } + } + + private static class Flusher extends IteratingCallback + { + private static final ByteBuffer COMPLETE_CALLBACK = BufferUtil.allocate(0); + + private final Content.Sink _sink; + private boolean _last; + private ByteBuffer _buffer; + private Callback _callback; + private boolean _lastWritten; + + Flusher(Content.Sink sink) + { + _sink = sink; + } + + void offer(Callback callback) + { + offer(false, COMPLETE_CALLBACK, callback); + } + + void offer(boolean last, ByteBuffer byteBuffer, Callback callback) + { + if (_callback != null) + throw new WritePendingException(); + _last = last; + _buffer = byteBuffer; + _callback = callback; + iterate(); + } + + @Override + protected Action process() + { + if (_lastWritten) + return Action.SUCCEEDED; + if (_callback == null) + return Action.IDLE; + if (_buffer != COMPLETE_CALLBACK) + { + _lastWritten = _last; + _sink.write(_last, _buffer, this); + } + else + { + succeeded(); + } + return Action.SCHEDULED; + } + + @Override + public void succeeded() + { + _buffer = null; + Callback callback = _callback; + _callback = null; + callback.succeeded(); + super.succeeded(); + } + + @Override + protected void onCompleteFailure(Throwable cause) + { + _buffer = null; + _callback.failed(cause); + } + } +} diff --git a/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/BufferedContentSinkTest.java b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/BufferedContentSinkTest.java new file mode 100644 index 00000000000..2d1b2bbaf81 --- /dev/null +++ b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/BufferedContentSinkTest.java @@ -0,0 +1,553 @@ +// +// ======================================================================== +// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.io; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Objects; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.eclipse.jetty.io.content.AsyncContent; +import org.eclipse.jetty.io.content.BufferedContentSink; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.Callback; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class BufferedContentSinkTest +{ + private ArrayByteBufferPool.Tracking _bufferPool; + + @BeforeEach + public void setUp() + { + _bufferPool = new ArrayByteBufferPool.Tracking(); + } + + @AfterEach + public void tearDown() + { + assertThat("Leaks: " + _bufferPool.dumpLeaks(), _bufferPool.getLeaks().size(), is(0)); + } + + @Test + public void testConstructor() + { + assertThrows(IllegalArgumentException.class, () -> new BufferedContentSink(new AsyncContent(), _bufferPool, true, 1, 0)); + assertThrows(IllegalArgumentException.class, () -> new BufferedContentSink(new AsyncContent(), _bufferPool, true, 1, -1)); + assertThrows(IllegalArgumentException.class, () -> new BufferedContentSink(new AsyncContent(), _bufferPool, true, 0, 1)); + assertThrows(IllegalArgumentException.class, () -> new BufferedContentSink(new AsyncContent(), _bufferPool, true, -1, 1)); + assertThrows(IllegalArgumentException.class, () -> new BufferedContentSink(new AsyncContent(), _bufferPool, true, 0, 0)); + assertThrows(IllegalArgumentException.class, () -> new BufferedContentSink(new AsyncContent(), _bufferPool, true, -1, -1)); + assertThrows(IllegalArgumentException.class, () -> new BufferedContentSink(new AsyncContent(), _bufferPool, true, 2, 1)); + } + + @Test + public void testWriteInvokesDemandCallback() throws Exception + { + try (AsyncContent async = new AsyncContent()) + { + BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, 4096, 4096); + + CountDownLatch latch = new CountDownLatch(1); + async.demand(latch::countDown); + assertFalse(latch.await(250, TimeUnit.MILLISECONDS)); + + buffered.write(true, UTF_8.encode("one"), Callback.NOOP); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + + Content.Chunk chunk = async.read(); + assertNotNull(chunk); + chunk.release(); + } + } + + @Test + public void testChunkReleaseSucceedsWriteCallback() + { + try (AsyncContent async = new AsyncContent()) + { + BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, 4096, 4096); + + AtomicInteger successCounter = new AtomicInteger(); + AtomicReference failureRef = new AtomicReference<>(); + + buffered.write(true, ByteBuffer.wrap(new byte[1]), Callback.from(successCounter::incrementAndGet, failureRef::set)); + + Content.Chunk chunk = async.read(); + assertThat(successCounter.get(), is(0)); + chunk.retain(); + assertThat(chunk.release(), is(false)); + assertThat(successCounter.get(), is(0)); + assertThat(chunk.release(), is(true)); + assertThat(successCounter.get(), is(1)); + assertThat(failureRef.get(), is(nullValue())); + } + } + + @Test + public void testEmptyChunkReadSucceedsWriteCallback() + { + try (AsyncContent async = new AsyncContent()) + { + BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, 4096, 4096); + + AtomicInteger successCounter = new AtomicInteger(); + AtomicReference failureRef = new AtomicReference<>(); + + buffered.write(true, ByteBuffer.wrap(new byte[0]), Callback.from(successCounter::incrementAndGet, failureRef::set)); + + Content.Chunk chunk = async.read(); + assertThat(successCounter.get(), is(1)); + assertThat(chunk.isLast(), is(true)); + assertThat(chunk.hasRemaining(), is(false)); + assertThat(chunk.release(), is(true)); + assertThat(successCounter.get(), is(1)); + assertThat(failureRef.get(), is(nullValue())); + } + } + + @Test + public void testWriteAfterWriteLast() + { + try (AsyncContent async = new AsyncContent()) + { + BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, 4096, 4096); + + AtomicInteger successCounter = new AtomicInteger(); + AtomicReference failureRef = new AtomicReference<>(); + + buffered.write(true, ByteBuffer.wrap(new byte[0]), Callback.from(successCounter::incrementAndGet, failureRef::set)); + + Content.Chunk chunk = async.read(); + assertThat(chunk.isLast(), is(true)); + assertThat(chunk.release(), is(true)); + + assertThat(successCounter.get(), is(1)); + assertThat(failureRef.get(), is(nullValue())); + + buffered.write(false, ByteBuffer.wrap(new byte[0]), Callback.from(successCounter::incrementAndGet, failureRef::set)); + assertThat(successCounter.get(), is(1)); + assertThat(failureRef.get(), instanceOf(IOException.class)); + } + } + + @Test + public void testLargeBuffer() + { + try (AsyncContent async = new AsyncContent()) + { + BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, 4096, 4096); + + buffered.write(false, ByteBuffer.wrap("one ".getBytes(UTF_8)), Callback.NOOP); + Content.Chunk chunk = async.read(); + assertThat(chunk, nullValue()); + + buffered.write(false, ByteBuffer.wrap("two".getBytes(UTF_8)), Callback.NOOP); + chunk = async.read(); + assertThat(chunk, nullValue()); + + buffered.write(true, null, Callback.NOOP); + chunk = async.read(); + assertThat(chunk.isLast(), is(true)); + assertThat(BufferUtil.toString(chunk.getByteBuffer(), UTF_8), is("one two")); + assertThat(chunk.release(), is(true)); + } + } + + @Test + public void testSmallBuffer() + { + int maxBufferSize = 16; + byte[] input1 = new byte[1024]; + Arrays.fill(input1, (byte)'1'); + byte[] input2 = new byte[1023]; + Arrays.fill(input2, (byte)'2'); + + ByteBuffer accumulatingBuffer = BufferUtil.allocate(4096); + BufferUtil.flipToFill(accumulatingBuffer); + + try (AsyncContent async = new AsyncContent()) + { + BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, maxBufferSize, maxBufferSize); + + buffered.write(false, ByteBuffer.wrap(input1), Callback.from(() -> + buffered.write(true, ByteBuffer.wrap(input2), Callback.NOOP))); + + int loopCount = 0; + while (true) + { + loopCount++; + Content.Chunk chunk = async.read(); + assertThat(chunk, notNullValue()); + accumulatingBuffer.put(chunk.getByteBuffer()); + assertThat(chunk.release(), is(true)); + if (chunk.isLast()) + break; + } + assertThat(loopCount, is(2)); + + BufferUtil.flipToFlush(accumulatingBuffer, 0); + assertThat(accumulatingBuffer.remaining(), is(input1.length + input2.length)); + for (byte b : input1) + { + assertThat(accumulatingBuffer.get(), is(b)); + } + for (byte b : input2) + { + assertThat(accumulatingBuffer.get(), is(b)); + } + } + } + + @Test + public void testMaxAggregationSizeExceeded() + { + int maxBufferSize = 1024; + int maxAggregationSize = 128; + byte[] input1 = new byte[512]; + Arrays.fill(input1, (byte)'1'); + byte[] input2 = new byte[128]; + Arrays.fill(input2, (byte)'2'); + + ByteBuffer accumulatingBuffer = BufferUtil.allocate(4096); + BufferUtil.flipToFill(accumulatingBuffer); + + try (AsyncContent async = new AsyncContent()) + { + BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, maxAggregationSize, maxBufferSize); + + buffered.write(false, ByteBuffer.wrap(input1), Callback.from(() -> + buffered.write(true, ByteBuffer.wrap(input2), Callback.NOOP))); + + Content.Chunk chunk = async.read(); + assertThat(chunk, notNullValue()); + assertThat(chunk.remaining(), is(512)); + accumulatingBuffer.put(chunk.getByteBuffer()); + assertThat(chunk.release(), is(true)); + assertThat(chunk.isLast(), is(false)); + + chunk = async.read(); + assertThat(chunk, notNullValue()); + assertThat(chunk.remaining(), is(128)); + accumulatingBuffer.put(chunk.getByteBuffer()); + assertThat(chunk.release(), is(true)); + assertThat(chunk.isLast(), is(true)); + + BufferUtil.flipToFlush(accumulatingBuffer, 0); + assertThat(accumulatingBuffer.remaining(), is(input1.length + input2.length)); + for (byte b : input1) + { + assertThat(accumulatingBuffer.get(), is(b)); + } + for (byte b : input2) + { + assertThat(accumulatingBuffer.get(), is(b)); + } + } + } + + @Test + public void testMaxAggregationSizeExceededAfterBuffering() + { + int maxBufferSize = 1024; + int maxAggregationSize = 128; + byte[] input1 = new byte[128]; + Arrays.fill(input1, (byte)'1'); + byte[] input2 = new byte[512]; + Arrays.fill(input2, (byte)'2'); + + ByteBuffer accumulatingBuffer = BufferUtil.allocate(4096); + BufferUtil.flipToFill(accumulatingBuffer); + + try (AsyncContent async = new AsyncContent()) + { + BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, maxAggregationSize, maxBufferSize); + + buffered.write(false, ByteBuffer.wrap(input1), Callback.from(() -> + buffered.write(true, ByteBuffer.wrap(input2), Callback.NOOP))); + + Content.Chunk chunk = async.read(); + assertThat(chunk, notNullValue()); + assertThat(chunk.remaining(), is(128)); + accumulatingBuffer.put(chunk.getByteBuffer()); + assertThat(chunk.release(), is(true)); + assertThat(chunk.isLast(), is(false)); + + chunk = async.read(); + assertThat(chunk, notNullValue()); + assertThat(chunk.remaining(), is(512)); + accumulatingBuffer.put(chunk.getByteBuffer()); + assertThat(chunk.release(), is(true)); + assertThat(chunk.isLast(), is(true)); + + BufferUtil.flipToFlush(accumulatingBuffer, 0); + assertThat(accumulatingBuffer.remaining(), is(input1.length + input2.length)); + for (byte b : input1) + { + assertThat(accumulatingBuffer.get(), is(b)); + } + for (byte b : input2) + { + assertThat(accumulatingBuffer.get(), is(b)); + } + } + } + + @Test + public void testMaxAggregationSizeRespected() + { + int maxBufferSize = 1024; + int maxAggregationSize = 128; + byte[] input1 = new byte[128]; + Arrays.fill(input1, (byte)'1'); + byte[] input2 = new byte[128]; + Arrays.fill(input2, (byte)'2'); + + ByteBuffer accumulatingBuffer = BufferUtil.allocate(4096); + BufferUtil.flipToFill(accumulatingBuffer); + + try (AsyncContent async = new AsyncContent()) + { + BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, maxAggregationSize, maxBufferSize); + + buffered.write(false, ByteBuffer.wrap(input1), Callback.from(() -> + buffered.write(true, ByteBuffer.wrap(input2), Callback.NOOP))); + + Content.Chunk chunk = async.read(); + assertThat(chunk, notNullValue()); + assertThat(chunk.remaining(), is(256)); + accumulatingBuffer.put(chunk.getByteBuffer()); + assertThat(chunk.release(), is(true)); + assertThat(chunk.isLast(), is(true)); + + BufferUtil.flipToFlush(accumulatingBuffer, 0); + assertThat(accumulatingBuffer.remaining(), is(input1.length + input2.length)); + for (byte b : input1) + { + assertThat(accumulatingBuffer.get(), is(b)); + } + for (byte b : input2) + { + assertThat(accumulatingBuffer.get(), is(b)); + } + } + } + + @Test + public void testBufferGrowth() + { + byte[] input1 = new byte[4000]; + Arrays.fill(input1, (byte)'1'); + byte[] input2 = new byte[4000]; + Arrays.fill(input2, (byte)'2'); + byte[] input3 = new byte[2000]; + Arrays.fill(input3, (byte)'3'); + + ByteBuffer accumulatingBuffer = BufferUtil.allocate(16384); + BufferUtil.flipToFill(accumulatingBuffer); + + try (AsyncContent async = new AsyncContent()) + { + BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, 4096, 4096); + + buffered.write(false, ByteBuffer.wrap(input1), Callback.from(() -> + buffered.write(false, ByteBuffer.wrap(input2), Callback.from(() -> + buffered.write(true, ByteBuffer.wrap(input3), Callback.NOOP))))); + + // We expect 3 buffer flushes: 4096b + 4096b + 1808b == 10_000b. + Content.Chunk chunk = async.read(); + assertThat(chunk, notNullValue()); + assertThat(chunk.remaining(), is(4096)); + accumulatingBuffer.put(chunk.getByteBuffer()); + assertThat(chunk.release(), is(true)); + assertThat(chunk.isLast(), is(false)); + + chunk = async.read(); + assertThat(chunk, notNullValue()); + assertThat(chunk.remaining(), is(4096)); + accumulatingBuffer.put(chunk.getByteBuffer()); + assertThat(chunk.release(), is(true)); + assertThat(chunk.isLast(), is(false)); + + chunk = async.read(); + assertThat(chunk, notNullValue()); + assertThat(chunk.remaining(), is(1808)); + accumulatingBuffer.put(chunk.getByteBuffer()); + assertThat(chunk.release(), is(true)); + assertThat(chunk.isLast(), is(true)); + + BufferUtil.flipToFlush(accumulatingBuffer, 0); + assertThat(accumulatingBuffer.remaining(), is(input1.length + input2.length + input3.length)); + for (byte b : input1) + { + assertThat(accumulatingBuffer.get(), is(b)); + } + for (byte b : input2) + { + assertThat(accumulatingBuffer.get(), is(b)); + } + for (byte b : input3) + { + assertThat(accumulatingBuffer.get(), is(b)); + } + } + } + + @Test + public void testByteByByteRecursion() throws Exception + { + try (AsyncContent async = new AsyncContent()) + { + BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, 4096, 4096); + AtomicInteger count = new AtomicInteger(8192); + CountDownLatch complete = new CountDownLatch(1); + Callback callback = new Callback() + { + @Override + public void succeeded() + { + int c = count.decrementAndGet(); + ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[]{(byte)c}); + if (c >= 0) + buffered.write(c == 0, byteBuffer, this); + else + complete.countDown(); + } + }; + + callback.succeeded(); + + Content.Chunk read = async.read(); + assertThat(read.isLast(), is(false)); + assertThat(read.remaining(), is(4096)); + assertThat(read.release(), is(true)); + + read = async.read(); + assertThat(read.isLast(), is(true)); + assertThat(read.remaining(), is(4096)); + assertThat(read.release(), is(true)); + + assertTrue(complete.await(5, TimeUnit.SECONDS)); + assertThat(count.get(), is(-1)); + } + } + + @Test + public void testByteByByteAsync() throws Exception + { + try (AsyncContent async = new AsyncContent()) + { + BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, 1024, 1024); + AtomicInteger count = new AtomicInteger(2048); + CountDownLatch complete = new CountDownLatch(1); + Callback callback = new Callback() + { + @Override + public void succeeded() + { + int c = count.decrementAndGet(); + if (c >= 0) + { + Callback cb = this; + new Thread(() -> + { + ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[]{(byte)c}); + buffered.write(c == 0, byteBuffer, cb); + }).start(); + } + else + { + complete.countDown(); + } + } + }; + + callback.succeeded(); + + Content.Chunk read = await().atMost(5, TimeUnit.SECONDS).until(async::read, Objects::nonNull); + assertThat(read.isLast(), is(false)); + assertThat(read.remaining(), is(1024)); + assertThat(read.release(), is(true)); + + read = await().atMost(5, TimeUnit.SECONDS).until(async::read, Objects::nonNull); + assertThat(read.isLast(), is(true)); + assertThat(read.remaining(), is(1024)); + assertThat(read.release(), is(true)); + + assertTrue(complete.await(5, TimeUnit.SECONDS)); + assertThat(count.get(), is(-1)); + } + } + + @Test + public void testSmallThenLargeWritesRecursion() throws Exception + { + try (AsyncContent async = new AsyncContent()) + { + BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, 1, 4096); + AtomicInteger count = new AtomicInteger(8192); + CountDownLatch complete = new CountDownLatch(1); + Callback callback = new Callback() + { + @Override + public void succeeded() + { + int c = count.decrementAndGet(); + ByteBuffer byteBuffer = (c % 2 == 0) ? ByteBuffer.wrap(new byte[512]) : ByteBuffer.wrap(new byte[]{(byte)c}); + if (c >= 0) + buffered.write(c == 0, byteBuffer, this); + else + complete.countDown(); + } + }; + + callback.succeeded(); + + for (int i = 0; i < 4096; i++) + { + Content.Chunk read = async.read(); + assertThat(read.isLast(), is(false)); + assertThat(read.remaining(), is(1)); + assertThat(read.release(), is(true)); + + read = async.read(); + assertThat(read.isLast(), is(i == 4095)); + assertThat(read.remaining(), is(512)); + assertThat(read.release(), is(true)); + } + + assertTrue(complete.await(5, TimeUnit.SECONDS)); + assertThat(count.get(), is(-1)); + } + } +} diff --git a/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ByteBufferAggregatorTest.java b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ByteBufferAggregatorTest.java new file mode 100644 index 00000000000..80509a36452 --- /dev/null +++ b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ByteBufferAggregatorTest.java @@ -0,0 +1,88 @@ +// +// ======================================================================== +// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.io; + +import java.nio.ByteBuffer; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class ByteBufferAggregatorTest +{ + private ArrayByteBufferPool.Tracking bufferPool; + + @BeforeEach + public void before() + { + bufferPool = new ArrayByteBufferPool.Tracking(); + } + + @AfterEach + public void tearDown() + { + assertThat("Leaks: " + bufferPool.dumpLeaks(), bufferPool.getLeaks().size(), is(0)); + } + + @Test + public void testConstructor() + { + assertThrows(IllegalArgumentException.class, () -> new ByteBufferAggregator(bufferPool, true, 0, 0)); + assertThrows(IllegalArgumentException.class, () -> new ByteBufferAggregator(bufferPool, true, 0, 1)); + assertThrows(IllegalArgumentException.class, () -> new ByteBufferAggregator(bufferPool, true, 1, 0)); + assertThrows(IllegalArgumentException.class, () -> new ByteBufferAggregator(bufferPool, true, 0, -1)); + assertThrows(IllegalArgumentException.class, () -> new ByteBufferAggregator(bufferPool, true, -1, 0)); + assertThrows(IllegalArgumentException.class, () -> new ByteBufferAggregator(bufferPool, true, 2, 1)); + } + + @Test + public void testFullInSingleShot() + { + ByteBufferAggregator aggregator = new ByteBufferAggregator(bufferPool, true, 1, 16); + + ByteBuffer byteBuffer1 = ByteBuffer.wrap(new byte[16]); + assertThat(aggregator.aggregate(byteBuffer1), is(true)); + assertThat(byteBuffer1.remaining(), is(0)); + + ByteBuffer byteBuffer2 = ByteBuffer.wrap(new byte[16]); + assertThat(aggregator.aggregate(byteBuffer2), is(true)); + assertThat(byteBuffer2.remaining(), is(16)); + + RetainableByteBuffer retainableByteBuffer = aggregator.takeRetainableByteBuffer(); + assertThat(retainableByteBuffer.getByteBuffer().remaining(), is(16)); + assertThat(retainableByteBuffer.release(), is(true)); + } + + @Test + public void testFullInMultipleShots() + { + ByteBufferAggregator aggregator = new ByteBufferAggregator(bufferPool, true, 1, 16); + + ByteBuffer byteBuffer1 = ByteBuffer.wrap(new byte[15]); + assertThat(aggregator.aggregate(byteBuffer1), is(false)); + assertThat(byteBuffer1.remaining(), is(0)); + + ByteBuffer byteBuffer2 = ByteBuffer.wrap(new byte[16]); + assertThat(aggregator.aggregate(byteBuffer2), is(true)); + assertThat(byteBuffer2.remaining(), is(15)); + + RetainableByteBuffer retainableByteBuffer = aggregator.takeRetainableByteBuffer(); + assertThat(retainableByteBuffer.getByteBuffer().remaining(), is(16)); + assertThat(retainableByteBuffer.release(), is(true)); + } +} diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java index 4239026e1d9..470e3d68173 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java @@ -13,6 +13,7 @@ package org.eclipse.jetty.server; +import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.ListIterator; import java.util.concurrent.CompletableFuture; @@ -30,6 +31,7 @@ import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.Trailers; +import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.Content; import org.eclipse.jetty.io.QuietException; import org.eclipse.jetty.server.handler.ErrorHandler; @@ -530,6 +532,40 @@ public interface Response extends Content.Sink return -1; } + /** + *

Wraps a {@link Response} as a {@link OutputStream} that performs buffering. The necessary + * {@link ByteBufferPool} is taken from the request's connector while the size and direction of the buffer + * is read from the request's {@link HttpConfiguration}.

+ *

This is equivalent to:

+ *

{@code Content.Sink.asOutputStream(Response.asBufferedSink(request, response))}

+ * @param request the request from which to get the buffering sink's settings + * @param response the response to wrap + * @return a buffering {@link OutputStream} + */ + static OutputStream asBufferedOutputStream(Request request, Response response) + { + return Content.Sink.asOutputStream(Response.asBufferedSink(request, response)); + } + + /** + * Wraps a {@link Response} as a {@link Content.Sink} that performs buffering. The necessary + * {@link ByteBufferPool} is taken from the request's connector while the size, direction of the buffer + * and commit size are read from the request's {@link HttpConfiguration}. + * @param request the request from which to get the buffering sink's settings + * @param response the response to wrap + * @return a buffering {@link Content.Sink} + */ + static Content.Sink asBufferedSink(Request request, Response response) + { + ConnectionMetaData connectionMetaData = request.getConnectionMetaData(); + ByteBufferPool bufferPool = connectionMetaData.getConnector().getByteBufferPool(); + HttpConfiguration httpConfiguration = connectionMetaData.getHttpConfiguration(); + int bufferSize = httpConfiguration.getOutputBufferSize(); + boolean useOutputDirectByteBuffers = httpConfiguration.isUseOutputDirectByteBuffers(); + int outputAggregationSize = httpConfiguration.getOutputAggregationSize(); + return Content.Sink.asBuffered(response, bufferPool, useOutputDirectByteBuffers, outputAggregationSize, bufferSize); + } + class Wrapper implements Response { private final Request _request; diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/BufferedResponseHandler.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/BufferedResponseHandler.java index d2cc992b85f..c2ca86df24b 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/BufferedResponseHandler.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/BufferedResponseHandler.java @@ -19,17 +19,15 @@ import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.MimeTypes; -import org.eclipse.jetty.io.ByteBufferAccumulator; import org.eclipse.jetty.io.ByteBufferPool; -import org.eclipse.jetty.io.RetainableByteBuffer; +import org.eclipse.jetty.io.Content; import org.eclipse.jetty.server.ConnectionMetaData; import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Response; -import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IncludeExclude; -import org.eclipse.jetty.util.IteratingNestedCallback; import org.eclipse.jetty.util.StringUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,14 +45,25 @@ import org.slf4j.LoggerFactory; * decision to buffer or not. *

*

- * Note also that there are no memory limits to the size of the buffer, thus - * this handler can represent an unbounded memory commitment if the content - * generated can also be unbounded. + * Note also that the size of the buffer can be controlled by setting the + * {@link #BUFFER_SIZE_ATTRIBUTE_NAME} request attribute to an integer; + * in the absence of such header, the {@link HttpConfiguration#getOutputBufferSize()} + * config setting is used, while the maximum aggregation size can be controlled + * by setting the {@link #MAX_AGGREGATION_SIZE_ATTRIBUTE_NAME} request attribute to an integer, + * in the absence of such header, the {@link HttpConfiguration#getOutputAggregationSize()} + * config setting is used. *

*/ public class BufferedResponseHandler extends ConditionalHandler.Abstract { + /** + * The name of the request attribute used to control the buffer size of a particular request. + */ public static final String BUFFER_SIZE_ATTRIBUTE_NAME = BufferedResponseHandler.class.getName() + ".buffer-size"; + /** + * The name of the request attribute used to control the max aggregation size of a particular request. + */ + public static final String MAX_AGGREGATION_SIZE_ATTRIBUTE_NAME = BufferedResponseHandler.class.getName() + ".max-aggregation-size"; private static final Logger LOG = LoggerFactory.getLogger(BufferedResponseHandler.class); @@ -159,8 +168,9 @@ public class BufferedResponseHandler extends ConditionalHandler.Abstract private class BufferedResponse extends Response.Wrapper implements Callback { private final Callback _callback; - private CountingByteBufferAccumulator _accumulator; + private Content.Sink _bufferedContentSink; private boolean _firstWrite = true; + private boolean _lastWritten; private BufferedResponse(Request request, Response response, Callback callback) { @@ -173,130 +183,44 @@ public class BufferedResponseHandler extends ConditionalHandler.Abstract { if (_firstWrite) { - if (shouldBuffer(this, last)) - { - ConnectionMetaData connectionMetaData = getRequest().getConnectionMetaData(); - ByteBufferPool bufferPool = connectionMetaData.getConnector().getByteBufferPool(); - boolean useOutputDirectByteBuffers = connectionMetaData.getHttpConfiguration().isUseOutputDirectByteBuffers(); - _accumulator = new CountingByteBufferAccumulator(bufferPool, useOutputDirectByteBuffers, getBufferSize()); - } _firstWrite = false; + if (shouldBuffer(this, last)) + _bufferedContentSink = createBufferedSink(); } - - if (_accumulator != null) - { - ByteBuffer current = byteBuffer != null ? byteBuffer : BufferUtil.EMPTY_BUFFER; - IteratingNestedCallback writer = new IteratingNestedCallback(callback) - { - private boolean complete; - - @Override - protected Action process() - { - if (complete) - return Action.SUCCEEDED; - boolean write = _accumulator.copyBuffer(current); - complete = last && !current.hasRemaining(); - if (write || complete) - { - RetainableByteBuffer buffer = _accumulator.takeRetainableByteBuffer(); - BufferedResponse.super.write(complete, buffer.getByteBuffer(), Callback.from(this, buffer::release)); - return Action.SCHEDULED; - } - return Action.SUCCEEDED; - } - }; - writer.iterate(); - } - else - { - super.write(last, byteBuffer, callback); - } + _lastWritten |= last; + Content.Sink destSink = _bufferedContentSink != null ? _bufferedContentSink : getWrapped(); + destSink.write(last, byteBuffer, callback); } - private int getBufferSize() + private Content.Sink createBufferedSink() { - Object attribute = getRequest().getAttribute(BufferedResponseHandler.BUFFER_SIZE_ATTRIBUTE_NAME); - return attribute instanceof Integer ? (int)attribute : Integer.MAX_VALUE; + Request request = getRequest(); + ConnectionMetaData connectionMetaData = request.getConnectionMetaData(); + ByteBufferPool bufferPool = connectionMetaData.getConnector().getByteBufferPool(); + HttpConfiguration httpConfiguration = connectionMetaData.getHttpConfiguration(); + Object attribute = request.getAttribute(BufferedResponseHandler.BUFFER_SIZE_ATTRIBUTE_NAME); + int bufferSize = attribute instanceof Integer ? (int)attribute : httpConfiguration.getOutputBufferSize(); + attribute = request.getAttribute(BufferedResponseHandler.MAX_AGGREGATION_SIZE_ATTRIBUTE_NAME); + int maxAggregationSize = attribute instanceof Integer ? (int)attribute : httpConfiguration.getOutputAggregationSize(); + boolean direct = httpConfiguration.isUseOutputDirectByteBuffers(); + return Content.Sink.asBuffered(getWrapped(), bufferPool, direct, maxAggregationSize, bufferSize); } @Override public void succeeded() { - // TODO pass all accumulated buffers as an array instead of allocating & copying into a single one. - if (_accumulator != null) - { - RetainableByteBuffer buffer = _accumulator.takeRetainableByteBuffer(); - super.write(true, buffer.getByteBuffer(), Callback.from(_callback, () -> - { - buffer.release(); - _accumulator.close(); - })); - } + if (_bufferedContentSink != null && !_lastWritten) + _bufferedContentSink.write(true, null, _callback); else - { _callback.succeeded(); - } } @Override public void failed(Throwable x) { - if (_accumulator != null) - _accumulator.close(); + if (_bufferedContentSink != null && !_lastWritten) + _bufferedContentSink.write(true, null, Callback.NOOP); _callback.failed(x); } } - - private static class CountingByteBufferAccumulator implements AutoCloseable - { - private final ByteBufferAccumulator _accumulator; - private final int _maxSize; - private int _accumulatedCount; - - private CountingByteBufferAccumulator(ByteBufferPool bufferPool, boolean direct, int maxSize) - { - if (maxSize <= 0) - throw new IllegalArgumentException("maxSize must be > 0, was: " + maxSize); - _maxSize = maxSize; - _accumulator = new ByteBufferAccumulator(bufferPool, direct); - } - - private boolean copyBuffer(ByteBuffer buffer) - { - int remainingCapacity = space(); - if (buffer.remaining() >= remainingCapacity) - { - _accumulatedCount += remainingCapacity; - int end = buffer.position() + remainingCapacity; - _accumulator.copyBuffer(buffer.duplicate().limit(end)); - buffer.position(end); - return true; - } - else - { - _accumulatedCount += buffer.remaining(); - _accumulator.copyBuffer(buffer); - return false; - } - } - - private int space() - { - return _maxSize - _accumulatedCount; - } - - private RetainableByteBuffer takeRetainableByteBuffer() - { - _accumulatedCount = 0; - return _accumulator.takeRetainableByteBuffer(); - } - - @Override - public void close() - { - _accumulatedCount = 0; - _accumulator.close(); - } - } } diff --git a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/handler/BufferedResponseHandlerTest.java b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/handler/BufferedResponseHandlerTest.java index 378bce1048a..acebc7ee926 100644 --- a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/handler/BufferedResponseHandlerTest.java +++ b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/handler/BufferedResponseHandlerTest.java @@ -87,11 +87,13 @@ public class BufferedResponseHandlerTest @Test public void testIncluded() throws Exception { + _test._bufferSize = 2048; String response = _local.getResponse("GET /ctx/include/path HTTP/1.1\r\nHost: localhost\r\n\r\n"); assertThat(response, containsString(" 200 OK")); assertThat(response, containsString("Write: 0")); assertThat(response, containsString("Write: 9")); assertThat(response, containsString("Written: true")); + assertThat(response, containsString("Content-Length: ")); } @Test @@ -124,6 +126,7 @@ public class BufferedResponseHandlerTest public void testFlushed() throws Exception { _test._flush = true; + _test._bufferSize = 2048; String response = _local.getResponse("GET /ctx/include/path HTTP/1.1\r\nHost: localhost\r\n\r\n"); assertThat(response, containsString(" 200 OK")); assertThat(response, containsString("Write: 0")); @@ -134,6 +137,7 @@ public class BufferedResponseHandlerTest @Test public void testBufferSizeSmall() throws Exception { + _test._aggregationSize = 16; _test._bufferSize = 16; String response = _local.getResponse("GET /ctx/include/path HTTP/1.1\r\nHost: localhost\r\n\r\n"); assertThat(response, containsString(" 200 OK")); @@ -187,6 +191,7 @@ public class BufferedResponseHandlerTest public void testReset() throws Exception { _test._reset = true; + _test._bufferSize = 2048; String response = _local.getResponse("GET /ctx/include/path HTTP/1.1\r\nHost: localhost\r\n\r\n"); assertThat(response, containsString(" 200 OK")); assertThat(response, containsString("Write: 0")); @@ -197,6 +202,7 @@ public class BufferedResponseHandlerTest public static class TestHandler extends Handler.Abstract { + int _aggregationSize = -1; int _bufferSize = -1; String _mimeType; byte[] _content = new byte[128]; @@ -217,6 +223,8 @@ public class BufferedResponseHandlerTest if (_bufferSize > 0) request.setAttribute(BufferedResponseHandler.BUFFER_SIZE_ATTRIBUTE_NAME, _bufferSize); + if (_aggregationSize > 0) + request.setAttribute(BufferedResponseHandler.MAX_AGGREGATION_SIZE_ATTRIBUTE_NAME, _aggregationSize); if (_mimeType != null) response.getHeaders().put(HttpHeader.CONTENT_TYPE, _mimeType); @@ -231,8 +239,8 @@ public class BufferedResponseHandlerTest outputStream.flush(); } response.getHeaders().add("Written", "true"); - callback.succeeded(); } + callback.succeeded(); return true; }