Work on permessage-deflate continues
Conflicts: jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/FrameCompressionExtensionTest.java
This commit is contained in:
parent
39f9f3ad44
commit
aac9568a30
|
@ -18,8 +18,10 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.common.extensions.compress;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.zip.DataFormatException;
|
||||
import java.util.zip.Deflater;
|
||||
import java.util.zip.Inflater;
|
||||
|
@ -30,7 +32,6 @@ import org.eclipse.jetty.util.ConcurrentArrayQueue;
|
|||
import org.eclipse.jetty.util.IteratingCallback;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.api.BadPayloadException;
|
||||
import org.eclipse.jetty.websocket.api.BatchMode;
|
||||
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||
import org.eclipse.jetty.websocket.api.extensions.Frame;
|
||||
|
@ -40,9 +41,10 @@ import org.eclipse.jetty.websocket.common.frames.DataFrame;
|
|||
|
||||
public abstract class CompressExtension extends AbstractExtension
|
||||
{
|
||||
protected static final byte[] TAIL_BYTES = new byte[]{0x00, 0x00, (byte)0xFF, (byte)0xFF};
|
||||
protected static final byte[] TAIL_BYTES = new byte[] { 0x00, 0x00, (byte)0xFF, (byte)0xFF};
|
||||
protected static final ByteBuffer TAIL_BYTES_BUF = ByteBuffer.wrap(TAIL_BYTES);
|
||||
private static final Logger LOG = Log.getLogger(CompressExtension.class);
|
||||
|
||||
|
||||
/** Never drop tail bytes 0000FFFF, from any frame type */
|
||||
protected static final int TAIL_DROP_NEVER = 0;
|
||||
/** Always drop tail bytes 0000FFFF, from all frame types */
|
||||
|
@ -52,37 +54,43 @@ public abstract class CompressExtension extends AbstractExtension
|
|||
|
||||
/** Always set RSV flag, on all frame types */
|
||||
protected static final int RSV_USE_ALWAYS = 0;
|
||||
/**
|
||||
/**
|
||||
* Only set RSV flag on first frame in multi-frame messages.
|
||||
* <p>
|
||||
* Note: this automatically means no-continuation frames have
|
||||
* the RSV bit set
|
||||
* Note: this automatically means no-continuation frames have the RSV bit set
|
||||
*/
|
||||
protected static final int RSV_USE_ONLY_FIRST = 1;
|
||||
|
||||
/** Inflater / Decompressed Buffer Size */
|
||||
protected static final int INFLATE_BUFFER_SIZE = 8 * 1024;
|
||||
/** Deflater / Inflater: Maximum Input Buffer Size */
|
||||
protected static final int INPUT_MAX_BUFFER_SIZE = 8 * 1024;
|
||||
private final static boolean NOWRAP = true;
|
||||
|
||||
private final Queue<FrameEntry> entries = new ConcurrentArrayQueue<>();
|
||||
private final IteratingCallback flusher = new Flusher();
|
||||
private final Deflater compressor;
|
||||
private final Inflater decompressor;
|
||||
private final Deflater deflater;
|
||||
private final Inflater inflater;
|
||||
protected AtomicInteger decompressCount = new AtomicInteger(0);
|
||||
private int tailDrop = TAIL_DROP_NEVER;
|
||||
private int rsvUse = RSV_USE_ALWAYS;
|
||||
|
||||
protected CompressExtension()
|
||||
{
|
||||
compressor = new Deflater(Deflater.BEST_COMPRESSION, true);
|
||||
decompressor = new Inflater(true);
|
||||
deflater = new Deflater(Deflater.DEFAULT_COMPRESSION,NOWRAP);
|
||||
inflater = new Inflater(NOWRAP);
|
||||
tailDrop = getTailDropMode();
|
||||
rsvUse = getRsvUseMode();
|
||||
}
|
||||
|
||||
|
||||
public Deflater getDeflater()
|
||||
{
|
||||
return compressor;
|
||||
return deflater;
|
||||
}
|
||||
|
||||
public Inflater getInflater()
|
||||
{
|
||||
return decompressor;
|
||||
return inflater;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -93,7 +101,7 @@ public abstract class CompressExtension extends AbstractExtension
|
|||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Return the mode of operation for dropping (or keeping) tail bytes in frames generated by compress (outgoing)
|
||||
*
|
||||
|
@ -114,7 +122,7 @@ public abstract class CompressExtension extends AbstractExtension
|
|||
// Unset RSV1 since it's not compressed anymore.
|
||||
newFrame.setRsv1(false);
|
||||
|
||||
ByteBuffer buffer = getBufferPool().acquire(accumulator.getLength(), false);
|
||||
ByteBuffer buffer = getBufferPool().acquire(accumulator.getLength(),false);
|
||||
try
|
||||
{
|
||||
BufferUtil.flipToFill(buffer);
|
||||
|
@ -127,55 +135,68 @@ public abstract class CompressExtension extends AbstractExtension
|
|||
getBufferPool().release(buffer);
|
||||
}
|
||||
}
|
||||
|
||||
protected ByteAccumulator decompress(byte[] input)
|
||||
|
||||
protected ByteAccumulator newByteAccumulator()
|
||||
{
|
||||
// Since we don't track text vs binary vs continuation state, just grab whatever is the greater value.
|
||||
int maxSize = Math.max(getPolicy().getMaxTextMessageSize(), getPolicy().getMaxBinaryMessageBufferSize());
|
||||
ByteAccumulator accumulator = new ByteAccumulator(maxSize);
|
||||
|
||||
decompressor.setInput(input, 0, input.length);
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Decompressing {} bytes", input.length);
|
||||
|
||||
try
|
||||
int maxSize = Math.max(getPolicy().getMaxTextMessageSize(),getPolicy().getMaxBinaryMessageBufferSize());
|
||||
return new ByteAccumulator(maxSize);
|
||||
}
|
||||
|
||||
protected void decompress(ByteAccumulator accumulator, ByteBuffer buf)
|
||||
throws DataFormatException
|
||||
{
|
||||
if ((buf == null) || (!buf.hasRemaining()))
|
||||
{
|
||||
// It is allowed to send DEFLATE blocks with BFINAL=1.
|
||||
// For such blocks, getRemaining() will be > 0 but finished()
|
||||
// will be true, so we need to check for both.
|
||||
// When BFINAL=0, finished() will always be false and we only
|
||||
// check the remaining bytes.
|
||||
while (decompressor.getRemaining() > 0 && !decompressor.finished())
|
||||
return;
|
||||
}
|
||||
byte[] output = new byte[1024];
|
||||
|
||||
if (inflater.needsInput() && !supplyInput(inflater, buf))
|
||||
{
|
||||
LOG.debug("Needed input, but no buffer could supply input");
|
||||
return;
|
||||
}
|
||||
|
||||
int read = 0;
|
||||
while ((read = inflater.inflate(output)) >= 0)
|
||||
{
|
||||
if (read == 0)
|
||||
{
|
||||
byte[] output = new byte[Math.min(input.length * 2, 32 * 1024)];
|
||||
int decompressed = decompressor.inflate(output);
|
||||
if (decompressed == 0)
|
||||
LOG.debug("Decompress: read 0 {}",toDetail(inflater));
|
||||
if (inflater.finished() || inflater.needsDictionary())
|
||||
{
|
||||
if (decompressor.needsInput())
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
throw new BadPayloadException("Unable to inflate frame, not enough input on frame");
|
||||
}
|
||||
if (decompressor.needsDictionary())
|
||||
{
|
||||
throw new BadPayloadException("Unable to inflate frame, frame erroneously says it needs a dictionary");
|
||||
LOG.debug("Decompress: finished? {}",toDetail(inflater));
|
||||
}
|
||||
// We are finished ?
|
||||
break;
|
||||
}
|
||||
else
|
||||
else if (inflater.needsInput())
|
||||
{
|
||||
accumulator.addChunk(output, 0, decompressed);
|
||||
if (!supplyInput(inflater, buf))
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Decompressed {}->{} bytes", input.length, accumulator.getLength());
|
||||
return accumulator;
|
||||
else
|
||||
{
|
||||
// do something with output
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("Decompressed {} bytes: {}",read,toDetail(inflater));
|
||||
}
|
||||
accumulator.addChunk(output,0,read);
|
||||
}
|
||||
}
|
||||
catch (DataFormatException x)
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
throw new BadPayloadException(x);
|
||||
LOG.debug("Decompress: exiting {}",toDetail(inflater));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void outgoingFrame(Frame frame, WriteCallback callback, BatchMode batchMode)
|
||||
{
|
||||
|
@ -185,13 +206,13 @@ public abstract class CompressExtension extends AbstractExtension
|
|||
|
||||
if (flusher.isFailed())
|
||||
{
|
||||
notifyCallbackFailure(callback, new ZipException());
|
||||
notifyCallbackFailure(callback,new ZipException());
|
||||
return;
|
||||
}
|
||||
|
||||
FrameEntry entry = new FrameEntry(frame, callback, batchMode);
|
||||
FrameEntry entry = new FrameEntry(frame,callback,batchMode);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Queuing {}", entry);
|
||||
LOG.debug("Queuing {}",entry);
|
||||
entries.offer(entry);
|
||||
flusher.iterate();
|
||||
}
|
||||
|
@ -206,7 +227,7 @@ public abstract class CompressExtension extends AbstractExtension
|
|||
catch (Throwable x)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Exception while notifying success of callback " + callback, x);
|
||||
LOG.debug("Exception while notifying success of callback " + callback,x);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -220,10 +241,119 @@ public abstract class CompressExtension extends AbstractExtension
|
|||
catch (Throwable x)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Exception while notifying failure of callback " + callback, x);
|
||||
LOG.debug("Exception while notifying failure of callback " + callback,x);
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean supplyInput(Inflater inflater, ByteBuffer buf)
|
||||
{
|
||||
if (buf.remaining() <= 0)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("No data left left to supply to Inflater");
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
byte input[];
|
||||
int inputOffset = 0;
|
||||
int len;
|
||||
|
||||
if (buf.hasArray())
|
||||
{
|
||||
// no need to create a new byte buffer, just return this one.
|
||||
len = buf.remaining();
|
||||
input = buf.array();
|
||||
inputOffset = buf.position() + buf.arrayOffset();
|
||||
buf.position(buf.position() + len);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Only create an return byte buffer that is reasonable in size
|
||||
len = Math.min(INPUT_MAX_BUFFER_SIZE, buf.remaining());
|
||||
input = new byte[len];
|
||||
inputOffset = 0;
|
||||
buf.get(input,0,len);
|
||||
}
|
||||
|
||||
inflater.setInput(input,inputOffset,len);
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("Supplied {} input bytes: {}",input.length,toDetail(inflater));
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private static boolean supplyInput(Deflater deflater, ByteBuffer buf)
|
||||
{
|
||||
if (buf.remaining() <= 0)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("No data left left to supply to Deflater");
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
byte input[];
|
||||
int inputOffset = 0;
|
||||
int len;
|
||||
|
||||
if (buf.hasArray())
|
||||
{
|
||||
// no need to create a new byte buffer, just return this one.
|
||||
len = buf.remaining();
|
||||
input = buf.array();
|
||||
inputOffset = buf.position() + buf.arrayOffset();
|
||||
buf.position(buf.position() + len);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Only create an return byte buffer that is reasonable in size
|
||||
len = Math.min(INPUT_MAX_BUFFER_SIZE, buf.remaining());
|
||||
input = new byte[len];
|
||||
inputOffset = 0;
|
||||
buf.get(input,0,len);
|
||||
}
|
||||
|
||||
deflater.setInput(input,inputOffset,len);
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("Supplied {} input bytes: {}",input.length,toDetail(deflater));
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private static String toDetail(Inflater inflater)
|
||||
{
|
||||
return String.format("Inflater[finished=%b,read=%d,written=%d,remaining=%d,in=%d,out=%d]",inflater.finished(),inflater.getBytesRead(),
|
||||
inflater.getBytesWritten(),inflater.getRemaining(),inflater.getTotalIn(),inflater.getTotalOut());
|
||||
}
|
||||
|
||||
private static String toDetail(Deflater deflater)
|
||||
{
|
||||
return String.format("Deflater[finished=%b,read=%d,written=%d,in=%d,out=%d]",deflater.finished(),deflater.getBytesRead(),
|
||||
deflater.getBytesWritten(),deflater.getTotalIn(),deflater.getTotalOut());
|
||||
}
|
||||
|
||||
public static boolean endsWithTail(ByteBuffer buf)
|
||||
{
|
||||
if ((buf == null) || (buf.remaining() < TAIL_BYTES.length))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
int limit = buf.limit();
|
||||
for (int i = TAIL_BYTES.length; i > 0; i--)
|
||||
{
|
||||
if (buf.get(limit - i) != TAIL_BYTES[TAIL_BYTES.length - i])
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
|
@ -254,7 +384,6 @@ public abstract class CompressExtension extends AbstractExtension
|
|||
{
|
||||
private static final int INPUT_BUFSIZE = 32 * 1024;
|
||||
private FrameEntry current;
|
||||
private ByteBuffer payload;
|
||||
private boolean finished = true;
|
||||
|
||||
@Override
|
||||
|
@ -263,14 +392,14 @@ public abstract class CompressExtension extends AbstractExtension
|
|||
if (finished)
|
||||
{
|
||||
current = entries.poll();
|
||||
LOG.debug("Processing {}", current);
|
||||
LOG.debug("Processing {}",current);
|
||||
if (current == null)
|
||||
return Action.IDLE;
|
||||
deflate(current);
|
||||
}
|
||||
else
|
||||
{
|
||||
compress(current, false);
|
||||
compress(current,false);
|
||||
}
|
||||
return Action.SCHEDULED;
|
||||
}
|
||||
|
@ -281,97 +410,117 @@ public abstract class CompressExtension extends AbstractExtension
|
|||
BatchMode batchMode = entry.batchMode;
|
||||
if (OpCode.isControlFrame(frame.getOpCode()) || !frame.hasPayload())
|
||||
{
|
||||
nextOutgoingFrame(frame, this, batchMode);
|
||||
nextOutgoingFrame(frame,this,batchMode);
|
||||
return;
|
||||
}
|
||||
|
||||
compress(entry, true);
|
||||
compress(entry,true);
|
||||
}
|
||||
|
||||
private void compress(FrameEntry entry, boolean first)
|
||||
{
|
||||
final int flush = Deflater.SYNC_FLUSH;
|
||||
|
||||
// Get a chunk of the payload to avoid to blow
|
||||
// the heap if the payload is a huge mapped file.
|
||||
Frame frame = entry.frame;
|
||||
ByteBuffer data = frame.getPayload();
|
||||
int remaining = data.remaining();
|
||||
int inputLength = Math.min(remaining, INPUT_BUFSIZE);
|
||||
int chunkSize = Math.min(remaining,INPUT_BUFSIZE);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Compressing {}: {} bytes in {} bytes chunk", entry, remaining, inputLength);
|
||||
|
||||
// Avoid to copy the bytes if the ByteBuffer
|
||||
// is backed by an array.
|
||||
int inputOffset;
|
||||
byte[] input;
|
||||
if (data.hasArray())
|
||||
LOG.debug("Compressing {}: {} bytes in {} bytes chunk",entry,remaining,chunkSize);
|
||||
|
||||
boolean needsCompress = true;
|
||||
|
||||
if (deflater.needsInput() && !supplyInput(deflater,data))
|
||||
{
|
||||
input = data.array();
|
||||
int position = data.position();
|
||||
inputOffset = position + data.arrayOffset();
|
||||
data.position(position + inputLength);
|
||||
// no input supplied
|
||||
needsCompress = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
input = new byte[inputLength];
|
||||
inputOffset = 0;
|
||||
data.get(input, 0, inputLength);
|
||||
}
|
||||
finished = inputLength == remaining;
|
||||
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
|
||||
compressor.setInput(input, inputOffset, inputLength);
|
||||
byte[] output = new byte[chunkSize];
|
||||
|
||||
// Use an additional space in case the content is not compressible.
|
||||
byte[] output = new byte[inputLength + 64];
|
||||
int outputOffset = 0;
|
||||
int outputLength = 0;
|
||||
while (true)
|
||||
// Compress the data
|
||||
while (needsCompress)
|
||||
{
|
||||
int space = output.length - outputOffset;
|
||||
int compressed = compressor.deflate(output, outputOffset, space, Deflater.SYNC_FLUSH);
|
||||
outputLength += compressed;
|
||||
if (compressed < space)
|
||||
int read = deflater.deflate(output,0,chunkSize,flush);
|
||||
if (read == 0)
|
||||
{
|
||||
// Everything was compressed.
|
||||
break;
|
||||
if (deflater.finished())
|
||||
{
|
||||
// done
|
||||
break;
|
||||
}
|
||||
else if (deflater.needsInput())
|
||||
{
|
||||
if (!supplyInput(deflater,data))
|
||||
{
|
||||
// done
|
||||
needsCompress = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// The compressed output is bigger than the uncompressed input.
|
||||
byte[] newOutput = new byte[output.length * 2];
|
||||
System.arraycopy(output, 0, newOutput, 0, output.length);
|
||||
outputOffset += output.length;
|
||||
output = newOutput;
|
||||
// Append the output for the eventual frame.
|
||||
out.write(output,0,read);
|
||||
}
|
||||
}
|
||||
|
||||
boolean fin = frame.isFin() && finished;
|
||||
boolean fin = frame.isFin();
|
||||
|
||||
ByteBuffer payload = ByteBuffer.wrap(out.toByteArray());
|
||||
|
||||
if (payload.remaining()>0)
|
||||
{
|
||||
// Handle tail bytes generated by SYNC_FLUSH.
|
||||
LOG.debug("compressed bytes[] = {}",BufferUtil.toDetailString(payload));
|
||||
|
||||
// Handle tail bytes generated by SYNC_FLUSH.
|
||||
if(tailDrop == TAIL_DROP_ALWAYS) {
|
||||
payload = ByteBuffer.wrap(output, 0, outputLength - TAIL_BYTES.length);
|
||||
} else if(tailDrop == TAIL_DROP_FIN_ONLY) {
|
||||
payload = ByteBuffer.wrap(output, 0, outputLength - (fin?TAIL_BYTES.length:0));
|
||||
} else {
|
||||
// always include
|
||||
payload = ByteBuffer.wrap(output, 0, outputLength);
|
||||
if (tailDrop == TAIL_DROP_ALWAYS)
|
||||
{
|
||||
if (endsWithTail(payload))
|
||||
{
|
||||
payload.limit(payload.limit() - TAIL_BYTES.length);
|
||||
}
|
||||
LOG.debug("payload (TAIL_DROP_ALWAYS) = {}",BufferUtil.toDetailString(payload));
|
||||
}
|
||||
else if (tailDrop == TAIL_DROP_FIN_ONLY)
|
||||
{
|
||||
if (frame.isFin() && endsWithTail(payload))
|
||||
{
|
||||
payload.limit(payload.limit() - TAIL_BYTES.length);
|
||||
}
|
||||
LOG.debug("payload (TAIL_DROP_FIN_ONLY) = {}",BufferUtil.toDetailString(payload));
|
||||
}
|
||||
}
|
||||
else if (fin)
|
||||
{
|
||||
// Special case: 8.2.3.6. Generating an Empty Fragment Manually
|
||||
payload = ByteBuffer.wrap(new byte[] { 0x00 });
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("Compressed {}: {}->{} chunk bytes",entry,inputLength,outputLength);
|
||||
LOG.debug("Compressed {}: input:{} -> payload:{}",entry,chunkSize,payload.remaining());
|
||||
}
|
||||
|
||||
boolean continuation = frame.getType().isContinuation() || !first;
|
||||
DataFrame chunk = new DataFrame(frame, continuation);
|
||||
if(rsvUse == RSV_USE_ONLY_FIRST) {
|
||||
DataFrame chunk = new DataFrame(frame,continuation);
|
||||
if (rsvUse == RSV_USE_ONLY_FIRST)
|
||||
{
|
||||
chunk.setRsv1(!continuation);
|
||||
} else {
|
||||
}
|
||||
else
|
||||
{
|
||||
// always set
|
||||
chunk.setRsv1(true);
|
||||
}
|
||||
chunk.setPayload(payload);
|
||||
chunk.setFin(fin);
|
||||
|
||||
nextOutgoingFrame(chunk, this, entry.batchMode);
|
||||
nextOutgoingFrame(chunk,this,entry.batchMode);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -379,14 +528,14 @@ public abstract class CompressExtension extends AbstractExtension
|
|||
{
|
||||
// This IteratingCallback never completes.
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected void onCompleteFailure(Throwable x)
|
||||
{
|
||||
// Fail all the frames in the queue.
|
||||
FrameEntry entry;
|
||||
while ((entry = entries.poll()) != null)
|
||||
notifyCallbackFailure(entry.callback, x);
|
||||
notifyCallbackFailure(entry.callback,x);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -400,7 +549,7 @@ public abstract class CompressExtension extends AbstractExtension
|
|||
@Override
|
||||
public void writeFailed(Throwable x)
|
||||
{
|
||||
notifyCallbackFailure(current.callback, x);
|
||||
notifyCallbackFailure(current.callback,x);
|
||||
// If something went wrong, very likely the compression context
|
||||
// will be invalid, so we need to fail this IteratingCallback.
|
||||
failed(x);
|
||||
|
|
|
@ -19,9 +19,11 @@
|
|||
package org.eclipse.jetty.websocket.common.extensions.compress;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.zip.DataFormatException;
|
||||
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.api.BadPayloadException;
|
||||
import org.eclipse.jetty.websocket.api.BatchMode;
|
||||
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
|
||||
|
@ -59,23 +61,33 @@ public class PerMessageDeflateExtension extends CompressExtension
|
|||
// This extension requires the RSV1 bit set only in the first frame.
|
||||
// Subsequent continuation frames don't have RSV1 set, but are compressed.
|
||||
if (frame.getType().isData())
|
||||
{
|
||||
incomingCompressed = frame.isRsv1();
|
||||
}
|
||||
|
||||
if (OpCode.isControlFrame(frame.getOpCode()) || !frame.hasPayload() || !incomingCompressed)
|
||||
if (OpCode.isControlFrame(frame.getOpCode()) || !incomingCompressed)
|
||||
{
|
||||
nextIncomingFrame(frame);
|
||||
return;
|
||||
}
|
||||
|
||||
boolean appendTail = frame.isFin();
|
||||
ByteBuffer payload = frame.getPayload();
|
||||
int remaining = payload.remaining();
|
||||
byte[] input = new byte[remaining + (appendTail ? TAIL_BYTES.length : 0)];
|
||||
payload.get(input, 0, remaining);
|
||||
if (appendTail)
|
||||
System.arraycopy(TAIL_BYTES, 0, input, remaining, TAIL_BYTES.length);
|
||||
|
||||
forwardIncoming(frame, decompress(input));
|
||||
|
||||
ByteAccumulator accumulator = newByteAccumulator();
|
||||
|
||||
try
|
||||
{
|
||||
ByteBuffer payload = frame.getPayload();
|
||||
decompress(accumulator, payload);
|
||||
if (frame.isFin())
|
||||
{
|
||||
decompress(accumulator, TAIL_BYTES_BUF.slice());
|
||||
}
|
||||
|
||||
forwardIncoming(frame, accumulator);
|
||||
}
|
||||
catch (DataFormatException e)
|
||||
{
|
||||
throw new BadPayloadException(e);
|
||||
}
|
||||
|
||||
if (frame.isFin())
|
||||
incomingCompressed = false;
|
||||
|
@ -87,6 +99,7 @@ public class PerMessageDeflateExtension extends CompressExtension
|
|||
if (frame.isFin() && !incomingContextTakeover)
|
||||
{
|
||||
LOG.debug("Incoming Context Reset");
|
||||
decompressCount.set(0);
|
||||
getInflater().reset();
|
||||
}
|
||||
super.nextIncomingFrame(frame);
|
||||
|
@ -167,6 +180,8 @@ public class PerMessageDeflateExtension extends CompressExtension
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
LOG.debug("config: outgoingContextTakover={}, incomingContextTakeover={} : {}", outgoingContextTakeover, incomingContextTakeover, this);
|
||||
|
||||
super.setConfig(configNegotiated);
|
||||
}
|
||||
|
@ -174,7 +189,7 @@ public class PerMessageDeflateExtension extends CompressExtension
|
|||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return String.format("%s[requested=%s,negotiated=%s]",
|
||||
return String.format("%s[requested=\"%s\", negotiated=\"%s\"]",
|
||||
getClass().getSimpleName(),
|
||||
configRequested.getParameterizedName(),
|
||||
configNegotiated.getParameterizedName());
|
||||
|
|
|
@ -1,94 +0,0 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.server;
|
||||
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.eclipse.jetty.toolchain.test.EventQueue;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketFrame;
|
||||
import org.eclipse.jetty.websocket.common.frames.TextFrame;
|
||||
import org.eclipse.jetty.websocket.common.test.BlockheadClient;
|
||||
import org.eclipse.jetty.websocket.common.test.HttpResponse;
|
||||
import org.eclipse.jetty.websocket.server.helper.EchoServlet;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
public class FrameCompressionExtensionTest
|
||||
{
|
||||
private static SimpleServletServer server;
|
||||
|
||||
@BeforeClass
|
||||
public static void startServer() throws Exception
|
||||
{
|
||||
server = new SimpleServletServer(new EchoServlet());
|
||||
server.start();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void stopServer()
|
||||
{
|
||||
server.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeflateFrameExtension() throws Exception
|
||||
{
|
||||
BlockheadClient client = new BlockheadClient(server.getServerUri());
|
||||
client.clearExtensions();
|
||||
client.addExtensions("x-webkit-deflate-frame");
|
||||
client.setProtocols("echo");
|
||||
|
||||
try
|
||||
{
|
||||
// Make sure the read times out if there are problems with the implementation
|
||||
client.setTimeout(1,TimeUnit.SECONDS);
|
||||
client.connect();
|
||||
client.sendStandardRequest();
|
||||
HttpResponse resp = client.expectUpgradeResponse();
|
||||
|
||||
Assert.assertThat("Response",resp.getExtensionsHeader(),containsString("x-webkit-deflate-frame"));
|
||||
|
||||
String msg = "Hello";
|
||||
|
||||
// Client sends first message
|
||||
client.write(new TextFrame().setPayload(msg));
|
||||
|
||||
EventQueue<WebSocketFrame> frames = client.readFrames(1,1000,TimeUnit.MILLISECONDS);
|
||||
WebSocketFrame frame = frames.poll();
|
||||
Assert.assertThat("TEXT.payload",frame.getPayloadAsUTF8(),is(msg.toString()));
|
||||
|
||||
// Client sends second message
|
||||
client.clearCaptured();
|
||||
msg = "There";
|
||||
client.write(new TextFrame().setPayload(msg));
|
||||
|
||||
frames = client.readFrames(1,1,TimeUnit.SECONDS);
|
||||
frame = frames.poll();
|
||||
Assert.assertThat("TEXT.payload",frame.getPayloadAsUTF8(),is(msg.toString()));
|
||||
}
|
||||
finally
|
||||
{
|
||||
client.close();
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue