Updates for Extensions

+ Extensions now report to Generator their requirements for
  RSV1, RSV2, RSV3
+ DeflateFrameExtension now reports RSV1 use.
+ DeflateFrameExtension reads uncompressed data properly now.
+ Unit tests for small/medium/large payloads on DeflateFrameExtension
+ OutgoingFrames.output() method now has non-optional throws IOException
This commit is contained in:
Joakim Erdfelt 2012-07-25 11:04:38 -07:00
parent 78b16d38f5
commit 6fa2f67a96
11 changed files with 235 additions and 36 deletions

View File

@ -15,9 +15,13 @@
//======================================================================== //========================================================================
package org.eclipse.jetty.websocket.api; package org.eclipse.jetty.websocket.api;
import java.io.IOException;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.io.IncomingFrames; import org.eclipse.jetty.websocket.io.IncomingFrames;
import org.eclipse.jetty.websocket.io.OutgoingFrames; import org.eclipse.jetty.websocket.io.OutgoingFrames;
import org.eclipse.jetty.websocket.protocol.ExtensionConfig; import org.eclipse.jetty.websocket.protocol.ExtensionConfig;
@ -25,6 +29,7 @@ import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
public abstract class Extension implements OutgoingFrames, IncomingFrames public abstract class Extension implements OutgoingFrames, IncomingFrames
{ {
private Logger LOG = Log.getLogger(this.getClass());
private WebSocketPolicy policy; private WebSocketPolicy policy;
private ByteBufferPool bufferPool; private ByteBufferPool bufferPool;
private ExtensionConfig config; private ExtensionConfig config;
@ -70,14 +75,14 @@ public abstract class Extension implements OutgoingFrames, IncomingFrames
public void incoming(WebSocketException e) public void incoming(WebSocketException e)
{ {
// pass thru, un-modified // pass thru, un-modified
nextIncomingFrames.incoming(e); nextIncoming(e);
} }
@Override @Override
public void incoming(WebSocketFrame frame) public void incoming(WebSocketFrame frame)
{ {
// pass thru, un-modified // pass thru, un-modified
nextIncomingFrames.incoming(frame); nextIncoming(frame);
} }
/** /**
@ -88,6 +93,10 @@ public abstract class Extension implements OutgoingFrames, IncomingFrames
*/ */
public void nextIncoming(WebSocketException e) public void nextIncoming(WebSocketException e)
{ {
if (LOG.isDebugEnabled())
{
LOG.debug("nextIncoming({}) - {}",e,nextIncomingFrames);
}
nextIncomingFrames.incoming(e); nextIncomingFrames.incoming(e);
} }
@ -99,6 +108,10 @@ public abstract class Extension implements OutgoingFrames, IncomingFrames
*/ */
public void nextIncoming(WebSocketFrame frame) public void nextIncoming(WebSocketFrame frame)
{ {
if (LOG.isDebugEnabled())
{
LOG.debug("nextIncoming({}) - {}",frame,nextIncomingFrames);
}
nextIncomingFrames.incoming(frame); nextIncomingFrames.incoming(frame);
} }
@ -108,21 +121,29 @@ public abstract class Extension implements OutgoingFrames, IncomingFrames
* @param frame * @param frame
* the frame to send to the next output * the frame to send to the next output
*/ */
public <C> void nextOutput(C context, Callback<C> callback, WebSocketFrame frame) public <C> void nextOutput(C context, Callback<C> callback, WebSocketFrame frame) throws IOException
{ {
if (LOG.isDebugEnabled())
{
LOG.debug("nextOutput({}) - {}",frame,nextOutgoingFrames);
}
nextOutgoingFrames.output(context,callback,frame); nextOutgoingFrames.output(context,callback,frame);
} }
public <C> void nextOutputNoCallback(WebSocketFrame frame) public <C> void nextOutputNoCallback(WebSocketFrame frame) throws IOException
{ {
if (LOG.isDebugEnabled())
{
LOG.debug("nextOutput({}) - {}",frame,nextOutgoingFrames);
}
nextOutgoingFrames.output(null,new FutureCallback<Void>(),frame); nextOutgoingFrames.output(null,new FutureCallback<Void>(),frame);
} }
@Override @Override
public <C> void output(C context, Callback<C> callback, WebSocketFrame frame) public <C> void output(C context, Callback<C> callback, WebSocketFrame frame) throws IOException
{ {
// pass thru, un-modified // pass thru, un-modified
nextOutgoingFrames.output(context,callback,frame); nextOutput(context,callback,frame);
} }
public void setBufferPool(ByteBufferPool bufferPool) public void setBufferPool(ByteBufferPool bufferPool)
@ -149,4 +170,19 @@ public abstract class Extension implements OutgoingFrames, IncomingFrames
{ {
this.policy = policy; this.policy = policy;
} }
public boolean useRsv1()
{
return false;
}
public boolean useRsv2()
{
return false;
}
public boolean useRsv3()
{
return false;
}
} }

View File

@ -15,6 +15,7 @@
//======================================================================== //========================================================================
package org.eclipse.jetty.websocket.extensions.deflate; package org.eclipse.jetty.websocket.extensions.deflate;
import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.zip.DataFormatException; import java.util.zip.DataFormatException;
import java.util.zip.Deflater; import java.util.zip.Deflater;
@ -89,6 +90,11 @@ public class DeflateFrameExtension extends Extension
buf.put((byte)(length & 0x7F)); buf.put((byte)(length & 0x7F));
} }
if (LOG.isDebugEnabled())
{
LOG.debug("Uncompressed length={} - {}",length,buf.position());
}
while (!deflater.finished()) while (!deflater.finished())
{ {
byte out[] = new byte[length]; byte out[] = new byte[length];
@ -175,7 +181,7 @@ public class DeflateFrameExtension extends Extension
} }
@Override @Override
public <C> void output(C context, Callback<C> callback, WebSocketFrame frame) public <C> void output(C context, Callback<C> callback, WebSocketFrame frame) throws IOException
{ {
if (frame.getOpCode().isControlFrame()) if (frame.getOpCode().isControlFrame())
{ {
@ -219,13 +225,13 @@ public class DeflateFrameExtension extends Extension
{ {
int length = data.get(); int length = data.get();
int bytes = 0; int bytes = 0;
if (length == 0x7F) if (length == 127) // 0x7F
{ {
// length 8 bytes (extended payload length) // length 8 bytes (extended payload length)
length = 0; length = 0;
bytes = 8; bytes = 8;
} }
else if (length == 0x7F) else if (length == 126) // 0x7E
{ {
// length 2 bytes (extended payload length) // length 2 bytes (extended payload length)
length = 0; length = 0;
@ -234,6 +240,7 @@ public class DeflateFrameExtension extends Extension
while (bytes > 0) while (bytes > 0)
{ {
--bytes;
byte b = data.get(); byte b = data.get();
length |= (b & 0xFF) << (8 * bytes); length |= (b & 0xFF) << (8 * bytes);
} }
@ -254,4 +261,19 @@ public class DeflateFrameExtension extends Extension
deflater.setStrategy(Deflater.DEFAULT_STRATEGY); deflater.setStrategy(Deflater.DEFAULT_STRATEGY);
inflater = new Inflater(); inflater = new Inflater();
} }
@Override
public String toString()
{
return String.format("DeflateFrameExtension[minLength=%d]",minLength);
}
/**
* Indicates use of RSV1 flag for indicating deflation is in use.
*/
@Override
public boolean useRsv1()
{
return true;
}
} }

View File

@ -15,6 +15,7 @@
//======================================================================== //========================================================================
package org.eclipse.jetty.websocket.extensions.fragment; package org.eclipse.jetty.websocket.extensions.fragment;
import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
@ -28,7 +29,7 @@ public class FragmentExtension extends Extension
private int maxLength = -1; private int maxLength = -1;
@Override @Override
public <C> void output(C context, Callback<C> callback, WebSocketFrame frame) public <C> void output(C context, Callback<C> callback, WebSocketFrame frame) throws IOException
{ {
if (frame.getOpCode().isControlFrame()) if (frame.getOpCode().isControlFrame())
{ {

View File

@ -1,5 +1,7 @@
package org.eclipse.jetty.websocket.io; package org.eclipse.jetty.websocket.io;
import java.io.IOException;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame; import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
@ -8,5 +10,5 @@ import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
*/ */
public interface OutgoingFrames public interface OutgoingFrames
{ {
<C> void output(C context, Callback<C> callback, WebSocketFrame frame); <C> void output(C context, Callback<C> callback, WebSocketFrame frame) throws IOException;
} }

View File

@ -277,7 +277,7 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements
break; break;
} }
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled() && (filled > 0))
{ {
LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer)); LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
} }

View File

@ -94,7 +94,7 @@ public class WebSocketSession implements WebSocketConnection, IncomingFrames, Ou
} }
@Override @Override
public <C> void output(C context, Callback<C> callback, WebSocketFrame frame) public <C> void output(C context, Callback<C> callback, WebSocketFrame frame) throws IOException
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
{ {

View File

@ -61,6 +61,13 @@ public class Generator
private final ByteBufferPool bufferPool; private final ByteBufferPool bufferPool;
private boolean validating; private boolean validating;
/** Is there an extension using RSV1 */
private boolean rsv1InUse = false;
/** Is there an extension using RSV2 */
private boolean rsv2InUse = false;
/** Is there an extension using RSV3 */
private boolean rsv3InUse = false;
/** /**
* Construct Generator with provided policy and bufferPool * Construct Generator with provided policy and bufferPool
* *
@ -104,21 +111,18 @@ public class Generator
* MUST be 0 unless an extension is negotiated that defines meanings for non-zero values. If a nonzero value is received and none of the negotiated * MUST be 0 unless an extension is negotiated that defines meanings for non-zero values. If a nonzero value is received and none of the negotiated
* extensions defines the meaning of such a nonzero value, the receiving endpoint MUST _Fail the WebSocket Connection_. * extensions defines the meaning of such a nonzero value, the receiving endpoint MUST _Fail the WebSocket Connection_.
*/ */
if (frame.isRsv1()) if (!rsv1InUse && frame.isRsv1())
{ {
// TODO: extensions can negotiate this (somehow)
throw new ProtocolException("RSV1 not allowed to be set"); throw new ProtocolException("RSV1 not allowed to be set");
} }
if (frame.isRsv2()) if (!rsv2InUse && frame.isRsv2())
{ {
// TODO: extensions can negotiate this (somehow)
throw new ProtocolException("RSV2 not allowed to be set"); throw new ProtocolException("RSV2 not allowed to be set");
} }
if (frame.isRsv3()) if (!rsv3InUse && frame.isRsv3())
{ {
// TODO: extensions can negotiate this (somehow)
throw new ProtocolException("RSV3 not allowed to be set"); throw new ProtocolException("RSV3 not allowed to be set");
} }
@ -168,10 +172,22 @@ public class Generator
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
{ {
LOG.debug(String.format( StringBuilder dbg = new StringBuilder();
"Generate.Frame[opcode=%s,fin=%b,cont=%b,rsv1=%b,rsv2=%b,rsv3=%b,mask=%b,plength=%d,payloadStart=%s,remaining=%d,position=%s]",frame dbg.append(policy.getBehavior());
.getOpCode().toString(),frame.isFin(),frame.isContinuation(),frame.isRsv1(),frame.isRsv2(),frame.isRsv3(),frame.isMasked(),frame dbg.append(" Generate.Frame[");
.getPayloadLength(),frame.getPayloadStart(),frame.remaining(),frame.position())); dbg.append("opcode=").append(frame.getOpCode());
dbg.append(",fin=").append(frame.isFin());
dbg.append(",cont=").append(frame.isContinuation());
dbg.append(",rsv1=").append(frame.isRsv1());
dbg.append(",rsv2=").append(frame.isRsv2());
dbg.append(",rsv3=").append(frame.isRsv3());
dbg.append(",mask=").append(frame.isMasked());
dbg.append(",payloadLength=").append(frame.getPayloadLength());
dbg.append(",payloadStart=").append(frame.getPayloadStart());
dbg.append(",remaining=").append(frame.remaining());
dbg.append(",position=").append(frame.position());
dbg.append(']');
LOG.debug(dbg.toString());
} }
/* /*
@ -326,10 +342,39 @@ public class Generator
public ByteBuffer generate(WebSocketFrame frame) public ByteBuffer generate(WebSocketFrame frame)
{ {
int bufferSize = frame.getPayloadLength() + OVERHEAD; int bufferSize = frame.getPayloadLength() + OVERHEAD;
return generate(bufferSize,frame); return generate(bufferSize,frame);
} }
public boolean isRsv1InUse()
{
return rsv1InUse;
}
public boolean isRsv2InUse()
{
return rsv2InUse;
}
public boolean isRsv3InUse()
{
return rsv3InUse;
}
public void setRsv1InUse(boolean rsv1InUse)
{
this.rsv1InUse = rsv1InUse;
}
public void setRsv2InUse(boolean rsv2InUse)
{
this.rsv2InUse = rsv2InUse;
}
public void setRsv3InUse(boolean rsv3InUse)
{
this.rsv3InUse = rsv3InUse;
}
@Override @Override
public String toString() public String toString()
{ {

View File

@ -142,7 +142,10 @@ public class Parser
protected void notifyFrame(final WebSocketFrame f) protected void notifyFrame(final WebSocketFrame f)
{ {
LOG.debug("Notify Frame: {}",f); if (LOG.isDebugEnabled())
{
LOG.debug("{} Notify Frame: {} to {}",policy.getBehavior(),f,incomingFramesHandler);
}
if (incomingFramesHandler == null) if (incomingFramesHandler == null)
{ {
return; return;
@ -183,7 +186,7 @@ public class Parser
// parse through all the frames in the buffer // parse through all the frames in the buffer
while (parseFrame(buffer)) while (parseFrame(buffer))
{ {
LOG.debug("Parsed Frame: " + frame); LOG.debug("{} Parsed Frame: {}",policy.getBehavior(),frame);
notifyFrame(frame); notifyFrame(frame);
} }
@ -221,7 +224,7 @@ public class Parser
return false; return false;
} }
LOG.debug("Parsing {} bytes",buffer.remaining()); LOG.debug("{} Parsing {} bytes",policy.getBehavior(),buffer.remaining());
while (buffer.hasRemaining()) while (buffer.hasRemaining())
{ {
switch (state) switch (state)

View File

@ -17,6 +17,7 @@ package org.eclipse.jetty.websocket.extensions;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.*;
import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedList; import java.util.LinkedList;
@ -41,8 +42,88 @@ import org.junit.Test;
public class DeflateFrameExtensionTest public class DeflateFrameExtensionTest
{ {
/**
* Test a large payload (a payload length over 65535 bytes)
*/
@Test @Test
public void testFlate() public void testFlateLarge()
{
// Server sends a big message
StringBuilder msg = new StringBuilder();
for (int i = 0; i < 5000; i++)
{
msg.append("0123456789ABCDEF ");
}
msg.append('X'); // so we can see the end in our debugging
// ensure that test remains sane
Assert.assertThat("Large Payload Length",msg.length(),greaterThan(0xFF_FF));
WebSocketPolicy policy = WebSocketPolicy.newServerPolicy();
// allow large payload for this test
policy.setBufferSize(100000);
policy.setMaxPayloadSize(150000);
DeflateFrameExtension ext = new DeflateFrameExtension();
ext.setBufferPool(new StandardByteBufferPool());
ext.setPolicy(policy);
ExtensionConfig config = ExtensionConfig.parse("x-deflate-frame;minLength=8");
ext.setConfig(config);
String expected = msg.toString();
ByteBuffer orig = BufferUtil.toBuffer(expected,StringUtil.__UTF8_CHARSET);
// compress
ByteBuffer compressed = ext.deflate(orig);
// decompress
ByteBuffer decompressed = ext.inflate(compressed);
// validate
String actual = BufferUtil.toUTF8String(decompressed);
Assert.assertEquals(expected,actual);
}
/**
* Test a medium payload (a payload length between 128 - 65535 bytes)
*/
@Test
public void testFlateMedium()
{
// Server sends a big message
StringBuilder msg = new StringBuilder();
for (int i = 0; i < 1000; i++)
{
msg.append("0123456789ABCDEF ");
}
msg.append('X'); // so we can see the end in our debugging
// ensure that test remains sane
Assert.assertThat("Medium Payload Length",msg.length(),allOf(greaterThanOrEqualTo(0x7E),lessThanOrEqualTo(0xFF_FF)));
DeflateFrameExtension ext = new DeflateFrameExtension();
ext.setBufferPool(new StandardByteBufferPool());
ext.setPolicy(WebSocketPolicy.newServerPolicy());
ExtensionConfig config = ExtensionConfig.parse("x-deflate-frame;minLength=8");
ext.setConfig(config);
String expected = msg.toString();
ByteBuffer orig = BufferUtil.toBuffer(expected,StringUtil.__UTF8_CHARSET);
// compress
ByteBuffer compressed = ext.deflate(orig);
// decompress
ByteBuffer decompressed = ext.inflate(compressed);
// validate
String actual = BufferUtil.toUTF8String(decompressed);
Assert.assertEquals(expected,actual);
}
@Test
public void testFlateSmall()
{ {
DeflateFrameExtension ext = new DeflateFrameExtension(); DeflateFrameExtension ext = new DeflateFrameExtension();
ext.setBufferPool(new StandardByteBufferPool()); ext.setBufferPool(new StandardByteBufferPool());
@ -56,6 +137,9 @@ public class DeflateFrameExtensionTest
quote.append("a single experiment can prove me wrong.\n"); quote.append("a single experiment can prove me wrong.\n");
quote.append("-- Albert Einstein"); quote.append("-- Albert Einstein");
// ensure that test remains sane
Assert.assertThat("Small Payload Length",quote.length(),lessThan(0x7E));
String expected = quote.toString(); String expected = quote.toString();
ByteBuffer orig = BufferUtil.toBuffer(expected,StringUtil.__UTF8_CHARSET); ByteBuffer orig = BufferUtil.toBuffer(expected,StringUtil.__UTF8_CHARSET);
@ -70,8 +154,11 @@ public class DeflateFrameExtensionTest
Assert.assertEquals(expected,actual); Assert.assertEquals(expected,actual);
} }
/**
* Test round-trips of many small frames (no frame larger than 126 bytes)
*/
@Test @Test
public void testFlateManySmall() public void testFlateSmall_Many()
{ {
DeflateFrameExtension ext = new DeflateFrameExtension(); DeflateFrameExtension ext = new DeflateFrameExtension();
ext.setBufferPool(new StandardByteBufferPool()); ext.setBufferPool(new StandardByteBufferPool());
@ -247,7 +334,7 @@ public class DeflateFrameExtensionTest
* Verify that outgoing text frames are compressed. * Verify that outgoing text frames are compressed.
*/ */
@Test @Test
public void testOutgoingFrames() public void testOutgoingFrames() throws IOException
{ {
OutgoingFramesCapture capture = new OutgoingFramesCapture(); OutgoingFramesCapture capture = new OutgoingFramesCapture();
@ -317,7 +404,7 @@ public class DeflateFrameExtensionTest
* Outgoing PING (Control Frame) should pass through extension unmodified * Outgoing PING (Control Frame) should pass through extension unmodified
*/ */
@Test @Test
public void testOutgoingPing() public void testOutgoingPing() throws IOException
{ {
OutgoingFramesCapture capture = new OutgoingFramesCapture(); OutgoingFramesCapture capture = new OutgoingFramesCapture();

View File

@ -17,6 +17,7 @@ package org.eclipse.jetty.websocket.extensions;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.*;
import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedList; import java.util.LinkedList;
@ -121,7 +122,8 @@ public class FragmentExtensionTest
* Incoming PING (Control Frame) should pass through extension unmodified * Incoming PING (Control Frame) should pass through extension unmodified
*/ */
@Test @Test
public void testIncomingPing() { public void testIncomingPing()
{
IncomingFramesCapture capture = new IncomingFramesCapture(); IncomingFramesCapture capture = new IncomingFramesCapture();
FragmentExtension ext = new FragmentExtension(); FragmentExtension ext = new FragmentExtension();
@ -155,7 +157,7 @@ public class FragmentExtensionTest
* Verify that outgoing text frames are fragmented by the maxLength configuration. * Verify that outgoing text frames are fragmented by the maxLength configuration.
*/ */
@Test @Test
public void testOutgoingFramesByMaxLength() public void testOutgoingFramesByMaxLength() throws IOException
{ {
OutgoingFramesCapture capture = new OutgoingFramesCapture(); OutgoingFramesCapture capture = new OutgoingFramesCapture();
@ -237,7 +239,7 @@ public class FragmentExtensionTest
* Verify that outgoing text frames are fragmented by default configuration * Verify that outgoing text frames are fragmented by default configuration
*/ */
@Test @Test
public void testOutgoingFramesDefaultConfig() public void testOutgoingFramesDefaultConfig() throws IOException
{ {
OutgoingFramesCapture capture = new OutgoingFramesCapture(); OutgoingFramesCapture capture = new OutgoingFramesCapture();
@ -314,7 +316,7 @@ public class FragmentExtensionTest
* Outgoing PING (Control Frame) should pass through extension unmodified * Outgoing PING (Control Frame) should pass through extension unmodified
*/ */
@Test @Test
public void testOutgoingPing() public void testOutgoingPing() throws IOException
{ {
OutgoingFramesCapture capture = new OutgoingFramesCapture(); OutgoingFramesCapture capture = new OutgoingFramesCapture();

View File

@ -17,6 +17,7 @@ package org.eclipse.jetty.websocket.extensions;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.*;
import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
@ -68,7 +69,7 @@ public class IdentityExtensionTest
* Verify that outgoing frames are unmodified * Verify that outgoing frames are unmodified
*/ */
@Test @Test
public void testOutgoingFrames() public void testOutgoingFrames() throws IOException
{ {
OutgoingFramesCapture capture = new OutgoingFramesCapture(); OutgoingFramesCapture capture = new OutgoingFramesCapture();