diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java index 279cdbfc90e..57a6fa823ec 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/ByteAccumulator.java @@ -49,7 +49,7 @@ public class ByteAccumulator chunks.add(copy); this.length += length; } - + public int getLength() { return length; diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java index dbb2e9d1261..5c27104c594 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/CompressExtension.java @@ -67,6 +67,9 @@ public abstract class CompressExtension extends AbstractExtension /** Deflater / Inflater: Maximum Input Buffer Size */ protected static final int INPUT_MAX_BUFFER_SIZE = 8 * 1024; + /** Inflater : Output Buffer Size */ + private static final int DECOMPRESS_BUF_SIZE = 8 * 1024; + private final static boolean NOWRAP = true; private final Queue entries = new ConcurrentArrayQueue<>(); @@ -150,7 +153,7 @@ public abstract class CompressExtension extends AbstractExtension { return; } - byte[] output = new byte[1024]; // TODO: make configurable size + byte[] output = new byte[DECOMPRESS_BUF_SIZE]; if (inflater.needsInput() && !supplyInput(inflater,buf)) { @@ -369,9 +372,15 @@ public abstract class CompressExtension extends AbstractExtension private class Flusher extends IteratingCallback implements WriteCallback { - private static final int INPUT_BUFSIZE = 32 * 1024; private FrameEntry current; private boolean finished = true; + + @Override + public void failed(Throwable x) + { + LOG.warn(x); + super.failed(x); + } @Override protected Action process() throws Exception diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/DeflateFrameExtension.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/DeflateFrameExtension.java index ba61a0bbdf3..8114a269d68 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/DeflateFrameExtension.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/extensions/compress/DeflateFrameExtension.java @@ -18,10 +18,10 @@ package org.eclipse.jetty.websocket.common.extensions.compress; -import java.nio.ByteBuffer; +import java.util.zip.DataFormatException; +import org.eclipse.jetty.websocket.api.BadPayloadException; import org.eclipse.jetty.websocket.api.extensions.Frame; -import org.eclipse.jetty.websocket.common.OpCode; /** * Implementation of the @@ -55,18 +55,22 @@ public class DeflateFrameExtension extends CompressExtension // they are read and parsed with a single thread, and // therefore there is no need for synchronization. - if (OpCode.isControlFrame(frame.getOpCode()) || !frame.isRsv1() || !frame.hasPayload()) + if ( frame.getType().isControl() || !frame.isRsv1() || !frame.hasPayload() ) { nextIncomingFrame(frame); return; } - ByteBuffer payload = frame.getPayload(); - int remaining = payload.remaining(); - byte[] input = new byte[remaining + TAIL_BYTES.length]; - payload.get(input, 0, remaining); - System.arraycopy(TAIL_BYTES, 0, input, remaining, TAIL_BYTES.length); - - forwardIncoming(frame, decompress(input)); + try + { + ByteAccumulator accumulator = newByteAccumulator(); + decompress(accumulator, frame.getPayload()); + decompress(accumulator, TAIL_BYTES_BUF.slice()); + forwardIncoming(frame, accumulator); + } + catch (DataFormatException e) + { + throw new BadPayloadException(e); + } } } diff --git a/jetty-websocket/websocket-common/src/main/resources/META-INF/services/org.eclipse.jetty.websocket.api.extensions.Extension b/jetty-websocket/websocket-common/src/main/resources/META-INF/services/org.eclipse.jetty.websocket.api.extensions.Extension index 11b4f359908..a1ad7d5a0f8 100644 --- a/jetty-websocket/websocket-common/src/main/resources/META-INF/services/org.eclipse.jetty.websocket.api.extensions.Extension +++ b/jetty-websocket/websocket-common/src/main/resources/META-INF/services/org.eclipse.jetty.websocket.api.extensions.Extension @@ -1,5 +1,5 @@ org.eclipse.jetty.websocket.common.extensions.identity.IdentityExtension +org.eclipse.jetty.websocket.common.extensions.fragment.FragmentExtension +org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension org.eclipse.jetty.websocket.common.extensions.compress.DeflateFrameExtension org.eclipse.jetty.websocket.common.extensions.compress.XWebkitDeflateFrameExtension -org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension -org.eclipse.jetty.websocket.common.extensions.fragment.FragmentExtension \ No newline at end of file diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/DeflateFrameExtensionTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/DeflateFrameExtensionTest.java index 731548c33cb..df1a5287d77 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/DeflateFrameExtensionTest.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/extensions/compress/DeflateFrameExtensionTest.java @@ -36,6 +36,8 @@ import org.eclipse.jetty.io.RuntimeIOException; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.TypeUtil; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.api.WriteCallback; @@ -62,6 +64,8 @@ import org.junit.Test; public class DeflateFrameExtensionTest extends AbstractExtensionTest { + private static final Logger LOG = Log.getLogger(DeflateFrameExtensionTest.class); + @Rule public LeakTrackingBufferPoolRule bufferPool = new LeakTrackingBufferPoolRule("Test"); @@ -385,15 +389,21 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest byte[] input = new byte[1024 * 1024]; // Make them not compressible. new Random().nextBytes(input); - + + int maxMessageSize = (1024 * 1024) + 8192; + DeflateFrameExtension clientExtension = new DeflateFrameExtension(); clientExtension.setBufferPool(bufferPool); clientExtension.setPolicy(WebSocketPolicy.newClientPolicy()); + clientExtension.getPolicy().setMaxBinaryMessageSize(maxMessageSize); + clientExtension.getPolicy().setMaxBinaryMessageBufferSize(maxMessageSize); clientExtension.setConfig(ExtensionConfig.parse("deflate-frame")); final DeflateFrameExtension serverExtension = new DeflateFrameExtension(); serverExtension.setBufferPool(bufferPool); serverExtension.setPolicy(WebSocketPolicy.newServerPolicy()); + serverExtension.getPolicy().setMaxBinaryMessageSize(maxMessageSize); + serverExtension.getPolicy().setMaxBinaryMessageBufferSize(maxMessageSize); serverExtension.setConfig(ExtensionConfig.parse("deflate-frame")); // Chain the next element to decompress. @@ -402,6 +412,7 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest @Override public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode) { + LOG.debug("outgoingFrame({})", frame); serverExtension.incomingFrame(frame); callback.writeSuccess(); } @@ -413,6 +424,7 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest @Override public void incomingFrame(Frame frame) { + LOG.debug("incomingFrame({})", frame); try { result.write(BufferUtil.toArray(frame.getPayload()));