471055 - Restore legacy/experimental WebSocket extensions (deflate-frame)
This commit is contained in:
parent
a71c543a7e
commit
55862e229e
|
@ -49,7 +49,7 @@ public class ByteAccumulator
|
|||
chunks.add(copy);
|
||||
this.length += length;
|
||||
}
|
||||
|
||||
|
||||
public int getLength()
|
||||
{
|
||||
return length;
|
||||
|
|
|
@ -67,6 +67,9 @@ public abstract class CompressExtension extends AbstractExtension
|
|||
/** Deflater / Inflater: Maximum Input Buffer Size */
|
||||
protected static final int INPUT_MAX_BUFFER_SIZE = 8 * 1024;
|
||||
|
||||
/** Inflater : Output Buffer Size */
|
||||
private static final int DECOMPRESS_BUF_SIZE = 8 * 1024;
|
||||
|
||||
private final static boolean NOWRAP = true;
|
||||
|
||||
private final Queue<FrameEntry> entries = new ConcurrentArrayQueue<>();
|
||||
|
@ -150,7 +153,7 @@ public abstract class CompressExtension extends AbstractExtension
|
|||
{
|
||||
return;
|
||||
}
|
||||
byte[] output = new byte[1024]; // TODO: make configurable size
|
||||
byte[] output = new byte[DECOMPRESS_BUF_SIZE];
|
||||
|
||||
if (inflater.needsInput() && !supplyInput(inflater,buf))
|
||||
{
|
||||
|
@ -369,9 +372,15 @@ public abstract class CompressExtension extends AbstractExtension
|
|||
|
||||
private class Flusher extends IteratingCallback implements WriteCallback
|
||||
{
|
||||
private static final int INPUT_BUFSIZE = 32 * 1024;
|
||||
private FrameEntry current;
|
||||
private boolean finished = true;
|
||||
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
LOG.warn(x);
|
||||
super.failed(x);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Action process() throws Exception
|
||||
|
|
|
@ -18,10 +18,10 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.common.extensions.compress;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.zip.DataFormatException;
|
||||
|
||||
import org.eclipse.jetty.websocket.api.BadPayloadException;
|
||||
import org.eclipse.jetty.websocket.api.extensions.Frame;
|
||||
import org.eclipse.jetty.websocket.common.OpCode;
|
||||
|
||||
/**
|
||||
* Implementation of the
|
||||
|
@ -55,18 +55,22 @@ public class DeflateFrameExtension extends CompressExtension
|
|||
// they are read and parsed with a single thread, and
|
||||
// therefore there is no need for synchronization.
|
||||
|
||||
if (OpCode.isControlFrame(frame.getOpCode()) || !frame.isRsv1() || !frame.hasPayload())
|
||||
if ( frame.getType().isControl() || !frame.isRsv1() || !frame.hasPayload() )
|
||||
{
|
||||
nextIncomingFrame(frame);
|
||||
return;
|
||||
}
|
||||
|
||||
ByteBuffer payload = frame.getPayload();
|
||||
int remaining = payload.remaining();
|
||||
byte[] input = new byte[remaining + TAIL_BYTES.length];
|
||||
payload.get(input, 0, remaining);
|
||||
System.arraycopy(TAIL_BYTES, 0, input, remaining, TAIL_BYTES.length);
|
||||
|
||||
forwardIncoming(frame, decompress(input));
|
||||
try
|
||||
{
|
||||
ByteAccumulator accumulator = newByteAccumulator();
|
||||
decompress(accumulator, frame.getPayload());
|
||||
decompress(accumulator, TAIL_BYTES_BUF.slice());
|
||||
forwardIncoming(frame, accumulator);
|
||||
}
|
||||
catch (DataFormatException e)
|
||||
{
|
||||
throw new BadPayloadException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
org.eclipse.jetty.websocket.common.extensions.identity.IdentityExtension
|
||||
org.eclipse.jetty.websocket.common.extensions.fragment.FragmentExtension
|
||||
org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension
|
||||
org.eclipse.jetty.websocket.common.extensions.compress.DeflateFrameExtension
|
||||
org.eclipse.jetty.websocket.common.extensions.compress.XWebkitDeflateFrameExtension
|
||||
org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension
|
||||
org.eclipse.jetty.websocket.common.extensions.fragment.FragmentExtension
|
|
@ -36,6 +36,8 @@ import org.eclipse.jetty.io.RuntimeIOException;
|
|||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.StringUtil;
|
||||
import org.eclipse.jetty.util.TypeUtil;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.api.BatchMode;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
|
||||
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||
|
@ -62,6 +64,8 @@ import org.junit.Test;
|
|||
|
||||
public class DeflateFrameExtensionTest extends AbstractExtensionTest
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(DeflateFrameExtensionTest.class);
|
||||
|
||||
@Rule
|
||||
public LeakTrackingBufferPoolRule bufferPool = new LeakTrackingBufferPoolRule("Test");
|
||||
|
||||
|
@ -385,15 +389,21 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
|
|||
byte[] input = new byte[1024 * 1024];
|
||||
// Make them not compressible.
|
||||
new Random().nextBytes(input);
|
||||
|
||||
|
||||
int maxMessageSize = (1024 * 1024) + 8192;
|
||||
|
||||
DeflateFrameExtension clientExtension = new DeflateFrameExtension();
|
||||
clientExtension.setBufferPool(bufferPool);
|
||||
clientExtension.setPolicy(WebSocketPolicy.newClientPolicy());
|
||||
clientExtension.getPolicy().setMaxBinaryMessageSize(maxMessageSize);
|
||||
clientExtension.getPolicy().setMaxBinaryMessageBufferSize(maxMessageSize);
|
||||
clientExtension.setConfig(ExtensionConfig.parse("deflate-frame"));
|
||||
|
||||
final DeflateFrameExtension serverExtension = new DeflateFrameExtension();
|
||||
serverExtension.setBufferPool(bufferPool);
|
||||
serverExtension.setPolicy(WebSocketPolicy.newServerPolicy());
|
||||
serverExtension.getPolicy().setMaxBinaryMessageSize(maxMessageSize);
|
||||
serverExtension.getPolicy().setMaxBinaryMessageBufferSize(maxMessageSize);
|
||||
serverExtension.setConfig(ExtensionConfig.parse("deflate-frame"));
|
||||
|
||||
// Chain the next element to decompress.
|
||||
|
@ -402,6 +412,7 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
|
|||
@Override
|
||||
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
|
||||
{
|
||||
LOG.debug("outgoingFrame({})", frame);
|
||||
serverExtension.incomingFrame(frame);
|
||||
callback.writeSuccess();
|
||||
}
|
||||
|
@ -413,6 +424,7 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
|
|||
@Override
|
||||
public void incomingFrame(Frame frame)
|
||||
{
|
||||
LOG.debug("incomingFrame({})", frame);
|
||||
try
|
||||
{
|
||||
result.write(BufferUtil.toArray(frame.getPayload()));
|
||||
|
|
Loading…
Reference in New Issue