Issue #7683 - Use direct buffers for gzip input/output, if configured.
Use direct buffers for gzip input/output, if configured. Code cleanups. Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
parent
c933bac11a
commit
a35719367b
|
@ -39,6 +39,7 @@ public class GZIPContentDecoder implements Destroyable
|
|||
private final List<ByteBuffer> _inflateds = new ArrayList<>();
|
||||
private final ByteBufferPool _pool;
|
||||
private final int _bufferSize;
|
||||
private final boolean _useDirectBuffers;
|
||||
private InflaterPool.Entry _inflaterEntry;
|
||||
private Inflater _inflater;
|
||||
private State _state;
|
||||
|
@ -62,12 +63,23 @@ public class GZIPContentDecoder implements Destroyable
|
|||
this(new InflaterPool(0, true), pool, bufferSize);
|
||||
}
|
||||
|
||||
public GZIPContentDecoder(ByteBufferPool pool, int bufferSize, boolean useDirectBuffers)
|
||||
{
|
||||
this(new InflaterPool(0, true), pool, bufferSize, useDirectBuffers);
|
||||
}
|
||||
|
||||
public GZIPContentDecoder(InflaterPool inflaterPool, ByteBufferPool pool, int bufferSize)
|
||||
{
|
||||
this(inflaterPool, pool, bufferSize, false);
|
||||
}
|
||||
|
||||
public GZIPContentDecoder(InflaterPool inflaterPool, ByteBufferPool pool, int bufferSize, boolean useDirectBuffers)
|
||||
{
|
||||
_inflaterEntry = inflaterPool.acquire();
|
||||
_inflater = _inflaterEntry.get();
|
||||
_bufferSize = bufferSize;
|
||||
_pool = pool;
|
||||
_useDirectBuffers = useDirectBuffers;
|
||||
reset();
|
||||
}
|
||||
|
||||
|
@ -209,6 +221,13 @@ public class GZIPContentDecoder implements Destroyable
|
|||
if (buffer == null)
|
||||
buffer = acquire(_bufferSize);
|
||||
|
||||
if (_inflater.needsInput())
|
||||
{
|
||||
if (!compressed.hasRemaining())
|
||||
return;
|
||||
_inflater.setInput(compressed);
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
int pos = BufferUtil.flipToFill(buffer);
|
||||
|
@ -227,12 +246,6 @@ public class GZIPContentDecoder implements Destroyable
|
|||
if (decodedChunk(chunk))
|
||||
return;
|
||||
}
|
||||
else if (_inflater.needsInput())
|
||||
{
|
||||
if (!compressed.hasRemaining())
|
||||
return;
|
||||
_inflater.setInput(compressed);
|
||||
}
|
||||
else if (_inflater.finished())
|
||||
{
|
||||
_state = State.CRC;
|
||||
|
@ -256,7 +269,7 @@ public class GZIPContentDecoder implements Destroyable
|
|||
{
|
||||
case ID:
|
||||
{
|
||||
_value += (currByte & 0xFF) << 8 * _size;
|
||||
_value += (currByte & 0xFFL) << (8 * _size);
|
||||
++_size;
|
||||
if (_size == 2)
|
||||
{
|
||||
|
@ -303,7 +316,7 @@ public class GZIPContentDecoder implements Destroyable
|
|||
}
|
||||
case EXTRA_LENGTH:
|
||||
{
|
||||
_value += (currByte & 0xFF) << 8 * _size;
|
||||
_value += (currByte & 0xFFL) << (8 * _size);
|
||||
++_size;
|
||||
if (_size == 2)
|
||||
_state = State.EXTRA;
|
||||
|
@ -357,7 +370,7 @@ public class GZIPContentDecoder implements Destroyable
|
|||
}
|
||||
case CRC:
|
||||
{
|
||||
_value += (currByte & 0xFF) << 8 * _size;
|
||||
_value += (currByte & 0xFFL) << (8 * _size);
|
||||
++_size;
|
||||
if (_size == 4)
|
||||
{
|
||||
|
@ -432,7 +445,7 @@ public class GZIPContentDecoder implements Destroyable
|
|||
*/
|
||||
public ByteBuffer acquire(int capacity)
|
||||
{
|
||||
return _pool == null ? BufferUtil.allocate(capacity) : _pool.acquire(capacity, false);
|
||||
return _pool == null ? BufferUtil.allocate(capacity) : _pool.acquire(capacity, _useDirectBuffers);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -41,7 +41,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
|||
public class GZIPContentDecoderTest
|
||||
{
|
||||
private ArrayByteBufferPool pool;
|
||||
private AtomicInteger buffers = new AtomicInteger(0);
|
||||
private final AtomicInteger buffers = new AtomicInteger(0);
|
||||
|
||||
@BeforeEach
|
||||
public void before()
|
||||
|
@ -132,24 +132,26 @@ public class GZIPContentDecoderTest
|
|||
{
|
||||
baos.write(read);
|
||||
}
|
||||
assertEquals(data, new String(baos.toByteArray(), StandardCharsets.UTF_8));
|
||||
assertEquals(data, baos.toString(StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoBlocks() throws Exception
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
public void testNoBlocks(boolean useDirectBuffers) throws Exception
|
||||
{
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
GZIPOutputStream output = new GZIPOutputStream(baos);
|
||||
output.close();
|
||||
byte[] bytes = baos.toByteArray();
|
||||
|
||||
GZIPContentDecoder decoder = new GZIPContentDecoder(pool, 2048);
|
||||
GZIPContentDecoder decoder = new GZIPContentDecoder(pool, 2048, useDirectBuffers);
|
||||
ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes));
|
||||
assertEquals(0, decoded.remaining());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSmallBlock() throws Exception
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
public void testSmallBlock(boolean useDirectBuffers) throws Exception
|
||||
{
|
||||
String data = "0";
|
||||
|
||||
|
@ -159,14 +161,15 @@ public class GZIPContentDecoderTest
|
|||
output.close();
|
||||
byte[] bytes = baos.toByteArray();
|
||||
|
||||
GZIPContentDecoder decoder = new GZIPContentDecoder(pool, 2048);
|
||||
GZIPContentDecoder decoder = new GZIPContentDecoder(pool, 2048, useDirectBuffers);
|
||||
ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes));
|
||||
assertEquals(data, StandardCharsets.UTF_8.decode(decoded).toString());
|
||||
decoder.release(decoded);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSmallBlockWithGZIPChunkedAtBegin() throws Exception
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
public void testSmallBlockWithGZIPChunkedAtBegin(boolean useDirectBuffers) throws Exception
|
||||
{
|
||||
String data = "0";
|
||||
|
||||
|
@ -182,7 +185,7 @@ public class GZIPContentDecoderTest
|
|||
byte[] bytes2 = new byte[bytes.length - bytes1.length];
|
||||
System.arraycopy(bytes, bytes1.length, bytes2, 0, bytes2.length);
|
||||
|
||||
GZIPContentDecoder decoder = new GZIPContentDecoder(pool, 2048);
|
||||
GZIPContentDecoder decoder = new GZIPContentDecoder(pool, 2048, useDirectBuffers);
|
||||
ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes1));
|
||||
assertEquals(0, decoded.capacity());
|
||||
decoded = decoder.decode(ByteBuffer.wrap(bytes2));
|
||||
|
@ -190,8 +193,9 @@ public class GZIPContentDecoderTest
|
|||
decoder.release(decoded);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSmallBlockWithGZIPChunkedAtEnd() throws Exception
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
public void testSmallBlockWithGZIPChunkedAtEnd(boolean useDirectBuffers) throws Exception
|
||||
{
|
||||
String data = "0";
|
||||
|
||||
|
@ -207,7 +211,7 @@ public class GZIPContentDecoderTest
|
|||
byte[] bytes2 = new byte[bytes.length - bytes1.length];
|
||||
System.arraycopy(bytes, bytes1.length, bytes2, 0, bytes2.length);
|
||||
|
||||
GZIPContentDecoder decoder = new GZIPContentDecoder(pool, 2048);
|
||||
GZIPContentDecoder decoder = new GZIPContentDecoder(pool, 2048, useDirectBuffers);
|
||||
ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes1));
|
||||
assertEquals(data, StandardCharsets.UTF_8.decode(decoded).toString());
|
||||
assertFalse(decoder.isFinished());
|
||||
|
@ -218,8 +222,9 @@ public class GZIPContentDecoderTest
|
|||
decoder.release(decoded);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSmallBlockWithGZIPTrailerChunked() throws Exception
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
public void testSmallBlockWithGZIPTrailerChunked(boolean useDirectBuffers) throws Exception
|
||||
{
|
||||
String data = "0";
|
||||
|
||||
|
@ -235,7 +240,7 @@ public class GZIPContentDecoderTest
|
|||
byte[] bytes2 = new byte[bytes.length - bytes1.length];
|
||||
System.arraycopy(bytes, bytes1.length, bytes2, 0, bytes2.length);
|
||||
|
||||
GZIPContentDecoder decoder = new GZIPContentDecoder(pool, 2048);
|
||||
GZIPContentDecoder decoder = new GZIPContentDecoder(pool, 2048, useDirectBuffers);
|
||||
ByteBuffer decoded = decoder.decode(ByteBuffer.wrap(bytes1));
|
||||
assertEquals(0, decoded.capacity());
|
||||
decoder.release(decoded);
|
||||
|
@ -244,8 +249,9 @@ public class GZIPContentDecoderTest
|
|||
decoder.release(decoded);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTwoSmallBlocks() throws Exception
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
public void testTwoSmallBlocks(boolean useDirectBuffers) throws Exception
|
||||
{
|
||||
String data1 = "0";
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
|
@ -265,7 +271,7 @@ public class GZIPContentDecoderTest
|
|||
System.arraycopy(bytes1, 0, bytes, 0, bytes1.length);
|
||||
System.arraycopy(bytes2, 0, bytes, bytes1.length, bytes2.length);
|
||||
|
||||
GZIPContentDecoder decoder = new GZIPContentDecoder(pool, 2048);
|
||||
GZIPContentDecoder decoder = new GZIPContentDecoder(pool, 2048, useDirectBuffers);
|
||||
ByteBuffer buffer = ByteBuffer.wrap(bytes);
|
||||
ByteBuffer decoded = decoder.decode(buffer);
|
||||
assertEquals(data1, StandardCharsets.UTF_8.decode(decoded).toString());
|
||||
|
@ -279,8 +285,9 @@ public class GZIPContentDecoderTest
|
|||
decoder.release(decoded);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBigBlock() throws Exception
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
public void testBigBlock(boolean useDirectBuffers) throws Exception
|
||||
{
|
||||
String data = "0123456789ABCDEF";
|
||||
for (int i = 0; i < 10; ++i)
|
||||
|
@ -294,7 +301,7 @@ public class GZIPContentDecoderTest
|
|||
byte[] bytes = baos.toByteArray();
|
||||
|
||||
String result = "";
|
||||
GZIPContentDecoder decoder = new GZIPContentDecoder(pool, 2048);
|
||||
GZIPContentDecoder decoder = new GZIPContentDecoder(pool, 2048, useDirectBuffers);
|
||||
ByteBuffer buffer = ByteBuffer.wrap(bytes);
|
||||
while (buffer.hasRemaining())
|
||||
{
|
||||
|
@ -374,11 +381,8 @@ public class GZIPContentDecoderTest
|
|||
// Signed Integer Max
|
||||
static final long INT_MAX = Integer.MAX_VALUE;
|
||||
|
||||
// Unsigned Integer Max == 2^32
|
||||
static final long UINT_MAX = 0xFFFFFFFFL;
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(longs = {INT_MAX, INT_MAX + 1 /* TODO too slow , UINT_MAX, UINT_MAX + 1 */ })
|
||||
@ValueSource(longs = {INT_MAX, INT_MAX + 1})
|
||||
public void testLargeGzipStream(long origSize) throws IOException
|
||||
{
|
||||
// Size chosen for trade off between speed of I/O vs speed of Gzip
|
||||
|
|
|
@ -1154,6 +1154,11 @@ public abstract class HttpChannel implements Runnable, HttpOutput.Interceptor
|
|||
return _connector.getScheduler();
|
||||
}
|
||||
|
||||
public boolean isUseInputDirectByteBuffers()
|
||||
{
|
||||
return getHttpConfiguration().isUseInputDirectByteBuffers();
|
||||
}
|
||||
|
||||
public boolean isUseOutputDirectByteBuffers()
|
||||
{
|
||||
return getHttpConfiguration().isUseOutputDirectByteBuffers();
|
||||
|
|
|
@ -590,7 +590,10 @@ public class GzipHandler extends HandlerWrapper implements GzipFactory
|
|||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} inflate {}", this, request);
|
||||
baseRequest.getHttpInput().addInterceptor(new GzipHttpInputInterceptor(_inflaterPool, baseRequest.getHttpChannel().getByteBufferPool(), _inflateBufferSize));
|
||||
GzipHttpInputInterceptor gzipHttpInputInterceptor =
|
||||
new GzipHttpInputInterceptor(_inflaterPool, baseRequest.getHttpChannel().getByteBufferPool(),
|
||||
_inflateBufferSize, baseRequest.getHttpChannel().isUseInputDirectByteBuffers());
|
||||
baseRequest.getHttpInput().addInterceptor(gzipHttpInputInterceptor);
|
||||
}
|
||||
|
||||
// Are we already being gzipped?
|
||||
|
|
|
@ -32,7 +32,12 @@ public class GzipHttpInputInterceptor implements HttpInput.Interceptor, Destroya
|
|||
|
||||
public GzipHttpInputInterceptor(InflaterPool inflaterPool, ByteBufferPool pool, int bufferSize)
|
||||
{
|
||||
_decoder = new Decoder(inflaterPool, pool, bufferSize);
|
||||
this(inflaterPool, pool, bufferSize, false);
|
||||
}
|
||||
|
||||
public GzipHttpInputInterceptor(InflaterPool inflaterPool, ByteBufferPool pool, int bufferSize, boolean useDirectBuffers)
|
||||
{
|
||||
_decoder = new Decoder(inflaterPool, pool, bufferSize, useDirectBuffers);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -71,9 +76,9 @@ public class GzipHttpInputInterceptor implements HttpInput.Interceptor, Destroya
|
|||
|
||||
private class Decoder extends GZIPContentDecoder
|
||||
{
|
||||
private Decoder(InflaterPool inflaterPool, ByteBufferPool bufferPool, int bufferSize)
|
||||
private Decoder(InflaterPool inflaterPool, ByteBufferPool bufferPool, int bufferSize, boolean useDirectBuffers)
|
||||
{
|
||||
super(inflaterPool, bufferPool, bufferSize);
|
||||
super(inflaterPool, bufferPool, bufferSize, useDirectBuffers);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -257,15 +257,22 @@ public class GzipHttpOutputInterceptor implements HttpOutput.Interceptor
|
|||
|
||||
private class GzipBufferCB extends IteratingNestedCallback
|
||||
{
|
||||
private ByteBuffer _copy;
|
||||
private final ByteBuffer _content;
|
||||
private final boolean _last;
|
||||
|
||||
public GzipBufferCB(ByteBuffer content, boolean complete, Callback callback)
|
||||
{
|
||||
super(callback);
|
||||
|
||||
_content = content;
|
||||
_last = complete;
|
||||
|
||||
_crc.update(_content.slice());
|
||||
|
||||
Deflater deflater = _deflaterEntry.get();
|
||||
deflater.setInput(_content);
|
||||
if (_last)
|
||||
deflater.finish();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -293,11 +300,6 @@ public class GzipHttpOutputInterceptor implements HttpOutput.Interceptor
|
|||
_channel.getByteBufferPool().release(_buffer);
|
||||
_buffer = null;
|
||||
}
|
||||
if (_copy != null)
|
||||
{
|
||||
_channel.getByteBufferPool().release(_copy);
|
||||
_copy = null;
|
||||
}
|
||||
return Action.SUCCEEDED;
|
||||
}
|
||||
|
||||
|
@ -305,7 +307,7 @@ public class GzipHttpOutputInterceptor implements HttpOutput.Interceptor
|
|||
if (_buffer == null)
|
||||
{
|
||||
// allocate a buffer and add the gzip header
|
||||
_buffer = _channel.getByteBufferPool().acquire(_bufferSize, false);
|
||||
_buffer = _channel.getByteBufferPool().acquire(_bufferSize, _channel.isUseOutputDirectByteBuffers());
|
||||
BufferUtil.fill(_buffer, GZIP_HEADER, 0, GZIP_HEADER.length);
|
||||
}
|
||||
else
|
||||
|
@ -318,54 +320,12 @@ public class GzipHttpOutputInterceptor implements HttpOutput.Interceptor
|
|||
Deflater deflater = _deflaterEntry.get();
|
||||
if (!deflater.finished())
|
||||
{
|
||||
if (deflater.needsInput())
|
||||
{
|
||||
// if there is no more content available to compress
|
||||
// then we are either finished all content or just the current write.
|
||||
if (BufferUtil.isEmpty(_content))
|
||||
{
|
||||
if (_last)
|
||||
deflater.finish();
|
||||
else
|
||||
return Action.SUCCEEDED;
|
||||
}
|
||||
else
|
||||
{
|
||||
// If there is more content available to compress, we have to make sure
|
||||
// it is available in an array for the current deflator API, maybe slicing
|
||||
// of content.
|
||||
ByteBuffer slice;
|
||||
if (_content.hasArray())
|
||||
slice = _content;
|
||||
else
|
||||
{
|
||||
if (_copy == null)
|
||||
_copy = _channel.getByteBufferPool().acquire(_bufferSize, false);
|
||||
else
|
||||
BufferUtil.clear(_copy);
|
||||
slice = _copy;
|
||||
BufferUtil.append(_copy, _content);
|
||||
}
|
||||
if (deflater.needsInput() && !_last)
|
||||
return Action.SUCCEEDED;
|
||||
|
||||
// transfer the data from the slice to the the deflator
|
||||
byte[] array = slice.array();
|
||||
int off = slice.arrayOffset() + slice.position();
|
||||
int len = slice.remaining();
|
||||
_crc.update(array, off, len);
|
||||
// Ideally we would want to use the ByteBuffer API for Deflaters. However due the the ByteBuffer implementation
|
||||
// of the CRC32.update() it is less efficient for us to use this rather than to convert to array ourselves.
|
||||
_deflaterEntry.get().setInput(array, off, len);
|
||||
slice.position(slice.position() + len);
|
||||
if (_last && BufferUtil.isEmpty(_content))
|
||||
deflater.finish();
|
||||
}
|
||||
}
|
||||
|
||||
// deflate the content into the available space in the buffer
|
||||
int off = _buffer.arrayOffset() + _buffer.limit();
|
||||
int len = BufferUtil.space(_buffer);
|
||||
int produced = deflater.deflate(_buffer.array(), off, len, _syncFlush ? Deflater.SYNC_FLUSH : Deflater.NO_FLUSH);
|
||||
_buffer.limit(_buffer.limit() + produced);
|
||||
int pos = BufferUtil.flipToFill(_buffer);
|
||||
deflater.deflate(_buffer, _syncFlush ? Deflater.SYNC_FLUSH : Deflater.NO_FLUSH);
|
||||
BufferUtil.flipToFlush(_buffer, pos);
|
||||
}
|
||||
|
||||
// If we have finished deflation and there is room for the trailer.
|
||||
|
@ -386,11 +346,10 @@ public class GzipHttpOutputInterceptor implements HttpOutput.Interceptor
|
|||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return String.format("%s[content=%s last=%b copy=%s buffer=%s deflate=%s %s]",
|
||||
return String.format("%s[content=%s last=%b buffer=%s deflate=%s %s]",
|
||||
super.toString(),
|
||||
BufferUtil.toDetailString(_content),
|
||||
_last,
|
||||
BufferUtil.toDetailString(_copy),
|
||||
BufferUtil.toDetailString(_buffer),
|
||||
_deflaterEntry,
|
||||
_deflaterEntry != null && _deflaterEntry.get().finished() ? "(finished)" : "");
|
||||
|
|
Loading…
Reference in New Issue