diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/Generator.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/Generator.java index 3886ac2ab4b..9b0f37a11ee 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/Generator.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/Generator.java @@ -352,7 +352,7 @@ public class Generator */ public void generateWholeFrame(Frame frame, ByteBuffer buf) { - buf.put(generateHeaderBytes(frame)); + generateHeaderBytes(frame, buf); if (frame.hasPayload()) { if (readOnly) diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java index fb5492a8997..0af6a96755a 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java @@ -24,6 +24,7 @@ import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; @@ -88,12 +89,17 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp private final List listeners = new CopyOnWriteArrayList<>(); private List extensions; private ByteBuffer networkBuffer; - private ByteBuffer prefillBuffer; public AbstractWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool, ExtensionStack extensionStack) { super(endp,executor); + Objects.requireNonNull(endp, "EndPoint"); + Objects.requireNonNull(executor, "Executor"); + Objects.requireNonNull(scheduler, "Scheduler"); + Objects.requireNonNull(policy, "WebSocketPolicy"); + Objects.requireNonNull(bufferPool, "ByteBufferPool"); + LOG = Log.getLogger(AbstractWebSocketConnection.class.getName() + "." + policy.getBehavior()); this.id = String.format("%s:%d->%s:%d", @@ -279,10 +285,22 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp return true; } + private ByteBuffer getNetworkBuffer() + { + synchronized (this) + { + if (networkBuffer == null) + { + networkBuffer = bufferPool.acquire(getInputBufferSize(), true); + } + return networkBuffer; + } + } + @Override public void onFillable() { - networkBuffer = bufferPool.acquire(getInputBufferSize(),true); + getNetworkBuffer(); fillAndParse(); } @@ -297,38 +315,27 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp return; } - if (BufferUtil.hasContent(prefillBuffer)) + if (networkBuffer.hasRemaining()) { - if (LOG.isDebugEnabled()) - { - LOG.debug("Parsing Upgrade prefill buffer ({})", prefillBuffer.remaining(), BufferUtil.toDetailString(prefillBuffer)); - } - if (!parser.parse(prefillBuffer)) return; - } - else - { - if (networkBuffer.hasRemaining()) - { - if (!parser.parse(networkBuffer)) return; - } - - int filled = getEndPoint().fill(networkBuffer); - - if (filled < 0) - { - bufferPool.release(networkBuffer); - return; - } - - if (filled == 0) - { - bufferPool.release(networkBuffer); - fillInterested(); - return; - } - if (!parser.parse(networkBuffer)) return; } + + int filled = getEndPoint().fill(networkBuffer); + + if (filled < 0) + { + bufferPool.release(networkBuffer); + return; + } + + if (filled == 0) + { + bufferPool.release(networkBuffer); + fillInterested(); + return; + } + + // if (!parser.parse(networkBuffer)) return; } } catch (Throwable t) @@ -349,7 +356,14 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp { LOG.debug("set Initial Buffer - {}",BufferUtil.toDetailString(prefilled)); } - prefillBuffer = prefilled; + + if ((prefilled != null) && (prefilled.hasRemaining())) + { + networkBuffer = bufferPool.acquire(prefilled.remaining(), true); + BufferUtil.clearToFill(networkBuffer); + BufferUtil.put(prefilled, networkBuffer); + BufferUtil.flipToFlush(networkBuffer, 0); + } } private void notifyError(Throwable cause) diff --git a/jetty-websocket/websocket-tests/pom.xml b/jetty-websocket/websocket-tests/pom.xml index bb36152ae7d..07b541ffd4e 100644 --- a/jetty-websocket/websocket-tests/pom.xml +++ b/jetty-websocket/websocket-tests/pom.xml @@ -50,6 +50,13 @@ jetty-io ${project.version} + + org.eclipse.jetty + jetty-http + ${project.version} + tests + test + org.eclipse.jetty.toolchain jetty-test-helper diff --git a/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/ParserCapture.java b/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/ParserCapture.java new file mode 100644 index 00000000000..f687b55bd65 --- /dev/null +++ b/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/ParserCapture.java @@ -0,0 +1,38 @@ +// +// ======================================================================== +// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; + +import org.eclipse.jetty.websocket.api.extensions.Frame; +import org.eclipse.jetty.websocket.common.Parser; +import org.eclipse.jetty.websocket.common.WebSocketFrame; + +public class ParserCapture implements Parser.Handler +{ + public BlockingQueue framesQueue = new LinkedBlockingDeque<>(); + + @Override + public boolean onFrame(Frame frame) + { + framesQueue.offer(WebSocketFrame.copy(frame)); + return true; // it is consumed + } +} diff --git a/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/SimpleServletServer.java b/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/SimpleServletServer.java index 2453adf3fcd..3271fd8d5f0 100644 --- a/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/SimpleServletServer.java +++ b/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/SimpleServletServer.java @@ -25,6 +25,7 @@ import javax.servlet.http.HttpServlet; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpConnectionFactory; +import org.eclipse.jetty.server.LocalConnector; import org.eclipse.jetty.server.SecureRequestCustomizer; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; @@ -44,6 +45,7 @@ public class SimpleServletServer extends ContainerLifeCycle private static final Logger LOG = Log.getLogger(SimpleServletServer.class); private Server server; private ServerConnector connector; + private LocalConnector localConnector; private URI serverUri; private HttpServlet servlet; private boolean ssl = false; @@ -58,7 +60,12 @@ public class SimpleServletServer extends ContainerLifeCycle { this.ssl = ssl; } - + + public LocalConnector getLocalConnector() + { + return localConnector; + } + public URI getServerUri() { return serverUri; @@ -113,7 +120,12 @@ public class SimpleServletServer extends ContainerLifeCycle connector = new ServerConnector(server); connector.setPort(0); } + // Add network connector server.addConnector(connector); + + // Add Local Connector + localConnector = new LocalConnector(server); + server.addConnector(localConnector); ServletContextHandler context = new ServletContextHandler(); context.setContextPath("/"); @@ -146,6 +158,7 @@ public class SimpleServletServer extends ContainerLifeCycle protected void configureServletContextHandler(ServletContextHandler context) { + /* override to change context handler */ } public WebSocketServletFactory getWebSocketServletFactory() diff --git a/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/ConnectionUpgradeToBufferTest.java b/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/ConnectionUpgradeToBufferTest.java new file mode 100644 index 00000000000..4fc6b49a659 --- /dev/null +++ b/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/ConnectionUpgradeToBufferTest.java @@ -0,0 +1,156 @@ +// +// ======================================================================== +// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests.server; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.http.HttpTester; +import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.MappedByteBufferPool; +import org.eclipse.jetty.server.LocalConnector; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.websocket.api.StatusCode; +import org.eclipse.jetty.websocket.api.WebSocketPolicy; +import org.eclipse.jetty.websocket.common.CloseInfo; +import org.eclipse.jetty.websocket.common.Generator; +import org.eclipse.jetty.websocket.common.OpCode; +import org.eclipse.jetty.websocket.common.Parser; +import org.eclipse.jetty.websocket.common.WebSocketFrame; +import org.eclipse.jetty.websocket.common.frames.CloseFrame; +import org.eclipse.jetty.websocket.common.frames.TextFrame; +import org.eclipse.jetty.websocket.tests.ParserCapture; +import org.eclipse.jetty.websocket.tests.SimpleServletServer; +import org.eclipse.jetty.websocket.tests.servlets.EchoServlet; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +/** + * Test simulating a client that talks too quickly. + *

+ * This is mainly for the {@link org.eclipse.jetty.io.Connection.UpgradeTo} logic within + * the {@link org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection} implementation. + *

+ *

+ * There is a class of client that will send the GET+Upgrade Request along with a few websocket frames in a single + * network packet. This test attempts to perform this behavior as close as possible. + *

+ */ +public class ConnectionUpgradeToBufferTest +{ + private static SimpleServletServer server; + + @BeforeClass + public static void startServer() throws Exception + { + server = new SimpleServletServer(new EchoServlet()); + server.start(); + } + + @AfterClass + public static void stopServer() throws Exception + { + server.stop(); + } + + @Rule + public TestName testname = new TestName(); + + @Test + public void testUpgradeWithSmallFrames() throws Exception + { + ByteBuffer buf = ByteBuffer.allocate(4096); + + // Create Upgrade Request Header + StringBuilder upgradeRequest = new StringBuilder(); + upgradeRequest.append("GET / HTTP/1.1\r\n"); + upgradeRequest.append("Host: local\r\n"); + upgradeRequest.append("Connection: Upgrade\r\n"); + upgradeRequest.append("Upgrade: WebSocket\r\n"); + upgradeRequest.append("Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"); + upgradeRequest.append("Sec-WebSocket-Origin: ws://local/\r\n"); + upgradeRequest.append("Sec-WebSocket-Protocol: echo\r\n"); + upgradeRequest.append("Sec-WebSocket-Version: 13\r\n"); + upgradeRequest.append("\r\n"); + + ByteBuffer upgradeRequestBytes = BufferUtil.toBuffer(upgradeRequest.toString(), StandardCharsets.UTF_8); + BufferUtil.put(upgradeRequestBytes, buf); + + // Create A few WebSocket Frames + TextFrame frame1 = new TextFrame().setPayload("Hello 1"); + TextFrame frame2 = new TextFrame().setPayload("Hello 2"); + CloseFrame closeFrame = new CloseInfo(StatusCode.NORMAL).asFrame(); + + // Need to set frame mask (as these are client frames) + byte mask[] = new byte[]{0x11, 0x22, 0x33, 0x44}; + frame1.setMask(mask); + frame2.setMask(mask); + closeFrame.setMask(mask); + + ByteBufferPool bufferPool = new MappedByteBufferPool(); + + Generator generator = new Generator(WebSocketPolicy.newClientPolicy(), bufferPool); + generator.generateWholeFrame(frame1, buf); + generator.generateWholeFrame(frame2, buf); + generator.generateWholeFrame(closeFrame, buf); + + // Send this buffer to the server + BufferUtil.flipToFlush(buf, 0); + LocalConnector connector = server.getLocalConnector(); + LocalConnector.LocalEndPoint endPoint = connector.connect(); + endPoint.addInput(buf); + + // Get response + ByteBuffer response = endPoint.waitForResponse(false, 1, TimeUnit.SECONDS); + HttpTester.Response parsedResponse = HttpTester.parseResponse(response); + + assertThat("Is Switching Protocols", parsedResponse.getStatus(), is(101)); + assertThat("Is WebSocket Upgrade", parsedResponse.get("Upgrade"), is("WebSocket")); + + // Let server know that client is done sending + endPoint.addInputEOF(); + + // Wait for server to close + endPoint.waitUntilClosed(); + + // Get the server send echo bytes + ByteBuffer wsIncoming = endPoint.getOutput(); + + // Parse those bytes into frames + ParserCapture capture = new ParserCapture(); + Parser parser = new Parser(WebSocketPolicy.newClientPolicy(), bufferPool, capture); + parser.parse(wsIncoming); + + // Validate echoed frames + WebSocketFrame incomingFrame; + incomingFrame = capture.framesQueue.poll(1, TimeUnit.SECONDS); + assertThat("Incoming Frame.op", incomingFrame.getOpCode(), is(OpCode.TEXT)); + assertThat("Incoming Frame.payload", incomingFrame.getPayloadAsUTF8(), is("Hello 1")); + incomingFrame = capture.framesQueue.poll(1, TimeUnit.SECONDS); + assertThat("Incoming Frame.op", incomingFrame.getOpCode(), is(OpCode.TEXT)); + assertThat("Incoming Frame.payload", incomingFrame.getPayloadAsUTF8(), is("Hello 2")); + } +} diff --git a/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/TooFastClientTest.java b/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/TooFastClientTest.java deleted file mode 100644 index 3781bfb1257..00000000000 --- a/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/TooFastClientTest.java +++ /dev/null @@ -1,221 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// -// You may elect to redistribute this code under either of these licenses. -// ======================================================================== -// - -package org.eclipse.jetty.websocket.tests.server; - -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; - -import java.net.URI; -import java.nio.charset.StandardCharsets; -import java.util.Arrays; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - -import org.eclipse.jetty.websocket.api.Session; -import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; -import org.eclipse.jetty.websocket.client.WebSocketClient; -import org.eclipse.jetty.websocket.tests.Defaults; -import org.eclipse.jetty.websocket.tests.LeakTrackingBufferPoolRule; -import org.eclipse.jetty.websocket.tests.SimpleServletServer; -import org.eclipse.jetty.websocket.tests.TrackingEndpoint; -import org.eclipse.jetty.websocket.tests.servlets.EchoServlet; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; - -/** - * Test simulating a client that talks too quickly. - *

- * There is a class of client that will send the GET+Upgrade Request along with a few websocket frames in a single - * network packet. This test attempts to perform this behavior as close as possible. - */ -public class TooFastClientTest -{ - private static SimpleServletServer server; - - @BeforeClass - public static void startServer() throws Exception - { - server = new SimpleServletServer(new EchoServlet()); - server.start(); - } - - @AfterClass - public static void stopServer() throws Exception - { - server.stop(); - } - - @Rule - LeakTrackingBufferPoolRule bufferPool = new LeakTrackingBufferPoolRule(TooFastClientTest.class.getSimpleName()); - - @Rule - public TestName testname = new TestName(); - - private WebSocketClient client; - - @Before - public void startClient() throws Exception - { - client = new WebSocketClient(); - client.start(); - } - - @After - public void stopClient() throws Exception - { - client.stop(); - } - - @Test - public void testUpgradeWithSmallFrames() throws Exception - { - URI wsUri = server.getServerUri(); - - TrackingEndpoint clientSocket = new TrackingEndpoint(testname.getMethodName()); - ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest(); - - /* TODO - Generate the Request ByteBuffer. - Complete with .. - * A WebSocket Upgrade Request URI - * A WebSocket Upgrade Request Headers - * A few outgoing WebSocket frames - Send this ByteBuffer as the complete HTTP request bytebuffer. - - // Create ByteBuffer representing the initial opening network packet from the client - ByteBuffer initialPacket = ByteBuffer.allocate(4096); - BufferUtil.clearToFill(initialPacket); - - // Add upgrade request to packet - StringBuilder upgradeRequest = client.generateUpgradeRequest(); - ByteBuffer upgradeBuffer = BufferUtil.toBuffer(upgradeRequest.toString(), StandardCharsets.UTF_8); - initialPacket.put(upgradeBuffer); - - // Add text frames - Generator generator = new Generator(WebSocketPolicy.newClientPolicy(), bufferPool); - - String msg1 = "Echo 1"; - String msg2 = "This is also an echooooo!"; - - TextFrame frame1 = new TextFrame().setPayload(msg1); - TextFrame frame2 = new TextFrame().setPayload(msg2); - - // Need to set frame mask (as these are client frames) - byte mask[] = new byte[]{0x11, 0x22, 0x33, 0x44}; - frame1.setMask(mask); - frame2.setMask(mask); - - generator.generateWholeFrame(frame1, initialPacket); - generator.generateWholeFrame(frame2, initialPacket); - - // Write packet to network - BufferUtil.flipToFlush(initialPacket, 0); - client.writeRaw(initialPacket); - */ - - Future clientConnectFuture = client.connect(clientSocket, wsUri, upgradeRequest); - - // Expect upgrade success - Session clientSession = clientConnectFuture.get(Defaults.CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS); - - // Read incoming messages - String incomingMessage; - incomingMessage = clientSocket.messageQueue.poll(5, TimeUnit.SECONDS); - assertThat("Echoed Incoming Message 1", incomingMessage, is("Echo 1")); - incomingMessage = clientSocket.messageQueue.poll(5, TimeUnit.SECONDS); - assertThat("Echoed Incoming Message 2", incomingMessage, is("This is also an echooooo!")); - - clientSession.close(); - } - - /** - * Test where were a client sends a HTTP Upgrade to websocket AND enough websocket frame(s) - * to completely overfill the {@link org.eclipse.jetty.io.AbstractConnection#getInputBufferSize()} - * to test a situation where the WebSocket connection opens with prefill that exceeds - * the normal input buffer sizes. - * - * @throws Exception on test failure - */ - @Test - public void testUpgradeWithLargeFrame() throws Exception - { - URI wsUri = server.getServerUri(); - - TrackingEndpoint clientSocket = new TrackingEndpoint(testname.getMethodName()); - ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest(); - - byte bigMsgBytes[] = new byte[64 * 1024]; - Arrays.fill(bigMsgBytes, (byte) 'x'); - String bigMsg = new String(bigMsgBytes, StandardCharsets.UTF_8); - - /* TODO - Generate the Request ByteBuffer. - Complete with .. - * A WebSocket Upgrade Request URI - * A WebSocket Upgrade Request Headers - * A big enough outgoing WebSocket frame - that will trigger a prefill + an unread buffer - Send this ByteBuffer as the complete HTTP request bytebuffer. - - // Create ByteBuffer representing the initial opening network packet from the client - ByteBuffer initialPacket = ByteBuffer.allocate(100 * 1024); - BufferUtil.clearToFill(initialPacket); - - // Add upgrade request to packet - StringBuilder upgradeRequest = client.generateUpgradeRequest(); - ByteBuffer upgradeBuffer = BufferUtil.toBuffer(upgradeRequest.toString(), StandardCharsets.UTF_8); - initialPacket.put(upgradeBuffer); - - // Add text frames - Generator generator = new Generator(WebSocketPolicy.newClientPolicy(), bufferPool); - - // Need to set frame mask (as these are client frames) - byte mask[] = new byte[]{0x11, 0x22, 0x33, 0x44}; - TextFrame frame = new TextFrame().setPayload(bigMsg); - frame.setMask(mask); - generator.generateWholeFrame(frame, initialPacket); - - // Write packet to network - BufferUtil.flipToFlush(initialPacket, 0); - client.writeRaw(initialPacket); - - // Expect upgrade - client.expectUpgradeResponse(); - */ - - Future clientConnectFuture = client.connect(clientSocket, wsUri, upgradeRequest); - - // Expect upgrade success - Session clientSession = clientConnectFuture.get(Defaults.CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS); - - // Read incoming messages - String incomingMessage; - incomingMessage = clientSocket.messageQueue.poll(5, TimeUnit.SECONDS); - assertThat("Echoed Incoming Message 1", incomingMessage, is(bigMsg)); - - clientSession.close(); - } - - -}