427700 - Outgoing extensions that create multiple frames should flush

them in order and atomically.

Optimized CompressExtension to reduce data copying at minimum on the
write path.
This commit is contained in:
Simone Bordet 2014-02-15 17:53:37 +01:00
parent a8e4caca90
commit d2f1954fa0
1 changed files with 33 additions and 22 deletions

View File

@ -110,7 +110,7 @@ public abstract class CompressExtension extends AbstractExtension
// check the remaining bytes.
while (decompressor.getRemaining() > 0 && !decompressor.finished())
{
byte[] output = new byte[Math.min(input.length * 2, 64 * 1024)];
byte[] output = new byte[Math.min(input.length * 2, 32 * 1024)];
int decompressed = decompressor.inflate(output);
if (decompressed == 0)
{
@ -183,7 +183,6 @@ public abstract class CompressExtension extends AbstractExtension
private class Flusher extends IteratingCallback implements WriteCallback
{
private FrameEntry current;
private int inputLength = 64 * 1024;
private ByteBuffer payload;
private boolean finished = true;
@ -231,23 +230,39 @@ public abstract class CompressExtension extends AbstractExtension
// the heap if the payload is a huge mapped file.
ByteBuffer data = frame.getPayload();
int remaining = data.remaining();
byte[] input = new byte[Math.min(remaining, inputLength)];
int length = Math.min(remaining, input.length);
LOG.debug("Compressing {}: {} bytes in {} bytes chunk", frame, remaining, length);
finished = length == remaining;
data.get(input, 0, length);
int inputLength = Math.min(remaining, 32 * 1024);
LOG.debug("Compressing {}: {} bytes in {} bytes chunk", frame, remaining, inputLength);
compressor.setInput(input, 0, length);
// Avoid to copy the bytes if the ByteBuffer
// is backed by an array.
int inputOffset;
byte[] input;
if (data.hasArray())
{
input = data.array();
int position = data.position();
inputOffset = position + data.arrayOffset();
data.position(position + inputLength);
}
else
{
input = new byte[inputLength];
inputOffset = 0;
data.get(input, 0, inputLength);
}
finished = inputLength == remaining;
compressor.setInput(input, inputOffset, inputLength);
// Use an additional space in case the content is not compressible.
byte[] output = new byte[length + 64];
int offset = 0;
int total = 0;
byte[] output = new byte[inputLength + 64];
int outputOffset = 0;
int outputLength = 0;
while (true)
{
int space = output.length - offset;
int compressed = compressor.deflate(output, offset, space, Deflater.SYNC_FLUSH);
total += compressed;
int space = output.length - outputOffset;
int compressed = compressor.deflate(output, outputOffset, space, Deflater.SYNC_FLUSH);
outputLength += compressed;
if (compressed < space)
{
// Everything was compressed.
@ -258,16 +273,14 @@ public abstract class CompressExtension extends AbstractExtension
// The compressed output is bigger than the uncompressed input.
byte[] newOutput = new byte[output.length * 2];
System.arraycopy(output, 0, newOutput, 0, output.length);
offset += output.length;
outputOffset += output.length;
output = newOutput;
}
}
payload = getBufferPool().acquire(total, true);
BufferUtil.flipToFill(payload);
// Skip the last tail bytes bytes generated by SYNC_FLUSH
payload.put(output, 0, total - TAIL_BYTES.length).flip();
LOG.debug("Compressed {}: {}->{} chunk bytes", frame, length, total);
// Skip the last tail bytes bytes generated by SYNC_FLUSH.
payload = ByteBuffer.wrap(output, 0, outputLength - TAIL_BYTES.length);
LOG.debug("Compressed {}: {}->{} chunk bytes", frame, inputLength, outputLength);
boolean continuation = frame.getType().isContinuation() || !first;
DataFrame chunk = new DataFrame(frame, continuation);
@ -288,7 +301,6 @@ public abstract class CompressExtension extends AbstractExtension
@Override
public void writeSuccess()
{
getBufferPool().release(payload);
if (finished)
notifyCallbackSuccess(current.callback);
succeeded();
@ -297,7 +309,6 @@ public abstract class CompressExtension extends AbstractExtension
@Override
public void writeFailed(Throwable x)
{
getBufferPool().release(payload);
notifyCallbackFailure(current.callback, x);
// If something went wrong, very likely the compression context
// will be invalid, so we need to fail this IteratingCallback.