Issue #3476 - previous WebSocketSessions being added to new connections

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2019-04-16 13:33:37 +10:00
parent 64549fb19c
commit aeb9d2c5dc
3 changed files with 237 additions and 1 deletions

View File

@ -0,0 +1,126 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.tests;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ConcurrentConnectTest
{
private static final int MAX_CONNECTIONS = 150;
private Server server;
private WebSocketClient client;
private URI uri;
@BeforeEach
public void start() throws Exception
{
server = new Server();
ServerConnector connector = new ServerConnector(server);
connector.setPort(0);
server.addConnector(connector);
ServletContextHandler context = new ServletContextHandler();
context.setContextPath("/");
context.addServlet(MyWebSocketServlet.class, "/");
server.setHandler(context);
server.start();
uri = new URI("ws://localhost:" + connector.getLocalPort());
client = new WebSocketClient();
client.getHttpClient().setMaxConnectionsPerDestination(MAX_CONNECTIONS);
client.start();
}
@AfterEach
public void stop() throws Exception
{
client.stop();
server.stop();
}
@Test
public void testConcurrentConnect() throws Exception
{
List<EventSocket> listeners = new ArrayList();
final int messages = MAX_CONNECTIONS;
for (int i=0; i<messages; i++)
{
try
{
EventSocket wsListener = new EventSocket();
listeners.add(wsListener);
client.connect(wsListener, uri);
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}
for (EventSocket l : listeners)
{
assertTrue(l.open.await(5, TimeUnit.SECONDS));
}
for (EventSocket l : listeners)
{
l.getSession().getRemote().sendString("ping");
assertThat(l.receivedMessages.poll(5,TimeUnit.SECONDS), is("ping"));
l.getSession().close(StatusCode.NORMAL, "close from client");
}
for (EventSocket l : listeners)
{
assertTrue(l.closed.await(5, TimeUnit.SECONDS));
assertThat(l.closeCode, is(StatusCode.NORMAL));
assertThat(l.closeReason, is("close from client"));
//assertNull(l.failure); //TODO: we can get failures after close??
}
}
public static class MyWebSocketServlet extends WebSocketServlet
{
@Override
public void configure(WebSocketServletFactory factory)
{
factory.register(EventSocket.EchoSocket.class);
}
}
}

View File

@ -0,0 +1,106 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.tests;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
@WebSocket
public class EventSocket
{
private static Logger LOG = Log.getLogger(EventSocket.class);
protected Session session;
private String behavior;
public volatile Throwable failure = null;
public volatile int closeCode = -1;
public volatile String closeReason = null;
public BlockingQueue<String> receivedMessages = new BlockingArrayQueue<>();
public CountDownLatch open = new CountDownLatch(1);
public CountDownLatch error = new CountDownLatch(1);
public CountDownLatch closed = new CountDownLatch(1);
public Session getSession()
{
return session;
}
@OnWebSocketConnect
public void onOpen(Session session)
{
this.session = session;
behavior = session.getPolicy().getBehavior().name();
LOG.info("{} onOpen(): {}", toString(), session);
open.countDown();
}
@OnWebSocketMessage
public void onMessage(String message) throws IOException
{
LOG.info("{} onMessage(): {}", toString(), message);
receivedMessages.offer(message);
}
@OnWebSocketClose
public void onClose(int statusCode, String reason)
{
LOG.debug("{} onClose(): {}:{}", toString(), statusCode, reason);
closeCode = statusCode;
closeReason = reason;
closed.countDown();
}
@OnWebSocketError
public void onError(Throwable cause)
{
LOG.info("{} onError(): {}", toString(), cause);
failure = cause;
error.countDown();
}
@Override
public String toString()
{
return String.format("[%s@%s]", behavior, Integer.toHexString(hashCode()));
}
@WebSocket
public static class EchoSocket extends EventSocket
{
@Override
public void onMessage(String message) throws IOException
{
super.onMessage(message);
session.getRemote().sendStringByFuture(message);
}
}
}

View File

@ -582,7 +582,11 @@ public class WebSocketUpgradeRequest extends HttpRequest implements CompleteList
if (connectionListeners != null)
{
connectionListeners.forEach((listener) -> connection.addListener(listener));
connectionListeners.forEach((listener) ->
{
if (!(listener instanceof WebSocketSession))
connection.addListener(listener);
});
}
URI requestURI = this.getURI();