Issue #5499 - add tests for ByteBufferAccumulator

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2020-11-17 13:59:49 +11:00
parent 5788fe609d
commit 7c46d96fce
3 changed files with 338 additions and 78 deletions

View File

@ -161,7 +161,9 @@ public class ByteBufferAccumulator implements AutoCloseable
return new byte[0];
byte[] bytes = new byte[length];
writeTo(BufferUtil.toBuffer(bytes));
ByteBuffer buffer = BufferUtil.toBuffer(bytes);
BufferUtil.clear(buffer);
writeTo(buffer);
return bytes;
}
@ -170,7 +172,7 @@ public class ByteBufferAccumulator implements AutoCloseable
int pos = BufferUtil.flipToFill(buffer);
for (ByteBuffer bb : _buffers)
{
buffer.put(bb);
buffer.put(bb.slice());
}
BufferUtil.flipToFlush(buffer, pos);
}
@ -179,7 +181,7 @@ public class ByteBufferAccumulator implements AutoCloseable
{
for (ByteBuffer bb : _buffers)
{
BufferUtil.writeTo(bb, out);
BufferUtil.writeTo(bb.slice(), out);
}
}

View File

@ -0,0 +1,333 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.io;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.util.BufferUtil;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class ByteBufferAccumulatorTest
{
private CountingBufferPool byteBufferPool;
private ByteBufferAccumulator accumulator;
@BeforeEach
public void before()
{
byteBufferPool = new CountingBufferPool();
accumulator = new ByteBufferAccumulator(byteBufferPool);
}
@Test
public void testToBuffer()
{
int size = 1024 * 1024;
int allocationSize = 1024;
ByteBuffer content = randomBytes(size);
ByteBuffer slice = content.slice();
// We completely fill up the internal buffer with the first write.
ByteBuffer internalBuffer = accumulator.ensureBuffer(1, allocationSize);
assertThat(BufferUtil.space(internalBuffer), greaterThanOrEqualTo(allocationSize));
writeInFlushMode(slice, internalBuffer);
assertThat(BufferUtil.space(internalBuffer), is(0));
// If we ask for min size of 0 we get the same buffer which is full.
internalBuffer = accumulator.ensureBuffer(0, allocationSize);
assertThat(BufferUtil.space(internalBuffer), is(0));
// If we need at least 1 minSpace we must allocate a new buffer.
internalBuffer = accumulator.ensureBuffer(1, allocationSize);
assertThat(BufferUtil.space(internalBuffer), greaterThan(0));
assertThat(BufferUtil.space(internalBuffer), greaterThanOrEqualTo(allocationSize));
// Write 13 bytes from the end of the internal buffer.
int bytesToWrite = BufferUtil.space(internalBuffer) - 13;
ByteBuffer buffer = BufferUtil.toBuffer(new byte[bytesToWrite]);
BufferUtil.clear(buffer);
assertThat(writeInFlushMode(slice, buffer), is(bytesToWrite));
assertThat(writeInFlushMode(buffer, internalBuffer), is(bytesToWrite));
assertThat(BufferUtil.space(internalBuffer), is(13));
// If we request anything under the amount remaining we get back the same buffer.
for (int i = 0; i <= 13; i++)
{
internalBuffer = accumulator.ensureBuffer(i, allocationSize);
assertThat(BufferUtil.space(internalBuffer), is(13));
}
// If we request over 13 then we get a new buffer.
internalBuffer = accumulator.ensureBuffer(14, allocationSize);
assertThat(BufferUtil.space(internalBuffer), greaterThanOrEqualTo(1024));
// Copy the rest of the content.
while (slice.hasRemaining())
{
internalBuffer = accumulator.ensureBuffer(1, allocationSize);
assertThat(BufferUtil.space(internalBuffer), greaterThanOrEqualTo(1));
writeInFlushMode(slice, internalBuffer);
}
// Check we have the same content as the original buffer.
assertThat(accumulator.getLength(), is(size));
assertThat(byteBufferPool.getLeasedBuffers(), greaterThan(1L));
ByteBuffer combinedBuffer = accumulator.toByteBuffer();
assertThat(byteBufferPool.getLeasedBuffers(), is(1L));
assertThat(accumulator.getLength(), is(size));
assertThat(combinedBuffer, is(content));
// Close the accumulator and make sure all is returned to bufferPool.
accumulator.close();
byteBufferPool.verifyClosed();
}
@Test
public void testTakeBuffer()
{
int size = 1024 * 1024;
int allocationSize = 1024;
ByteBuffer content = randomBytes(size);
ByteBuffer slice = content.slice();
// Copy the content.
while (slice.hasRemaining())
{
ByteBuffer internalBuffer = accumulator.ensureBuffer(1, allocationSize);
assertThat(BufferUtil.space(internalBuffer), greaterThanOrEqualTo(1));
writeInFlushMode(slice, internalBuffer);
}
// Check we have the same content as the original buffer.
assertThat(accumulator.getLength(), is(size));
assertThat(byteBufferPool.getLeasedBuffers(), greaterThan(1L));
ByteBuffer combinedBuffer = accumulator.takeByteBuffer();
assertThat(byteBufferPool.getLeasedBuffers(), is(1L));
assertThat(accumulator.getLength(), is(0));
accumulator.close();
assertThat(byteBufferPool.getLeasedBuffers(), is(1L));
assertThat(combinedBuffer, is(content));
// Return the buffer and make sure all is returned to bufferPool.
byteBufferPool.release(combinedBuffer);
byteBufferPool.verifyClosed();
}
@Test
public void testToByteArray()
{
int size = 1024 * 1024;
int allocationSize = 1024;
ByteBuffer content = randomBytes(size);
ByteBuffer slice = content.slice();
// Copy the content.
while (slice.hasRemaining())
{
ByteBuffer internalBuffer = accumulator.ensureBuffer(1, allocationSize);
writeInFlushMode(slice, internalBuffer);
}
// Check we have the same content as the original buffer.
assertThat(accumulator.getLength(), is(size));
assertThat(byteBufferPool.getLeasedBuffers(), greaterThan(1L));
byte[] combinedBuffer = accumulator.toByteArray();
assertThat(byteBufferPool.getLeasedBuffers(), greaterThan(1L));
assertThat(accumulator.getLength(), is(size));
assertThat(BufferUtil.toBuffer(combinedBuffer), is(content));
// Close the accumulator and make sure all is returned to bufferPool.
accumulator.close();
byteBufferPool.verifyClosed();
}
@Test
public void testEmptyToBuffer()
{
ByteBuffer combinedBuffer = accumulator.toByteBuffer();
assertThat(combinedBuffer.remaining(), is(0));
assertThat(byteBufferPool.getLeasedBuffers(), is(1L));
accumulator.close();
byteBufferPool.verifyClosed();
}
@Test
public void testEmptyTakeBuffer()
{
ByteBuffer combinedBuffer = accumulator.takeByteBuffer();
assertThat(combinedBuffer.remaining(), is(0));
accumulator.close();
assertThat(byteBufferPool.getLeasedBuffers(), is(1L));
byteBufferPool.release(combinedBuffer);
byteBufferPool.verifyClosed();
}
@Test
public void testWriteTo()
{
int size = 1024 * 1024;
int allocationSize = 1024;
ByteBuffer content = randomBytes(size);
ByteBuffer slice = content.slice();
// Copy the content.
while (slice.hasRemaining())
{
ByteBuffer internalBuffer = accumulator.ensureBuffer(1, allocationSize);
writeInFlushMode(slice, internalBuffer);
}
// Check we have the same content as the original buffer.
assertThat(byteBufferPool.getLeasedBuffers(), greaterThan(1L));
ByteBuffer combinedBuffer = byteBufferPool.acquire(accumulator.getLength(), false);
accumulator.writeTo(combinedBuffer);
assertThat(accumulator.getLength(), is(size));
assertThat(combinedBuffer, is(content));
byteBufferPool.release(combinedBuffer);
// Close the accumulator and make sure all is returned to bufferPool.
accumulator.close();
byteBufferPool.verifyClosed();
}
@Test
public void testWriteToBufferTooSmall()
{
int size = 1024 * 1024;
int allocationSize = 1024;
ByteBuffer content = randomBytes(size);
ByteBuffer slice = content.slice();
// Copy the content.
while (slice.hasRemaining())
{
ByteBuffer internalBuffer = accumulator.ensureBuffer(1, allocationSize);
writeInFlushMode(slice, internalBuffer);
}
// Writing to a buffer too small gives buffer overflow.
assertThat(byteBufferPool.getLeasedBuffers(), greaterThan(1L));
ByteBuffer combinedBuffer = BufferUtil.toBuffer(new byte[accumulator.getLength() - 1]);
BufferUtil.clear(combinedBuffer);
assertThrows(BufferOverflowException.class, () -> accumulator.writeTo(combinedBuffer));
// Close the accumulator and make sure all is returned to bufferPool.
accumulator.close();
byteBufferPool.verifyClosed();
}
@Test
public void testCopy()
{
int size = 1024 * 1024;
ByteBuffer content = randomBytes(size);
ByteBuffer slice = content.slice();
// Copy the content.
int tmpBufferSize = 1024;
ByteBuffer tmpBuffer = BufferUtil.toBuffer(new byte[tmpBufferSize]);
BufferUtil.clear(tmpBuffer);
while (slice.hasRemaining())
{
writeInFlushMode(slice, tmpBuffer);
accumulator.copyBuffer(tmpBuffer);
}
// Check we have the same content as the original buffer.
assertThat(byteBufferPool.getLeasedBuffers(), greaterThan(1L));
ByteBuffer combinedBuffer = byteBufferPool.acquire(accumulator.getLength(), false);
accumulator.writeTo(combinedBuffer);
assertThat(accumulator.getLength(), is(size));
assertThat(combinedBuffer, is(content));
byteBufferPool.release(combinedBuffer);
// Close the accumulator and make sure all is returned to bufferPool.
accumulator.close();
byteBufferPool.verifyClosed();
}
private ByteBuffer randomBytes(int size)
{
byte[] data = new byte[size];
new Random().nextBytes(data);
return BufferUtil.toBuffer(data);
}
private int writeInFlushMode(ByteBuffer from, ByteBuffer to)
{
int pos = BufferUtil.flipToFill(to);
int written = BufferUtil.put(from, to);
BufferUtil.flipToFlush(to, pos);
return written;
}
public static class CountingBufferPool extends LeakTrackingByteBufferPool
{
private final AtomicLong _leasedBuffers = new AtomicLong(0);
public CountingBufferPool()
{
this(new MappedByteBufferPool());
}
public CountingBufferPool(ByteBufferPool delegate)
{
super(delegate);
}
@Override
public ByteBuffer acquire(int size, boolean direct)
{
_leasedBuffers.incrementAndGet();
return super.acquire(size, direct);
}
@Override
public void release(ByteBuffer buffer)
{
if (buffer != null)
_leasedBuffers.decrementAndGet();
super.release(buffer);
}
public long getLeasedBuffers()
{
return _leasedBuffers.get();
}
public void verifyClosed()
{
assertThat(_leasedBuffers.get(), is(0L));
assertThat(getLeakedAcquires(), is(0L));
assertThat(getLeakedReleases(), is(0L));
assertThat(getLeakedResources(), is(0L));
assertThat(getLeakedRemoves(), is(0L));
}
}
}

View File

@ -91,79 +91,4 @@ public class ByteAccumulatorTest
MessageTooLargeException e = assertThrows(MessageTooLargeException.class, () -> accumulator.copyChunk(world, 0, world.length));
assertThat(e.getMessage(), containsString("too large for configured max"));
}
/*
@Test
public void testRecycle()
{
ByteAccumulator accumulator = new ByteAccumulator(10_000, new ArrayByteBufferPool());
ByteBuffer out0 = ByteBuffer.allocate(200);
ByteBuffer out1 = ByteBuffer.allocate(200);
{
// 1
ByteBuffer buf = accumulator.newByteBuffer(10);
byte[] hello = "Hello".getBytes(UTF_8);
buf.put(hello).flip();
accumulator.copyChunk(buf);
// 2
buf = accumulator.newByteBuffer(10);
byte[] space = " ".getBytes(UTF_8);
buf.put(space).flip();
accumulator.copyChunk(buf);
// 3
buf = accumulator.newByteBuffer(10);
byte[] world = "World".getBytes(UTF_8);
buf.put(world).flip();
accumulator.copyChunk(buf);
assertThat("Length", accumulator.getLength(), is(hello.length + space.length + world.length));
accumulator.transferTo(out0);
// reuse that byte[]
accumulator.recycle();
}
{
// 1
ByteBuffer buf = accumulator.newByteBuffer(10);
byte[] olleh = "olleH".getBytes(UTF_8);
buf.put(olleh).flip();
accumulator.copyChunk(buf);
// 2
buf = accumulator.newByteBuffer(10);
byte[] space = " ".getBytes(UTF_8);
buf.put(space).flip();
accumulator.copyChunk(buf);
// 3
buf = accumulator.newByteBuffer(10);
byte[] dlrow = "dlroW".getBytes(UTF_8);
buf.put(dlrow).flip();
accumulator.copyChunk(buf);
// 4
buf = accumulator.newByteBuffer(10);
byte[] done = " enoD".getBytes(UTF_8);
buf.put(done).flip();
accumulator.copyChunk(buf);
assertThat("Length", accumulator.getLength(), is(olleh.length + space.length + dlrow.length + done.length));
accumulator.transferTo(out1);
// reuse that byte[]
accumulator.recycle();
}
String result0 = BufferUtil.toUTF8String(out0);
assertThat("result", result0, is("Hello World"));
String result1 = BufferUtil.toUTF8String(out1);
assertThat("result", result1, is("olleH dlroW enoD"));
}
*/
}