Revert "jetty-websocket-tests code cleanup"

This reverts commit 044a899a99.
This commit is contained in:
Lachlan Roberts 2019-11-19 19:14:54 +11:00
parent 6db5f3be8d
commit cef80aca2e
7 changed files with 93 additions and 172 deletions

View File

@ -69,7 +69,7 @@ public class ConcurrentConnectTest
@Override
public void configure(WebSocketServletFactory factory)
{
factory.register(EchoSocket.class);
factory.register(EventSocket.EchoSocket.class);
serverFactory = (WebSocketServerFactory)factory;
}
};
@ -116,19 +116,19 @@ public class ConcurrentConnectTest
for (EventSocket l : listeners)
{
assertTrue(l.openLatch.await(5, TimeUnit.SECONDS));
assertTrue(l.open.await(5, TimeUnit.SECONDS));
}
for (EventSocket l : listeners)
{
l.getSession().getRemote().sendString("ping");
assertThat(l.textMessages.poll(5, TimeUnit.SECONDS), is("ping"));
assertThat(l.receivedMessages.poll(5, TimeUnit.SECONDS), is("ping"));
l.getSession().close(StatusCode.NORMAL, "close from client");
}
for (EventSocket l : listeners)
{
assertTrue(l.closeLatch.await(5, TimeUnit.SECONDS));
assertTrue(l.closed.await(5, TimeUnit.SECONDS));
assertThat(l.closeCode, is(StatusCode.NORMAL));
assertThat(l.closeReason, is("close from client"));
assertNull(l.failure);

View File

@ -18,21 +18,26 @@
package org.eclipse.jetty.websocket.tests;
import java.io.IOException;
import java.nio.ByteBuffer;
public class EchoSocket extends EventSocket
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
@SuppressWarnings("unused")
@WebSocket
public class EchoSocket
{
@Override
public void onWebSocketText(String message)
@OnWebSocketMessage
public void onMessage(Session session, String msg) throws IOException
{
super.onWebSocketText(message);
session.getRemote().sendString(message, null);
session.getRemote().sendString(msg);
}
@Override
public void onWebSocketBinary(byte[] payload, int offset, int length)
@OnWebSocketMessage
public void onBinaryMessage(Session session, byte[] data, int offset, int len) throws IOException
{
super.onWebSocketBinary(payload, offset, length);
session.getRemote().sendBytes(ByteBuffer.wrap(payload, offset, length), null);
session.getRemote().sendBytes(ByteBuffer.wrap(data, offset, len));
}
}

View File

@ -1,97 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.tests;
import java.net.URI;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class EchoTest
{
public class UpgradeServlet extends WebSocketServlet
{
@Override
public void configure(WebSocketServletFactory factory)
{
factory.setCreator(((req, resp) -> serverSocket));
}
}
private Server server = new Server();
private WebSocketClient client = new WebSocketClient();
private EventSocket serverSocket;
private URI serverUri;
public void start(EventSocket serverSocket) throws Exception
{
this.serverSocket = serverSocket;
ServerConnector connector = new ServerConnector(server);
server.addConnector(connector);
ServletContextHandler contextHandler = new ServletContextHandler(ServletContextHandler.SESSIONS);
contextHandler.setContextPath("/");
contextHandler.addServlet(new ServletHolder(new UpgradeServlet()), "/");
server.setHandler(contextHandler);
server.start();
client.start();
serverUri = new URI("ws://localhost:" + connector.getLocalPort() + "/");
}
@AfterEach
public void stop() throws Exception
{
client.stop();
server.stop();
}
@Test
public void testEcho() throws Exception
{
start(new EchoSocket());
EventSocket clientSocket = new EventSocket();
Session session = client.connect(clientSocket, serverUri).get(5, TimeUnit.SECONDS);
// Send and receive an echo text message.
clientSocket.getSession().getRemote().sendStringByFuture("hello world");
assertThat(clientSocket.textMessages.poll(5, TimeUnit.SECONDS), is("hello world"));
// Make sure both sides close successfully.
session.close(StatusCode.NORMAL, null);
assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverSocket.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(clientSocket.closeCode, is(StatusCode.NORMAL));
assertThat(serverSocket.closeCode, is(StatusCode.NORMAL));
}
}

View File

@ -18,7 +18,7 @@
package org.eclipse.jetty.websocket.tests;
import java.nio.ByteBuffer;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
@ -26,9 +26,14 @@ import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketListener;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
public class EventSocket implements WebSocketListener
@WebSocket
public class EventSocket
{
private static Logger LOG = Log.getLogger(EventSocket.class);
@ -38,62 +43,48 @@ public class EventSocket implements WebSocketListener
public volatile int closeCode = -1;
public volatile String closeReason = null;
public BlockingQueue<String> textMessages = new BlockingArrayQueue<>();
public BlockingQueue<ByteBuffer> binaryMessages = new BlockingArrayQueue<>();
public BlockingQueue<String> receivedMessages = new BlockingArrayQueue<>();
public CountDownLatch openLatch = new CountDownLatch(1);
public CountDownLatch closeLatch = new CountDownLatch(1);
public CountDownLatch errorLatch = new CountDownLatch(1);
public CountDownLatch open = new CountDownLatch(1);
public CountDownLatch error = new CountDownLatch(1);
public CountDownLatch closed = new CountDownLatch(1);
public Session getSession()
{
return session;
}
@Override
public void onWebSocketConnect(Session session)
@OnWebSocketConnect
public void onOpen(Session session)
{
this.session = session;
behavior = session.getPolicy().getBehavior().name();
if (LOG.isDebugEnabled())
LOG.debug("{} onOpen(): {}", toString(), session);
openLatch.countDown();
LOG.info("{} onOpen(): {}", toString(), session);
open.countDown();
}
@Override
public void onWebSocketText(String message)
@OnWebSocketMessage
public void onMessage(String message) throws IOException
{
if (LOG.isDebugEnabled())
LOG.debug("{} onMessage(): {}", toString(), message);
textMessages.offer(message);
LOG.info("{} onMessage(): {}", toString(), message);
receivedMessages.offer(message);
}
@Override
public void onWebSocketBinary(byte[] payload, int offset, int length)
@OnWebSocketClose
public void onClose(int statusCode, String reason)
{
ByteBuffer message = ByteBuffer.wrap(payload, offset, length);
if (LOG.isDebugEnabled())
LOG.debug("{} onMessage(): {}", toString(), message);
binaryMessages.offer(message);
}
@Override
public void onWebSocketClose(int statusCode, String reason)
{
if (LOG.isDebugEnabled())
LOG.debug("{} onClose(): {}:{}", toString(), statusCode, reason);
LOG.debug("{} onClose(): {}:{}", toString(), statusCode, reason);
closeCode = statusCode;
closeReason = reason;
closeLatch.countDown();
closed.countDown();
}
@Override
public void onWebSocketError(Throwable cause)
@OnWebSocketError
public void onError(Throwable cause)
{
if (LOG.isDebugEnabled())
LOG.debug("{} onError(): {}", toString(), cause);
LOG.info("{} onError(): {}", toString(), cause);
failure = cause;
errorLatch.countDown();
error.countDown();
}
@Override
@ -101,4 +92,15 @@ public class EventSocket implements WebSocketListener
{
return String.format("[%s@%s]", behavior, Integer.toHexString(hashCode()));
}
@WebSocket
public static class EchoSocket extends EventSocket
{
@Override
public void onMessage(String message) throws IOException
{
super.onMessage(message);
session.getRemote().sendStringByFuture(message);
}
}
}

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.websocket.tests;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@ -29,6 +30,7 @@ import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.SuspendToken;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
@ -44,16 +46,24 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class SuspendResumeTest
{
@WebSocket
public static class SuspendSocket extends EventSocket
{
volatile SuspendToken suspendToken = null;
@Override
public void onWebSocketText(String message)
public void onMessage(String message) throws IOException
{
if ("suspend".equals(message))
suspendToken = session.suspend();
super.onWebSocketText(message);
super.onMessage(message);
}
@Override
public void onError(Throwable cause)
{
super.onError(cause);
cause.printStackTrace();
}
}
@ -105,21 +115,21 @@ public class SuspendResumeTest
clientSocket.session.getRemote().sendString("suspend");
clientSocket.session.getRemote().sendString("hello world");
assertThat(serverSocket.textMessages.poll(5, TimeUnit.SECONDS), is("suspend"));
assertNull(serverSocket.textMessages.poll(1, TimeUnit.SECONDS));
assertThat(serverSocket.receivedMessages.poll(5, TimeUnit.SECONDS), is("suspend"));
assertNull(serverSocket.receivedMessages.poll(1, TimeUnit.SECONDS));
serverSocket.suspendToken.resume();
assertThat(serverSocket.textMessages.poll(5, TimeUnit.SECONDS), is("suspend"));
assertNull(serverSocket.textMessages.poll(1, TimeUnit.SECONDS));
assertThat(serverSocket.receivedMessages.poll(5, TimeUnit.SECONDS), is("suspend"));
assertNull(serverSocket.receivedMessages.poll(1, TimeUnit.SECONDS));
serverSocket.suspendToken.resume();
assertThat(serverSocket.textMessages.poll(5, TimeUnit.SECONDS), is("hello world"));
assertNull(serverSocket.textMessages.poll(1, TimeUnit.SECONDS));
assertThat(serverSocket.receivedMessages.poll(5, TimeUnit.SECONDS), is("hello world"));
assertNull(serverSocket.receivedMessages.poll(1, TimeUnit.SECONDS));
// make sure both sides are closed
clientSocket.session.close();
assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverSocket.closeLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientSocket.closed.await(5, TimeUnit.SECONDS));
assertTrue(serverSocket.closed.await(5, TimeUnit.SECONDS));
// check we closed normally
assertThat(clientSocket.closeCode, is(StatusCode.NORMAL));
@ -135,29 +145,29 @@ public class SuspendResumeTest
connect.get(5, TimeUnit.SECONDS);
// verify connection by sending a message from server to client
assertTrue(serverSocket.openLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverSocket.open.await(5, TimeUnit.SECONDS));
serverSocket.session.getRemote().sendString("verification");
assertThat(clientSocket.textMessages.poll(5, TimeUnit.SECONDS), is("verification"));
assertThat(clientSocket.receivedMessages.poll(5, TimeUnit.SECONDS), is("verification"));
// suspend the client so that no read events occur
SuspendToken suspendToken = clientSocket.session.suspend();
// verify client can still send messages
clientSocket.session.getRemote().sendString("message-from-client");
assertThat(serverSocket.textMessages.poll(5, TimeUnit.SECONDS), is("message-from-client"));
assertThat(serverSocket.receivedMessages.poll(5, TimeUnit.SECONDS), is("message-from-client"));
// the message is not received as it is suspended
serverSocket.session.getRemote().sendString("message-from-server");
assertNull(clientSocket.textMessages.poll(2, TimeUnit.SECONDS));
assertNull(clientSocket.receivedMessages.poll(2, TimeUnit.SECONDS));
// client should receive message after it resumes
suspendToken.resume();
assertThat(clientSocket.textMessages.poll(5, TimeUnit.SECONDS), is("message-from-server"));
assertThat(clientSocket.receivedMessages.poll(5, TimeUnit.SECONDS), is("message-from-server"));
// make sure both sides are closed
clientSocket.session.close();
assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverSocket.closeLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientSocket.closed.await(5, TimeUnit.SECONDS));
assertTrue(serverSocket.closed.await(5, TimeUnit.SECONDS));
// check we closed normally
assertThat(clientSocket.closeCode, is(StatusCode.NORMAL));
@ -173,14 +183,14 @@ public class SuspendResumeTest
connect.get(5, TimeUnit.SECONDS);
// verify connection by sending a message from server to client
assertTrue(serverSocket.openLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverSocket.open.await(5, TimeUnit.SECONDS));
serverSocket.session.getRemote().sendString("verification");
assertThat(clientSocket.textMessages.poll(5, TimeUnit.SECONDS), is("verification"));
assertThat(clientSocket.receivedMessages.poll(5, TimeUnit.SECONDS), is("verification"));
// make sure both sides are closed
clientSocket.session.close();
assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverSocket.closeLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientSocket.closed.await(5, TimeUnit.SECONDS));
assertTrue(serverSocket.closed.await(5, TimeUnit.SECONDS));
// check we closed normally
assertThat(clientSocket.closeCode, is(StatusCode.NORMAL));

View File

@ -144,7 +144,7 @@ public class WebSocketStatsTest
}
}
assertTrue(socket.closeLatch.await(5, TimeUnit.SECONDS));
assertTrue(socket.closed.await(5, TimeUnit.SECONDS));
assertTrue(wsConnectionClosed.await(5, TimeUnit.SECONDS));
assertThat(statistics.getConnectionsMax(), is(1L));

View File

@ -43,6 +43,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class WriteAfterStopTest
{
public class UpgradeServlet extends WebSocketServlet
{
@Override
@ -89,13 +90,13 @@ public class WriteAfterStopTest
upgradeRequest.addExtensions("permessage-deflate");
Session session = client.connect(clientSocket, uri, upgradeRequest).get(5, TimeUnit.SECONDS);
clientSocket.getSession().getRemote().sendStringByFuture("init deflater");
assertThat(serverSocket.textMessages.poll(5, TimeUnit.SECONDS), is("init deflater"));
assertThat(serverSocket.receivedMessages.poll(5, TimeUnit.SECONDS), is("init deflater"));
session.close(StatusCode.NORMAL, null);
// make sure both sides are closed
clientSocket.session.close();
assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverSocket.closeLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientSocket.closed.await(5, TimeUnit.SECONDS));
assertTrue(serverSocket.closed.await(5, TimeUnit.SECONDS));
// check we closed normally
assertThat(clientSocket.closeCode, is(StatusCode.NORMAL));