474453 - Tiny buffers (under 7 bytes) fail to compress in permessage-deflate
+ Ensure compress() is sanely using Deflater.deflate() + Ensure output buffer in .deflate() is always a minimum of 256 bytes
This commit is contained in:
parent
ffcedde60a
commit
c424b58153
|
@ -41,7 +41,7 @@ import org.eclipse.jetty.websocket.common.frames.DataFrame;
|
|||
|
||||
public abstract class CompressExtension extends AbstractExtension
|
||||
{
|
||||
protected static final byte[] TAIL_BYTES = new byte[] { 0x00, 0x00, (byte)0xFF, (byte)0xFF};
|
||||
protected static final byte[] TAIL_BYTES = new byte[] { 0x00, 0x00, (byte)0xFF, (byte)0xFF };
|
||||
protected static final ByteBuffer TAIL_BYTES_BUF = ByteBuffer.wrap(TAIL_BYTES);
|
||||
private static final Logger LOG = Log.getLogger(CompressExtension.class);
|
||||
|
||||
|
@ -63,10 +63,12 @@ public abstract class CompressExtension extends AbstractExtension
|
|||
|
||||
/** Inflater / Decompressed Buffer Size */
|
||||
protected static final int INFLATE_BUFFER_SIZE = 8 * 1024;
|
||||
|
||||
/** Deflater / Inflater: Maximum Input Buffer Size */
|
||||
protected static final int INPUT_MAX_BUFFER_SIZE = 8 * 1024;
|
||||
|
||||
private final static boolean NOWRAP = true;
|
||||
|
||||
|
||||
private final Queue<FrameEntry> entries = new ConcurrentArrayQueue<>();
|
||||
private final IteratingCallback flusher = new Flusher();
|
||||
private final Deflater deflater;
|
||||
|
@ -135,15 +137,14 @@ public abstract class CompressExtension extends AbstractExtension
|
|||
getBufferPool().release(buffer);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
protected ByteAccumulator newByteAccumulator()
|
||||
{
|
||||
int maxSize = Math.max(getPolicy().getMaxTextMessageSize(),getPolicy().getMaxBinaryMessageBufferSize());
|
||||
return new ByteAccumulator(maxSize);
|
||||
}
|
||||
|
||||
protected void decompress(ByteAccumulator accumulator, ByteBuffer buf)
|
||||
throws DataFormatException
|
||||
|
||||
protected void decompress(ByteAccumulator accumulator, ByteBuffer buf) throws DataFormatException
|
||||
{
|
||||
if ((buf == null) || (!buf.hasRemaining()))
|
||||
{
|
||||
|
@ -151,7 +152,7 @@ public abstract class CompressExtension extends AbstractExtension
|
|||
}
|
||||
byte[] output = new byte[1024]; // TODO: make configurable size
|
||||
|
||||
if (inflater.needsInput() && !supplyInput(inflater, buf))
|
||||
if (inflater.needsInput() && !supplyInput(inflater,buf))
|
||||
{
|
||||
LOG.debug("Needed input, but no buffer could supply input");
|
||||
return;
|
||||
|
@ -172,17 +173,17 @@ public abstract class CompressExtension extends AbstractExtension
|
|||
{
|
||||
LOG.debug("Decompressed {} bytes: {}",read,toDetail(inflater));
|
||||
}
|
||||
|
||||
|
||||
accumulator.copyChunk(output,0,read);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("Decompress: exiting {}",toDetail(inflater));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
|
||||
{
|
||||
|
@ -245,7 +246,7 @@ public abstract class CompressExtension extends AbstractExtension
|
|||
byte input[];
|
||||
int inputOffset = 0;
|
||||
int len;
|
||||
|
||||
|
||||
if (buf.hasArray())
|
||||
{
|
||||
// no need to create a new byte buffer, just return this one.
|
||||
|
@ -257,12 +258,12 @@ public abstract class CompressExtension extends AbstractExtension
|
|||
else
|
||||
{
|
||||
// Only create an return byte buffer that is reasonable in size
|
||||
len = Math.min(INPUT_MAX_BUFFER_SIZE, buf.remaining());
|
||||
len = Math.min(INPUT_MAX_BUFFER_SIZE,buf.remaining());
|
||||
input = new byte[len];
|
||||
inputOffset = 0;
|
||||
buf.get(input,0,len);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
inflater.setInput(input,inputOffset,len);
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
|
@ -270,7 +271,7 @@ public abstract class CompressExtension extends AbstractExtension
|
|||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
private static boolean supplyInput(Deflater deflater, ByteBuffer buf)
|
||||
{
|
||||
if (buf.remaining() <= 0)
|
||||
|
@ -285,7 +286,7 @@ public abstract class CompressExtension extends AbstractExtension
|
|||
byte input[];
|
||||
int inputOffset = 0;
|
||||
int len;
|
||||
|
||||
|
||||
if (buf.hasArray())
|
||||
{
|
||||
// no need to create a new byte buffer, just return this one.
|
||||
|
@ -297,12 +298,12 @@ public abstract class CompressExtension extends AbstractExtension
|
|||
else
|
||||
{
|
||||
// Only create an return byte buffer that is reasonable in size
|
||||
len = Math.min(INPUT_MAX_BUFFER_SIZE, buf.remaining());
|
||||
len = Math.min(INPUT_MAX_BUFFER_SIZE,buf.remaining());
|
||||
input = new byte[len];
|
||||
inputOffset = 0;
|
||||
buf.get(input,0,len);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
deflater.setInput(input,inputOffset,len);
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
|
@ -310,7 +311,7 @@ public abstract class CompressExtension extends AbstractExtension
|
|||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
private static String toDetail(Inflater inflater)
|
||||
{
|
||||
return String.format("Inflater[finished=%b,read=%d,written=%d,remaining=%d,in=%d,out=%d]",inflater.finished(),inflater.getBytesRead(),
|
||||
|
@ -319,8 +320,8 @@ public abstract class CompressExtension extends AbstractExtension
|
|||
|
||||
private static String toDetail(Deflater deflater)
|
||||
{
|
||||
return String.format("Deflater[finished=%b,read=%d,written=%d,in=%d,out=%d]",deflater.finished(),deflater.getBytesRead(),
|
||||
deflater.getBytesWritten(),deflater.getTotalIn(),deflater.getTotalOut());
|
||||
return String.format("Deflater[finished=%b,read=%d,written=%d,in=%d,out=%d]",deflater.finished(),deflater.getBytesRead(),deflater.getBytesWritten(),
|
||||
deflater.getTotalIn(),deflater.getTotalOut());
|
||||
}
|
||||
|
||||
public static boolean endsWithTail(ByteBuffer buf)
|
||||
|
@ -405,64 +406,52 @@ public abstract class CompressExtension extends AbstractExtension
|
|||
|
||||
private void compress(FrameEntry entry, boolean first)
|
||||
{
|
||||
final int flush = Deflater.SYNC_FLUSH;
|
||||
|
||||
// Get a chunk of the payload to avoid to blow
|
||||
// the heap if the payload is a huge mapped file.
|
||||
Frame frame = entry.frame;
|
||||
ByteBuffer data = frame.getPayload();
|
||||
int remaining = data.remaining();
|
||||
int chunkSize = Math.min(remaining,INPUT_BUFSIZE);
|
||||
int outputLength = Math.max(256,data.remaining());
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Compressing {}: {} bytes in {} bytes chunk",entry,remaining,chunkSize);
|
||||
|
||||
LOG.debug("Compressing {}: {} bytes in {} bytes chunk",entry,remaining,outputLength);
|
||||
|
||||
boolean needsCompress = true;
|
||||
|
||||
|
||||
if (deflater.needsInput() && !supplyInput(deflater,data))
|
||||
{
|
||||
// no input supplied
|
||||
needsCompress = false;
|
||||
}
|
||||
|
||||
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
|
||||
byte[] output = new byte[chunkSize];
|
||||
byte[] output = new byte[outputLength];
|
||||
|
||||
boolean fin = frame.isFin();
|
||||
|
||||
// Compress the data
|
||||
while (needsCompress)
|
||||
{
|
||||
int read = deflater.deflate(output,0,chunkSize,flush);
|
||||
if (read == 0)
|
||||
int compressed = deflater.deflate(output,0,outputLength,Deflater.SYNC_FLUSH);
|
||||
|
||||
// Append the output for the eventual frame.
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Wrote {} bytes to output buffer",compressed);
|
||||
out.write(output,0,compressed);
|
||||
|
||||
if (compressed < outputLength)
|
||||
{
|
||||
if (deflater.finished())
|
||||
{
|
||||
// done
|
||||
break;
|
||||
}
|
||||
else if (deflater.needsInput())
|
||||
{
|
||||
if (!supplyInput(deflater,data))
|
||||
{
|
||||
// done
|
||||
needsCompress = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// Append the output for the eventual frame.
|
||||
out.write(output,0,read);
|
||||
needsCompress = false;
|
||||
}
|
||||
}
|
||||
|
||||
boolean fin = frame.isFin();
|
||||
|
||||
ByteBuffer payload = ByteBuffer.wrap(out.toByteArray());
|
||||
|
||||
if (payload.remaining()>0)
|
||||
|
||||
if (payload.remaining() > 0)
|
||||
{
|
||||
// Handle tail bytes generated by SYNC_FLUSH.
|
||||
LOG.debug("compressed bytes[] = {}",BufferUtil.toDetailString(payload));
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("compressed bytes[] = {}",BufferUtil.toDetailString(payload));
|
||||
|
||||
if (tailDrop == TAIL_DROP_ALWAYS)
|
||||
{
|
||||
|
@ -470,7 +459,8 @@ public abstract class CompressExtension extends AbstractExtension
|
|||
{
|
||||
payload.limit(payload.limit() - TAIL_BYTES.length);
|
||||
}
|
||||
LOG.debug("payload (TAIL_DROP_ALWAYS) = {}",BufferUtil.toDetailString(payload));
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("payload (TAIL_DROP_ALWAYS) = {}",BufferUtil.toDetailString(payload));
|
||||
}
|
||||
else if (tailDrop == TAIL_DROP_FIN_ONLY)
|
||||
{
|
||||
|
@ -478,7 +468,8 @@ public abstract class CompressExtension extends AbstractExtension
|
|||
{
|
||||
payload.limit(payload.limit() - TAIL_BYTES.length);
|
||||
}
|
||||
LOG.debug("payload (TAIL_DROP_FIN_ONLY) = {}",BufferUtil.toDetailString(payload));
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("payload (TAIL_DROP_FIN_ONLY) = {}",BufferUtil.toDetailString(payload));
|
||||
}
|
||||
}
|
||||
else if (fin)
|
||||
|
@ -486,10 +477,10 @@ public abstract class CompressExtension extends AbstractExtension
|
|||
// Special case: 8.2.3.6. Generating an Empty Fragment Manually
|
||||
payload = ByteBuffer.wrap(new byte[] { 0x00 });
|
||||
}
|
||||
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("Compressed {}: input:{} -> payload:{}",entry,chunkSize,payload.remaining());
|
||||
LOG.debug("Compressed {}: input:{} -> payload:{}",entry,outputLength,payload.remaining());
|
||||
}
|
||||
|
||||
boolean continuation = frame.getType().isContinuation() || !first;
|
||||
|
|
|
@ -57,7 +57,7 @@ public class PerMessageDeflateExtensionTest
|
|||
@Test
|
||||
public void testPerMessageDeflateDefault() throws Exception
|
||||
{
|
||||
Assume.assumeTrue("Server has x-webkit-deflate-frame registered",
|
||||
Assume.assumeTrue("Server has permessage-deflate registered",
|
||||
server.getWebSocketServletFactory().getExtensionFactory().isAvailable("permessage-deflate"));
|
||||
|
||||
BlockheadClient client = new BlockheadClient(server.getServerUri());
|
||||
|
|
Loading…
Reference in New Issue