Making WebSocket use new Upgrade mechanism

+ Also fixes websocket server prefill bytes issue
+ Adjusting client side to also use/benefit from new Upgrade mechanism
This commit is contained in:
Joakim Erdfelt 2015-02-13 15:15:40 -07:00
parent 5a40ed5a0d
commit c52f100ec3
4 changed files with 143 additions and 30 deletions

View File

@ -86,6 +86,7 @@ public class WebSocketClientConnection extends AbstractWebSocketConnection
connectPromise.succeeded(session); connectPromise.succeeded(session);
ByteBuffer extraBuf = connectPromise.getResponse().getRemainingBuffer(); ByteBuffer extraBuf = connectPromise.getResponse().getRemainingBuffer();
setInitialBuffer(extraBuf);
if (extraBuf.hasRemaining()) if (extraBuf.hasRemaining())
{ {
LOG.debug("Parsing extra remaining buffer from UpgradeConnection"); LOG.debug("Parsing extra remaining buffer from UpgradeConnection");

View File

@ -215,7 +215,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
private WebSocketSession session; private WebSocketSession session;
private List<ExtensionConfig> extensions; private List<ExtensionConfig> extensions;
private boolean isFilling; private boolean isFilling;
private ByteBuffer buffer; private ByteBuffer prefillBuffer;
private ReadMode readMode = ReadMode.PARSE; private ReadMode readMode = ReadMode.PARSE;
private IOState ioState; private IOState ioState;
private Stats stats = new Stats(); private Stats stats = new Stats();
@ -424,14 +424,22 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
switch (state) switch (state)
{ {
case OPEN: case OPEN:
if (BufferUtil.isEmpty(buffer)) if (BufferUtil.hasContent(prefillBuffer))
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("fillInterested"); {
fillInterested(); LOG.debug("OPEN: has prefill - onFillable called");
}
onFillable();
} }
else else
onFillable(); {
if (LOG.isDebugEnabled())
{
LOG.debug("OPEN: normal fillInterested");
}
fillInterested();
}
break; break;
case CLOSED: case CLOSED:
if (ioState.wasAbnormalClose()) if (ioState.wasAbnormalClose())
@ -465,8 +473,9 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} onFillable()",policy.getBehavior()); LOG.debug("{} onFillable()",policy.getBehavior());
stats.countOnFillableEvents.incrementAndGet(); stats.countOnFillableEvents.incrementAndGet();
if (buffer==null)
buffer = bufferPool.acquire(getInputBufferSize(),true); ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),true);
try try
{ {
isFilling = true; isFilling = true;
@ -483,7 +492,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
finally finally
{ {
bufferPool.release(buffer); bufferPool.release(buffer);
buffer=null;
} }
if ((readMode != ReadMode.EOF) && (suspendToken.get() == false)) if ((readMode != ReadMode.EOF) && (suspendToken.get() == false))
@ -506,9 +514,18 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
super.onFillInterestedFailed(cause); super.onFillInterestedFailed(cause);
} }
protected void prefill(ByteBuffer prefilled) /**
* Extra bytes from the initial HTTP upgrade that need to
* be processed by the websocket parser before starting
* to read bytes from the connection
*/
protected void setInitialBuffer(ByteBuffer prefilled)
{ {
buffer=prefilled; if (LOG.isDebugEnabled())
{
LOG.debug("set Initial Buffer - {}",BufferUtil.toDetailString(prefilled));
}
prefillBuffer = prefilled;
} }
@Override @Override
@ -606,27 +623,52 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
EndPoint endPoint = getEndPoint(); EndPoint endPoint = getEndPoint();
try try
{ {
while (true) // TODO: should this honor the LogicalConnection.suspend() ? // Process any prefill first
while (BufferUtil.hasContent(prefillBuffer))
{
if (BufferUtil.hasContent(prefillBuffer))
{
int pos = BufferUtil.flipToFill(buffer);
int size = BufferUtil.put(prefillBuffer,buffer);
BufferUtil.flipToFlush(buffer,pos);
if (LOG.isDebugEnabled())
{
LOG.debug("Filled {} bytes of Upgrade prefill buffer for parse ({} remaining)",size,prefillBuffer.remaining());
}
if (!prefillBuffer.hasRemaining())
{
prefillBuffer = null;
}
}
if (buffer.hasRemaining())
{
parser.parse(buffer);
}
}
// Process the content from the Endpoint next
while(true) // TODO: should this honor the LogicalConnection.suspend() ?
{ {
int filled = endPoint.fill(buffer); int filled = endPoint.fill(buffer);
if (filled == 0) if (filled < 0)
{
return ReadMode.PARSE;
}
else if (filled < 0)
{ {
LOG.debug("read - EOF Reached (remote: {})",getRemoteAddress()); LOG.debug("read - EOF Reached (remote: {})",getRemoteAddress());
ioState.onReadFailure(new EOFException("Remote Read EOF")); ioState.onReadFailure(new EOFException("Remote Read EOF"));
return ReadMode.EOF; return ReadMode.EOF;
} }
else else if (filled == 0)
{ {
if (LOG.isDebugEnabled()) // Done reading, wait for next onFillable
{ return ReadMode.PARSE;
LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
}
parser.parse(buffer);
} }
if (LOG.isDebugEnabled())
{
LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
}
parser.parse(buffer);
} }
} }
catch (IOException e) catch (IOException e)
@ -649,7 +691,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
return ReadMode.DISCARD; return ReadMode.DISCARD;
} }
} }
@Override @Override
public void resume() public void resume()
{ {

View File

@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames; import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
@ -57,10 +56,15 @@ public class WebSocketServerConnection extends AbstractWebSocketConnection imple
return getEndPoint().getRemoteAddress(); return getEndPoint().getRemoteAddress();
} }
/**
* Extra bytes from the initial HTTP upgrade that need to
* be processed by the websocket parser before starting
* to read bytes from the connection
*/
@Override @Override
public void onUpgradeTo(ByteBuffer prefilled) public void onUpgradeTo(ByteBuffer prefilled)
{ {
prefill(prefilled); setInitialBuffer(prefilled);
} }
@Override @Override

View File

@ -22,6 +22,7 @@ import static org.hamcrest.Matchers.*;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.MappedByteBufferPool; import org.eclipse.jetty.io.MappedByteBufferPool;
@ -37,7 +38,6 @@ import org.eclipse.jetty.websocket.server.examples.MyEchoServlet;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
/** /**
@ -46,7 +46,6 @@ import org.junit.Test;
* There is a class of client that will send the GET+Upgrade Request along with a few websocket frames in a single * There is a class of client that will send the GET+Upgrade Request along with a few websocket frames in a single
* network packet. This test attempts to perform this behavior as close as possible. * network packet. This test attempts to perform this behavior as close as possible.
*/ */
@Ignore
public class TooFastClientTest public class TooFastClientTest
{ {
private static SimpleServletServer server; private static SimpleServletServer server;
@ -65,7 +64,7 @@ public class TooFastClientTest
} }
@Test @Test
public void testUpgradeWithWebkitDeflateExtension() throws Exception public void testUpgradeWithSmallFrames() throws Exception
{ {
BlockheadClient client = new BlockheadClient(server.getServerUri()); BlockheadClient client = new BlockheadClient(server.getServerUri());
try try
@ -84,11 +83,20 @@ public class TooFastClientTest
// Add text frames // Add text frames
Generator generator = new Generator(WebSocketPolicy.newClientPolicy(), Generator generator = new Generator(WebSocketPolicy.newClientPolicy(),
new LeakTrackingBufferPool("Generator",new MappedByteBufferPool())); new LeakTrackingBufferPool("Generator",new MappedByteBufferPool()));
String msg1 = "Echo 1"; String msg1 = "Echo 1";
String msg2 = "This is also an echooooo!"; String msg2 = "This is also an echooooo!";
generator.generateWholeFrame(new TextFrame().setPayload(msg1),initialPacket); TextFrame frame1 = new TextFrame().setPayload(msg1);
generator.generateWholeFrame(new TextFrame().setPayload(msg2),initialPacket); TextFrame frame2 = new TextFrame().setPayload(msg2);
// Need to set frame mask (as these are client frames)
byte mask[] = new byte[] { 0x11, 0x22, 0x33, 0x44 };
frame1.setMask(mask);
frame2.setMask(mask);
generator.generateWholeFrame(frame1,initialPacket);
generator.generateWholeFrame(frame2,initialPacket);
// Write packet to network // Write packet to network
BufferUtil.flipToFlush(initialPacket,0); BufferUtil.flipToFlush(initialPacket,0);
@ -109,4 +117,62 @@ public class TooFastClientTest
client.close(); client.close();
} }
} }
/**
* Test where were a client sends a HTTP Upgrade to websocket AND enough websocket frame(s)
* to completely overfill the {@link org.eclipse.jetty.io.AbstractConnection#getInputBufferSize()}
* to test a situation where the WebSocket connection opens with prefill that exceeds
* the normal input buffer sizes.
*/
@Test
public void testUpgradeWithLargeFrame() throws Exception
{
BlockheadClient client = new BlockheadClient(server.getServerUri());
try
{
client.connect();
// Create ByteBuffer representing the initial opening network packet from the client
ByteBuffer initialPacket = ByteBuffer.allocate(100 * 1024);
BufferUtil.clearToFill(initialPacket);
// Add upgrade request to packet
StringBuilder upgradeRequest = client.generateUpgradeRequest();
ByteBuffer upgradeBuffer = BufferUtil.toBuffer(upgradeRequest.toString(),StandardCharsets.UTF_8);
initialPacket.put(upgradeBuffer);
// Add text frames
Generator generator = new Generator(WebSocketPolicy.newClientPolicy(),
new LeakTrackingBufferPool("Generator",new MappedByteBufferPool()));
byte bigMsgBytes[] = new byte[64*1024];
Arrays.fill(bigMsgBytes,(byte)'x');
String bigMsg = new String(bigMsgBytes, StandardCharsets.UTF_8);
// Need to set frame mask (as these are client frames)
byte mask[] = new byte[] { 0x11, 0x22, 0x33, 0x44 };
TextFrame frame = new TextFrame().setPayload(bigMsg);
frame.setMask(mask);
generator.generateWholeFrame(frame,initialPacket);
// Write packet to network
BufferUtil.flipToFlush(initialPacket,0);
client.writeRaw(initialPacket);
// Expect upgrade
client.expectUpgradeResponse();
// Read frames (hopefully text frames)
EventQueue<WebSocketFrame> frames = client.readFrames(1,1,TimeUnit.SECONDS);
WebSocketFrame tf = frames.poll();
Assert.assertThat("Text Frame/msg1",tf.getPayloadAsUTF8(),is(bigMsg));
}
finally
{
client.close();
}
}
} }