Issue #83 - reworked SessionTrackingTest to fix intermittent failures

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2019-05-22 17:16:52 +10:00
parent a2465234c6
commit 52cc4f6c22
3 changed files with 170 additions and 41 deletions

View File

@ -0,0 +1,80 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.javax.tests;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import javax.websocket.ClientEndpoint;
import javax.websocket.CloseReason;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ServerEndpoint("/")
@ClientEndpoint
public class EventSocket
{
private final static Logger LOG = Log.getLogger(EventSocket.class);
public Session session;
public BlockingQueue<String> messageQueue = new BlockingArrayQueue<>();
public volatile Throwable error = null;
public CountDownLatch openLatch = new CountDownLatch(1);
public CountDownLatch closeLatch = new CountDownLatch(1);
@OnOpen
public void onOpen(Session session)
{
this.session = session;
LOG.info("{} onOpen(): {}", toString(), session);
openLatch.countDown();
}
@OnMessage
public void onMessage(String message) throws IOException
{
LOG.info("{} onMessage(): {}", toString(), message);
messageQueue.offer(message);
}
@OnClose
public void onClose(CloseReason reason)
{
LOG.info("{} onClose(): {}", toString(), reason);
closeLatch.countDown();
}
@OnError
public void onError(Throwable cause)
{
LOG.info("{} onError(): {}", toString(), cause);
error = cause;
}
}

View File

@ -43,12 +43,15 @@ import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils; import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.websocket.core.internal.Parser; import org.eclipse.jetty.websocket.core.internal.Parser;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketSession;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketSessionListener;
import org.eclipse.jetty.websocket.javax.server.JavaxWebSocketServerContainer; import org.eclipse.jetty.websocket.javax.server.JavaxWebSocketServerContainer;
import org.eclipse.jetty.websocket.javax.server.JavaxWebSocketServerFrameHandlerFactory; import org.eclipse.jetty.websocket.javax.server.JavaxWebSocketServerFrameHandlerFactory;
import org.eclipse.jetty.websocket.javax.server.JavaxWebSocketServletContainerInitializer; import org.eclipse.jetty.websocket.javax.server.JavaxWebSocketServletContainerInitializer;
@ -75,7 +78,8 @@ public class LocalServer extends ContainerLifeCycle implements LocalFuzzer.Provi
private ServerConnector connector; private ServerConnector connector;
private LocalConnector localConnector; private LocalConnector localConnector;
private ServletContextHandler servletContextHandler; private ServletContextHandler servletContextHandler;
private ServerContainer serverContainer; private JavaxWebSocketServerContainer serverContainer;
private TrackingListener trackingListener = new TrackingListener();
private URI serverUri; private URI serverUri;
private URI wsUri; private URI wsUri;
private boolean ssl = false; private boolean ssl = false;
@ -165,6 +169,7 @@ public class LocalServer extends ContainerLifeCycle implements LocalFuzzer.Provi
servletContextHandler = new ServletContextHandler(server, "/", true, false); servletContextHandler = new ServletContextHandler(server, "/", true, false);
servletContextHandler.setContextPath("/"); servletContextHandler.setContextPath("/");
serverContainer = JavaxWebSocketServletContainerInitializer.configureContext(servletContextHandler); serverContainer = JavaxWebSocketServletContainerInitializer.configureContext(servletContextHandler);
serverContainer.addSessionListener(trackingListener);
configureServletContextHandler(servletContextHandler); configureServletContextHandler(servletContextHandler);
return servletContextHandler; return servletContextHandler;
} }
@ -286,4 +291,37 @@ public class LocalServer extends ContainerLifeCycle implements LocalFuzzer.Provi
{ {
return server; return server;
} }
public TrackingListener getTrackingListener()
{
return trackingListener;
}
public static class TrackingListener implements JavaxWebSocketSessionListener
{
private BlockingArrayQueue<JavaxWebSocketSession> openedSessions = new BlockingArrayQueue<>();
private BlockingArrayQueue<JavaxWebSocketSession> closedSessions = new BlockingArrayQueue<>();
@Override
public void onJavaxWebSocketSessionOpened(JavaxWebSocketSession session)
{
openedSessions.offer(session);
}
@Override
public void onJavaxWebSocketSessionClosed(JavaxWebSocketSession session)
{
closedSessions.offer(session);
}
public BlockingArrayQueue<JavaxWebSocketSession> getOpenedSessions()
{
return openedSessions;
}
public BlockingArrayQueue<JavaxWebSocketSession> getClosedSessions()
{
return closedSessions;
}
}
} }

View File

@ -19,10 +19,7 @@
package org.eclipse.jetty.websocket.javax.tests.server; package org.eclipse.jetty.websocket.javax.tests.server;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.websocket.OnMessage; import javax.websocket.OnMessage;
@ -31,19 +28,21 @@ import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint; import javax.websocket.server.ServerEndpoint;
import org.eclipse.jetty.util.BlockingArrayQueue; import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.websocket.core.Frame; import org.eclipse.jetty.websocket.javax.client.JavaxWebSocketClientContainer;
import org.eclipse.jetty.websocket.core.OpCode; import org.eclipse.jetty.websocket.javax.tests.EventSocket;
import org.eclipse.jetty.websocket.javax.tests.Fuzzer;
import org.eclipse.jetty.websocket.javax.tests.LocalServer; import org.eclipse.jetty.websocket.javax.tests.LocalServer;
import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class SessionTrackingTest public class SessionTrackingTest
{ {
static BlockingArrayQueue<Session> serverSessions = new BlockingArrayQueue<>(); static BlockingArrayQueue<Session> serverSessions = new BlockingArrayQueue<>();
@ServerEndpoint("/session-info/{sessionId}") @ServerEndpoint("/session-info/{sessionId}")
@ -79,6 +78,7 @@ public class SessionTrackingTest
} }
private static LocalServer server; private static LocalServer server;
private static JavaxWebSocketClientContainer client;
@BeforeAll @BeforeAll
public static void startServer() throws Exception public static void startServer() throws Exception
@ -86,72 +86,83 @@ public class SessionTrackingTest
server = new LocalServer(); server = new LocalServer();
server.start(); server.start();
server.getServerContainer().addEndpoint(SessionTrackingSocket.class); server.getServerContainer().addEndpoint(SessionTrackingSocket.class);
client = new JavaxWebSocketClientContainer();
client.start();
} }
@AfterAll @AfterAll
public static void stopServer() throws Exception public static void stopServer() throws Exception
{ {
client.stop();
server.stop(); server.stop();
} }
@Test @Test
public void testAddRemoveSessions() throws Exception public void testAddRemoveSessions() throws Exception
{ {
List<Frame> expectedFrames = new ArrayList<>(); EventSocket clientSocket1 = new EventSocket();
EventSocket clientSocket2 = new EventSocket();
EventSocket clientSocket3 = new EventSocket();
try (Fuzzer session1 = server.newNetworkFuzzer("/session-info/1")) try (Session session1 = client.connectToServer(clientSocket1, server.getWsUri().resolve("/session-info/1")))
{ {
assertNotNull(serverSessions.poll(10, TimeUnit.SECONDS)); Session serverSession1 = serverSessions.poll(5, TimeUnit.SECONDS);
expectedFrames.clear(); assertNotNull(serverSession1);
sendTextFrameToAll("openSessions|in-1", session1); sendTextFrameToAll("openSessions|in-1", session1);
session1.expect(Arrays.asList(new Frame(OpCode.TEXT).setPayload("openSessions(@in-1).size=1"))); assertThat(clientSocket1.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@in-1).size=1"));
try (Fuzzer session2 = server.newNetworkFuzzer("/session-info/2")) try (Session session2 = client.connectToServer(clientSocket2, server.getWsUri().resolve("/session-info/2")))
{ {
assertNotNull(serverSessions.poll(10, TimeUnit.SECONDS)); Session serverSession2 = serverSessions.poll(5, TimeUnit.SECONDS);
expectedFrames.clear(); assertNotNull(serverSession2);
sendTextFrameToAll("openSessions|in-2", session1, session2); sendTextFrameToAll("openSessions|in-2", session1, session2);
session1.expect(Arrays.asList(new Frame(OpCode.TEXT).setPayload("openSessions(@in-2).size=2"))); assertThat(clientSocket1.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@in-2).size=2"));
session2.expect(Arrays.asList(new Frame(OpCode.TEXT).setPayload("openSessions(@in-2).size=2"))); assertThat(clientSocket2.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@in-2).size=2"));
try (Fuzzer session3 = server.newNetworkFuzzer("/session-info/3")) try (Session session3 = client.connectToServer(clientSocket3, server.getWsUri().resolve("/session-info/3")))
{ {
assertNotNull(serverSessions.poll(10, TimeUnit.SECONDS)); Session serverSession3 = serverSessions.poll(5, TimeUnit.SECONDS);
assertNotNull(serverSession3);
sendTextFrameToAll("openSessions|in-3", session1, session2, session3); sendTextFrameToAll("openSessions|in-3", session1, session2, session3);
assertThat(clientSocket1.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@in-3).size=3"));
assertThat(clientSocket2.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@in-3).size=3"));
assertThat(clientSocket3.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@in-3).size=3"));
sendTextFrameToAll("openSessions|lvl-3", session1, session2, session3); sendTextFrameToAll("openSessions|lvl-3", session1, session2, session3);
assertThat(clientSocket1.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@lvl-3).size=3"));
assertThat(clientSocket2.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@lvl-3).size=3"));
assertThat(clientSocket3.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@lvl-3).size=3"));
expectedFrames.clear(); // assert session is closed, and we have received the notification from the SessionListener
expectedFrames.add(new Frame(OpCode.TEXT).setPayload("openSessions(@in-3).size=3")); session3.close();
expectedFrames.add(new Frame(OpCode.TEXT).setPayload("openSessions(@lvl-3).size=3")); assertThat(server.getTrackingListener().getClosedSessions().poll(5, TimeUnit.SECONDS), sameInstance(serverSession3));
session1.expect(expectedFrames); assertTrue(clientSocket3.closeLatch.await(5, TimeUnit.SECONDS));
session2.expect(expectedFrames);
session3.expect(expectedFrames);
session3.sendFrames(new Frame(OpCode.CLOSE));
session3.expect(Arrays.asList(new Frame(OpCode.CLOSE)));
} }
sendTextFrameToAll("openSessions|lvl-2", session1, session2); sendTextFrameToAll("openSessions|lvl-2", session1, session2);
session1.expect(Arrays.asList(new Frame(OpCode.TEXT).setPayload("openSessions(@lvl-2).size=2"))); assertThat(clientSocket1.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@lvl-2).size=2"));
session2.expect(Arrays.asList(new Frame(OpCode.TEXT).setPayload("openSessions(@lvl-2).size=2"))); assertThat(clientSocket2.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@lvl-2).size=2"));
session2.sendFrames(new Frame(OpCode.CLOSE)); // assert session is closed, and we have received the notification from the SessionListener
session2.expect(Arrays.asList(new Frame(OpCode.CLOSE))); session2.close();
assertThat(server.getTrackingListener().getClosedSessions().poll(5, TimeUnit.SECONDS), sameInstance(serverSession2));
assertTrue(clientSocket2.closeLatch.await(5, TimeUnit.SECONDS));
} }
sendTextFrameToAll("openSessions|lvl-1", session1); sendTextFrameToAll("openSessions|lvl-1", session1);
session1.sendFrames(new Frame(OpCode.CLOSE)); assertThat(clientSocket1.messageQueue.poll(5, TimeUnit.SECONDS), is("openSessions(@lvl-1).size=1"));
expectedFrames.clear(); // assert session is closed, and we have received the notification from the SessionListener
expectedFrames.add(new Frame(OpCode.TEXT).setPayload("openSessions(@lvl-1).size=1")); session1.close();
expectedFrames.add(new Frame(OpCode.CLOSE)); assertThat(server.getTrackingListener().getClosedSessions().poll(5, TimeUnit.SECONDS), sameInstance(serverSession1));
session1.expect(expectedFrames); assertTrue(clientSocket1.closeLatch.await(5, TimeUnit.SECONDS));
} }
} }
private void sendTextFrameToAll(String msg, Fuzzer... sessions) throws IOException private static void sendTextFrameToAll(String msg, Session... sessions) throws IOException
{ {
for (Fuzzer session : sessions) for (Session session : sessions)
session.sendFrames(new Frame(OpCode.TEXT).setPayload(msg)); session.getBasicRemote().sendText(msg);
} }
} }