diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java index 88bbf7ef084..dbb2e9d1261 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java @@ -41,7 +41,7 @@ import org.eclipse.jetty.websocket.common.frames.DataFrame; public abstract class CompressExtension extends AbstractExtension { - protected static final byte[] TAIL_BYTES = new byte[] { 0x00, 0x00, (byte)0xFF, (byte)0xFF}; + protected static final byte[] TAIL_BYTES = new byte[] { 0x00, 0x00, (byte)0xFF, (byte)0xFF }; protected static final ByteBuffer TAIL_BYTES_BUF = ByteBuffer.wrap(TAIL_BYTES); private static final Logger LOG = Log.getLogger(CompressExtension.class); @@ -63,10 +63,12 @@ public abstract class CompressExtension extends AbstractExtension /** Inflater / Decompressed Buffer Size */ protected static final int INFLATE_BUFFER_SIZE = 8 * 1024; + /** Deflater / Inflater: Maximum Input Buffer Size */ protected static final int INPUT_MAX_BUFFER_SIZE = 8 * 1024; + private final static boolean NOWRAP = true; - + private final Queue entries = new ConcurrentArrayQueue<>(); private final IteratingCallback flusher = new Flusher(); private final Deflater deflater; @@ -135,15 +137,14 @@ public abstract class CompressExtension extends AbstractExtension getBufferPool().release(buffer); } } - + protected ByteAccumulator newByteAccumulator() { int maxSize = Math.max(getPolicy().getMaxTextMessageSize(),getPolicy().getMaxBinaryMessageBufferSize()); return new ByteAccumulator(maxSize); } - - protected void decompress(ByteAccumulator accumulator, ByteBuffer buf) - throws DataFormatException + + protected void decompress(ByteAccumulator accumulator, ByteBuffer buf) throws DataFormatException { if ((buf == null) || (!buf.hasRemaining())) { @@ -151,7 +152,7 @@ public abstract class CompressExtension extends AbstractExtension } byte[] output = new byte[1024]; // TODO: make configurable size - if (inflater.needsInput() && !supplyInput(inflater, buf)) + if (inflater.needsInput() && !supplyInput(inflater,buf)) { LOG.debug("Needed input, but no buffer could supply input"); return; @@ -172,17 +173,17 @@ public abstract class CompressExtension extends AbstractExtension { LOG.debug("Decompressed {} bytes: {}",read,toDetail(inflater)); } - + accumulator.copyChunk(output,0,read); } } - + if (LOG.isDebugEnabled()) { LOG.debug("Decompress: exiting {}",toDetail(inflater)); } } - + @Override public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode) { @@ -245,7 +246,7 @@ public abstract class CompressExtension extends AbstractExtension byte input[]; int inputOffset = 0; int len; - + if (buf.hasArray()) { // no need to create a new byte buffer, just return this one. @@ -257,12 +258,12 @@ public abstract class CompressExtension extends AbstractExtension else { // Only create an return byte buffer that is reasonable in size - len = Math.min(INPUT_MAX_BUFFER_SIZE, buf.remaining()); + len = Math.min(INPUT_MAX_BUFFER_SIZE,buf.remaining()); input = new byte[len]; inputOffset = 0; buf.get(input,0,len); - } - + } + inflater.setInput(input,inputOffset,len); if (LOG.isDebugEnabled()) { @@ -270,7 +271,7 @@ public abstract class CompressExtension extends AbstractExtension } return true; } - + private static boolean supplyInput(Deflater deflater, ByteBuffer buf) { if (buf.remaining() <= 0) @@ -285,7 +286,7 @@ public abstract class CompressExtension extends AbstractExtension byte input[]; int inputOffset = 0; int len; - + if (buf.hasArray()) { // no need to create a new byte buffer, just return this one. @@ -297,12 +298,12 @@ public abstract class CompressExtension extends AbstractExtension else { // Only create an return byte buffer that is reasonable in size - len = Math.min(INPUT_MAX_BUFFER_SIZE, buf.remaining()); + len = Math.min(INPUT_MAX_BUFFER_SIZE,buf.remaining()); input = new byte[len]; inputOffset = 0; buf.get(input,0,len); - } - + } + deflater.setInput(input,inputOffset,len); if (LOG.isDebugEnabled()) { @@ -310,7 +311,7 @@ public abstract class CompressExtension extends AbstractExtension } return true; } - + private static String toDetail(Inflater inflater) { return String.format("Inflater[finished=%b,read=%d,written=%d,remaining=%d,in=%d,out=%d]",inflater.finished(),inflater.getBytesRead(), @@ -319,8 +320,8 @@ public abstract class CompressExtension extends AbstractExtension private static String toDetail(Deflater deflater) { - return String.format("Deflater[finished=%b,read=%d,written=%d,in=%d,out=%d]",deflater.finished(),deflater.getBytesRead(), - deflater.getBytesWritten(),deflater.getTotalIn(),deflater.getTotalOut()); + return String.format("Deflater[finished=%b,read=%d,written=%d,in=%d,out=%d]",deflater.finished(),deflater.getBytesRead(),deflater.getBytesWritten(), + deflater.getTotalIn(),deflater.getTotalOut()); } public static boolean endsWithTail(ByteBuffer buf) @@ -405,64 +406,52 @@ public abstract class CompressExtension extends AbstractExtension private void compress(FrameEntry entry, boolean first) { - final int flush = Deflater.SYNC_FLUSH; - // Get a chunk of the payload to avoid to blow // the heap if the payload is a huge mapped file. Frame frame = entry.frame; ByteBuffer data = frame.getPayload(); int remaining = data.remaining(); - int chunkSize = Math.min(remaining,INPUT_BUFSIZE); + int outputLength = Math.max(256,data.remaining()); if (LOG.isDebugEnabled()) - LOG.debug("Compressing {}: {} bytes in {} bytes chunk",entry,remaining,chunkSize); - + LOG.debug("Compressing {}: {} bytes in {} bytes chunk",entry,remaining,outputLength); + boolean needsCompress = true; - + if (deflater.needsInput() && !supplyInput(deflater,data)) { // no input supplied needsCompress = false; } - + ByteArrayOutputStream out = new ByteArrayOutputStream(); - byte[] output = new byte[chunkSize]; + byte[] output = new byte[outputLength]; + + boolean fin = frame.isFin(); // Compress the data while (needsCompress) { - int read = deflater.deflate(output,0,chunkSize,flush); - if (read == 0) + int compressed = deflater.deflate(output,0,outputLength,Deflater.SYNC_FLUSH); + + // Append the output for the eventual frame. + if (LOG.isDebugEnabled()) + LOG.debug("Wrote {} bytes to output buffer",compressed); + out.write(output,0,compressed); + + if (compressed < outputLength) { - if (deflater.finished()) - { - // done - break; - } - else if (deflater.needsInput()) - { - if (!supplyInput(deflater,data)) - { - // done - needsCompress = false; - } - } - } - else - { - // Append the output for the eventual frame. - out.write(output,0,read); + needsCompress = false; } } - boolean fin = frame.isFin(); - ByteBuffer payload = ByteBuffer.wrap(out.toByteArray()); - - if (payload.remaining()>0) + + if (payload.remaining() > 0) { // Handle tail bytes generated by SYNC_FLUSH. - LOG.debug("compressed bytes[] = {}",BufferUtil.toDetailString(payload)); + if (LOG.isDebugEnabled()) + LOG.debug("compressed bytes[] = {}",BufferUtil.toDetailString(payload)); if (tailDrop == TAIL_DROP_ALWAYS) { @@ -470,7 +459,8 @@ public abstract class CompressExtension extends AbstractExtension { payload.limit(payload.limit() - TAIL_BYTES.length); } - LOG.debug("payload (TAIL_DROP_ALWAYS) = {}",BufferUtil.toDetailString(payload)); + if (LOG.isDebugEnabled()) + LOG.debug("payload (TAIL_DROP_ALWAYS) = {}",BufferUtil.toDetailString(payload)); } else if (tailDrop == TAIL_DROP_FIN_ONLY) { @@ -478,7 +468,8 @@ public abstract class CompressExtension extends AbstractExtension { payload.limit(payload.limit() - TAIL_BYTES.length); } - LOG.debug("payload (TAIL_DROP_FIN_ONLY) = {}",BufferUtil.toDetailString(payload)); + if (LOG.isDebugEnabled()) + LOG.debug("payload (TAIL_DROP_FIN_ONLY) = {}",BufferUtil.toDetailString(payload)); } } else if (fin) @@ -486,10 +477,10 @@ public abstract class CompressExtension extends AbstractExtension // Special case: 8.2.3.6. Generating an Empty Fragment Manually payload = ByteBuffer.wrap(new byte[] { 0x00 }); } - + if (LOG.isDebugEnabled()) { - LOG.debug("Compressed {}: input:{} -> payload:{}",entry,chunkSize,payload.remaining()); + LOG.debug("Compressed {}: input:{} -> payload:{}",entry,outputLength,payload.remaining()); } boolean continuation = frame.getType().isContinuation() || !first; diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/PerMessageDeflateExtensionTest.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/PerMessageDeflateExtensionTest.java index 1f78e5554e2..492a5e187c8 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/PerMessageDeflateExtensionTest.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/PerMessageDeflateExtensionTest.java @@ -57,7 +57,7 @@ public class PerMessageDeflateExtensionTest @Test public void testPerMessageDeflateDefault() throws Exception { - Assume.assumeTrue("Server has x-webkit-deflate-frame registered", + Assume.assumeTrue("Server has permessage-deflate registered", server.getWebSocketServletFactory().getExtensionFactory().isAvailable("permessage-deflate")); BlockheadClient client = new BlockheadClient(server.getServerUri());