From ae62180416226dcb4f7d1442387c5123f5bee092 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Mon, 31 Aug 2020 18:29:28 +1000 Subject: [PATCH] Issue #5170 - ensure bytes after 101 response isn't lost during upgrade Signed-off-by: Lachlan Roberts --- .../client/http/HttpReceiverOverHTTP.java | 11 +- .../core/internal/WebSocketConnection.java | 9 +- .../websocket/core/TestMessageHandler.java | 4 + .../UpgradeWithLeftOverHttpBytesTest.java | 134 ++++++++++++++++++ 4 files changed, 151 insertions(+), 7 deletions(-) create mode 100644 jetty-websocket/websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/UpgradeWithLeftOverHttpBytesTest.java diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java index 75bf4a53ea0..57b15e614fc 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java @@ -141,6 +141,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res BufferUtil.flipToFlush(upgradeBuffer, 0); return upgradeBuffer; } + releaseNetworkBuffer(); return null; } @@ -160,12 +161,11 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res return; } - // Connection may be closed or upgraded in a parser callback. - boolean upgraded = connection != endPoint.getConnection(); - if (connection.isClosed() || upgraded) + // Connection may be closed in a parser callback. + if (connection.isClosed()) { if (LOG.isDebugEnabled()) - LOG.debug("{} {}", upgraded ? "Upgraded" : "Closed", connection); + LOG.debug("Closed {}", connection); releaseNetworkBuffer(); return; } @@ -235,6 +235,9 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res if (complete) { + if (getHttpExchange().getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101) + return true; + if (LOG.isDebugEnabled()) LOG.debug("Discarding unexpected content after response: {}", networkBuffer); networkBuffer.clear(); diff --git a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketConnection.java b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketConnection.java index c34e86c4e80..1c2009634ad 100644 --- a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketConnection.java +++ b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketConnection.java @@ -469,9 +469,12 @@ public class WebSocketConnection extends AbstractConnection implements Connectio } catch (Throwable t) { - LOG.warn(t.toString()); - BufferUtil.clear(networkBuffer.getBuffer()); - releaseNetworkBuffer(); + LOG.warn("Error during fillAndParse()", t); + if (networkBuffer != null) + { + BufferUtil.clear(networkBuffer.getBuffer()); + releaseNetworkBuffer(); + } coreSession.processConnectionError(t, Callback.NOOP); } } diff --git a/jetty-websocket/websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/TestMessageHandler.java b/jetty-websocket/websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/TestMessageHandler.java index ca1816580c0..626c8c70ef2 100644 --- a/jetty-websocket/websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/TestMessageHandler.java +++ b/jetty-websocket/websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/TestMessageHandler.java @@ -34,6 +34,7 @@ public class TestMessageHandler extends MessageHandler public CoreSession coreSession; public BlockingQueue textMessages = new BlockingArrayQueue<>(); public BlockingQueue binaryMessages = new BlockingArrayQueue<>(); + public CloseStatus closeStatus; public volatile Throwable error; public CountDownLatch openLatch = new CountDownLatch(1); public CountDownLatch errorLatch = new CountDownLatch(1); @@ -73,6 +74,7 @@ public class TestMessageHandler extends MessageHandler if (LOG.isDebugEnabled()) LOG.debug("onClosed {}", closeStatus); super.onClosed(closeStatus, callback); + this.closeStatus = closeStatus; closeLatch.countDown(); } @@ -82,6 +84,7 @@ public class TestMessageHandler extends MessageHandler if (LOG.isDebugEnabled()) LOG.debug("onText {}", message); textMessages.offer(message); + callback.succeeded(); } @Override @@ -90,5 +93,6 @@ public class TestMessageHandler extends MessageHandler if (LOG.isDebugEnabled()) LOG.debug("onBinary {}", message); binaryMessages.offer(message); + callback.succeeded(); } } diff --git a/jetty-websocket/websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/UpgradeWithLeftOverHttpBytesTest.java b/jetty-websocket/websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/UpgradeWithLeftOverHttpBytesTest.java new file mode 100644 index 00000000000..c53d01d37b0 --- /dev/null +++ b/jetty-websocket/websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/UpgradeWithLeftOverHttpBytesTest.java @@ -0,0 +1,134 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under +// the terms of the Eclipse Public License 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0 +// +// This Source Code may also be made available under the following +// Secondary Licenses when the conditions for such availability set +// forth in the Eclipse Public License, v. 2.0 are satisfied: +// the Apache License v2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0 +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.websocket.core; + +import java.io.ByteArrayOutputStream; +import java.io.InputStream; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Scanner; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.StringUtil; +import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient; +import org.eclipse.jetty.websocket.core.internal.Generator; +import org.eclipse.jetty.websocket.core.internal.WebSocketCore; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class UpgradeWithLeftOverHttpBytesTest extends WebSocketTester +{ + private ServerSocket server; + private URI serverUri; + private WebSocketCoreClient client; + private final Generator generator = new Generator(); + + @BeforeEach + public void start() throws Exception + { + client = new WebSocketCoreClient(); + client.getHttpClient().setIdleTimeout(5000); + client.start(); + server = new ServerSocket(0); + serverUri = URI.create("ws://localhost:" + server.getLocalPort()); + } + + @AfterEach + public void stop() throws Exception + { + client.stop(); + server.close(); + } + + @Test + public void testUpgradeWithLeftOverHttpBytes() throws Exception + { + TestMessageHandler clientEndpoint = new TestMessageHandler(); + CompletableFuture clientConnect = client.connect(clientEndpoint, serverUri); + Socket serverSocket = server.accept(); + + String upgradeRequest = getRequestHeaders(serverSocket.getInputStream()); + assertThat(upgradeRequest, containsString("HTTP/1.1")); + assertThat(upgradeRequest, containsString("Upgrade: websocket")); + + // Send upgrade response in the same write as two websocket frames. + String upgradeResponse = "HTTP/1.1 101 Switching Protocols\n" + + "Upgrade: WebSocket\n" + + "Connection: Upgrade\n" + + "Sec-WebSocket-Accept: " + getAcceptKey(upgradeRequest) + "\n" + + "\n"; + + Frame dataFrame = new Frame(OpCode.TEXT, BufferUtil.toBuffer("first message payload")); + Frame closeFrame = new CloseStatus(CloseStatus.NORMAL, "closed by test").toFrame(); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + baos.write(upgradeResponse.getBytes(StandardCharsets.ISO_8859_1)); + BufferUtil.writeTo(generateFrame(dataFrame), baos); + BufferUtil.writeTo(generateFrame(closeFrame), baos); + serverSocket.getOutputStream().write(baos.toByteArray()); + + // Check the client receives upgrade response and then the two websocket frames. + CoreSession coreSession = clientConnect.get(5, TimeUnit.SECONDS); + assertNotNull(coreSession); + assertTrue(clientEndpoint.openLatch.await(5, TimeUnit.SECONDS)); + assertThat(clientEndpoint.textMessages.poll(5, TimeUnit.SECONDS), is("first message payload")); + assertTrue(clientEndpoint.closeLatch.await(5, TimeUnit.SECONDS)); + assertThat(clientEndpoint.closeStatus.getCode(), is(CloseStatus.NORMAL)); + assertThat(clientEndpoint.closeStatus.getReason(), is("closed by test")); + } + + public ByteBuffer generateFrame(Frame frame) + { + int size = Generator.MAX_HEADER_LENGTH + frame.getPayloadLength(); + ByteBuffer buffer = BufferUtil.allocate(size); + generator.generateWholeFrame(frame, buffer); + return buffer; + } + + String getAcceptKey(String upgradeRequest) + { + Matcher matcher = Pattern.compile(".*Sec-WebSocket-Key: ([^\n\r]+)\r?\n.*", Pattern.DOTALL | Pattern.MULTILINE) + .matcher(upgradeRequest); + assertTrue(matcher.matches()); + String key = matcher.group(1); + assertFalse(StringUtil.isEmpty(key)); + return WebSocketCore.hashKey(key); + } + + static String getRequestHeaders(InputStream is) + { + Scanner s = new Scanner(is).useDelimiter("\r\n\r\n"); + return s.hasNext() ? s.next() : ""; + } +}