From aeb9d2c5dc961ce639701d3b8f0794be86798986 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Tue, 16 Apr 2019 13:33:37 +1000 Subject: [PATCH] Issue #3476 - previous WebSocketSessions being added to new connections Signed-off-by: Lachlan Roberts --- .../tests/ConcurrentConnectTest.java | 126 ++++++++++++++++++ .../jetty/websocket/tests/EventSocket.java | 106 +++++++++++++++ .../client/WebSocketUpgradeRequest.java | 6 +- 3 files changed, 237 insertions(+), 1 deletion(-) create mode 100644 jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/ConcurrentConnectTest.java create mode 100644 jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/EventSocket.java diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/ConcurrentConnectTest.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/ConcurrentConnectTest.java new file mode 100644 index 00000000000..aca32fc1d99 --- /dev/null +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/ConcurrentConnectTest.java @@ -0,0 +1,126 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests; + +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.websocket.api.StatusCode; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.eclipse.jetty.websocket.servlet.WebSocketServlet; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ConcurrentConnectTest +{ + private static final int MAX_CONNECTIONS = 150; + + private Server server; + private WebSocketClient client; + private URI uri; + + @BeforeEach + public void start() throws Exception + { + server = new Server(); + ServerConnector connector = new ServerConnector(server); + connector.setPort(0); + server.addConnector(connector); + + ServletContextHandler context = new ServletContextHandler(); + context.setContextPath("/"); + context.addServlet(MyWebSocketServlet.class, "/"); + server.setHandler(context); + + server.start(); + uri = new URI("ws://localhost:" + connector.getLocalPort()); + + client = new WebSocketClient(); + client.getHttpClient().setMaxConnectionsPerDestination(MAX_CONNECTIONS); + client.start(); + } + + @AfterEach + public void stop() throws Exception + { + client.stop(); + server.stop(); + } + + @Test + public void testConcurrentConnect() throws Exception + { + List listeners = new ArrayList(); + final int messages = MAX_CONNECTIONS; + + for (int i=0; i receivedMessages = new BlockingArrayQueue<>(); + + public CountDownLatch open = new CountDownLatch(1); + public CountDownLatch error = new CountDownLatch(1); + public CountDownLatch closed = new CountDownLatch(1); + + public Session getSession() + { + return session; + } + + @OnWebSocketConnect + public void onOpen(Session session) + { + this.session = session; + behavior = session.getPolicy().getBehavior().name(); + LOG.info("{} onOpen(): {}", toString(), session); + open.countDown(); + } + + @OnWebSocketMessage + public void onMessage(String message) throws IOException + { + LOG.info("{} onMessage(): {}", toString(), message); + receivedMessages.offer(message); + } + + @OnWebSocketClose + public void onClose(int statusCode, String reason) + { + LOG.debug("{} onClose(): {}:{}", toString(), statusCode, reason); + closeCode = statusCode; + closeReason = reason; + closed.countDown(); + } + + @OnWebSocketError + public void onError(Throwable cause) + { + LOG.info("{} onError(): {}", toString(), cause); + failure = cause; + error.countDown(); + } + + @Override + public String toString() + { + return String.format("[%s@%s]", behavior, Integer.toHexString(hashCode())); + } + + @WebSocket + public static class EchoSocket extends EventSocket + { + @Override + public void onMessage(String message) throws IOException + { + super.onMessage(message); + session.getRemote().sendStringByFuture(message); + } + } +} diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketUpgradeRequest.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketUpgradeRequest.java index e42c776f2a1..48bb8ef87b4 100644 --- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketUpgradeRequest.java +++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketUpgradeRequest.java @@ -582,7 +582,11 @@ public class WebSocketUpgradeRequest extends HttpRequest implements CompleteList if (connectionListeners != null) { - connectionListeners.forEach((listener) -> connection.addListener(listener)); + connectionListeners.forEach((listener) -> + { + if (!(listener instanceof WebSocketSession)) + connection.addListener(listener); + }); } URI requestURI = this.getURI();