Fixes #10483 - Improve BufferedResponseHandler
* Changed default buffer size from 2 GB to 16 KB. * Make max buffer size configurable. * Introduce `BufferedContentSink` with all the buffering logic, doing only one buffer copy instead of two, starting with a small buffer and growing it if needed. * Refactor `BufferedResponseHandler` to delegate all buffering work to `BufferedContentSink` * Introduced `ByteBufferAggregator` to aggregate ByteBuffers into a single ByteBuffer. Signed-off-by: Ludovic Orban <lorban@bitronix.be> Co-authored-by: gregw <gregw@webtide.com>
This commit is contained in:
parent
53ad119912
commit
d2dff9a758
|
@ -49,5 +49,10 @@
|
|||
<artifactId>jetty-test-helper</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.awaitility</groupId>
|
||||
<artifactId>awaitility</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
|
|
|
@ -0,0 +1,138 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
|
||||
//
|
||||
// This program and the accompanying materials are made available under the
|
||||
// terms of the Eclipse Public License v. 2.0 which is available at
|
||||
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
|
||||
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
|
||||
//
|
||||
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.io;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* <p>Aggregates data into a single ByteBuffer of a specified maximum size.</p>
|
||||
* <p>The buffer automatically grows as data is written to it, up until it reaches the specified maximum size.
|
||||
* Once the buffer is full, the aggregator will not aggregate any more bytes until its buffer is taken out,
|
||||
* after which a new aggregate/take buffer cycle can start.</p>
|
||||
* <p>The buffers are taken from the supplied {@link ByteBufferPool} or freshly allocated if one is not supplied.</p>
|
||||
*/
|
||||
public class ByteBufferAggregator
|
||||
{
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ByteBufferAggregator.class);
|
||||
|
||||
private final ByteBufferPool _bufferPool;
|
||||
private final boolean _direct;
|
||||
private final int _maxSize;
|
||||
private RetainableByteBuffer _retainableByteBuffer;
|
||||
private int _aggregatedSize;
|
||||
private int _currentSize;
|
||||
|
||||
/**
|
||||
* Creates a ByteBuffer aggregator.
|
||||
* @param bufferPool The {@link ByteBufferPool} from which to acquire the buffers
|
||||
* @param direct whether to get direct buffers
|
||||
* @param startSize the starting size of the buffer
|
||||
* @param maxSize the maximum size of the buffer which must be greater than {@code startSize}
|
||||
*/
|
||||
public ByteBufferAggregator(ByteBufferPool bufferPool, boolean direct, int startSize, int maxSize)
|
||||
{
|
||||
if (maxSize <= 0)
|
||||
throw new IllegalArgumentException("maxSize must be > 0, was: " + maxSize);
|
||||
if (startSize <= 0)
|
||||
throw new IllegalArgumentException("startSize must be > 0, was: " + startSize);
|
||||
if (startSize > maxSize)
|
||||
throw new IllegalArgumentException("maxSize (" + maxSize + ") must be >= startSize (" + startSize + ")");
|
||||
_bufferPool = (bufferPool == null) ? new ByteBufferPool.NonPooling() : bufferPool;
|
||||
_direct = direct;
|
||||
_maxSize = maxSize;
|
||||
_currentSize = startSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Aggregates the given ByteBuffer. This copies bytes up to the specified maximum size, at which
|
||||
* time this method returns {@code true} and {@link #takeRetainableByteBuffer()} must be called
|
||||
* for this method to accept aggregating again.
|
||||
* @param buffer the buffer to copy into this aggregator; its position is updated according to
|
||||
* the number of aggregated bytes
|
||||
* @return true if the aggregator's buffer is full and should be taken, false otherwise
|
||||
*/
|
||||
public boolean aggregate(ByteBuffer buffer)
|
||||
{
|
||||
tryExpandBufferCapacity(buffer.remaining());
|
||||
if (_retainableByteBuffer == null)
|
||||
{
|
||||
_retainableByteBuffer = _bufferPool.acquire(_currentSize, _direct);
|
||||
BufferUtil.flipToFill(_retainableByteBuffer.getByteBuffer());
|
||||
}
|
||||
int copySize = Math.min(_currentSize - _aggregatedSize, buffer.remaining());
|
||||
|
||||
ByteBuffer byteBuffer = _retainableByteBuffer.getByteBuffer();
|
||||
byteBuffer.put(byteBuffer.position(), buffer, buffer.position(), copySize);
|
||||
byteBuffer.position(byteBuffer.position() + copySize);
|
||||
buffer.position(buffer.position() + copySize);
|
||||
_aggregatedSize += copySize;
|
||||
return _aggregatedSize == _maxSize;
|
||||
}
|
||||
|
||||
private void tryExpandBufferCapacity(int remaining)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("tryExpandBufferCapacity remaining: {} _currentSize: {} _accumulatedSize={}", remaining, _currentSize, _aggregatedSize);
|
||||
if (_currentSize == _maxSize)
|
||||
return;
|
||||
int capacityLeft = _currentSize - _aggregatedSize;
|
||||
if (remaining <= capacityLeft)
|
||||
return;
|
||||
int need = remaining - capacityLeft;
|
||||
_currentSize = Math.min(_maxSize, ceilToNextPowerOfTwo(_currentSize + need));
|
||||
|
||||
if (_retainableByteBuffer != null)
|
||||
{
|
||||
BufferUtil.flipToFlush(_retainableByteBuffer.getByteBuffer(), 0);
|
||||
RetainableByteBuffer newBuffer = _bufferPool.acquire(_currentSize, _direct);
|
||||
BufferUtil.flipToFill(newBuffer.getByteBuffer());
|
||||
newBuffer.getByteBuffer().put(_retainableByteBuffer.getByteBuffer());
|
||||
_retainableByteBuffer.release();
|
||||
_retainableByteBuffer = newBuffer;
|
||||
}
|
||||
}
|
||||
|
||||
private static int ceilToNextPowerOfTwo(int val)
|
||||
{
|
||||
int result = 1 << (Integer.SIZE - Integer.numberOfLeadingZeros(val - 1));
|
||||
return result > 0 ? result : Integer.MAX_VALUE;
|
||||
}
|
||||
|
||||
/**
|
||||
* Takes the buffer out of the aggregator. Once the buffer has been taken out,
|
||||
* the aggregator resets itself and a new buffer will be acquired from the pool
|
||||
* during the next {@link #aggregate(ByteBuffer)} call.
|
||||
* @return the aggregated buffer, or null if nothing has been buffered yet
|
||||
*/
|
||||
public RetainableByteBuffer takeRetainableByteBuffer()
|
||||
{
|
||||
if (_retainableByteBuffer == null)
|
||||
return null;
|
||||
BufferUtil.flipToFlush(_retainableByteBuffer.getByteBuffer(), 0);
|
||||
RetainableByteBuffer result = _retainableByteBuffer;
|
||||
_retainableByteBuffer = null;
|
||||
_aggregatedSize = 0;
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "%s@%x{a=%d c=%d m=%d b=%s}".formatted(getClass().getSimpleName(), hashCode(), _aggregatedSize, _currentSize, _maxSize, _retainableByteBuffer);
|
||||
}
|
||||
}
|
|
@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture;
|
|||
import java.util.concurrent.Flow;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import org.eclipse.jetty.io.content.BufferedContentSink;
|
||||
import org.eclipse.jetty.io.content.ContentSinkOutputStream;
|
||||
import org.eclipse.jetty.io.content.ContentSinkSubscriber;
|
||||
import org.eclipse.jetty.io.content.ContentSourceInputStream;
|
||||
|
@ -411,6 +412,22 @@ public class Content
|
|||
*/
|
||||
public interface Sink
|
||||
{
|
||||
/**
|
||||
* <p>Wraps the given content sink with a buffering sink.</p>
|
||||
*
|
||||
* @param sink the sink to write to
|
||||
* @param bufferPool the {@link ByteBufferPool} to use
|
||||
* @param direct true to use direct buffers, false to use heap buffers
|
||||
* @param maxAggregationSize the maximum size that can be buffered in a single write;
|
||||
* any size above this threshold triggers a buffer flush
|
||||
* @param maxBufferSize the maximum size of the buffer
|
||||
* @return a Sink that writes to the given content sink
|
||||
*/
|
||||
static Sink asBuffered(Sink sink, ByteBufferPool bufferPool, boolean direct, int maxAggregationSize, int maxBufferSize)
|
||||
{
|
||||
return new BufferedContentSink(sink, bufferPool, direct, maxAggregationSize, maxBufferSize);
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Wraps the given content sink with an {@link OutputStream}.</p>
|
||||
*
|
||||
|
|
|
@ -0,0 +1,277 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
|
||||
//
|
||||
// This program and the accompanying materials are made available under the
|
||||
// terms of the Eclipse Public License v. 2.0 which is available at
|
||||
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
|
||||
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
|
||||
//
|
||||
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.io.content;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.WritePendingException;
|
||||
|
||||
import org.eclipse.jetty.io.ByteBufferAggregator;
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.io.Content;
|
||||
import org.eclipse.jetty.io.RetainableByteBuffer;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.IteratingCallback;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* <p>A {@link Content.Sink} backed by another {@link Content.Sink}.
|
||||
* Any content written to this {@link Content.Sink} is buffered,
|
||||
* then written to the delegate using
|
||||
* {@link Content.Sink#write(boolean, ByteBuffer, Callback)}. </p>
|
||||
*/
|
||||
public class BufferedContentSink implements Content.Sink
|
||||
{
|
||||
private static final Logger LOG = LoggerFactory.getLogger(BufferedContentSink.class);
|
||||
|
||||
private static final int START_BUFFER_SIZE = 1024;
|
||||
|
||||
private final Content.Sink _delegate;
|
||||
private final ByteBufferPool _bufferPool;
|
||||
private final boolean _direct;
|
||||
private final int _maxBufferSize;
|
||||
private final int _maxAggregationSize;
|
||||
private final Flusher _flusher;
|
||||
private ByteBufferAggregator _aggregator;
|
||||
private boolean _firstWrite = true;
|
||||
private boolean _lastWritten;
|
||||
|
||||
public BufferedContentSink(Content.Sink delegate, ByteBufferPool bufferPool, boolean direct, int maxAggregationSize, int maxBufferSize)
|
||||
{
|
||||
if (maxBufferSize <= 0)
|
||||
throw new IllegalArgumentException("maxBufferSize must be > 0, was: " + maxBufferSize);
|
||||
if (maxAggregationSize <= 0)
|
||||
throw new IllegalArgumentException("maxAggregationSize must be > 0, was: " + maxAggregationSize);
|
||||
if (maxBufferSize < maxAggregationSize)
|
||||
throw new IllegalArgumentException("maxBufferSize (" + maxBufferSize + ") must be >= maxAggregationSize (" + maxAggregationSize + ")");
|
||||
_delegate = delegate;
|
||||
_bufferPool = (bufferPool == null) ? new ByteBufferPool.NonPooling() : bufferPool;
|
||||
_direct = direct;
|
||||
_maxBufferSize = maxBufferSize;
|
||||
_maxAggregationSize = maxAggregationSize;
|
||||
_flusher = new Flusher(delegate);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(boolean last, ByteBuffer byteBuffer, Callback callback)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("writing last={} {}", last, BufferUtil.toDetailString(byteBuffer));
|
||||
|
||||
if (_lastWritten)
|
||||
{
|
||||
callback.failed(new IOException("complete"));
|
||||
return;
|
||||
}
|
||||
_lastWritten = last;
|
||||
if (_firstWrite)
|
||||
{
|
||||
_firstWrite = false;
|
||||
if (last)
|
||||
{
|
||||
// No need to buffer if this is both the first and the last write.
|
||||
_delegate.write(true, byteBuffer, callback);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
ByteBuffer current = byteBuffer != null ? byteBuffer : BufferUtil.EMPTY_BUFFER;
|
||||
if (current.remaining() <= _maxAggregationSize)
|
||||
{
|
||||
// current buffer can be aggregated
|
||||
if (_aggregator == null)
|
||||
_aggregator = new ByteBufferAggregator(_bufferPool, _direct, Math.min(START_BUFFER_SIZE, _maxBufferSize), _maxBufferSize);
|
||||
aggregateAndFlush(last, current, callback);
|
||||
}
|
||||
else
|
||||
{
|
||||
// current buffer is greater than the max aggregation size
|
||||
flush(last, current, callback);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Flushes the aggregated buffer if something was aggregated, then flushes the
|
||||
* given buffer, bypassing the aggregator.
|
||||
*/
|
||||
private void flush(boolean last, ByteBuffer currentBuffer, Callback callback)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("given buffer is greater than _maxBufferSize");
|
||||
|
||||
RetainableByteBuffer aggregatedBuffer = _aggregator == null ? null : _aggregator.takeRetainableByteBuffer();
|
||||
if (aggregatedBuffer == null)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("nothing aggregated, flushing current buffer {}", currentBuffer);
|
||||
_flusher.offer(last, currentBuffer, callback);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("flushing aggregated buffer {}", aggregatedBuffer);
|
||||
_flusher.offer(false, aggregatedBuffer.getByteBuffer(), new Callback.Nested(Callback.from(aggregatedBuffer::release))
|
||||
{
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
super.succeeded();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("succeeded writing aggregated buffer, flushing current buffer {}", currentBuffer);
|
||||
_flusher.offer(last, currentBuffer, callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("failure writing aggregated buffer", x);
|
||||
super.failed(x);
|
||||
callback.failed(x);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Aggregates the given buffer, flushing the aggregated buffer if necessary.
|
||||
*/
|
||||
private void aggregateAndFlush(boolean last, ByteBuffer currentBuffer, Callback callback)
|
||||
{
|
||||
boolean full = _aggregator.aggregate(currentBuffer);
|
||||
boolean complete = last && !currentBuffer.hasRemaining();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("aggregated current buffer, full={}, complete={}, bytes left={}, aggregator={}", full, complete, currentBuffer.remaining(), _aggregator);
|
||||
if (complete)
|
||||
{
|
||||
RetainableByteBuffer aggregatedBuffer = _aggregator.takeRetainableByteBuffer();
|
||||
if (aggregatedBuffer != null)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("complete; writing aggregated buffer as the last one: {} bytes", aggregatedBuffer.remaining());
|
||||
_flusher.offer(true, aggregatedBuffer.getByteBuffer(), Callback.from(callback, aggregatedBuffer::release));
|
||||
}
|
||||
else
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("complete; no aggregated buffer, writing last empty buffer");
|
||||
_flusher.offer(true, BufferUtil.EMPTY_BUFFER, callback);
|
||||
}
|
||||
}
|
||||
else if (full)
|
||||
{
|
||||
RetainableByteBuffer aggregatedBuffer = _aggregator.takeRetainableByteBuffer();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("writing aggregated buffer: {} bytes", aggregatedBuffer.remaining());
|
||||
_flusher.offer(false, aggregatedBuffer.getByteBuffer(), new Callback.Nested(Callback.from(aggregatedBuffer::release))
|
||||
{
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
super.succeeded();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("written aggregated buffer, writing remaining of current: {} bytes{}", currentBuffer.remaining(), (last ? " (last write)" : ""));
|
||||
if (last)
|
||||
_flusher.offer(true, currentBuffer, callback);
|
||||
else
|
||||
aggregateAndFlush(false, currentBuffer, callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("failure writing aggregated buffer", x);
|
||||
super.failed(x);
|
||||
callback.failed(x);
|
||||
}
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("buffer fully aggregated, delaying writing - aggregator: {}", _aggregator);
|
||||
_flusher.offer(callback);
|
||||
}
|
||||
}
|
||||
|
||||
private static class Flusher extends IteratingCallback
|
||||
{
|
||||
private static final ByteBuffer COMPLETE_CALLBACK = BufferUtil.allocate(0);
|
||||
|
||||
private final Content.Sink _sink;
|
||||
private boolean _last;
|
||||
private ByteBuffer _buffer;
|
||||
private Callback _callback;
|
||||
private boolean _lastWritten;
|
||||
|
||||
Flusher(Content.Sink sink)
|
||||
{
|
||||
_sink = sink;
|
||||
}
|
||||
|
||||
void offer(Callback callback)
|
||||
{
|
||||
offer(false, COMPLETE_CALLBACK, callback);
|
||||
}
|
||||
|
||||
void offer(boolean last, ByteBuffer byteBuffer, Callback callback)
|
||||
{
|
||||
if (_callback != null)
|
||||
throw new WritePendingException();
|
||||
_last = last;
|
||||
_buffer = byteBuffer;
|
||||
_callback = callback;
|
||||
iterate();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Action process()
|
||||
{
|
||||
if (_lastWritten)
|
||||
return Action.SUCCEEDED;
|
||||
if (_callback == null)
|
||||
return Action.IDLE;
|
||||
if (_buffer != COMPLETE_CALLBACK)
|
||||
{
|
||||
_lastWritten = _last;
|
||||
_sink.write(_last, _buffer, this);
|
||||
}
|
||||
else
|
||||
{
|
||||
succeeded();
|
||||
}
|
||||
return Action.SCHEDULED;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
_buffer = null;
|
||||
Callback callback = _callback;
|
||||
_callback = null;
|
||||
callback.succeeded();
|
||||
super.succeeded();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onCompleteFailure(Throwable cause)
|
||||
{
|
||||
_buffer = null;
|
||||
_callback.failed(cause);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,553 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
|
||||
//
|
||||
// This program and the accompanying materials are made available under the
|
||||
// terms of the Eclipse Public License v. 2.0 which is available at
|
||||
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
|
||||
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
|
||||
//
|
||||
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.io;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.eclipse.jetty.io.content.AsyncContent;
|
||||
import org.eclipse.jetty.io.content.BufferedContentSink;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class BufferedContentSinkTest
|
||||
{
|
||||
private ArrayByteBufferPool.Tracking _bufferPool;
|
||||
|
||||
@BeforeEach
|
||||
public void setUp()
|
||||
{
|
||||
_bufferPool = new ArrayByteBufferPool.Tracking();
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void tearDown()
|
||||
{
|
||||
assertThat("Leaks: " + _bufferPool.dumpLeaks(), _bufferPool.getLeaks().size(), is(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConstructor()
|
||||
{
|
||||
assertThrows(IllegalArgumentException.class, () -> new BufferedContentSink(new AsyncContent(), _bufferPool, true, 1, 0));
|
||||
assertThrows(IllegalArgumentException.class, () -> new BufferedContentSink(new AsyncContent(), _bufferPool, true, 1, -1));
|
||||
assertThrows(IllegalArgumentException.class, () -> new BufferedContentSink(new AsyncContent(), _bufferPool, true, 0, 1));
|
||||
assertThrows(IllegalArgumentException.class, () -> new BufferedContentSink(new AsyncContent(), _bufferPool, true, -1, 1));
|
||||
assertThrows(IllegalArgumentException.class, () -> new BufferedContentSink(new AsyncContent(), _bufferPool, true, 0, 0));
|
||||
assertThrows(IllegalArgumentException.class, () -> new BufferedContentSink(new AsyncContent(), _bufferPool, true, -1, -1));
|
||||
assertThrows(IllegalArgumentException.class, () -> new BufferedContentSink(new AsyncContent(), _bufferPool, true, 2, 1));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWriteInvokesDemandCallback() throws Exception
|
||||
{
|
||||
try (AsyncContent async = new AsyncContent())
|
||||
{
|
||||
BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, 4096, 4096);
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
async.demand(latch::countDown);
|
||||
assertFalse(latch.await(250, TimeUnit.MILLISECONDS));
|
||||
|
||||
buffered.write(true, UTF_8.encode("one"), Callback.NOOP);
|
||||
|
||||
assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
Content.Chunk chunk = async.read();
|
||||
assertNotNull(chunk);
|
||||
chunk.release();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testChunkReleaseSucceedsWriteCallback()
|
||||
{
|
||||
try (AsyncContent async = new AsyncContent())
|
||||
{
|
||||
BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, 4096, 4096);
|
||||
|
||||
AtomicInteger successCounter = new AtomicInteger();
|
||||
AtomicReference<Throwable> failureRef = new AtomicReference<>();
|
||||
|
||||
buffered.write(true, ByteBuffer.wrap(new byte[1]), Callback.from(successCounter::incrementAndGet, failureRef::set));
|
||||
|
||||
Content.Chunk chunk = async.read();
|
||||
assertThat(successCounter.get(), is(0));
|
||||
chunk.retain();
|
||||
assertThat(chunk.release(), is(false));
|
||||
assertThat(successCounter.get(), is(0));
|
||||
assertThat(chunk.release(), is(true));
|
||||
assertThat(successCounter.get(), is(1));
|
||||
assertThat(failureRef.get(), is(nullValue()));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEmptyChunkReadSucceedsWriteCallback()
|
||||
{
|
||||
try (AsyncContent async = new AsyncContent())
|
||||
{
|
||||
BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, 4096, 4096);
|
||||
|
||||
AtomicInteger successCounter = new AtomicInteger();
|
||||
AtomicReference<Throwable> failureRef = new AtomicReference<>();
|
||||
|
||||
buffered.write(true, ByteBuffer.wrap(new byte[0]), Callback.from(successCounter::incrementAndGet, failureRef::set));
|
||||
|
||||
Content.Chunk chunk = async.read();
|
||||
assertThat(successCounter.get(), is(1));
|
||||
assertThat(chunk.isLast(), is(true));
|
||||
assertThat(chunk.hasRemaining(), is(false));
|
||||
assertThat(chunk.release(), is(true));
|
||||
assertThat(successCounter.get(), is(1));
|
||||
assertThat(failureRef.get(), is(nullValue()));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWriteAfterWriteLast()
|
||||
{
|
||||
try (AsyncContent async = new AsyncContent())
|
||||
{
|
||||
BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, 4096, 4096);
|
||||
|
||||
AtomicInteger successCounter = new AtomicInteger();
|
||||
AtomicReference<Throwable> failureRef = new AtomicReference<>();
|
||||
|
||||
buffered.write(true, ByteBuffer.wrap(new byte[0]), Callback.from(successCounter::incrementAndGet, failureRef::set));
|
||||
|
||||
Content.Chunk chunk = async.read();
|
||||
assertThat(chunk.isLast(), is(true));
|
||||
assertThat(chunk.release(), is(true));
|
||||
|
||||
assertThat(successCounter.get(), is(1));
|
||||
assertThat(failureRef.get(), is(nullValue()));
|
||||
|
||||
buffered.write(false, ByteBuffer.wrap(new byte[0]), Callback.from(successCounter::incrementAndGet, failureRef::set));
|
||||
assertThat(successCounter.get(), is(1));
|
||||
assertThat(failureRef.get(), instanceOf(IOException.class));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLargeBuffer()
|
||||
{
|
||||
try (AsyncContent async = new AsyncContent())
|
||||
{
|
||||
BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, 4096, 4096);
|
||||
|
||||
buffered.write(false, ByteBuffer.wrap("one ".getBytes(UTF_8)), Callback.NOOP);
|
||||
Content.Chunk chunk = async.read();
|
||||
assertThat(chunk, nullValue());
|
||||
|
||||
buffered.write(false, ByteBuffer.wrap("two".getBytes(UTF_8)), Callback.NOOP);
|
||||
chunk = async.read();
|
||||
assertThat(chunk, nullValue());
|
||||
|
||||
buffered.write(true, null, Callback.NOOP);
|
||||
chunk = async.read();
|
||||
assertThat(chunk.isLast(), is(true));
|
||||
assertThat(BufferUtil.toString(chunk.getByteBuffer(), UTF_8), is("one two"));
|
||||
assertThat(chunk.release(), is(true));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSmallBuffer()
|
||||
{
|
||||
int maxBufferSize = 16;
|
||||
byte[] input1 = new byte[1024];
|
||||
Arrays.fill(input1, (byte)'1');
|
||||
byte[] input2 = new byte[1023];
|
||||
Arrays.fill(input2, (byte)'2');
|
||||
|
||||
ByteBuffer accumulatingBuffer = BufferUtil.allocate(4096);
|
||||
BufferUtil.flipToFill(accumulatingBuffer);
|
||||
|
||||
try (AsyncContent async = new AsyncContent())
|
||||
{
|
||||
BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, maxBufferSize, maxBufferSize);
|
||||
|
||||
buffered.write(false, ByteBuffer.wrap(input1), Callback.from(() ->
|
||||
buffered.write(true, ByteBuffer.wrap(input2), Callback.NOOP)));
|
||||
|
||||
int loopCount = 0;
|
||||
while (true)
|
||||
{
|
||||
loopCount++;
|
||||
Content.Chunk chunk = async.read();
|
||||
assertThat(chunk, notNullValue());
|
||||
accumulatingBuffer.put(chunk.getByteBuffer());
|
||||
assertThat(chunk.release(), is(true));
|
||||
if (chunk.isLast())
|
||||
break;
|
||||
}
|
||||
assertThat(loopCount, is(2));
|
||||
|
||||
BufferUtil.flipToFlush(accumulatingBuffer, 0);
|
||||
assertThat(accumulatingBuffer.remaining(), is(input1.length + input2.length));
|
||||
for (byte b : input1)
|
||||
{
|
||||
assertThat(accumulatingBuffer.get(), is(b));
|
||||
}
|
||||
for (byte b : input2)
|
||||
{
|
||||
assertThat(accumulatingBuffer.get(), is(b));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaxAggregationSizeExceeded()
|
||||
{
|
||||
int maxBufferSize = 1024;
|
||||
int maxAggregationSize = 128;
|
||||
byte[] input1 = new byte[512];
|
||||
Arrays.fill(input1, (byte)'1');
|
||||
byte[] input2 = new byte[128];
|
||||
Arrays.fill(input2, (byte)'2');
|
||||
|
||||
ByteBuffer accumulatingBuffer = BufferUtil.allocate(4096);
|
||||
BufferUtil.flipToFill(accumulatingBuffer);
|
||||
|
||||
try (AsyncContent async = new AsyncContent())
|
||||
{
|
||||
BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, maxAggregationSize, maxBufferSize);
|
||||
|
||||
buffered.write(false, ByteBuffer.wrap(input1), Callback.from(() ->
|
||||
buffered.write(true, ByteBuffer.wrap(input2), Callback.NOOP)));
|
||||
|
||||
Content.Chunk chunk = async.read();
|
||||
assertThat(chunk, notNullValue());
|
||||
assertThat(chunk.remaining(), is(512));
|
||||
accumulatingBuffer.put(chunk.getByteBuffer());
|
||||
assertThat(chunk.release(), is(true));
|
||||
assertThat(chunk.isLast(), is(false));
|
||||
|
||||
chunk = async.read();
|
||||
assertThat(chunk, notNullValue());
|
||||
assertThat(chunk.remaining(), is(128));
|
||||
accumulatingBuffer.put(chunk.getByteBuffer());
|
||||
assertThat(chunk.release(), is(true));
|
||||
assertThat(chunk.isLast(), is(true));
|
||||
|
||||
BufferUtil.flipToFlush(accumulatingBuffer, 0);
|
||||
assertThat(accumulatingBuffer.remaining(), is(input1.length + input2.length));
|
||||
for (byte b : input1)
|
||||
{
|
||||
assertThat(accumulatingBuffer.get(), is(b));
|
||||
}
|
||||
for (byte b : input2)
|
||||
{
|
||||
assertThat(accumulatingBuffer.get(), is(b));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaxAggregationSizeExceededAfterBuffering()
|
||||
{
|
||||
int maxBufferSize = 1024;
|
||||
int maxAggregationSize = 128;
|
||||
byte[] input1 = new byte[128];
|
||||
Arrays.fill(input1, (byte)'1');
|
||||
byte[] input2 = new byte[512];
|
||||
Arrays.fill(input2, (byte)'2');
|
||||
|
||||
ByteBuffer accumulatingBuffer = BufferUtil.allocate(4096);
|
||||
BufferUtil.flipToFill(accumulatingBuffer);
|
||||
|
||||
try (AsyncContent async = new AsyncContent())
|
||||
{
|
||||
BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, maxAggregationSize, maxBufferSize);
|
||||
|
||||
buffered.write(false, ByteBuffer.wrap(input1), Callback.from(() ->
|
||||
buffered.write(true, ByteBuffer.wrap(input2), Callback.NOOP)));
|
||||
|
||||
Content.Chunk chunk = async.read();
|
||||
assertThat(chunk, notNullValue());
|
||||
assertThat(chunk.remaining(), is(128));
|
||||
accumulatingBuffer.put(chunk.getByteBuffer());
|
||||
assertThat(chunk.release(), is(true));
|
||||
assertThat(chunk.isLast(), is(false));
|
||||
|
||||
chunk = async.read();
|
||||
assertThat(chunk, notNullValue());
|
||||
assertThat(chunk.remaining(), is(512));
|
||||
accumulatingBuffer.put(chunk.getByteBuffer());
|
||||
assertThat(chunk.release(), is(true));
|
||||
assertThat(chunk.isLast(), is(true));
|
||||
|
||||
BufferUtil.flipToFlush(accumulatingBuffer, 0);
|
||||
assertThat(accumulatingBuffer.remaining(), is(input1.length + input2.length));
|
||||
for (byte b : input1)
|
||||
{
|
||||
assertThat(accumulatingBuffer.get(), is(b));
|
||||
}
|
||||
for (byte b : input2)
|
||||
{
|
||||
assertThat(accumulatingBuffer.get(), is(b));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaxAggregationSizeRespected()
|
||||
{
|
||||
int maxBufferSize = 1024;
|
||||
int maxAggregationSize = 128;
|
||||
byte[] input1 = new byte[128];
|
||||
Arrays.fill(input1, (byte)'1');
|
||||
byte[] input2 = new byte[128];
|
||||
Arrays.fill(input2, (byte)'2');
|
||||
|
||||
ByteBuffer accumulatingBuffer = BufferUtil.allocate(4096);
|
||||
BufferUtil.flipToFill(accumulatingBuffer);
|
||||
|
||||
try (AsyncContent async = new AsyncContent())
|
||||
{
|
||||
BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, maxAggregationSize, maxBufferSize);
|
||||
|
||||
buffered.write(false, ByteBuffer.wrap(input1), Callback.from(() ->
|
||||
buffered.write(true, ByteBuffer.wrap(input2), Callback.NOOP)));
|
||||
|
||||
Content.Chunk chunk = async.read();
|
||||
assertThat(chunk, notNullValue());
|
||||
assertThat(chunk.remaining(), is(256));
|
||||
accumulatingBuffer.put(chunk.getByteBuffer());
|
||||
assertThat(chunk.release(), is(true));
|
||||
assertThat(chunk.isLast(), is(true));
|
||||
|
||||
BufferUtil.flipToFlush(accumulatingBuffer, 0);
|
||||
assertThat(accumulatingBuffer.remaining(), is(input1.length + input2.length));
|
||||
for (byte b : input1)
|
||||
{
|
||||
assertThat(accumulatingBuffer.get(), is(b));
|
||||
}
|
||||
for (byte b : input2)
|
||||
{
|
||||
assertThat(accumulatingBuffer.get(), is(b));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBufferGrowth()
|
||||
{
|
||||
byte[] input1 = new byte[4000];
|
||||
Arrays.fill(input1, (byte)'1');
|
||||
byte[] input2 = new byte[4000];
|
||||
Arrays.fill(input2, (byte)'2');
|
||||
byte[] input3 = new byte[2000];
|
||||
Arrays.fill(input3, (byte)'3');
|
||||
|
||||
ByteBuffer accumulatingBuffer = BufferUtil.allocate(16384);
|
||||
BufferUtil.flipToFill(accumulatingBuffer);
|
||||
|
||||
try (AsyncContent async = new AsyncContent())
|
||||
{
|
||||
BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, 4096, 4096);
|
||||
|
||||
buffered.write(false, ByteBuffer.wrap(input1), Callback.from(() ->
|
||||
buffered.write(false, ByteBuffer.wrap(input2), Callback.from(() ->
|
||||
buffered.write(true, ByteBuffer.wrap(input3), Callback.NOOP)))));
|
||||
|
||||
// We expect 3 buffer flushes: 4096b + 4096b + 1808b == 10_000b.
|
||||
Content.Chunk chunk = async.read();
|
||||
assertThat(chunk, notNullValue());
|
||||
assertThat(chunk.remaining(), is(4096));
|
||||
accumulatingBuffer.put(chunk.getByteBuffer());
|
||||
assertThat(chunk.release(), is(true));
|
||||
assertThat(chunk.isLast(), is(false));
|
||||
|
||||
chunk = async.read();
|
||||
assertThat(chunk, notNullValue());
|
||||
assertThat(chunk.remaining(), is(4096));
|
||||
accumulatingBuffer.put(chunk.getByteBuffer());
|
||||
assertThat(chunk.release(), is(true));
|
||||
assertThat(chunk.isLast(), is(false));
|
||||
|
||||
chunk = async.read();
|
||||
assertThat(chunk, notNullValue());
|
||||
assertThat(chunk.remaining(), is(1808));
|
||||
accumulatingBuffer.put(chunk.getByteBuffer());
|
||||
assertThat(chunk.release(), is(true));
|
||||
assertThat(chunk.isLast(), is(true));
|
||||
|
||||
BufferUtil.flipToFlush(accumulatingBuffer, 0);
|
||||
assertThat(accumulatingBuffer.remaining(), is(input1.length + input2.length + input3.length));
|
||||
for (byte b : input1)
|
||||
{
|
||||
assertThat(accumulatingBuffer.get(), is(b));
|
||||
}
|
||||
for (byte b : input2)
|
||||
{
|
||||
assertThat(accumulatingBuffer.get(), is(b));
|
||||
}
|
||||
for (byte b : input3)
|
||||
{
|
||||
assertThat(accumulatingBuffer.get(), is(b));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testByteByByteRecursion() throws Exception
|
||||
{
|
||||
try (AsyncContent async = new AsyncContent())
|
||||
{
|
||||
BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, 4096, 4096);
|
||||
AtomicInteger count = new AtomicInteger(8192);
|
||||
CountDownLatch complete = new CountDownLatch(1);
|
||||
Callback callback = new Callback()
|
||||
{
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
int c = count.decrementAndGet();
|
||||
ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[]{(byte)c});
|
||||
if (c >= 0)
|
||||
buffered.write(c == 0, byteBuffer, this);
|
||||
else
|
||||
complete.countDown();
|
||||
}
|
||||
};
|
||||
|
||||
callback.succeeded();
|
||||
|
||||
Content.Chunk read = async.read();
|
||||
assertThat(read.isLast(), is(false));
|
||||
assertThat(read.remaining(), is(4096));
|
||||
assertThat(read.release(), is(true));
|
||||
|
||||
read = async.read();
|
||||
assertThat(read.isLast(), is(true));
|
||||
assertThat(read.remaining(), is(4096));
|
||||
assertThat(read.release(), is(true));
|
||||
|
||||
assertTrue(complete.await(5, TimeUnit.SECONDS));
|
||||
assertThat(count.get(), is(-1));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testByteByByteAsync() throws Exception
|
||||
{
|
||||
try (AsyncContent async = new AsyncContent())
|
||||
{
|
||||
BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, 1024, 1024);
|
||||
AtomicInteger count = new AtomicInteger(2048);
|
||||
CountDownLatch complete = new CountDownLatch(1);
|
||||
Callback callback = new Callback()
|
||||
{
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
int c = count.decrementAndGet();
|
||||
if (c >= 0)
|
||||
{
|
||||
Callback cb = this;
|
||||
new Thread(() ->
|
||||
{
|
||||
ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[]{(byte)c});
|
||||
buffered.write(c == 0, byteBuffer, cb);
|
||||
}).start();
|
||||
}
|
||||
else
|
||||
{
|
||||
complete.countDown();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
callback.succeeded();
|
||||
|
||||
Content.Chunk read = await().atMost(5, TimeUnit.SECONDS).until(async::read, Objects::nonNull);
|
||||
assertThat(read.isLast(), is(false));
|
||||
assertThat(read.remaining(), is(1024));
|
||||
assertThat(read.release(), is(true));
|
||||
|
||||
read = await().atMost(5, TimeUnit.SECONDS).until(async::read, Objects::nonNull);
|
||||
assertThat(read.isLast(), is(true));
|
||||
assertThat(read.remaining(), is(1024));
|
||||
assertThat(read.release(), is(true));
|
||||
|
||||
assertTrue(complete.await(5, TimeUnit.SECONDS));
|
||||
assertThat(count.get(), is(-1));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSmallThenLargeWritesRecursion() throws Exception
|
||||
{
|
||||
try (AsyncContent async = new AsyncContent())
|
||||
{
|
||||
BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, 1, 4096);
|
||||
AtomicInteger count = new AtomicInteger(8192);
|
||||
CountDownLatch complete = new CountDownLatch(1);
|
||||
Callback callback = new Callback()
|
||||
{
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
int c = count.decrementAndGet();
|
||||
ByteBuffer byteBuffer = (c % 2 == 0) ? ByteBuffer.wrap(new byte[512]) : ByteBuffer.wrap(new byte[]{(byte)c});
|
||||
if (c >= 0)
|
||||
buffered.write(c == 0, byteBuffer, this);
|
||||
else
|
||||
complete.countDown();
|
||||
}
|
||||
};
|
||||
|
||||
callback.succeeded();
|
||||
|
||||
for (int i = 0; i < 4096; i++)
|
||||
{
|
||||
Content.Chunk read = async.read();
|
||||
assertThat(read.isLast(), is(false));
|
||||
assertThat(read.remaining(), is(1));
|
||||
assertThat(read.release(), is(true));
|
||||
|
||||
read = async.read();
|
||||
assertThat(read.isLast(), is(i == 4095));
|
||||
assertThat(read.remaining(), is(512));
|
||||
assertThat(read.release(), is(true));
|
||||
}
|
||||
|
||||
assertTrue(complete.await(5, TimeUnit.SECONDS));
|
||||
assertThat(count.get(), is(-1));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,88 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
|
||||
//
|
||||
// This program and the accompanying materials are made available under the
|
||||
// terms of the Eclipse Public License v. 2.0 which is available at
|
||||
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
|
||||
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
|
||||
//
|
||||
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.io;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
public class ByteBufferAggregatorTest
|
||||
{
|
||||
private ArrayByteBufferPool.Tracking bufferPool;
|
||||
|
||||
@BeforeEach
|
||||
public void before()
|
||||
{
|
||||
bufferPool = new ArrayByteBufferPool.Tracking();
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void tearDown()
|
||||
{
|
||||
assertThat("Leaks: " + bufferPool.dumpLeaks(), bufferPool.getLeaks().size(), is(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConstructor()
|
||||
{
|
||||
assertThrows(IllegalArgumentException.class, () -> new ByteBufferAggregator(bufferPool, true, 0, 0));
|
||||
assertThrows(IllegalArgumentException.class, () -> new ByteBufferAggregator(bufferPool, true, 0, 1));
|
||||
assertThrows(IllegalArgumentException.class, () -> new ByteBufferAggregator(bufferPool, true, 1, 0));
|
||||
assertThrows(IllegalArgumentException.class, () -> new ByteBufferAggregator(bufferPool, true, 0, -1));
|
||||
assertThrows(IllegalArgumentException.class, () -> new ByteBufferAggregator(bufferPool, true, -1, 0));
|
||||
assertThrows(IllegalArgumentException.class, () -> new ByteBufferAggregator(bufferPool, true, 2, 1));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFullInSingleShot()
|
||||
{
|
||||
ByteBufferAggregator aggregator = new ByteBufferAggregator(bufferPool, true, 1, 16);
|
||||
|
||||
ByteBuffer byteBuffer1 = ByteBuffer.wrap(new byte[16]);
|
||||
assertThat(aggregator.aggregate(byteBuffer1), is(true));
|
||||
assertThat(byteBuffer1.remaining(), is(0));
|
||||
|
||||
ByteBuffer byteBuffer2 = ByteBuffer.wrap(new byte[16]);
|
||||
assertThat(aggregator.aggregate(byteBuffer2), is(true));
|
||||
assertThat(byteBuffer2.remaining(), is(16));
|
||||
|
||||
RetainableByteBuffer retainableByteBuffer = aggregator.takeRetainableByteBuffer();
|
||||
assertThat(retainableByteBuffer.getByteBuffer().remaining(), is(16));
|
||||
assertThat(retainableByteBuffer.release(), is(true));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFullInMultipleShots()
|
||||
{
|
||||
ByteBufferAggregator aggregator = new ByteBufferAggregator(bufferPool, true, 1, 16);
|
||||
|
||||
ByteBuffer byteBuffer1 = ByteBuffer.wrap(new byte[15]);
|
||||
assertThat(aggregator.aggregate(byteBuffer1), is(false));
|
||||
assertThat(byteBuffer1.remaining(), is(0));
|
||||
|
||||
ByteBuffer byteBuffer2 = ByteBuffer.wrap(new byte[16]);
|
||||
assertThat(aggregator.aggregate(byteBuffer2), is(true));
|
||||
assertThat(byteBuffer2.remaining(), is(15));
|
||||
|
||||
RetainableByteBuffer retainableByteBuffer = aggregator.takeRetainableByteBuffer();
|
||||
assertThat(retainableByteBuffer.getByteBuffer().remaining(), is(16));
|
||||
assertThat(retainableByteBuffer.release(), is(true));
|
||||
}
|
||||
}
|
|
@ -13,6 +13,7 @@
|
|||
|
||||
package org.eclipse.jetty.server;
|
||||
|
||||
import java.io.OutputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ListIterator;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
@ -30,6 +31,7 @@ import org.eclipse.jetty.http.HttpMethod;
|
|||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.http.HttpVersion;
|
||||
import org.eclipse.jetty.http.Trailers;
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.io.Content;
|
||||
import org.eclipse.jetty.io.QuietException;
|
||||
import org.eclipse.jetty.server.handler.ErrorHandler;
|
||||
|
@ -530,6 +532,40 @@ public interface Response extends Content.Sink
|
|||
return -1;
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Wraps a {@link Response} as a {@link OutputStream} that performs buffering. The necessary
|
||||
* {@link ByteBufferPool} is taken from the request's connector while the size and direction of the buffer
|
||||
* is read from the request's {@link HttpConfiguration}.</p>
|
||||
* <p>This is equivalent to:</p>
|
||||
* <p>{@code Content.Sink.asOutputStream(Response.asBufferedSink(request, response))}</p>
|
||||
* @param request the request from which to get the buffering sink's settings
|
||||
* @param response the response to wrap
|
||||
* @return a buffering {@link OutputStream}
|
||||
*/
|
||||
static OutputStream asBufferedOutputStream(Request request, Response response)
|
||||
{
|
||||
return Content.Sink.asOutputStream(Response.asBufferedSink(request, response));
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps a {@link Response} as a {@link Content.Sink} that performs buffering. The necessary
|
||||
* {@link ByteBufferPool} is taken from the request's connector while the size, direction of the buffer
|
||||
* and commit size are read from the request's {@link HttpConfiguration}.
|
||||
* @param request the request from which to get the buffering sink's settings
|
||||
* @param response the response to wrap
|
||||
* @return a buffering {@link Content.Sink}
|
||||
*/
|
||||
static Content.Sink asBufferedSink(Request request, Response response)
|
||||
{
|
||||
ConnectionMetaData connectionMetaData = request.getConnectionMetaData();
|
||||
ByteBufferPool bufferPool = connectionMetaData.getConnector().getByteBufferPool();
|
||||
HttpConfiguration httpConfiguration = connectionMetaData.getHttpConfiguration();
|
||||
int bufferSize = httpConfiguration.getOutputBufferSize();
|
||||
boolean useOutputDirectByteBuffers = httpConfiguration.isUseOutputDirectByteBuffers();
|
||||
int outputAggregationSize = httpConfiguration.getOutputAggregationSize();
|
||||
return Content.Sink.asBuffered(response, bufferPool, useOutputDirectByteBuffers, outputAggregationSize, bufferSize);
|
||||
}
|
||||
|
||||
class Wrapper implements Response
|
||||
{
|
||||
private final Request _request;
|
||||
|
|
|
@ -19,17 +19,15 @@ import org.eclipse.jetty.http.HttpHeader;
|
|||
import org.eclipse.jetty.http.HttpMethod;
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.http.MimeTypes;
|
||||
import org.eclipse.jetty.io.ByteBufferAccumulator;
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.io.RetainableByteBuffer;
|
||||
import org.eclipse.jetty.io.Content;
|
||||
import org.eclipse.jetty.server.ConnectionMetaData;
|
||||
import org.eclipse.jetty.server.Handler;
|
||||
import org.eclipse.jetty.server.HttpConfiguration;
|
||||
import org.eclipse.jetty.server.Request;
|
||||
import org.eclipse.jetty.server.Response;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.IncludeExclude;
|
||||
import org.eclipse.jetty.util.IteratingNestedCallback;
|
||||
import org.eclipse.jetty.util.StringUtil;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -47,14 +45,25 @@ import org.slf4j.LoggerFactory;
|
|||
* decision to buffer or not.
|
||||
* </p>
|
||||
* <p>
|
||||
* Note also that there are no memory limits to the size of the buffer, thus
|
||||
* this handler can represent an unbounded memory commitment if the content
|
||||
* generated can also be unbounded.
|
||||
* Note also that the size of the buffer can be controlled by setting the
|
||||
* {@link #BUFFER_SIZE_ATTRIBUTE_NAME} request attribute to an integer;
|
||||
* in the absence of such header, the {@link HttpConfiguration#getOutputBufferSize()}
|
||||
* config setting is used, while the maximum aggregation size can be controlled
|
||||
* by setting the {@link #MAX_AGGREGATION_SIZE_ATTRIBUTE_NAME} request attribute to an integer,
|
||||
* in the absence of such header, the {@link HttpConfiguration#getOutputAggregationSize()}
|
||||
* config setting is used.
|
||||
* </p>
|
||||
*/
|
||||
public class BufferedResponseHandler extends ConditionalHandler.Abstract
|
||||
{
|
||||
/**
|
||||
* The name of the request attribute used to control the buffer size of a particular request.
|
||||
*/
|
||||
public static final String BUFFER_SIZE_ATTRIBUTE_NAME = BufferedResponseHandler.class.getName() + ".buffer-size";
|
||||
/**
|
||||
* The name of the request attribute used to control the max aggregation size of a particular request.
|
||||
*/
|
||||
public static final String MAX_AGGREGATION_SIZE_ATTRIBUTE_NAME = BufferedResponseHandler.class.getName() + ".max-aggregation-size";
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(BufferedResponseHandler.class);
|
||||
|
||||
|
@ -159,8 +168,9 @@ public class BufferedResponseHandler extends ConditionalHandler.Abstract
|
|||
private class BufferedResponse extends Response.Wrapper implements Callback
|
||||
{
|
||||
private final Callback _callback;
|
||||
private CountingByteBufferAccumulator _accumulator;
|
||||
private Content.Sink _bufferedContentSink;
|
||||
private boolean _firstWrite = true;
|
||||
private boolean _lastWritten;
|
||||
|
||||
private BufferedResponse(Request request, Response response, Callback callback)
|
||||
{
|
||||
|
@ -173,130 +183,44 @@ public class BufferedResponseHandler extends ConditionalHandler.Abstract
|
|||
{
|
||||
if (_firstWrite)
|
||||
{
|
||||
if (shouldBuffer(this, last))
|
||||
{
|
||||
ConnectionMetaData connectionMetaData = getRequest().getConnectionMetaData();
|
||||
ByteBufferPool bufferPool = connectionMetaData.getConnector().getByteBufferPool();
|
||||
boolean useOutputDirectByteBuffers = connectionMetaData.getHttpConfiguration().isUseOutputDirectByteBuffers();
|
||||
_accumulator = new CountingByteBufferAccumulator(bufferPool, useOutputDirectByteBuffers, getBufferSize());
|
||||
}
|
||||
_firstWrite = false;
|
||||
if (shouldBuffer(this, last))
|
||||
_bufferedContentSink = createBufferedSink();
|
||||
}
|
||||
|
||||
if (_accumulator != null)
|
||||
{
|
||||
ByteBuffer current = byteBuffer != null ? byteBuffer : BufferUtil.EMPTY_BUFFER;
|
||||
IteratingNestedCallback writer = new IteratingNestedCallback(callback)
|
||||
{
|
||||
private boolean complete;
|
||||
|
||||
@Override
|
||||
protected Action process()
|
||||
{
|
||||
if (complete)
|
||||
return Action.SUCCEEDED;
|
||||
boolean write = _accumulator.copyBuffer(current);
|
||||
complete = last && !current.hasRemaining();
|
||||
if (write || complete)
|
||||
{
|
||||
RetainableByteBuffer buffer = _accumulator.takeRetainableByteBuffer();
|
||||
BufferedResponse.super.write(complete, buffer.getByteBuffer(), Callback.from(this, buffer::release));
|
||||
return Action.SCHEDULED;
|
||||
}
|
||||
return Action.SUCCEEDED;
|
||||
}
|
||||
};
|
||||
writer.iterate();
|
||||
}
|
||||
else
|
||||
{
|
||||
super.write(last, byteBuffer, callback);
|
||||
}
|
||||
_lastWritten |= last;
|
||||
Content.Sink destSink = _bufferedContentSink != null ? _bufferedContentSink : getWrapped();
|
||||
destSink.write(last, byteBuffer, callback);
|
||||
}
|
||||
|
||||
private int getBufferSize()
|
||||
private Content.Sink createBufferedSink()
|
||||
{
|
||||
Object attribute = getRequest().getAttribute(BufferedResponseHandler.BUFFER_SIZE_ATTRIBUTE_NAME);
|
||||
return attribute instanceof Integer ? (int)attribute : Integer.MAX_VALUE;
|
||||
Request request = getRequest();
|
||||
ConnectionMetaData connectionMetaData = request.getConnectionMetaData();
|
||||
ByteBufferPool bufferPool = connectionMetaData.getConnector().getByteBufferPool();
|
||||
HttpConfiguration httpConfiguration = connectionMetaData.getHttpConfiguration();
|
||||
Object attribute = request.getAttribute(BufferedResponseHandler.BUFFER_SIZE_ATTRIBUTE_NAME);
|
||||
int bufferSize = attribute instanceof Integer ? (int)attribute : httpConfiguration.getOutputBufferSize();
|
||||
attribute = request.getAttribute(BufferedResponseHandler.MAX_AGGREGATION_SIZE_ATTRIBUTE_NAME);
|
||||
int maxAggregationSize = attribute instanceof Integer ? (int)attribute : httpConfiguration.getOutputAggregationSize();
|
||||
boolean direct = httpConfiguration.isUseOutputDirectByteBuffers();
|
||||
return Content.Sink.asBuffered(getWrapped(), bufferPool, direct, maxAggregationSize, bufferSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
// TODO pass all accumulated buffers as an array instead of allocating & copying into a single one.
|
||||
if (_accumulator != null)
|
||||
{
|
||||
RetainableByteBuffer buffer = _accumulator.takeRetainableByteBuffer();
|
||||
super.write(true, buffer.getByteBuffer(), Callback.from(_callback, () ->
|
||||
{
|
||||
buffer.release();
|
||||
_accumulator.close();
|
||||
}));
|
||||
}
|
||||
if (_bufferedContentSink != null && !_lastWritten)
|
||||
_bufferedContentSink.write(true, null, _callback);
|
||||
else
|
||||
{
|
||||
_callback.succeeded();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
if (_accumulator != null)
|
||||
_accumulator.close();
|
||||
if (_bufferedContentSink != null && !_lastWritten)
|
||||
_bufferedContentSink.write(true, null, Callback.NOOP);
|
||||
_callback.failed(x);
|
||||
}
|
||||
}
|
||||
|
||||
private static class CountingByteBufferAccumulator implements AutoCloseable
|
||||
{
|
||||
private final ByteBufferAccumulator _accumulator;
|
||||
private final int _maxSize;
|
||||
private int _accumulatedCount;
|
||||
|
||||
private CountingByteBufferAccumulator(ByteBufferPool bufferPool, boolean direct, int maxSize)
|
||||
{
|
||||
if (maxSize <= 0)
|
||||
throw new IllegalArgumentException("maxSize must be > 0, was: " + maxSize);
|
||||
_maxSize = maxSize;
|
||||
_accumulator = new ByteBufferAccumulator(bufferPool, direct);
|
||||
}
|
||||
|
||||
private boolean copyBuffer(ByteBuffer buffer)
|
||||
{
|
||||
int remainingCapacity = space();
|
||||
if (buffer.remaining() >= remainingCapacity)
|
||||
{
|
||||
_accumulatedCount += remainingCapacity;
|
||||
int end = buffer.position() + remainingCapacity;
|
||||
_accumulator.copyBuffer(buffer.duplicate().limit(end));
|
||||
buffer.position(end);
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
_accumulatedCount += buffer.remaining();
|
||||
_accumulator.copyBuffer(buffer);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private int space()
|
||||
{
|
||||
return _maxSize - _accumulatedCount;
|
||||
}
|
||||
|
||||
private RetainableByteBuffer takeRetainableByteBuffer()
|
||||
{
|
||||
_accumulatedCount = 0;
|
||||
return _accumulator.takeRetainableByteBuffer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
_accumulatedCount = 0;
|
||||
_accumulator.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -87,11 +87,13 @@ public class BufferedResponseHandlerTest
|
|||
@Test
|
||||
public void testIncluded() throws Exception
|
||||
{
|
||||
_test._bufferSize = 2048;
|
||||
String response = _local.getResponse("GET /ctx/include/path HTTP/1.1\r\nHost: localhost\r\n\r\n");
|
||||
assertThat(response, containsString(" 200 OK"));
|
||||
assertThat(response, containsString("Write: 0"));
|
||||
assertThat(response, containsString("Write: 9"));
|
||||
assertThat(response, containsString("Written: true"));
|
||||
assertThat(response, containsString("Content-Length: "));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -124,6 +126,7 @@ public class BufferedResponseHandlerTest
|
|||
public void testFlushed() throws Exception
|
||||
{
|
||||
_test._flush = true;
|
||||
_test._bufferSize = 2048;
|
||||
String response = _local.getResponse("GET /ctx/include/path HTTP/1.1\r\nHost: localhost\r\n\r\n");
|
||||
assertThat(response, containsString(" 200 OK"));
|
||||
assertThat(response, containsString("Write: 0"));
|
||||
|
@ -134,6 +137,7 @@ public class BufferedResponseHandlerTest
|
|||
@Test
|
||||
public void testBufferSizeSmall() throws Exception
|
||||
{
|
||||
_test._aggregationSize = 16;
|
||||
_test._bufferSize = 16;
|
||||
String response = _local.getResponse("GET /ctx/include/path HTTP/1.1\r\nHost: localhost\r\n\r\n");
|
||||
assertThat(response, containsString(" 200 OK"));
|
||||
|
@ -187,6 +191,7 @@ public class BufferedResponseHandlerTest
|
|||
public void testReset() throws Exception
|
||||
{
|
||||
_test._reset = true;
|
||||
_test._bufferSize = 2048;
|
||||
String response = _local.getResponse("GET /ctx/include/path HTTP/1.1\r\nHost: localhost\r\n\r\n");
|
||||
assertThat(response, containsString(" 200 OK"));
|
||||
assertThat(response, containsString("Write: 0"));
|
||||
|
@ -197,6 +202,7 @@ public class BufferedResponseHandlerTest
|
|||
|
||||
public static class TestHandler extends Handler.Abstract
|
||||
{
|
||||
int _aggregationSize = -1;
|
||||
int _bufferSize = -1;
|
||||
String _mimeType;
|
||||
byte[] _content = new byte[128];
|
||||
|
@ -217,6 +223,8 @@ public class BufferedResponseHandlerTest
|
|||
|
||||
if (_bufferSize > 0)
|
||||
request.setAttribute(BufferedResponseHandler.BUFFER_SIZE_ATTRIBUTE_NAME, _bufferSize);
|
||||
if (_aggregationSize > 0)
|
||||
request.setAttribute(BufferedResponseHandler.MAX_AGGREGATION_SIZE_ATTRIBUTE_NAME, _aggregationSize);
|
||||
if (_mimeType != null)
|
||||
response.getHeaders().put(HttpHeader.CONTENT_TYPE, _mimeType);
|
||||
|
||||
|
@ -231,8 +239,8 @@ public class BufferedResponseHandlerTest
|
|||
outputStream.flush();
|
||||
}
|
||||
response.getHeaders().add("Written", "true");
|
||||
callback.succeeded();
|
||||
}
|
||||
callback.succeeded();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue