Issue #207 - simplifying OnUpgradeTo prefill + parse, with test

This commit is contained in:
Joakim Erdfelt 2017-04-28 17:11:56 -07:00
parent 92f8f594d1
commit c065f7d125
7 changed files with 262 additions and 255 deletions

View File

@ -352,7 +352,7 @@ public class Generator
*/
public void generateWholeFrame(Frame frame, ByteBuffer buf)
{
buf.put(generateHeaderBytes(frame));
generateHeaderBytes(frame, buf);
if (frame.hasPayload())
{
if (readOnly)

View File

@ -24,6 +24,7 @@ import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
@ -88,12 +89,17 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
private final List<LogicalConnection.Listener> listeners = new CopyOnWriteArrayList<>();
private List<ExtensionConfig> extensions;
private ByteBuffer networkBuffer;
private ByteBuffer prefillBuffer;
public AbstractWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool, ExtensionStack extensionStack)
{
super(endp,executor);
Objects.requireNonNull(endp, "EndPoint");
Objects.requireNonNull(executor, "Executor");
Objects.requireNonNull(scheduler, "Scheduler");
Objects.requireNonNull(policy, "WebSocketPolicy");
Objects.requireNonNull(bufferPool, "ByteBufferPool");
LOG = Log.getLogger(AbstractWebSocketConnection.class.getName() + "." + policy.getBehavior());
this.id = String.format("%s:%d->%s:%d",
@ -279,10 +285,22 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
return true;
}
private ByteBuffer getNetworkBuffer()
{
synchronized (this)
{
if (networkBuffer == null)
{
networkBuffer = bufferPool.acquire(getInputBufferSize(), true);
}
return networkBuffer;
}
}
@Override
public void onFillable()
{
networkBuffer = bufferPool.acquire(getInputBufferSize(),true);
getNetworkBuffer();
fillAndParse();
}
@ -297,38 +315,27 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
return;
}
if (BufferUtil.hasContent(prefillBuffer))
if (networkBuffer.hasRemaining())
{
if (LOG.isDebugEnabled())
{
LOG.debug("Parsing Upgrade prefill buffer ({})", prefillBuffer.remaining(), BufferUtil.toDetailString(prefillBuffer));
}
if (!parser.parse(prefillBuffer)) return;
}
else
{
if (networkBuffer.hasRemaining())
{
if (!parser.parse(networkBuffer)) return;
}
int filled = getEndPoint().fill(networkBuffer);
if (filled < 0)
{
bufferPool.release(networkBuffer);
return;
}
if (filled == 0)
{
bufferPool.release(networkBuffer);
fillInterested();
return;
}
if (!parser.parse(networkBuffer)) return;
}
int filled = getEndPoint().fill(networkBuffer);
if (filled < 0)
{
bufferPool.release(networkBuffer);
return;
}
if (filled == 0)
{
bufferPool.release(networkBuffer);
fillInterested();
return;
}
// if (!parser.parse(networkBuffer)) return;
}
}
catch (Throwable t)
@ -349,7 +356,14 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
{
LOG.debug("set Initial Buffer - {}",BufferUtil.toDetailString(prefilled));
}
prefillBuffer = prefilled;
if ((prefilled != null) && (prefilled.hasRemaining()))
{
networkBuffer = bufferPool.acquire(prefilled.remaining(), true);
BufferUtil.clearToFill(networkBuffer);
BufferUtil.put(prefilled, networkBuffer);
BufferUtil.flipToFlush(networkBuffer, 0);
}
}
private void notifyError(Throwable cause)

View File

@ -50,6 +50,13 @@
<artifactId>jetty-io</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-http</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.toolchain</groupId>
<artifactId>jetty-test-helper</artifactId>

View File

@ -0,0 +1,38 @@
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.tests;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.Parser;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
public class ParserCapture implements Parser.Handler
{
public BlockingQueue<WebSocketFrame> framesQueue = new LinkedBlockingDeque<>();
@Override
public boolean onFrame(Frame frame)
{
framesQueue.offer(WebSocketFrame.copy(frame));
return true; // it is consumed
}
}

View File

@ -25,6 +25,7 @@ import javax.servlet.http.HttpServlet;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.LocalConnector;
import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
@ -44,6 +45,7 @@ public class SimpleServletServer extends ContainerLifeCycle
private static final Logger LOG = Log.getLogger(SimpleServletServer.class);
private Server server;
private ServerConnector connector;
private LocalConnector localConnector;
private URI serverUri;
private HttpServlet servlet;
private boolean ssl = false;
@ -58,7 +60,12 @@ public class SimpleServletServer extends ContainerLifeCycle
{
this.ssl = ssl;
}
public LocalConnector getLocalConnector()
{
return localConnector;
}
public URI getServerUri()
{
return serverUri;
@ -113,7 +120,12 @@ public class SimpleServletServer extends ContainerLifeCycle
connector = new ServerConnector(server);
connector.setPort(0);
}
// Add network connector
server.addConnector(connector);
// Add Local Connector
localConnector = new LocalConnector(server);
server.addConnector(localConnector);
ServletContextHandler context = new ServletContextHandler();
context.setContextPath("/");
@ -146,6 +158,7 @@ public class SimpleServletServer extends ContainerLifeCycle
protected void configureServletContextHandler(ServletContextHandler context)
{
/* override to change context handler */
}
public WebSocketServletFactory getWebSocketServletFactory()

View File

@ -0,0 +1,156 @@
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.tests.server;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.http.HttpTester;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.server.LocalConnector;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.Generator;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.Parser;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.frames.CloseFrame;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.tests.ParserCapture;
import org.eclipse.jetty.websocket.tests.SimpleServletServer;
import org.eclipse.jetty.websocket.tests.servlets.EchoServlet;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
/**
* Test simulating a client that talks too quickly.
* <p>
* This is mainly for the {@link org.eclipse.jetty.io.Connection.UpgradeTo} logic within
* the {@link org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection} implementation.
* </p>
* <p>
* There is a class of client that will send the GET+Upgrade Request along with a few websocket frames in a single
* network packet. This test attempts to perform this behavior as close as possible.
* </p>
*/
public class ConnectionUpgradeToBufferTest
{
private static SimpleServletServer server;
@BeforeClass
public static void startServer() throws Exception
{
server = new SimpleServletServer(new EchoServlet());
server.start();
}
@AfterClass
public static void stopServer() throws Exception
{
server.stop();
}
@Rule
public TestName testname = new TestName();
@Test
public void testUpgradeWithSmallFrames() throws Exception
{
ByteBuffer buf = ByteBuffer.allocate(4096);
// Create Upgrade Request Header
StringBuilder upgradeRequest = new StringBuilder();
upgradeRequest.append("GET / HTTP/1.1\r\n");
upgradeRequest.append("Host: local\r\n");
upgradeRequest.append("Connection: Upgrade\r\n");
upgradeRequest.append("Upgrade: WebSocket\r\n");
upgradeRequest.append("Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n");
upgradeRequest.append("Sec-WebSocket-Origin: ws://local/\r\n");
upgradeRequest.append("Sec-WebSocket-Protocol: echo\r\n");
upgradeRequest.append("Sec-WebSocket-Version: 13\r\n");
upgradeRequest.append("\r\n");
ByteBuffer upgradeRequestBytes = BufferUtil.toBuffer(upgradeRequest.toString(), StandardCharsets.UTF_8);
BufferUtil.put(upgradeRequestBytes, buf);
// Create A few WebSocket Frames
TextFrame frame1 = new TextFrame().setPayload("Hello 1");
TextFrame frame2 = new TextFrame().setPayload("Hello 2");
CloseFrame closeFrame = new CloseInfo(StatusCode.NORMAL).asFrame();
// Need to set frame mask (as these are client frames)
byte mask[] = new byte[]{0x11, 0x22, 0x33, 0x44};
frame1.setMask(mask);
frame2.setMask(mask);
closeFrame.setMask(mask);
ByteBufferPool bufferPool = new MappedByteBufferPool();
Generator generator = new Generator(WebSocketPolicy.newClientPolicy(), bufferPool);
generator.generateWholeFrame(frame1, buf);
generator.generateWholeFrame(frame2, buf);
generator.generateWholeFrame(closeFrame, buf);
// Send this buffer to the server
BufferUtil.flipToFlush(buf, 0);
LocalConnector connector = server.getLocalConnector();
LocalConnector.LocalEndPoint endPoint = connector.connect();
endPoint.addInput(buf);
// Get response
ByteBuffer response = endPoint.waitForResponse(false, 1, TimeUnit.SECONDS);
HttpTester.Response parsedResponse = HttpTester.parseResponse(response);
assertThat("Is Switching Protocols", parsedResponse.getStatus(), is(101));
assertThat("Is WebSocket Upgrade", parsedResponse.get("Upgrade"), is("WebSocket"));
// Let server know that client is done sending
endPoint.addInputEOF();
// Wait for server to close
endPoint.waitUntilClosed();
// Get the server send echo bytes
ByteBuffer wsIncoming = endPoint.getOutput();
// Parse those bytes into frames
ParserCapture capture = new ParserCapture();
Parser parser = new Parser(WebSocketPolicy.newClientPolicy(), bufferPool, capture);
parser.parse(wsIncoming);
// Validate echoed frames
WebSocketFrame incomingFrame;
incomingFrame = capture.framesQueue.poll(1, TimeUnit.SECONDS);
assertThat("Incoming Frame.op", incomingFrame.getOpCode(), is(OpCode.TEXT));
assertThat("Incoming Frame.payload", incomingFrame.getPayloadAsUTF8(), is("Hello 1"));
incomingFrame = capture.framesQueue.poll(1, TimeUnit.SECONDS);
assertThat("Incoming Frame.op", incomingFrame.getOpCode(), is(OpCode.TEXT));
assertThat("Incoming Frame.payload", incomingFrame.getPayloadAsUTF8(), is("Hello 2"));
}
}

View File

@ -1,221 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.tests.server;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.tests.Defaults;
import org.eclipse.jetty.websocket.tests.LeakTrackingBufferPoolRule;
import org.eclipse.jetty.websocket.tests.SimpleServletServer;
import org.eclipse.jetty.websocket.tests.TrackingEndpoint;
import org.eclipse.jetty.websocket.tests.servlets.EchoServlet;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
/**
* Test simulating a client that talks too quickly.
* <p>
* There is a class of client that will send the GET+Upgrade Request along with a few websocket frames in a single
* network packet. This test attempts to perform this behavior as close as possible.
*/
public class TooFastClientTest
{
private static SimpleServletServer server;
@BeforeClass
public static void startServer() throws Exception
{
server = new SimpleServletServer(new EchoServlet());
server.start();
}
@AfterClass
public static void stopServer() throws Exception
{
server.stop();
}
@Rule
LeakTrackingBufferPoolRule bufferPool = new LeakTrackingBufferPoolRule(TooFastClientTest.class.getSimpleName());
@Rule
public TestName testname = new TestName();
private WebSocketClient client;
@Before
public void startClient() throws Exception
{
client = new WebSocketClient();
client.start();
}
@After
public void stopClient() throws Exception
{
client.stop();
}
@Test
public void testUpgradeWithSmallFrames() throws Exception
{
URI wsUri = server.getServerUri();
TrackingEndpoint clientSocket = new TrackingEndpoint(testname.getMethodName());
ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest();
/* TODO
Generate the Request ByteBuffer.
Complete with ..
* A WebSocket Upgrade Request URI
* A WebSocket Upgrade Request Headers
* A few outgoing WebSocket frames
Send this ByteBuffer as the complete HTTP request bytebuffer.
// Create ByteBuffer representing the initial opening network packet from the client
ByteBuffer initialPacket = ByteBuffer.allocate(4096);
BufferUtil.clearToFill(initialPacket);
// Add upgrade request to packet
StringBuilder upgradeRequest = client.generateUpgradeRequest();
ByteBuffer upgradeBuffer = BufferUtil.toBuffer(upgradeRequest.toString(), StandardCharsets.UTF_8);
initialPacket.put(upgradeBuffer);
// Add text frames
Generator generator = new Generator(WebSocketPolicy.newClientPolicy(), bufferPool);
String msg1 = "Echo 1";
String msg2 = "This is also an echooooo!";
TextFrame frame1 = new TextFrame().setPayload(msg1);
TextFrame frame2 = new TextFrame().setPayload(msg2);
// Need to set frame mask (as these are client frames)
byte mask[] = new byte[]{0x11, 0x22, 0x33, 0x44};
frame1.setMask(mask);
frame2.setMask(mask);
generator.generateWholeFrame(frame1, initialPacket);
generator.generateWholeFrame(frame2, initialPacket);
// Write packet to network
BufferUtil.flipToFlush(initialPacket, 0);
client.writeRaw(initialPacket);
*/
Future<Session> clientConnectFuture = client.connect(clientSocket, wsUri, upgradeRequest);
// Expect upgrade success
Session clientSession = clientConnectFuture.get(Defaults.CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
// Read incoming messages
String incomingMessage;
incomingMessage = clientSocket.messageQueue.poll(5, TimeUnit.SECONDS);
assertThat("Echoed Incoming Message 1", incomingMessage, is("Echo 1"));
incomingMessage = clientSocket.messageQueue.poll(5, TimeUnit.SECONDS);
assertThat("Echoed Incoming Message 2", incomingMessage, is("This is also an echooooo!"));
clientSession.close();
}
/**
* Test where were a client sends a HTTP Upgrade to websocket AND enough websocket frame(s)
* to completely overfill the {@link org.eclipse.jetty.io.AbstractConnection#getInputBufferSize()}
* to test a situation where the WebSocket connection opens with prefill that exceeds
* the normal input buffer sizes.
*
* @throws Exception on test failure
*/
@Test
public void testUpgradeWithLargeFrame() throws Exception
{
URI wsUri = server.getServerUri();
TrackingEndpoint clientSocket = new TrackingEndpoint(testname.getMethodName());
ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest();
byte bigMsgBytes[] = new byte[64 * 1024];
Arrays.fill(bigMsgBytes, (byte) 'x');
String bigMsg = new String(bigMsgBytes, StandardCharsets.UTF_8);
/* TODO
Generate the Request ByteBuffer.
Complete with ..
* A WebSocket Upgrade Request URI
* A WebSocket Upgrade Request Headers
* A big enough outgoing WebSocket frame
that will trigger a prefill + an unread buffer
Send this ByteBuffer as the complete HTTP request bytebuffer.
// Create ByteBuffer representing the initial opening network packet from the client
ByteBuffer initialPacket = ByteBuffer.allocate(100 * 1024);
BufferUtil.clearToFill(initialPacket);
// Add upgrade request to packet
StringBuilder upgradeRequest = client.generateUpgradeRequest();
ByteBuffer upgradeBuffer = BufferUtil.toBuffer(upgradeRequest.toString(), StandardCharsets.UTF_8);
initialPacket.put(upgradeBuffer);
// Add text frames
Generator generator = new Generator(WebSocketPolicy.newClientPolicy(), bufferPool);
// Need to set frame mask (as these are client frames)
byte mask[] = new byte[]{0x11, 0x22, 0x33, 0x44};
TextFrame frame = new TextFrame().setPayload(bigMsg);
frame.setMask(mask);
generator.generateWholeFrame(frame, initialPacket);
// Write packet to network
BufferUtil.flipToFlush(initialPacket, 0);
client.writeRaw(initialPacket);
// Expect upgrade
client.expectUpgradeResponse();
*/
Future<Session> clientConnectFuture = client.connect(clientSocket, wsUri, upgradeRequest);
// Expect upgrade success
Session clientSession = clientConnectFuture.get(Defaults.CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
// Read incoming messages
String incomingMessage;
incomingMessage = clientSocket.messageQueue.poll(5, TimeUnit.SECONDS);
assertThat("Echoed Incoming Message 1", incomingMessage, is(bigMsg));
clientSession.close();
}
}