Work on permessage-deflate continues

This commit is contained in:
Joakim Erdfelt 2015-05-14 07:42:34 -07:00
parent eb638777d0
commit f4f5157ab6
3 changed files with 289 additions and 220 deletions

View File

@ -18,8 +18,10 @@
package org.eclipse.jetty.websocket.common.extensions.compress; package org.eclipse.jetty.websocket.common.extensions.compress;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.DataFormatException; import java.util.zip.DataFormatException;
import java.util.zip.Deflater; import java.util.zip.Deflater;
import java.util.zip.Inflater; import java.util.zip.Inflater;
@ -30,7 +32,6 @@ import org.eclipse.jetty.util.ConcurrentArrayQueue;
import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BadPayloadException;
import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame; import org.eclipse.jetty.websocket.api.extensions.Frame;
@ -40,7 +41,8 @@ import org.eclipse.jetty.websocket.common.frames.DataFrame;
public abstract class CompressExtension extends AbstractExtension public abstract class CompressExtension extends AbstractExtension
{ {
protected static final byte[] TAIL_BYTES = new byte[]{0x00, 0x00, (byte)0xFF, (byte)0xFF}; protected static final byte[] TAIL_BYTES = new byte[] { 0x00, 0x00, (byte)0xFF, (byte)0xFF};
protected static final ByteBuffer TAIL_BYTES_BUF = ByteBuffer.wrap(TAIL_BYTES);
private static final Logger LOG = Log.getLogger(CompressExtension.class); private static final Logger LOG = Log.getLogger(CompressExtension.class);
/** Never drop tail bytes 0000FFFF, from any frame type */ /** Never drop tail bytes 0000FFFF, from any frame type */
@ -55,34 +57,40 @@ public abstract class CompressExtension extends AbstractExtension
/** /**
* Only set RSV flag on first frame in multi-frame messages. * Only set RSV flag on first frame in multi-frame messages.
* <p> * <p>
* Note: this automatically means no-continuation frames have * Note: this automatically means no-continuation frames have the RSV bit set
* the RSV bit set
*/ */
protected static final int RSV_USE_ONLY_FIRST = 1; protected static final int RSV_USE_ONLY_FIRST = 1;
/** Inflater / Decompressed Buffer Size */
protected static final int INFLATE_BUFFER_SIZE = 8 * 1024;
/** Deflater / Inflater: Maximum Input Buffer Size */
protected static final int INPUT_MAX_BUFFER_SIZE = 8 * 1024;
private final static boolean NOWRAP = true;
private final Queue<FrameEntry> entries = new ConcurrentArrayQueue<>(); private final Queue<FrameEntry> entries = new ConcurrentArrayQueue<>();
private final IteratingCallback flusher = new Flusher(); private final IteratingCallback flusher = new Flusher();
private final Deflater compressor; private final Deflater deflater;
private final Inflater decompressor; private final Inflater inflater;
protected AtomicInteger decompressCount = new AtomicInteger(0);
private int tailDrop = TAIL_DROP_NEVER; private int tailDrop = TAIL_DROP_NEVER;
private int rsvUse = RSV_USE_ALWAYS; private int rsvUse = RSV_USE_ALWAYS;
protected CompressExtension() protected CompressExtension()
{ {
compressor = new Deflater(Deflater.BEST_COMPRESSION, true); deflater = new Deflater(Deflater.DEFAULT_COMPRESSION,NOWRAP);
decompressor = new Inflater(true); inflater = new Inflater(NOWRAP);
tailDrop = getTailDropMode(); tailDrop = getTailDropMode();
rsvUse = getRsvUseMode(); rsvUse = getRsvUseMode();
} }
public Deflater getDeflater() public Deflater getDeflater()
{ {
return compressor; return deflater;
} }
public Inflater getInflater() public Inflater getInflater()
{ {
return decompressor; return inflater;
} }
/** /**
@ -114,7 +122,7 @@ public abstract class CompressExtension extends AbstractExtension
// Unset RSV1 since it's not compressed anymore. // Unset RSV1 since it's not compressed anymore.
newFrame.setRsv1(false); newFrame.setRsv1(false);
ByteBuffer buffer = getBufferPool().acquire(accumulator.getLength(), false); ByteBuffer buffer = getBufferPool().acquire(accumulator.getLength(),false);
try try
{ {
BufferUtil.flipToFill(buffer); BufferUtil.flipToFill(buffer);
@ -128,51 +136,64 @@ public abstract class CompressExtension extends AbstractExtension
} }
} }
protected ByteAccumulator decompress(byte[] input) protected ByteAccumulator newByteAccumulator()
{ {
// Since we don't track text vs binary vs continuation state, just grab whatever is the greater value. int maxSize = Math.max(getPolicy().getMaxTextMessageSize(),getPolicy().getMaxBinaryMessageBufferSize());
int maxSize = Math.max(getPolicy().getMaxTextMessageSize(), getPolicy().getMaxBinaryMessageBufferSize()); return new ByteAccumulator(maxSize);
ByteAccumulator accumulator = new ByteAccumulator(maxSize); }
decompressor.setInput(input, 0, input.length); protected void decompress(ByteAccumulator accumulator, ByteBuffer buf)
throws DataFormatException
if (LOG.isDebugEnabled()) {
LOG.debug("Decompressing {} bytes", input.length); if ((buf == null) || (!buf.hasRemaining()))
try
{ {
// It is allowed to send DEFLATE blocks with BFINAL=1. return;
// For such blocks, getRemaining() will be > 0 but finished() }
// will be true, so we need to check for both. byte[] output = new byte[1024];
// When BFINAL=0, finished() will always be false and we only
// check the remaining bytes. if (inflater.needsInput() && !supplyInput(inflater, buf))
while (decompressor.getRemaining() > 0 && !decompressor.finished()) {
LOG.debug("Needed input, but no buffer could supply input");
return;
}
int read = 0;
while ((read = inflater.inflate(output)) >= 0)
{
if (read == 0)
{ {
byte[] output = new byte[Math.min(input.length * 2, 32 * 1024)]; LOG.debug("Decompress: read 0 {}",toDetail(inflater));
int decompressed = decompressor.inflate(output); if (inflater.finished() || inflater.needsDictionary())
if (decompressed == 0)
{ {
if (decompressor.needsInput()) if (LOG.isDebugEnabled())
{ {
throw new BadPayloadException("Unable to inflate frame, not enough input on frame"); LOG.debug("Decompress: finished? {}",toDetail(inflater));
}
if (decompressor.needsDictionary())
{
throw new BadPayloadException("Unable to inflate frame, frame erroneously says it needs a dictionary");
} }
// We are finished ?
break;
} }
else else if (inflater.needsInput())
{ {
accumulator.addChunk(output, 0, decompressed); if (!supplyInput(inflater, buf))
{
break;
}
} }
} }
if (LOG.isDebugEnabled()) else
LOG.debug("Decompressed {}->{} bytes", input.length, accumulator.getLength()); {
return accumulator; // do something with output
if (LOG.isDebugEnabled())
{
LOG.debug("Decompressed {} bytes: {}",read,toDetail(inflater));
}
accumulator.addChunk(output,0,read);
}
} }
catch (DataFormatException x)
if (LOG.isDebugEnabled())
{ {
throw new BadPayloadException(x); LOG.debug("Decompress: exiting {}",toDetail(inflater));
} }
} }
@ -185,13 +206,13 @@ public abstract class CompressExtension extends AbstractExtension
if (flusher.isFailed()) if (flusher.isFailed())
{ {
notifyCallbackFailure(callback, new ZipException()); notifyCallbackFailure(callback,new ZipException());
return; return;
} }
FrameEntry entry = new FrameEntry(frame, callback, batchMode); FrameEntry entry = new FrameEntry(frame,callback,batchMode);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Queuing {}", entry); LOG.debug("Queuing {}",entry);
entries.offer(entry); entries.offer(entry);
flusher.iterate(); flusher.iterate();
} }
@ -206,7 +227,7 @@ public abstract class CompressExtension extends AbstractExtension
catch (Throwable x) catch (Throwable x)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Exception while notifying success of callback " + callback, x); LOG.debug("Exception while notifying success of callback " + callback,x);
} }
} }
@ -220,10 +241,119 @@ public abstract class CompressExtension extends AbstractExtension
catch (Throwable x) catch (Throwable x)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Exception while notifying failure of callback " + callback, x); LOG.debug("Exception while notifying failure of callback " + callback,x);
} }
} }
private static boolean supplyInput(Inflater inflater, ByteBuffer buf)
{
if (buf.remaining() <= 0)
{
if (LOG.isDebugEnabled())
{
LOG.debug("No data left left to supply to Inflater");
}
return false;
}
byte input[];
int inputOffset = 0;
int len;
if (buf.hasArray())
{
// no need to create a new byte buffer, just return this one.
len = buf.remaining();
input = buf.array();
inputOffset = buf.position() + buf.arrayOffset();
buf.position(buf.position() + len);
}
else
{
// Only create an return byte buffer that is reasonable in size
len = Math.min(INPUT_MAX_BUFFER_SIZE, buf.remaining());
input = new byte[len];
inputOffset = 0;
buf.get(input,0,len);
}
inflater.setInput(input,inputOffset,len);
if (LOG.isDebugEnabled())
{
LOG.debug("Supplied {} input bytes: {}",input.length,toDetail(inflater));
}
return true;
}
private static boolean supplyInput(Deflater deflater, ByteBuffer buf)
{
if (buf.remaining() <= 0)
{
if (LOG.isDebugEnabled())
{
LOG.debug("No data left left to supply to Deflater");
}
return false;
}
byte input[];
int inputOffset = 0;
int len;
if (buf.hasArray())
{
// no need to create a new byte buffer, just return this one.
len = buf.remaining();
input = buf.array();
inputOffset = buf.position() + buf.arrayOffset();
buf.position(buf.position() + len);
}
else
{
// Only create an return byte buffer that is reasonable in size
len = Math.min(INPUT_MAX_BUFFER_SIZE, buf.remaining());
input = new byte[len];
inputOffset = 0;
buf.get(input,0,len);
}
deflater.setInput(input,inputOffset,len);
if (LOG.isDebugEnabled())
{
LOG.debug("Supplied {} input bytes: {}",input.length,toDetail(deflater));
}
return true;
}
private static String toDetail(Inflater inflater)
{
return String.format("Inflater[finished=%b,read=%d,written=%d,remaining=%d,in=%d,out=%d]",inflater.finished(),inflater.getBytesRead(),
inflater.getBytesWritten(),inflater.getRemaining(),inflater.getTotalIn(),inflater.getTotalOut());
}
private static String toDetail(Deflater deflater)
{
return String.format("Deflater[finished=%b,read=%d,written=%d,in=%d,out=%d]",deflater.finished(),deflater.getBytesRead(),
deflater.getBytesWritten(),deflater.getTotalIn(),deflater.getTotalOut());
}
public static boolean endsWithTail(ByteBuffer buf)
{
if ((buf == null) || (buf.remaining() < TAIL_BYTES.length))
{
return false;
}
int limit = buf.limit();
for (int i = TAIL_BYTES.length; i > 0; i--)
{
if (buf.get(limit - i) != TAIL_BYTES[TAIL_BYTES.length - i])
{
return false;
}
}
return true;
}
@Override @Override
public String toString() public String toString()
{ {
@ -254,7 +384,6 @@ public abstract class CompressExtension extends AbstractExtension
{ {
private static final int INPUT_BUFSIZE = 32 * 1024; private static final int INPUT_BUFSIZE = 32 * 1024;
private FrameEntry current; private FrameEntry current;
private ByteBuffer payload;
private boolean finished = true; private boolean finished = true;
@Override @Override
@ -263,14 +392,14 @@ public abstract class CompressExtension extends AbstractExtension
if (finished) if (finished)
{ {
current = entries.poll(); current = entries.poll();
LOG.debug("Processing {}", current); LOG.debug("Processing {}",current);
if (current == null) if (current == null)
return Action.IDLE; return Action.IDLE;
deflate(current); deflate(current);
} }
else else
{ {
compress(current, false); compress(current,false);
} }
return Action.SCHEDULED; return Action.SCHEDULED;
} }
@ -281,97 +410,117 @@ public abstract class CompressExtension extends AbstractExtension
BatchMode batchMode = entry.batchMode; BatchMode batchMode = entry.batchMode;
if (OpCode.isControlFrame(frame.getOpCode()) || !frame.hasPayload()) if (OpCode.isControlFrame(frame.getOpCode()) || !frame.hasPayload())
{ {
nextOutgoingFrame(frame, this, batchMode); nextOutgoingFrame(frame,this,batchMode);
return; return;
} }
compress(entry, true); compress(entry,true);
} }
private void compress(FrameEntry entry, boolean first) private void compress(FrameEntry entry, boolean first)
{ {
final int flush = Deflater.SYNC_FLUSH;
// Get a chunk of the payload to avoid to blow // Get a chunk of the payload to avoid to blow
// the heap if the payload is a huge mapped file. // the heap if the payload is a huge mapped file.
Frame frame = entry.frame; Frame frame = entry.frame;
ByteBuffer data = frame.getPayload(); ByteBuffer data = frame.getPayload();
int remaining = data.remaining(); int remaining = data.remaining();
int inputLength = Math.min(remaining, INPUT_BUFSIZE); int chunkSize = Math.min(remaining,INPUT_BUFSIZE);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Compressing {}: {} bytes in {} bytes chunk", entry, remaining, inputLength); LOG.debug("Compressing {}: {} bytes in {} bytes chunk",entry,remaining,chunkSize);
// Avoid to copy the bytes if the ByteBuffer boolean needsCompress = true;
// is backed by an array.
int inputOffset; if (deflater.needsInput() && !supplyInput(deflater,data))
byte[] input;
if (data.hasArray())
{ {
input = data.array(); // no input supplied
int position = data.position(); needsCompress = false;
inputOffset = position + data.arrayOffset();
data.position(position + inputLength);
} }
else
{
input = new byte[inputLength];
inputOffset = 0;
data.get(input, 0, inputLength);
}
finished = inputLength == remaining;
compressor.setInput(input, inputOffset, inputLength); ByteArrayOutputStream out = new ByteArrayOutputStream();
// Use an additional space in case the content is not compressible. byte[] output = new byte[chunkSize];
byte[] output = new byte[inputLength + 64];
int outputOffset = 0; // Compress the data
int outputLength = 0; while (needsCompress)
while (true)
{ {
int space = output.length - outputOffset; int read = deflater.deflate(output,0,chunkSize,flush);
int compressed = compressor.deflate(output, outputOffset, space, Deflater.SYNC_FLUSH); if (read == 0)
outputLength += compressed;
if (compressed < space)
{ {
// Everything was compressed. if (deflater.finished())
break; {
// done
break;
}
else if (deflater.needsInput())
{
if (!supplyInput(deflater,data))
{
// done
needsCompress = false;
}
}
} }
else else
{ {
// The compressed output is bigger than the uncompressed input. // Append the output for the eventual frame.
byte[] newOutput = new byte[output.length * 2]; out.write(output,0,read);
System.arraycopy(output, 0, newOutput, 0, output.length);
outputOffset += output.length;
output = newOutput;
} }
} }
boolean fin = frame.isFin() && finished; boolean fin = frame.isFin();
// Handle tail bytes generated by SYNC_FLUSH. ByteBuffer payload = ByteBuffer.wrap(out.toByteArray());
if(tailDrop == TAIL_DROP_ALWAYS) {
payload = ByteBuffer.wrap(output, 0, outputLength - TAIL_BYTES.length); if (payload.remaining()>0)
} else if(tailDrop == TAIL_DROP_FIN_ONLY) { {
payload = ByteBuffer.wrap(output, 0, outputLength - (fin?TAIL_BYTES.length:0)); // Handle tail bytes generated by SYNC_FLUSH.
} else { LOG.debug("compressed bytes[] = {}",BufferUtil.toDetailString(payload));
// always include
payload = ByteBuffer.wrap(output, 0, outputLength); if (tailDrop == TAIL_DROP_ALWAYS)
{
if (endsWithTail(payload))
{
payload.limit(payload.limit() - TAIL_BYTES.length);
}
LOG.debug("payload (TAIL_DROP_ALWAYS) = {}",BufferUtil.toDetailString(payload));
}
else if (tailDrop == TAIL_DROP_FIN_ONLY)
{
if (frame.isFin() && endsWithTail(payload))
{
payload.limit(payload.limit() - TAIL_BYTES.length);
}
LOG.debug("payload (TAIL_DROP_FIN_ONLY) = {}",BufferUtil.toDetailString(payload));
}
} }
else if (fin)
{
// Special case: 8.2.3.6. Generating an Empty Fragment Manually
payload = ByteBuffer.wrap(new byte[] { 0x00 });
}
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
{ {
LOG.debug("Compressed {}: {}->{} chunk bytes",entry,inputLength,outputLength); LOG.debug("Compressed {}: input:{} -> payload:{}",entry,chunkSize,payload.remaining());
} }
boolean continuation = frame.getType().isContinuation() || !first; boolean continuation = frame.getType().isContinuation() || !first;
DataFrame chunk = new DataFrame(frame, continuation); DataFrame chunk = new DataFrame(frame,continuation);
if(rsvUse == RSV_USE_ONLY_FIRST) { if (rsvUse == RSV_USE_ONLY_FIRST)
{
chunk.setRsv1(!continuation); chunk.setRsv1(!continuation);
} else { }
else
{
// always set // always set
chunk.setRsv1(true); chunk.setRsv1(true);
} }
chunk.setPayload(payload); chunk.setPayload(payload);
chunk.setFin(fin); chunk.setFin(fin);
nextOutgoingFrame(chunk, this, entry.batchMode); nextOutgoingFrame(chunk,this,entry.batchMode);
} }
@Override @Override
@ -386,7 +535,7 @@ public abstract class CompressExtension extends AbstractExtension
// Fail all the frames in the queue. // Fail all the frames in the queue.
FrameEntry entry; FrameEntry entry;
while ((entry = entries.poll()) != null) while ((entry = entries.poll()) != null)
notifyCallbackFailure(entry.callback, x); notifyCallbackFailure(entry.callback,x);
} }
@Override @Override
@ -400,7 +549,7 @@ public abstract class CompressExtension extends AbstractExtension
@Override @Override
public void writeFailed(Throwable x) public void writeFailed(Throwable x)
{ {
notifyCallbackFailure(current.callback, x); notifyCallbackFailure(current.callback,x);
// If something went wrong, very likely the compression context // If something went wrong, very likely the compression context
// will be invalid, so we need to fail this IteratingCallback. // will be invalid, so we need to fail this IteratingCallback.
failed(x); failed(x);

View File

@ -19,9 +19,11 @@
package org.eclipse.jetty.websocket.common.extensions.compress; package org.eclipse.jetty.websocket.common.extensions.compress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.zip.DataFormatException;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BadPayloadException;
import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig; import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
@ -59,23 +61,33 @@ public class PerMessageDeflateExtension extends CompressExtension
// This extension requires the RSV1 bit set only in the first frame. // This extension requires the RSV1 bit set only in the first frame.
// Subsequent continuation frames don't have RSV1 set, but are compressed. // Subsequent continuation frames don't have RSV1 set, but are compressed.
if (frame.getType().isData()) if (frame.getType().isData())
{
incomingCompressed = frame.isRsv1(); incomingCompressed = frame.isRsv1();
}
if (OpCode.isControlFrame(frame.getOpCode()) || !frame.hasPayload() || !incomingCompressed) if (OpCode.isControlFrame(frame.getOpCode()) || !incomingCompressed)
{ {
nextIncomingFrame(frame); nextIncomingFrame(frame);
return; return;
} }
boolean appendTail = frame.isFin(); ByteAccumulator accumulator = newByteAccumulator();
ByteBuffer payload = frame.getPayload();
int remaining = payload.remaining();
byte[] input = new byte[remaining + (appendTail ? TAIL_BYTES.length : 0)];
payload.get(input, 0, remaining);
if (appendTail)
System.arraycopy(TAIL_BYTES, 0, input, remaining, TAIL_BYTES.length);
forwardIncoming(frame, decompress(input)); try
{
ByteBuffer payload = frame.getPayload();
decompress(accumulator, payload);
if (frame.isFin())
{
decompress(accumulator, TAIL_BYTES_BUF.slice());
}
forwardIncoming(frame, accumulator);
}
catch (DataFormatException e)
{
throw new BadPayloadException(e);
}
if (frame.isFin()) if (frame.isFin())
incomingCompressed = false; incomingCompressed = false;
@ -87,6 +99,7 @@ public class PerMessageDeflateExtension extends CompressExtension
if (frame.isFin() && !incomingContextTakeover) if (frame.isFin() && !incomingContextTakeover)
{ {
LOG.debug("Incoming Context Reset"); LOG.debug("Incoming Context Reset");
decompressCount.set(0);
getInflater().reset(); getInflater().reset();
} }
super.nextIncomingFrame(frame); super.nextIncomingFrame(frame);
@ -168,13 +181,15 @@ public class PerMessageDeflateExtension extends CompressExtension
} }
} }
LOG.debug("config: outgoingContextTakover={}, incomingContextTakeover={} : {}", outgoingContextTakeover, incomingContextTakeover, this);
super.setConfig(configNegotiated); super.setConfig(configNegotiated);
} }
@Override @Override
public String toString() public String toString()
{ {
return String.format("%s[requested=%s,negotiated=%s]", return String.format("%s[requested=\"%s\", negotiated=\"%s\"]",
getClass().getSimpleName(), getClass().getSimpleName(),
configRequested.getParameterizedName(), configRequested.getParameterizedName(),
configNegotiated.getParameterizedName()); configNegotiated.getParameterizedName());

View File

@ -1,95 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.server;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.test.BlockheadClient;
import org.eclipse.jetty.websocket.common.test.HttpResponse;
import org.eclipse.jetty.websocket.server.helper.EchoServlet;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
public class FrameCompressionExtensionTest
{
private static SimpleServletServer server;
@BeforeClass
public static void startServer() throws Exception
{
server = new SimpleServletServer(new EchoServlet());
server.start();
}
@AfterClass
public static void stopServer()
{
server.stop();
}
@Test
public void testDeflateFrameExtension() throws Exception
{
BlockheadClient client = new BlockheadClient(server.getServerUri());
client.clearExtensions();
client.addExtensions("x-webkit-deflate-frame");
client.setProtocols("echo");
try
{
// Make sure the read times out if there are problems with the implementation
client.setTimeout(1,TimeUnit.SECONDS);
client.connect();
client.sendStandardRequest();
HttpResponse resp = client.expectUpgradeResponse();
Assert.assertThat("Response",resp.getExtensionsHeader(),containsString("x-webkit-deflate-frame"));
String msg = "Hello";
// Client sends first message
client.write(new TextFrame().setPayload(msg));
EventQueue<WebSocketFrame> frames = client.readFrames(1,1000,TimeUnit.MILLISECONDS);
WebSocketFrame frame = frames.poll();
Assert.assertThat("TEXT.payload",frame.getPayloadAsUTF8(),is(msg.toString()));
// Client sends second message
client.clearCaptured();
msg = "There";
client.write(new TextFrame().setPayload(msg));
frames = client.readFrames(1,1,TimeUnit.SECONDS);
frame = frames.poll();
Assert.assertThat("TEXT.payload",frame.getPayloadAsUTF8(),is(msg.toString()));
}
finally
{
client.close();
}
}
}