More websocket threading issues

* Using new IOState object in AbstractWebSocketConnection to better
  manage close handshake state and replies.
* Cleaning up many bad BufferPool.release() calls against ByteBuffers
  that didn't arrive from BufferPool.acquire()
* Removing many ByteBuffer.wrap() calls.
* Fixing FrameCompression / MessageCompression extension handling
  of ByteBuffers
This commit is contained in:
Joakim Erdfelt 2012-12-12 11:04:04 -07:00
parent be83cb100d
commit 9bbfcd7e62
40 changed files with 370 additions and 490 deletions

View File

@ -15,6 +15,10 @@
</properties>
<dependencies>
<!-- NOTE: It is important that this websocket-api module NOT depend on any other jetty modules,
for WebAppClassloader isolation reasons
- Joakim
-->
<dependency>
<groupId>org.eclipse.jetty.toolchain</groupId>
<artifactId>jetty-test-helper</artifactId>

View File

@ -41,6 +41,7 @@ import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
import org.eclipse.jetty.websocket.client.internal.ConnectionManager;
import org.eclipse.jetty.websocket.client.internal.DefaultWebSocketClient;
import org.eclipse.jetty.websocket.client.masks.Masker;
import org.eclipse.jetty.websocket.client.masks.RandomMasker;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.eclipse.jetty.websocket.common.events.EventDriverFactory;
@ -106,6 +107,7 @@ public class WebSocketClientFactory extends ContainerLifeCycle
addBean(this.connectionManager);
this.eventDriverFactory = new EventDriverFactory(policy);
this.masker = new RandomMasker();
}
public WebSocketClientFactory(SslContextFactory sslContextFactory)
@ -158,16 +160,6 @@ public class WebSocketClientFactory extends ContainerLifeCycle
return extensionRegistry;
}
public WebSocketPolicy getPolicy()
{
return policy;
}
public Scheduler getScheduler()
{
return scheduler;
}
/**
*
* @return the masker or null if none is set
@ -177,11 +169,16 @@ public class WebSocketClientFactory extends ContainerLifeCycle
return masker;
}
public void setMasker(Masker masker)
public WebSocketPolicy getPolicy()
{
this.masker = masker;
return policy;
}
public Scheduler getScheduler()
{
return scheduler;
}
public List<Extension> initExtensions(List<ExtensionConfig> requested)
{
List<Extension> extensions = new ArrayList<Extension>();
@ -236,4 +233,9 @@ public class WebSocketClientFactory extends ContainerLifeCycle
{
this.bindAddress = bindAddress;
}
public void setMasker(Masker masker)
{
this.masker = masker;
}
}

View File

@ -50,6 +50,7 @@ public class WebSocketClientConnection extends AbstractWebSocketConnection
this.factory = client.getFactory();
this.connected = false;
this.masker = client.getMasker();
assert (this.masker != null);
}
public DefaultWebSocketClient getClient()

View File

@ -98,7 +98,7 @@ public class BlockheadServer
this.incomingFrames = new IncomingFramesCapture();
this.policy = WebSocketPolicy.newServerPolicy();
this.bufferPool = new MappedByteBufferPool(BUFFER_SIZE);
this.parser = new Parser(policy);
this.parser = new Parser(policy,bufferPool);
this.parseCount = new AtomicInteger(0);
this.generator = new Generator(policy,bufferPool,false);
this.extensionRegistry = new WebSocketExtensionFactory(policy,bufferPool);

View File

@ -20,7 +20,6 @@ package org.eclipse.jetty.websocket.common;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.Utf8Appendable.NotUtf8Exception;
import org.eclipse.jetty.util.Utf8StringBuilder;
@ -129,7 +128,7 @@ public class CloseInfo
this.reason = reason;
}
public ByteBuffer asByteBuffer()
private byte[] asByteBuffer()
{
if ((statusCode == StatusCode.NO_CLOSE) || (statusCode == StatusCode.NO_CODE) || (statusCode == (-1)))
{
@ -137,14 +136,23 @@ public class CloseInfo
return null;
}
ByteBuffer buf = ByteBuffer.allocate(WebSocketFrame.MAX_CONTROL_PAYLOAD);
buf.putChar((char)statusCode);
int len = 2; // status code
byte utf[] = null;
if (StringUtil.isNotBlank(reason))
{
byte utf[] = StringUtil.getUtf8Bytes(reason);
buf.put(utf,0,utf.length);
utf = StringUtil.getUtf8Bytes(reason);
len += utf.length;
}
BufferUtil.flipToFlush(buf,0);
byte buf[] = new byte[len];
buf[0] = (byte)((statusCode >>> 8) & 0xFF);
buf[1] = (byte)((statusCode >>> 0) & 0xFF);
if (utf != null)
{
System.arraycopy(utf,0,buf,2,utf.length);
}
return buf;
}

View File

@ -232,6 +232,7 @@ public class Generator
// limit the buffer to the window size.
int newlimit = Math.min(buffer.position() + windowSize,buffer.limit());
buffer.limit(newlimit);
LOG.debug("Buffer limited: {}",buffer);
if (frame.remaining() == frame.getPayloadLength())
{

View File

@ -18,7 +18,6 @@
package org.eclipse.jetty.websocket.common;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.eclipse.jetty.websocket.api.StatusCode;
@ -26,25 +25,10 @@ import org.eclipse.jetty.websocket.api.SuspendToken;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.io.IOState;
public interface LogicalConnection extends OutgoingFrames, SuspendToken
{
/**
* Perform a quick check that the connection input is open.
*
* @throws IOException
* if the connection input is closed
*/
void assertInputOpen() throws IOException;
/**
* Perform a quick check that the connection output is open.
*
* @throws IOException
* if the connection output is closed
*/
void assertOutputOpen() throws IOException;
/**
* Send a websocket Close frame, without a status code or reason.
* <p>
@ -73,6 +57,13 @@ public interface LogicalConnection extends OutgoingFrames, SuspendToken
*/
void disconnect();
/**
* Get the IOState of the connection.
*
* @return the IOState of the connection.
*/
IOState getIOState();
/**
* Get the local {@link InetSocketAddress} in use for this connection.
* <p>
@ -105,32 +96,11 @@ public interface LogicalConnection extends OutgoingFrames, SuspendToken
WebSocketSession getSession();
/**
* Get the WebSocket connection State.
* Test if logical connection is still open
*
* @return the connection state.
* @return true if connection is open
*/
ConnectionState getState();
/**
* Test if input is closed (as a result of receiving a close frame)
*
* @return true if input is closed.
*/
boolean isInputClosed();
/**
* Simple test to see if connection is open (and not closed)
*
* @return true if connection still open
*/
boolean isOpen();
/**
* Test if output is closed (as a result of sending a close frame)
*
* @return true if output is closed.
*/
boolean isOutputClosed();
public boolean isOpen();
/**
* Tests if the connection is actively reading.
@ -139,16 +109,6 @@ public interface LogicalConnection extends OutgoingFrames, SuspendToken
*/
boolean isReading();
/**
* A close handshake frame has been detected
*
* @param incoming
* true if part of an incoming frame, false if part of an outgoing frame.
* @param close
* the close details
*/
void onCloseHandshake(boolean incoming, CloseInfo close);
/**
* Set where the connection should send the incoming frames to.
* <p>

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.websocket.common;
import java.nio.ByteBuffer;
import java.util.List;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -53,7 +54,9 @@ public class Parser
PAYLOAD
}
private static final Logger LOG_FRAMES = Log.getLogger("org.eclipse.jetty.websocket.io.Frames");
private static final Logger LOG = Log.getLogger(Parser.class);
private final WebSocketPolicy policy;
private final ByteBufferPool bufferPool;
// State specific
private State state = State.START;
@ -77,12 +80,11 @@ public class Parser
/** Is there an extension that processes invalid UTF8 text messages (such as compressed content) */
private boolean isTextFrameValidated = true;
private static final Logger LOG = Log.getLogger(Parser.class);
private IncomingFrames incomingFramesHandler;
private WebSocketPolicy policy;
public Parser(WebSocketPolicy wspolicy)
public Parser(WebSocketPolicy wspolicy, ByteBufferPool bufferPool)
{
this.bufferPool = bufferPool;
this.policy = wspolicy;
}
@ -173,10 +175,6 @@ public class Parser
protected void notifyFrame(final Frame f)
{
if (LOG_FRAMES.isDebugEnabled())
{
LOG_FRAMES.debug("{} Read Frame: {}",policy.getBehavior(),f);
}
if (LOG.isDebugEnabled())
{
LOG.debug("{} Notify {}",policy.getBehavior(),incomingFramesHandler);
@ -544,7 +542,7 @@ public class Parser
{
getPolicy().assertValidPayloadLength(payloadLength);
frame.assertValid();
payload = ByteBuffer.allocate(payloadLength);
payload = bufferPool.acquire(payloadLength,false);
BufferUtil.clearToFill(payload);
}

View File

@ -66,7 +66,7 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
{
try
{
connection.assertOutputOpen();
connection.getIOState().assertOutputOpen();
return outgoing.outgoingFrame(frame);
}
catch (IOException e)
@ -78,7 +78,7 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
@Override
public void sendBytes(ByteBuffer data) throws IOException
{
connection.assertOutputOpen();
connection.getIOState().assertOutputOpen();
if (LOG.isDebugEnabled())
{
LOG.debug("sendBytes({})",BufferUtil.toDetailString(data));

View File

@ -251,7 +251,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Web
@Override
public void incomingError(WebSocketException e)
{
if (connection.isInputClosed())
if (connection.getIOState().isInputClosed())
{
return; // input is closed
}
@ -265,7 +265,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Web
@Override
public void incomingFrame(Frame frame)
{
if (connection.isInputClosed())
if (connection.getIOState().isInputClosed())
{
return; // input is closed
}

View File

@ -107,21 +107,18 @@ public abstract class EventDriver implements IncomingFrames
session.close(close.getStatusCode(),close.getReason());
// process handshake
session.getConnection().onCloseHandshake(true,close);
session.getConnection().getIOState().onCloseHandshake(true,close);
return;
}
case OpCode.PING:
{
ByteBuffer pongBuf = ByteBuffer.allocate(frame.getPayloadLength());
if (frame.getPayloadLength() > 0)
byte pongBuf[] = new byte[0];
if (frame.hasPayload())
{
// Copy payload
BufferUtil.clearToFill(pongBuf);
BufferUtil.put(frame.getPayload(),pongBuf);
BufferUtil.flipToFlush(pongBuf,0);
pongBuf = BufferUtil.toArray(frame.getPayload());
}
session.getRemote().sendPong(pongBuf);
session.getRemote().sendPong(ByteBuffer.wrap(pongBuf));
break;
}
case OpCode.BINARY:

View File

@ -48,29 +48,21 @@ public class FrameCompressionExtension extends AbstractExtension
}
ByteBuffer data = frame.getPayload();
try
method.decompress().input(data);
while (!method.decompress().isDone())
{
method.decompress().input(data);
while (!method.decompress().isDone())
ByteBuffer uncompressed = method.decompress().process();
WebSocketFrame out = new WebSocketFrame(frame).setPayload(uncompressed);
if (!method.decompress().isDone())
{
ByteBuffer uncompressed = method.decompress().process();
WebSocketFrame out = new WebSocketFrame(frame).setPayload(uncompressed);
if (!method.decompress().isDone())
{
out.setFin(false);
}
out.setRsv1(false); // Unset RSV1 on decompressed frame
nextIncomingFrame(out);
out.setFin(false);
}
out.setRsv1(false); // Unset RSV1 on decompressed frame
nextIncomingFrame(out);
}
// reset on every frame.
// method.decompress().end();
}
finally
{
// release original buffer (no longer needed)
getBufferPool().release(data);
}
// reset on every frame.
// method.decompress().end();
}
/**
@ -105,32 +97,25 @@ public class FrameCompressionExtension extends AbstractExtension
Future<WriteResult> future = null;
ByteBuffer data = frame.getPayload();
try
{
// deflate data
method.compress().input(data);
while (!method.compress().isDone())
{
ByteBuffer buf = method.compress().process();
WebSocketFrame out = new WebSocketFrame(frame).setPayload(buf);
out.setRsv1(true);
if (!method.compress().isDone())
{
out.setFin(false);
}
future = nextOutgoingFrame(out);
// deflate data
method.compress().input(data);
while (!method.compress().isDone())
{
ByteBuffer buf = method.compress().process();
WebSocketFrame out = new WebSocketFrame(frame).setPayload(buf);
out.setRsv1(true);
if (!method.compress().isDone())
{
out.setFin(false);
}
// reset on every frame.
method.compress().end();
}
finally
{
// free original data buffer
getBufferPool().release(data);
future = nextOutgoingFrame(out);
}
// reset on every frame.
method.compress().end();
return future;
}

View File

@ -53,35 +53,27 @@ public class MessageCompressionExtension extends AbstractExtension
}
ByteBuffer data = frame.getPayload();
try
method.decompress().input(data);
while (!method.decompress().isDone())
{
method.decompress().input(data);
while (!method.decompress().isDone())
ByteBuffer uncompressed = method.decompress().process();
if (uncompressed == null)
{
ByteBuffer uncompressed = method.decompress().process();
if (uncompressed == null)
{
continue;
}
WebSocketFrame out = new WebSocketFrame(frame).setPayload(uncompressed);
if (!method.decompress().isDone())
{
out.setFin(false);
}
out.setRsv1(false); // Unset RSV1 on decompressed frame
nextIncomingFrame(out);
continue;
}
// reset only at the end of a message.
if (frame.isFin())
WebSocketFrame out = new WebSocketFrame(frame).setPayload(uncompressed);
if (!method.decompress().isDone())
{
method.decompress().end();
out.setFin(false);
}
out.setRsv1(false); // Unset RSV1 on decompressed frame
nextIncomingFrame(out);
}
finally
// reset only at the end of a message.
if (frame.isFin())
{
// release original buffer (no longer needed)
getBufferPool().release(data);
method.decompress().end();
}
}
@ -113,36 +105,28 @@ public class MessageCompressionExtension extends AbstractExtension
Future<WriteResult> future = null;
ByteBuffer data = frame.getPayload();
try
// deflate data
method.compress().input(data);
while (!method.compress().isDone())
{
// deflate data
method.compress().input(data);
while (!method.compress().isDone())
ByteBuffer buf = method.compress().process();
WebSocketFrame out = new WebSocketFrame(frame).setPayload(buf);
out.setRsv1(true);
if (!method.compress().isDone())
{
ByteBuffer buf = method.compress().process();
WebSocketFrame out = new WebSocketFrame(frame).setPayload(buf);
out.setRsv1(true);
if (!method.compress().isDone())
{
out.setFin(false);
future = nextOutgoingFrame(out);
}
else
{
future = nextOutgoingFrame(out);
}
out.setFin(false);
future = nextOutgoingFrame(out);
}
// reset only at end of message
if (frame.isFin())
else
{
method.compress().end();
future = nextOutgoingFrame(out);
}
}
finally
// reset only at end of message
if (frame.isFin())
{
// free original data buffer
getBufferPool().release(data);
method.compress().end();
}
return future;

View File

@ -40,6 +40,7 @@ import org.eclipse.jetty.websocket.common.ConnectionState;
import org.eclipse.jetty.websocket.common.LogicalConnection;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.io.IOState;
/**
* MuxChannel, acts as WebSocketConnection for specific sub-channel.
@ -53,7 +54,7 @@ public class MuxChannel implements WebSocketConnection, LogicalConnection, Incom
private final AtomicBoolean inputClosed;
private final AtomicBoolean outputClosed;
private final AtomicBoolean suspendToken;
private ConnectionState connectionState;
private IOState ioState;
private WebSocketPolicy policy;
private WebSocketSession session;
private IncomingFrames incoming;
@ -66,26 +67,13 @@ public class MuxChannel implements WebSocketConnection, LogicalConnection, Incom
this.policy = muxer.getPolicy().clonePolicy();
this.suspendToken = new AtomicBoolean(false);
this.connectionState = ConnectionState.CONNECTING;
this.ioState = new IOState();
ioState.setState(ConnectionState.CONNECTING);
this.inputClosed = new AtomicBoolean(false);
this.outputClosed = new AtomicBoolean(false);
}
@Override
public void assertInputOpen() throws IOException
{
// TODO Auto-generated method stub
}
@Override
public void assertOutputOpen() throws IOException
{
// TODO Auto-generated method stub
}
@Override
public void close()
{
@ -110,7 +98,7 @@ public class MuxChannel implements WebSocketConnection, LogicalConnection, Incom
@Override
public void disconnect()
{
this.connectionState = ConnectionState.CLOSED;
this.ioState.setState(ConnectionState.CLOSED);
// TODO: disconnect the virtual end-point?
}
@ -119,6 +107,13 @@ public class MuxChannel implements WebSocketConnection, LogicalConnection, Incom
return channelId;
}
@Override
public IOState getIOState()
{
// TODO Auto-generated method stub
return null;
}
@Override
public InetSocketAddress getLocalAddress()
{
@ -151,12 +146,6 @@ public class MuxChannel implements WebSocketConnection, LogicalConnection, Incom
return session;
}
@Override
public ConnectionState getState()
{
return this.connectionState;
}
@Override
public String getSubProtocol()
{
@ -183,13 +172,7 @@ public class MuxChannel implements WebSocketConnection, LogicalConnection, Incom
public boolean isActive()
{
return (getState() != ConnectionState.CLOSED);
}
@Override
public boolean isInputClosed()
{
return inputClosed.get();
return (ioState.isOpen());
}
@Override
@ -198,12 +181,6 @@ public class MuxChannel implements WebSocketConnection, LogicalConnection, Incom
return isActive() && muxer.isOpen();
}
@Override
public boolean isOutputClosed()
{
return outputClosed.get();
}
@Override
public boolean isReading()
{
@ -212,43 +189,12 @@ public class MuxChannel implements WebSocketConnection, LogicalConnection, Incom
public void onClose()
{
this.connectionState = ConnectionState.CLOSED;
}
@Override
public void onCloseHandshake(boolean incoming, CloseInfo close)
{
boolean in = inputClosed.get();
boolean out = outputClosed.get();
if (incoming)
{
in = true;
this.inputClosed.set(true);
}
else
{
out = true;
this.outputClosed.set(true);
}
LOG.debug("onCloseHandshake({},{}), input={}, output={}",incoming,close,in,out);
if (in && out)
{
LOG.debug("Close Handshake satisfied, disconnecting");
this.disconnect();
}
if (close.isHarsh())
{
LOG.debug("Close status code was harsh, disconnecting");
this.disconnect();
}
this.ioState.setState(ConnectionState.CLOSED);
}
public void onOpen()
{
this.connectionState = ConnectionState.OPEN;
this.ioState.setState(ConnectionState.OPEN);
}
/**

View File

@ -169,7 +169,10 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
if (frame.getType().getOpCode() == OpCode.CLOSE)
{
CloseInfo close = new CloseInfo(origPayload,false);
onCloseHandshake(false,close);
if (ioState.onCloseHandshake(false,close))
{
disconnect();
}
}
getBufferPool().release(origPayload);
@ -228,13 +231,13 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
// We need to keep the correct ordering of frames, to avoid that another
// Data frame for the same stream is written before this one is finished.
queue.prepend(this);
flush();
}
else
{
LOG.debug("Send complete");
super.complete();
}
flush();
}
@Override
@ -313,9 +316,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
private List<ExtensionConfig> extensions;
private boolean flushing;
private boolean isFilling;
private ConnectionState connectionState;
private final AtomicBoolean inputClosed;
private final AtomicBoolean outputClosed;
private IOState ioState;
public AbstractWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool)
{
@ -323,32 +324,12 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
this.policy = policy;
this.bufferPool = bufferPool;
this.generator = new Generator(policy,bufferPool);
this.parser = new Parser(policy);
this.parser = new Parser(policy,bufferPool);
this.scheduler = scheduler;
this.extensions = new ArrayList<>();
this.suspendToken = new AtomicBoolean(false);
this.connectionState = ConnectionState.CONNECTING;
this.inputClosed = new AtomicBoolean(false);
this.outputClosed = new AtomicBoolean(false);
}
@Override
public void assertInputOpen() throws IOException
{
if (isInputClosed())
{
throw new IOException("Connection input is closed");
}
}
@Override
public void assertOutputOpen() throws IOException
{
if (isOutputClosed())
{
throw new IOException("Connection output is closed");
}
this.ioState = new IOState();
this.ioState.setState(ConnectionState.CONNECTING);
}
@Override
@ -365,7 +346,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
public void complete(final Callback callback)
{
if (connectionState != ConnectionState.CLOSED)
if (ioState.isOpen())
{
invoker.invoke(callback);
}
@ -379,7 +360,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
public void disconnect(boolean onlyOutput)
{
connectionState = ConnectionState.CLOSED;
ioState.setState(ConnectionState.CLOSED);
EndPoint endPoint = getEndPoint();
// We need to gently close first, to allow
// SSL close alerts to be sent by Jetty
@ -502,6 +483,12 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
return generator;
}
@Override
public IOState getIOState()
{
return ioState;
}
public Parser getParser()
{
return parser;
@ -530,28 +517,10 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
return session;
}
@Override
public ConnectionState getState()
{
return connectionState;
}
@Override
public boolean isInputClosed()
{
return inputClosed.get();
}
@Override
public boolean isOpen()
{
return (getState() != ConnectionState.CLOSED) && getEndPoint().isOpen();
}
@Override
public boolean isOutputClosed()
{
return outputClosed.get();
return getIOState().isOpen() && getEndPoint().isOpen();
}
@Override
@ -564,38 +533,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
public void onClose()
{
super.onClose();
this.connectionState = ConnectionState.CLOSED;
}
@Override
public void onCloseHandshake(boolean incoming, CloseInfo close)
{
boolean in = inputClosed.get();
boolean out = outputClosed.get();
if (incoming)
{
in = true;
this.inputClosed.set(true);
}
else
{
out = true;
this.outputClosed.set(true);
}
LOG.debug("onCloseHandshake({},{}), input={}, output={}",incoming,close,in,out);
if (in && out)
{
LOG.debug("Close Handshake satisfied, disconnecting");
this.disconnect(false);
}
if (close.isHarsh())
{
LOG.debug("Close status code was harsh, disconnecting");
this.disconnect(false);
}
this.getIOState().setState(ConnectionState.CLOSED);
}
@Override
@ -629,7 +567,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
public void onOpen()
{
super.onOpen();
this.connectionState = ConnectionState.OPEN;
this.ioState.setState(ConnectionState.OPEN);
LOG.debug("fillInterested");
fillInterested();
}

View File

@ -94,6 +94,15 @@ public class IOState
return outputClosed.get();
}
/**
* Test for if connection should disconnect or response on a close handshake.
*
* @param incoming
* true if incoming close
* @param close
* the close details.
* @return true if connection should be disconnected now, or false if response to close should be issued.
*/
public boolean onCloseHandshake(boolean incoming, CloseInfo close)
{
boolean in = inputClosed.get();

View File

@ -18,20 +18,17 @@
package org.eclipse.jetty.websocket.common;
import static org.hamcrest.Matchers.*;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.Parser;
import org.junit.Assert;
import org.junit.Test;
import static org.hamcrest.Matchers.is;
public class ClosePayloadParserTest
{
@Test
@ -53,7 +50,7 @@ public class ClosePayloadParserTest
buf.flip();
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parse(buf);

View File

@ -27,10 +27,6 @@ import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.Generator;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.Parser;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.junit.Assert;
import org.junit.Test;
@ -42,7 +38,7 @@ public class GeneratorParserRoundtripTest
WebSocketPolicy policy = WebSocketPolicy.newServerPolicy();
ByteBufferPool bufferPool = new MappedByteBufferPool();
Generator gen = new Generator(policy,bufferPool);
Parser parser = new Parser(policy);
Parser parser = new Parser(policy,bufferPool);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
@ -78,7 +74,7 @@ public class GeneratorParserRoundtripTest
WebSocketPolicy policy = WebSocketPolicy.newServerPolicy();
ByteBufferPool bufferPool = new MappedByteBufferPool();
Generator gen = new Generator(policy,bufferPool);
Parser parser = new Parser(policy);
Parser parser = new Parser(policy,bufferPool);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);

View File

@ -18,6 +18,8 @@
package org.eclipse.jetty.websocket.common;
import static org.hamcrest.Matchers.*;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@ -26,17 +28,9 @@ import java.util.List;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.Generator;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.Parser;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.junit.Assert;
import org.junit.Test;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
public class GeneratorTest
{
/**
@ -173,7 +167,7 @@ public class GeneratorTest
// Parse complete buffer.
WebSocketPolicy policy = WebSocketPolicy.newServerPolicy();
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);

View File

@ -240,7 +240,7 @@ public class ParserTest
buf.flip();
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parse(buf);

View File

@ -18,19 +18,16 @@
package org.eclipse.jetty.websocket.common;
import static org.hamcrest.Matchers.*;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.Parser;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.junit.Assert;
import org.junit.Test;
import static org.hamcrest.Matchers.is;
public class PingPayloadParserTest
{
@Test
@ -43,7 +40,7 @@ public class PingPayloadParserTest
BufferUtil.flipToFlush(buf,0);
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parse(buf);

View File

@ -18,20 +18,17 @@
package org.eclipse.jetty.websocket.common;
import static org.hamcrest.Matchers.*;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.Parser;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.junit.Assert;
import org.junit.Test;
import static org.hamcrest.Matchers.is;
/**
* Collection of Example packets as found in <a href="https://tools.ietf.org/html/rfc6455#section-5.7">RFC 6455 Examples section</a>
*/
@ -41,7 +38,7 @@ public class RFC6455ExamplesParserTest
public void testFragmentedUnmaskedTextMessage()
{
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
@ -86,7 +83,7 @@ public class RFC6455ExamplesParserTest
buf.flip();
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parse(buf);
@ -109,7 +106,7 @@ public class RFC6455ExamplesParserTest
buf.flip();
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parse(buf);
@ -139,7 +136,7 @@ public class RFC6455ExamplesParserTest
buf.flip();
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parse(buf);
@ -179,7 +176,7 @@ public class RFC6455ExamplesParserTest
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
policy.setBufferSize(80000);
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parse(buf);
@ -210,7 +207,7 @@ public class RFC6455ExamplesParserTest
buf.flip();
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parse(buf);
@ -233,7 +230,7 @@ public class RFC6455ExamplesParserTest
buf.flip();
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parse(buf);

View File

@ -91,7 +91,7 @@ public class TextPayloadParserTest
WebSocketPolicy policy = WebSocketPolicy.newServerPolicy();
policy.setBufferSize(100000);
policy.setMaxPayloadSize(100000);
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parse(buf);
@ -126,7 +126,7 @@ public class TextPayloadParserTest
buf.flip();
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parse(buf);
@ -163,7 +163,7 @@ public class TextPayloadParserTest
buf.flip();
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parse(buf);
@ -190,7 +190,7 @@ public class TextPayloadParserTest
buf.flip();
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parse(buf);
@ -216,7 +216,7 @@ public class TextPayloadParserTest
buf.flip();
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parse(buf);

View File

@ -20,18 +20,25 @@ package org.eclipse.jetty.websocket.common;
import java.nio.ByteBuffer;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
public class UnitParser extends Parser
{
public UnitParser()
{
super(WebSocketPolicy.newServerPolicy());
this(WebSocketPolicy.newServerPolicy());
}
public UnitParser(ByteBufferPool bufferPool, WebSocketPolicy policy)
{
super(policy, bufferPool);
}
public UnitParser(WebSocketPolicy policy)
{
super(policy);
this(new MappedByteBufferPool(), policy);
}
private void parsePartial(ByteBuffer buf, int numBytes)

View File

@ -18,6 +18,8 @@
package org.eclipse.jetty.websocket.common.ab;
import static org.hamcrest.Matchers.*;
import java.nio.ByteBuffer;
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
@ -29,12 +31,11 @@ import org.eclipse.jetty.websocket.common.IncomingFramesCapture;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.Parser;
import org.eclipse.jetty.websocket.common.UnitGenerator;
import org.eclipse.jetty.websocket.common.UnitParser;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.junit.Assert;
import org.junit.Test;
import static org.hamcrest.Matchers.is;
/**
* Text Message Spec testing the {@link Generator} and {@link Parser}
*/
@ -313,7 +314,7 @@ public class TestABCase1_1
expected.flip();
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
@ -347,7 +348,7 @@ public class TestABCase1_1
expected.flip();
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
@ -381,7 +382,7 @@ public class TestABCase1_1
expected.flip();
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
@ -415,7 +416,7 @@ public class TestABCase1_1
expected.flip();
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
@ -431,9 +432,6 @@ public class TestABCase1_1
@Test
public void testParse65535ByteTextCase1_1_6()
{
// Debug.enableDebugLogging(Parser.class);
// Debug.enableDebugLogging(TextPayloadParser.class);
int length = 65535;
ByteBuffer expected = ByteBuffer.allocate(length + 5);
@ -454,7 +452,7 @@ public class TestABCase1_1
expected.flip();
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
policy.setMaxTextMessageSize(length);
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
@ -491,7 +489,7 @@ public class TestABCase1_1
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
policy.setMaxTextMessageSize(length);
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
@ -515,7 +513,7 @@ public class TestABCase1_1
expected.flip();
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parse(expected);

View File

@ -18,6 +18,8 @@
package org.eclipse.jetty.websocket.common.ab;
import static org.hamcrest.Matchers.*;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
@ -30,12 +32,11 @@ import org.eclipse.jetty.websocket.common.IncomingFramesCapture;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.Parser;
import org.eclipse.jetty.websocket.common.UnitGenerator;
import org.eclipse.jetty.websocket.common.UnitParser;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.junit.Assert;
import org.junit.Test;
import static org.hamcrest.Matchers.is;
/**
* Binary Message Spec testing the {@link Generator} and {@link Parser}
*/
@ -329,7 +330,7 @@ public class TestABCase1_2
expected.flip();
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
@ -363,7 +364,7 @@ public class TestABCase1_2
expected.flip();
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
@ -397,7 +398,7 @@ public class TestABCase1_2
expected.flip();
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
@ -431,7 +432,7 @@ public class TestABCase1_2
expected.flip();
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
@ -466,7 +467,7 @@ public class TestABCase1_2
expected.flip();
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
policy.setMaxTextMessageSize(length);
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
@ -503,7 +504,7 @@ public class TestABCase1_2
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
policy.setMaxTextMessageSize(length);
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
@ -527,7 +528,7 @@ public class TestABCase1_2
expected.flip();
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parse(expected);

View File

@ -195,7 +195,7 @@ public class TestABCase2
expected.flip();
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
@ -225,7 +225,7 @@ public class TestABCase2
expected.flip();
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
@ -248,7 +248,7 @@ public class TestABCase2
expected.flip();
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
@ -279,7 +279,7 @@ public class TestABCase2
expected.flip();
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parse(expected);

View File

@ -26,6 +26,7 @@ import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.IncomingFramesCapture;
import org.eclipse.jetty.websocket.common.LogShush;
import org.eclipse.jetty.websocket.common.Parser;
import org.eclipse.jetty.websocket.common.UnitParser;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@ -57,7 +58,7 @@ public class TestABCase4
expected.flip();
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
@ -79,7 +80,7 @@ public class TestABCase4
expected.flip();
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
@ -102,7 +103,7 @@ public class TestABCase4
expected.flip();
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
@ -124,7 +125,7 @@ public class TestABCase4
expected.flip();
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parse(expected);

View File

@ -73,7 +73,7 @@ public class TestABCase7_3
expected.flip();
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
@ -147,7 +147,7 @@ public class TestABCase7_3
expected.flip();
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
@ -205,7 +205,7 @@ public class TestABCase7_3
expected.put(messageBytes);
expected.flip();
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parse(expected);
@ -275,7 +275,7 @@ public class TestABCase7_3
expected.put(messageBytes);
expected.flip();
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
parser.parse(expected);

View File

@ -40,6 +40,7 @@ import org.eclipse.jetty.websocket.common.IncomingFramesCapture;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.OutgoingNetworkBytesCapture;
import org.eclipse.jetty.websocket.common.Parser;
import org.eclipse.jetty.websocket.common.UnitParser;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.junit.Assert;
import org.junit.Test;
@ -63,7 +64,7 @@ public class FrameCompressionExtensionTest
// Wire up stack
ext.setNextIncomingFrames(capture);
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
parser.configureFromExtensions(Collections.singletonList(ext));
parser.setIncomingFramesHandler(ext);
@ -114,6 +115,16 @@ public class FrameCompressionExtensionTest
capture.assertBytes(0,expectedHex);
}
@Test
public void testBlockheadClient_HelloThere()
{
// Captured from Blockhead Client - "Hello" then "There" via unit test
String hello = "c1 87 00 00 00 00 f2 48 cd c9 c9 07 00".replaceAll("\\s*","");
String there = "c1 87 00 00 00 00 0a c9 48 2d 4a 05 00".replaceAll("\\s*","");
byte rawbuf[] = TypeUtil.fromHexString(hello + there);
assertIncoming(rawbuf,"Hello","There");
}
@Test
public void testChrome20_Hello()
{
@ -122,6 +133,16 @@ public class FrameCompressionExtensionTest
assertIncoming(rawbuf,"Hello");
}
@Test
public void testChrome20_HelloThere()
{
// Captured from Chrome 20.x - "Hello" then "There" (sent from browser/client)
String hello = "c1 87 7b 19 71 db 89 51 bc 12 b2 1e 71".replaceAll("\\s*","");
String there = "c1 87 59 ed c8 f4 53 24 80 d9 13 e8 c8".replaceAll("\\s*","");
byte rawbuf[] = TypeUtil.fromHexString(hello + there);
assertIncoming(rawbuf,"Hello","There");
}
@Test
public void testChrome20_Info()
{
@ -167,7 +188,6 @@ public class FrameCompressionExtensionTest
int len = compressor.deflate(out,0,out.length,Deflater.SYNC_FLUSH);
if (len > 0)
{
System.err.printf("Compressed %,d bytes%n",len);
outbuf.put(out,0,len);
}
}
@ -185,10 +205,36 @@ public class FrameCompressionExtensionTest
String expected = "CaCc4bCbB70200"; // what pywebsocket produces
// String expected = "CbCc4bCbB70200"; // what java produces
System.out.printf("Compressed data: %s%n",actual);
Assert.assertThat("Compressed data",actual,is(expected));
}
@Test
public void testGeneratedTwoFrames() throws IOException
{
WebSocketPolicy policy = WebSocketPolicy.newServerPolicy();
FrameCompressionExtension ext = new FrameCompressionExtension();
ext.setBufferPool(new MappedByteBufferPool());
ext.setPolicy(policy);
ExtensionConfig config = ExtensionConfig.parse("x-webkit-deflate-frame");
ext.setConfig(config);
ByteBufferPool bufferPool = new MappedByteBufferPool();
boolean validating = true;
Generator generator = new Generator(policy,bufferPool,validating);
generator.configureFromExtensions(Collections.singletonList(ext));
OutgoingNetworkBytesCapture capture = new OutgoingNetworkBytesCapture(generator);
ext.setNextOutgoingFrames(capture);
ext.outgoingFrame(WebSocketFrame.text("Hello"));
ext.outgoingFrame(WebSocketFrame.text("There"));
capture.assertBytes(0,"c107f248cdc9c90700");
capture.assertBytes(1,"c1070ac9482d4a0500");
}
@Test
public void testInflateBasics() throws Exception
{
@ -214,7 +260,7 @@ public class FrameCompressionExtensionTest
{
// Captured from PyWebSocket - "Hello" (echo from server)
byte rawbuf[] = TypeUtil.fromHexString("c107f248cdc9c90700");
assertIncoming(rawbuf, "Hello");
assertIncoming(rawbuf,"Hello");
}
@Test
@ -230,17 +276,25 @@ public class FrameCompressionExtensionTest
public void testPyWebSocketServer_Medium()
{
// Captured from PyWebSocket - "stackoverflow" (echo from server)
byte rawbuf[]=TypeUtil.fromHexString("c10f2a2e494ccece2f4b2d4acbc92f0700");
assertIncoming(rawbuf, "stackoverflow");
byte rawbuf[] = TypeUtil.fromHexString("c10f2a2e494ccece2f4b2d4acbc92f0700");
assertIncoming(rawbuf,"stackoverflow");
}
/**
* Make sure that the server generated compressed form for "Hello" is
* consistent with what PyWebSocket creates.
* Make sure that the server generated compressed form for "Hello" is consistent with what PyWebSocket creates.
*/
@Test
public void testServerGeneratedHello() throws IOException
{
assertOutgoing("Hello", "c107f248cdc9c90700");
assertOutgoing("Hello","c107f248cdc9c90700");
}
/**
* Make sure that the server generated compressed form for "There" is consistent with what PyWebSocket creates.
*/
@Test
public void testServerGeneratedThere() throws IOException
{
assertOutgoing("There","c1070ac9482d4a0500");
}
}

View File

@ -38,6 +38,7 @@ import org.eclipse.jetty.websocket.common.IncomingFramesCapture;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.OutgoingFramesCapture;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.extensions.compress.CompressionMethod.Process;
import org.junit.Assert;
import org.junit.Test;
@ -272,6 +273,17 @@ public class MessageCompressionExtensionTest
quote.add("a single experiment can prove me wrong.");
quote.add("-- Albert Einstein");
// Expected compressed parts
List<ByteBuffer> expectedBuffers = new ArrayList<>();
CompressionMethod method = new DeflateCompressionMethod();
for(String part: quote) {
Process process = method.compress();
process.begin();
process.input(BufferUtil.toBuffer(part,StringUtil.__UTF8_CHARSET));
expectedBuffers.add(process.process());
process.end();
}
// Write quote as separate frames
for (String section : quote)
{
@ -298,13 +310,12 @@ public class MessageCompressionExtensionTest
Assert.assertThat(prefix + ".rsv3",actual.isRsv3(),is(false));
// Validate Payload
ByteBuffer expected = BufferUtil.toBuffer(quote.get(i),StringUtil.__UTF8_CHARSET);
ByteBuffer expected = expectedBuffers.get(i);
// Decompress payload
ByteBuffer compressed = actual.getPayload().slice();
// ByteBuffer uncompressed = ext.inflate(compressed);
// Assert.assertThat(prefix + ".payloadLength",uncompressed.remaining(),is(expected.remaining()));
// ByteBufferAssert.assertEquals(prefix + ".payload",expected,uncompressed);
Assert.assertThat(prefix + ".payloadLength",compressed.remaining(),is(expected.remaining()));
ByteBufferAssert.assertEquals(prefix + ".payload",expected,compressed);
}
}

View File

@ -36,12 +36,9 @@ import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.IncomingFramesCapture;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.Parser;
import org.eclipse.jetty.websocket.common.UnitParser;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.extensions.AbstractExtension;
import org.eclipse.jetty.websocket.common.extensions.mux.AbstractMuxExtension;
import org.eclipse.jetty.websocket.common.extensions.mux.MuxParser;
import org.eclipse.jetty.websocket.common.extensions.mux.MuxedFrame;
import org.eclipse.jetty.websocket.common.extensions.mux.Muxer;
import org.junit.Assert;
import org.junit.Test;
@ -60,7 +57,7 @@ public class MuxParserRFCTest
{
IncomingFramesCapture capture = new IncomingFramesCapture();
WebSocketPolicy policy = WebSocketPolicy.newServerPolicy();
Parser parser = new Parser(policy);
Parser parser = new UnitParser(policy);
parser.setIncomingFramesHandler(capture);
List<? extends AbstractExtension> muxList = Collections.singletonList(new DummyMuxExtension());
parser.configureFromExtensions(muxList);

View File

@ -31,8 +31,6 @@ import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WriteResult;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.ConnectionState;
import org.eclipse.jetty.websocket.common.LogicalConnection;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.junit.rules.TestName;
@ -59,16 +57,6 @@ public class LocalWebSocketConnection implements WebSocketConnection, LogicalCon
this.id = testname.getMethodName();
}
@Override
public void assertInputOpen() throws IOException
{
}
@Override
public void assertOutputOpen() throws IOException
{
}
@Override
public void close()
{
@ -92,6 +80,13 @@ public class LocalWebSocketConnection implements WebSocketConnection, LogicalCon
return incoming;
}
@Override
public IOState getIOState()
{
// TODO Auto-generated method stub
return null;
}
@Override
public InetSocketAddress getLocalAddress()
{
@ -122,12 +117,6 @@ public class LocalWebSocketConnection implements WebSocketConnection, LogicalCon
return null;
}
@Override
public ConnectionState getState()
{
return null;
}
@Override
public String getSubProtocol()
{
@ -146,35 +135,18 @@ public class LocalWebSocketConnection implements WebSocketConnection, LogicalCon
incoming.incomingFrame(frame);
}
@Override
public boolean isInputClosed()
{
return false;
}
@Override
public boolean isOpen()
{
return open;
}
@Override
public boolean isOutputClosed()
{
return false;
}
@Override
public boolean isReading()
{
return false;
}
@Override
public void onCloseHandshake(boolean incoming, CloseInfo close)
{
}
public void onOpen() {
open = true;
}

View File

@ -29,7 +29,6 @@ import org.eclipse.jetty.websocket.server.helper.IncomingFramesCapture;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
public class FrameCompressionExtensionTest
@ -50,7 +49,6 @@ public class FrameCompressionExtensionTest
}
@Test
@Ignore("Broken as of ForkInvoker change")
public void testDeflateFrameExtension() throws Exception
{
BlockheadClient client = new BlockheadClient(server.getServerUri());

View File

@ -33,7 +33,9 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.DefaultHandler;
@ -91,8 +93,9 @@ public class WebSocketLoadRFC6455Test
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.CLIENT);
// _endp=new SocketEndPoint(socket);
_generator = new UnitGenerator();
_parser = new Parser(policy);
ByteBufferPool bufferPool = new MappedByteBufferPool();
_generator = new Generator(policy,bufferPool);
_parser = new Parser(policy,bufferPool);
}

View File

@ -226,11 +226,13 @@ public class Fuzzer
for (WebSocketFrame f : send)
{
setClientMask(f);
ByteBuffer rawbytes = generator.generate(f);
if (LOG.isDebugEnabled())
{
LOG.debug("payload: {}",BufferUtil.toDetailString(f.getPayload()));
LOG.debug("frame: {}",f);
LOG.debug("bytes: {}",BufferUtil.toDetailString(rawbytes));
}
BufferUtil.put(generator.generate(f),buf);
BufferUtil.put(rawbytes,buf);
}
BufferUtil.flipToFlush(buf,0);

View File

@ -135,7 +135,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
this.bufferPool = new MappedByteBufferPool(8192);
this.generator = new Generator(policy,bufferPool);
this.parser = new Parser(policy);
this.parser = new Parser(policy,bufferPool);
this.parseCount = new AtomicInteger(0);
this.incomingFrames = new IncomingFramesCapture();
@ -253,8 +253,8 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
extensionStack.negotiate(configs);
// Start with default routing
extensionStack.setNextIncoming(this);
extensionStack.setNextOutgoing(this);
extensionStack.setNextIncoming(this); // the websocket layer
extensionStack.setNextOutgoing(outgoing); // the network layer
// Configure Parser / Generator
extensionStack.configure(parser);
@ -274,6 +274,9 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
parser.setIncomingFramesHandler(extensionStack);
ioState.setState(ConnectionState.OPEN);
LOG.debug("outgoing = {}",outgoing);
LOG.debug("incoming = {}",extensionStack);
return respHeader;
}
@ -439,6 +442,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
}
if (ioState.isInputClosed())
{
LOG.debug("Input is closed");
return 0;
}
int len = 0;
@ -449,6 +453,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
if (b == (-1))
{
eof = true;
break;
}
buf.put((byte)b);
len++;
@ -459,7 +464,6 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
public IncomingFramesCapture readFrames(int expectedCount, TimeUnit timeoutUnit, int timeoutDuration) throws IOException, TimeoutException
{
LOG.debug("Read: waiting for {} frame(s) from server",expectedCount);
int startCount = incomingFrames.size();
ByteBuffer buf = bufferPool.acquire(BUFFER_SIZE,false);
BufferUtil.clearToFill(buf);
@ -470,17 +474,34 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
long expireOn = now + msDur;
LOG.debug("Now: {} - expireOn: {} ({} ms)",now,expireOn,msDur);
long iter = 0;
int len = 0;
while (incomingFrames.size() < (startCount + expectedCount))
while (incomingFrames.size() < expectedCount)
{
BufferUtil.clearToFill(buf);
len = read(buf);
if (len > 0)
{
LOG.debug("Read {} bytes",len);
BufferUtil.flipToFlush(buf,0);
if (LOG.isDebugEnabled())
{
LOG.debug("Read {} bytes: {}",len,BufferUtil.toDetailString(buf));
}
parser.parse(buf);
}
else
{
if (LOG.isDebugEnabled())
{
iter++;
if ((iter % 10000000) == 0)
{
LOG.debug("10,000,000 reads of zero length");
iter = 0;
}
}
}
if (!debug && (System.currentTimeMillis() > expireOn))
{
@ -604,9 +625,16 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
public void write(WebSocketFrame frame) throws IOException
{
LOG.debug("write(Frame->{}) to {}",frame,outgoing);
frame.setMask(clientmask);
// DEBUG frame.setMask(new byte[] { 0x00, 0x00, 0x00, 0x00 });
outgoing.outgoingFrame(frame);
if (LOG.isDebugEnabled())
{
frame.setMask(new byte[]
{ 0x00, 0x00, 0x00, 0x00 });
}
else
{
frame.setMask(clientmask);
}
extensionStack.outgoingFrame(frame);
}
public void writeRaw(ByteBuffer buf) throws IOException

View File

@ -13,6 +13,7 @@
<input id="info" class="button" type="submit" name="info" value="info" disabled="disabled"/>
<input id="time" class="button" type="submit" name="time" value="time" disabled="disabled"/>
<input id="hello" class="button" type="submit" name="hello" value="hello" disabled="disabled"/>
<input id="there" class="button" type="submit" name="there" value="there" disabled="disabled"/>
</div>
<script type="text/javascript">
$("connect").onclick = function(event) { wstool.connect(); return false; }
@ -20,6 +21,7 @@
$("info").onclick = function(event) {wstool.write("info:"); return false; }
$("time").onclick = function(event) {wstool.write("time:"); return false; }
$("hello").onclick = function(event) {wstool.write("Hello"); return false; }
$("there").onclick = function(event) {wstool.write("There"); return false; }
</script>
</body>
</html>

View File

@ -70,6 +70,7 @@ var wstool = {
$('info').disabled = !enabled;
$('time').disabled = !enabled;
$('hello').disabled = !enabled;
$('there').disabled = !enabled;
},
_onopen : function() {

View File

@ -1,25 +1,16 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
# org.eclipse.jetty.websocket.LEVEL=WARN
# org.eclipse.jetty.websocket.LEVEL=DEBUG
# org.eclipse.jetty.websocket.io.LEVEL=DEBUG
# org.eclipse.jetty.websocket.io.WebSocketAsyncConnection.LEVEL=DEBUG
# org.eclipse.jetty.websocket.io.ControlFrameBytes.LEVEL=DEBUG
# org.eclipse.jetty.websocket.driver.WebSocketEventDriver.LEVEL=DEBUG
# org.eclipse.jetty.websocket.extensions.LEVEL=DEBUG
# org.eclipse.jetty.websocket.protocol.Generator.LEVEL=INFO
# org.eclipse.jetty.websocket.protocol.Parser.LEVEL=DEBUG
# org.eclipse.jetty.websocket.io.payload.LEVEL=DEBUG
# org.eclipse.jetty.websocket.server.ab.LEVEL=DEBUG
# org.eclipse.jetty.websocket.common.io.LEVEL=DEBUG
# org.eclipse.jetty.websocket.common.Parser.LEVEL=DEBUG
# org.eclipse.jetty.websocket.common.Generator.LEVEL=DEBUG
# org.eclipse.jetty.websocket.server.ab.Fuzzer.LEVEL=DEBUG
# org.eclipse.jetty.websocket.server.blockhead.LEVEL=DEBUG
### See the read/write traffic
# org.eclipse.jetty.websocket.io.Frames.LEVEL=DEBUG
### Show state changes on BrowserDebugTool
# org.eclipse.jetty.websocket.server.browser.LEVEL=DEBUG
### Disabling intentional error out of RFCSocket
org.eclipse.jetty.websocket.server.helper.RFCSocket.LEVEL=OFF
### Disable stacks on FrameBytes.failed()
org.eclipse.jetty.websocket.core.io.FrameBytes.STACKS=false