From c2eff992b7d7ccb856f9f85aae26b592f159f7c6 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Fri, 13 Sep 2019 11:15:13 +1000 Subject: [PATCH] Issue #3982 - fix to WebSocket bytesIn for flaky WebSocketStatsTest Signed-off-by: Lachlan Roberts --- .../tests/WebSocketConnectionStatsTest.java | 179 ------------------ .../jetty/websocket/common/Parser.java | 10 - .../io/AbstractWebSocketConnection.java | 5 +- 3 files changed, 4 insertions(+), 190 deletions(-) delete mode 100644 jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/WebSocketConnectionStatsTest.java diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/WebSocketConnectionStatsTest.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/WebSocketConnectionStatsTest.java deleted file mode 100644 index 148720950c4..00000000000 --- a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/WebSocketConnectionStatsTest.java +++ /dev/null @@ -1,179 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// -// You may elect to redistribute this code under either of these licenses. -// ======================================================================== -// - -package org.eclipse.jetty.websocket.tests; - -import java.net.URI; -import java.nio.ByteBuffer; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - -import org.eclipse.jetty.io.ByteBufferPool; -import org.eclipse.jetty.io.Connection; -import org.eclipse.jetty.io.ConnectionStatistics; -import org.eclipse.jetty.io.MappedByteBufferPool; -import org.eclipse.jetty.server.HttpConnection; -import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.ServerConnector; -import org.eclipse.jetty.servlet.ServletContextHandler; -import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.websocket.api.Session; -import org.eclipse.jetty.websocket.api.StatusCode; -import org.eclipse.jetty.websocket.api.WebSocketPolicy; -import org.eclipse.jetty.websocket.client.WebSocketClient; -import org.eclipse.jetty.websocket.common.CloseInfo; -import org.eclipse.jetty.websocket.common.Generator; -import org.eclipse.jetty.websocket.common.WebSocketFrame; -import org.eclipse.jetty.websocket.common.frames.TextFrame; -import org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection; -import org.eclipse.jetty.websocket.servlet.WebSocketServlet; -import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.is; -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class WebSocketConnectionStatsTest -{ - public static class MyWebSocketServlet extends WebSocketServlet - { - @Override - public void configure(WebSocketServletFactory factory) - { - factory.setCreator((req, resp) -> new EchoSocket()); - } - } - - private Server server; - private ServerConnector connector; - private WebSocketClient client; - private ConnectionStatistics statistics; - private CountDownLatch wsUpgradeComplete = new CountDownLatch(1); - private CountDownLatch wsConnectionClosed = new CountDownLatch(1); - - @BeforeEach - public void start() throws Exception - { - statistics = new ConnectionStatistics() - { - @Override - public void onClosed(Connection connection) - { - super.onClosed(connection); - - if (connection instanceof AbstractWebSocketConnection) - wsConnectionClosed.countDown(); - else if (connection instanceof HttpConnection) - wsUpgradeComplete.countDown(); - } - }; - - server = new Server(); - connector = new ServerConnector(server); - connector.addBean(statistics); - server.addConnector(connector); - - ServletContextHandler contextHandler = new ServletContextHandler(ServletContextHandler.SESSIONS); - contextHandler.setContextPath("/"); - contextHandler.addServlet(MyWebSocketServlet.class, "/testPath"); - server.setHandler(contextHandler); - - client = new WebSocketClient(); - - server.start(); - client.start(); - } - - @AfterEach - public void stop() throws Exception - { - client.stop(); - server.stop(); - } - - long getFrameByteSize(WebSocketFrame frame) - { - ByteBufferPool bufferPool = new MappedByteBufferPool(); - Generator generator = new Generator(WebSocketPolicy.newClientPolicy(), bufferPool); - ByteBuffer buffer = bufferPool.acquire(frame.getPayloadLength() + 10, true); - int pos = BufferUtil.flipToFill(buffer); - generator.generateWholeFrame(frame, buffer); - return buffer.position() - pos; - } - - @Disabled("Flaky test see issue #3982") - @Test - public void echoStatsTest() throws Exception - { - URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + "/testPath"); - EventSocket socket = new EventSocket(); - Future connect = client.connect(socket, uri); - - final long numMessages = 10000; - final String msgText = "hello world"; - - long upgradeSentBytes; - long upgradeReceivedBytes; - - try (Session session = connect.get(5, TimeUnit.SECONDS)) - { - wsUpgradeComplete.await(5, TimeUnit.SECONDS); - upgradeSentBytes = statistics.getSentBytes(); - upgradeReceivedBytes = statistics.getReceivedBytes(); - - for (int i = 0; i < numMessages; i++) - { - session.getRemote().sendString(msgText); - } - session.close(StatusCode.NORMAL, null); - - assertTrue(socket.closed.await(5, TimeUnit.SECONDS)); - assertTrue(wsConnectionClosed.await(5, TimeUnit.SECONDS)); - } - - assertThat(statistics.getConnectionsMax(), is(1L)); - assertThat(statistics.getConnections(), is(0L)); - - assertThat(statistics.getSentMessages(), is(numMessages + 2L)); - assertThat(statistics.getReceivedMessages(), is(numMessages + 2L)); - - WebSocketFrame textFrame = new TextFrame().setPayload(msgText); - WebSocketFrame closeFrame = new CloseInfo(socket.closeCode, socket.closeReason).asFrame(); - - final long textFrameSize = getFrameByteSize(textFrame); - final long closeFrameSize = getFrameByteSize(closeFrame); - final int maskSize = 4; // We use 4 byte mask for client frames - - // Pointless Sanity Checks - // assertThat("Upgrade Sent Bytes", upgradeSentBytes, is(197L)); - // assertThat("Upgrade Received Bytes", upgradeReceivedBytes, is(261L)); - // assertThat("Text Frame Size", textFrameSize, is(13L)); - // assertThat("Close Frame Size", closeFrameSize, is(4L)); - - final long expectedSent = upgradeSentBytes + numMessages * textFrameSize + closeFrameSize; - final long expectedReceived = upgradeReceivedBytes + numMessages * (textFrameSize + maskSize) + closeFrameSize + maskSize; - - assertThat("stats.sendBytes", statistics.getSentBytes(), is(expectedSent)); - assertThat("stats.receivedBytes", statistics.getReceivedBytes(), is(expectedReceived)); - } -} diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/Parser.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/Parser.java index e6d1981b87c..853dd2c4486 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/Parser.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/Parser.java @@ -65,7 +65,6 @@ public class Parser // Stats (where a message is defined as a WebSocket frame) private final LongAdder messagesIn = new LongAdder(); - private final LongAdder bytesIn = new LongAdder(); // State specific private State state = State.START; @@ -250,8 +249,6 @@ public class Parser try { - int startingBytes = buffer.remaining(); - // attempt to parse a frame from the buffer if (parseFrame(buffer)) { @@ -266,8 +263,6 @@ public class Parser } reset(); } - - bytesIn.add(startingBytes - buffer.remaining()); } catch (Throwable t) { @@ -657,11 +652,6 @@ public class Parser return messagesIn.longValue(); } - public long getBytesIn() - { - return bytesIn.longValue(); - } - @Override public String toString() { diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java index 003627a6b04..de24fd7af75 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.AbstractEndPoint; @@ -139,6 +140,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp private final ConnectionState connectionState = new ConnectionState(); private final FrameFlusher flusher; private final String id; + private final LongAdder bytesIn = new LongAdder(); private WebSocketSession session; private List extensions = new ArrayList<>(); private ByteBuffer prefillBuffer; @@ -472,6 +474,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp return; } + bytesIn.add(filled); if (LOG.isDebugEnabled()) LOG.debug("Filled {} bytes - {}", filled, BufferUtil.toDetailString(buffer)); } @@ -668,7 +671,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp @Override public long getBytesIn() { - return parser.getBytesIn(); + return bytesIn.longValue(); } /**