From db9997a7920fc9abd252d149b71e97c92a487b90 Mon Sep 17 00:00:00 2001 From: Joakim Erdfelt Date: Tue, 20 Jun 2017 13:40:30 -0700 Subject: [PATCH] Fixing WebSocket MessageReceivingTest --- jetty-websocket/websocket-tests/pom.xml | 12 + .../client/jsr356/MessageReceivingTest.java | 411 +++++++++++++----- .../test/resources/jetty-logging.properties | 5 +- .../src/test/resources/logback-test.xml | 32 ++ 4 files changed, 359 insertions(+), 101 deletions(-) create mode 100644 jetty-websocket/websocket-tests/src/test/resources/logback-test.xml diff --git a/jetty-websocket/websocket-tests/pom.xml b/jetty-websocket/websocket-tests/pom.xml index ea8a7181e87..df47c25407e 100644 --- a/jetty-websocket/websocket-tests/pom.xml +++ b/jetty-websocket/websocket-tests/pom.xml @@ -56,6 +56,18 @@ ${project.version} tests + + ch.qos.logback + logback-classic + 1.2.3 + test + + + org.slf4j + slf4j-api + 1.7.25 + test + org.eclipse.jetty.toolchain jetty-test-helper diff --git a/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/jsr356/MessageReceivingTest.java b/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/jsr356/MessageReceivingTest.java index b2074938554..814902fefa4 100644 --- a/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/jsr356/MessageReceivingTest.java +++ b/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/jsr356/MessageReceivingTest.java @@ -18,19 +18,23 @@ package org.eclipse.jetty.websocket.tests.client.jsr356; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertThat; import java.io.IOException; import java.net.URI; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; +import javax.websocket.ClientEndpointConfig; import javax.websocket.ContainerProvider; import javax.websocket.Endpoint; import javax.websocket.EndpointConfig; @@ -41,97 +45,224 @@ import javax.websocket.WebSocketContainer; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.handler.ContextHandler; +import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.component.LifeCycle; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.RemoteEndpoint; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; +import org.eclipse.jetty.websocket.api.annotations.WebSocket; +import org.eclipse.jetty.websocket.api.util.WSURI; +import org.eclipse.jetty.websocket.common.util.TextUtil; +import org.eclipse.jetty.websocket.server.WebSocketHandler; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; +import org.eclipse.jetty.websocket.servlet.WebSocketCreator; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; +import org.eclipse.jetty.websocket.tests.DataUtils; import org.junit.After; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; /** * This class tests receiving of messages by different types of {@link MessageHandler} */ -public class MessageReceivingTest { - private static final Logger LOG = Log.getLogger(EndpointEchoTest.class); - private static Server server; - private static EchoHandler handler; - private static URI serverUri; - private WebSocketContainer container; - private final String VERY_LONG_STRING; - - public MessageReceivingTest() { - byte raw[] = new byte[1024 * 1024]; - Arrays.fill(raw, (byte)'x'); - VERY_LONG_STRING = new String(raw, StandardCharsets.UTF_8); +public class MessageReceivingTest +{ + @WebSocket + public static class SendPartialTextSocket + { + @OnWebSocketMessage + public void onText(org.eclipse.jetty.websocket.api.Session session, String message) throws IOException + { + RemoteEndpoint remote = session.getRemote(); + String parts[] = message.split(" "); + for (int i = 0; i < parts.length; i++) + { + if (i > 0) + remote.sendPartialString(" ", false); + boolean last = (i >= (parts.length - 1)); + remote.sendPartialString(parts[i], last); + } + } } + @WebSocket + public static class SendPartialBinarySocket + { + @OnWebSocketMessage + public void onText(org.eclipse.jetty.websocket.api.Session session, ByteBuffer payload) throws IOException + { + RemoteEndpoint remote = session.getRemote(); + ByteBuffer copy = DataUtils.copyOf(payload); + int segmentSize = 128 * 1024; + int segmentCount = Math.max(1, copy.remaining() / segmentSize); + if (LOG.isDebugEnabled()) + { + LOG.debug(".onText(payload.length={})", payload.remaining()); + LOG.debug("segmentSize={}, segmentCount={}", segmentSize, segmentCount); + } + for (int i = 0; i < segmentCount; i++) + { + ByteBuffer segment = copy.slice(); + segment.position(i * segmentSize); + int remaining = segment.remaining(); + segment.limit(segment.position() + Math.min(remaining, segmentSize)); + boolean last = (i >= (segmentCount - 1)); + if (LOG.isDebugEnabled()) + { + LOG.debug("segment[{}].sendPartialBytes({}, {})", i, BufferUtil.toDetailString(segment), last); + } + remote.sendPartialBytes(segment.asReadOnlyBuffer(), last); + } + } + } + + @WebSocket + public static class EchoWholeMessageSocket + { + @OnWebSocketMessage + public void onText(org.eclipse.jetty.websocket.api.Session session, String message) throws IOException + { + if (LOG.isDebugEnabled()) + { + LOG.debug("{}.onText({})", EchoWholeMessageSocket.class.getSimpleName(), TextUtil.hint(message)); + } + session.getRemote().sendString(message); + } + + @OnWebSocketMessage + public void onBinary(org.eclipse.jetty.websocket.api.Session session, ByteBuffer payload) throws IOException + { + if (LOG.isDebugEnabled()) + { + LOG.debug("{}.onBinary({})", EchoWholeMessageSocket.class.getSimpleName(), BufferUtil.toDetailString(payload)); + } + ByteBuffer copy = DataUtils.copyOf(payload); + session.getRemote().sendBytes(copy); + } + } + + public static class ServerMessageSendingHandler extends WebSocketHandler implements WebSocketCreator + { + @Override + public void configure(WebSocketServletFactory factory) + { + factory.getPolicy().setMaxTextMessageSize(2 * 1024 * 1024); + factory.getPolicy().setMaxBinaryMessageSize(2 * 1024 * 1024); + factory.setCreator(this); + } + + @Override + public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp) + { + if (req.hasSubProtocol("partial-text")) + { + resp.setAcceptedSubProtocol("partial-text"); + return new SendPartialTextSocket(); + } + + if (req.hasSubProtocol("partial-binary")) + { + resp.setAcceptedSubProtocol("partial-binary"); + return new SendPartialBinarySocket(); + } + + if (req.hasSubProtocol("echo")) + { + resp.setAcceptedSubProtocol("echo"); + return new EchoWholeMessageSocket(); + } + + return null; + } + } + + private static final Logger LOG = Log.getLogger(MessageReceivingTest.class); + private static Server server; + private static URI serverUri; + private WebSocketContainer container; + @BeforeClass - public static void startServer() throws Exception { + public static void startServer() throws Exception + { server = new Server(); ServerConnector connector = new ServerConnector(server); connector.setPort(0); server.addConnector(connector); - handler = new EchoHandler(); - ContextHandler context = new ContextHandler(); context.setContextPath("/"); - context.setHandler(handler); + context.setHandler(new ServerMessageSendingHandler()); server.setHandler(context); // Start Server server.start(); - String host = connector.getHost(); - if (host == null) { - host = "localhost"; - } - int port = connector.getLocalPort(); - serverUri = new URI(String.format("ws://%s:%d/", host, port)); + serverUri = WSURI.toWebsocket(server.getURI()).resolve("/"); } @AfterClass - public static void stopServer() { - try { - server.stop(); - } catch (Exception e) { - e.printStackTrace(System.err); - } + public static void stopServer() throws Exception + { + server.stop(); } @Before - public void initClient() { + public void initClient() + { container = ContainerProvider.getWebSocketContainer(); } - + @After public void stopClient() throws Exception { - ((LifeCycle)container).stop(); + ((LifeCycle) container).stop(); } - + /** * Method tests receiving of text messages at once. * * @throws Exception on exception occur */ @Test - @Ignore("flappy test") - public void testWholeTextMessage() throws Exception { - final TestEndpoint echoer = new TestEndpoint(new WholeStringCaptureHandler()); - Assert.assertThat(echoer, instanceOf(javax.websocket.Endpoint.class)); - // Issue connect using instance of class that extends Endpoint - final Session session = container.connectToServer(echoer, serverUri); - session.getBasicRemote().sendText(""); - session.getBasicRemote().sendText("Echo"); - session.getBasicRemote().sendText(VERY_LONG_STRING); - session.getBasicRemote().sendText("Echo"); - String msg = echoer.handler.messageQueue.poll(1, TimeUnit.SECONDS); - msg = echoer.handler.messageQueue.poll(1, TimeUnit.SECONDS); + public void testWholeTextMessage() throws Exception + { + final TestEndpoint clientEndpoint = new TestEndpoint(new WholeStringCaptureHandler()); + assertThat(clientEndpoint, instanceOf(javax.websocket.Endpoint.class)); + + ClientEndpointConfig clientConfig = ClientEndpointConfig.Builder.create() + .preferredSubprotocols(Collections.singletonList("echo")) + .build(); + + byte raw[] = new byte[1024 * 1024]; + Arrays.fill(raw, (byte) 'x'); + String veryLongString = new String(raw, UTF_8); + + final Session session = container.connectToServer(clientEndpoint, clientConfig, serverUri); + try + { + session.getBasicRemote().sendText(""); + session.getBasicRemote().sendText("Echo"); + session.getBasicRemote().sendText(veryLongString); + session.getBasicRemote().sendText("Another Echo"); + + String msg; + msg = clientEndpoint.handler.messageQueue.poll(5, TimeUnit.SECONDS); + assertThat("Received Message", msg, is("")); + msg = clientEndpoint.handler.messageQueue.poll(5, TimeUnit.SECONDS); + assertThat("Received Message", msg, is("Echo")); + msg = clientEndpoint.handler.messageQueue.poll(5, TimeUnit.SECONDS); + assertThat("Received Message", msg, is(veryLongString)); + msg = clientEndpoint.handler.messageQueue.poll(5, TimeUnit.SECONDS); + assertThat("Received Message", msg, is("Another Echo")); + } + finally + { + session.close(); + } } /** @@ -140,15 +271,33 @@ public class MessageReceivingTest { * @throws Exception on exception occur */ @Test - public void testPartialTextMessage() throws Exception { - final TestEndpoint echoer = new TestEndpoint(new PartialStringCaptureHandler()); - Assert.assertThat(echoer, instanceOf(javax.websocket.Endpoint.class)); - // Issue connect using instance of class that extends Endpoint - final Session session = container.connectToServer(echoer, serverUri); - session.getBasicRemote().sendText(""); - session.getBasicRemote().sendText("Echo"); - String msg = echoer.handler.messageQueue.poll(1, TimeUnit.SECONDS); - msg = echoer.handler.messageQueue.poll(1, TimeUnit.SECONDS); + public void testPartialTextMessage() throws Exception + { + final TestEndpoint clientEndpoint = new TestEndpoint(new PartialStringCaptureHandler()); + assertThat(clientEndpoint, instanceOf(javax.websocket.Endpoint.class)); + + ClientEndpointConfig clientConfig = ClientEndpointConfig.Builder.create() + .preferredSubprotocols(Collections.singletonList("partial-text")) + .build(); + + final Session session = container.connectToServer(clientEndpoint, clientConfig, serverUri); + try + { + session.getBasicRemote().sendText(""); + session.getBasicRemote().sendText("Echo"); + session.getBasicRemote().sendText("I can live for two months on a good compliment."); + String msg; + msg = clientEndpoint.handler.messageQueue.poll(5, TimeUnit.SECONDS); + assertThat("Received Message", msg, is("")); + msg = clientEndpoint.handler.messageQueue.poll(5, TimeUnit.SECONDS); + assertThat("Received Message", msg, is("Echo")); + msg = clientEndpoint.handler.messageQueue.poll(5, TimeUnit.SECONDS); + assertThat("Received Message", msg, is("I can live for two months on a good compliment.")); + } + finally + { + session.close(); + } } /** @@ -157,15 +306,31 @@ public class MessageReceivingTest { * @throws Exception on exception occur */ @Test - public void testWholeBinaryMessage() throws Exception { - final TestEndpoint echoer = new TestEndpoint(new WholeByteBufferCaptureHandler()); - Assert.assertThat(echoer, instanceOf(javax.websocket.Endpoint.class)); - // Issue connect using instance of class that extends Endpoint - final Session session = container.connectToServer(echoer, serverUri); - sendBinary(session, ""); - sendBinary(session, "Echo"); - String msg = echoer.handler.messageQueue.poll(1, TimeUnit.SECONDS); - msg = echoer.handler.messageQueue.poll(1, TimeUnit.SECONDS); + public void testWholeBinaryMessage() throws Exception + { + final TestEndpoint clientEndpoint = new TestEndpoint(new WholeByteBufferCaptureHandler()); + assertThat(clientEndpoint, instanceOf(javax.websocket.Endpoint.class)); + + ClientEndpointConfig clientConfig = ClientEndpointConfig.Builder.create() + .preferredSubprotocols(Collections.singletonList("echo")) + .build(); + + final Session session = container.connectToServer(clientEndpoint, clientConfig, serverUri); + try + { + session.getBasicRemote().sendBinary(BufferUtil.toBuffer("", UTF_8)); + session.getBasicRemote().sendBinary(BufferUtil.toBuffer("Echo", UTF_8)); + + String msg; + msg = clientEndpoint.handler.messageQueue.poll(5, TimeUnit.SECONDS); + assertThat("Received Message", msg, is("")); + msg = clientEndpoint.handler.messageQueue.poll(5, TimeUnit.SECONDS); + assertThat("Received Message", msg, is("Echo")); + } + finally + { + session.close(); + } } /** @@ -174,31 +339,60 @@ public class MessageReceivingTest { * @throws Exception on exception occur */ @Test - public void testPartialBinaryMessage() throws Exception { - final TestEndpoint echoer = new TestEndpoint(new PartialByteBufferCaptureHandler()); - Assert.assertThat(echoer, instanceOf(javax.websocket.Endpoint.class)); - // Issue connect using instance of class that extends Endpoint - final Session session = container.connectToServer(echoer, serverUri); - sendBinary(session, ""); - sendBinary(session, "Echo"); - String msg = echoer.handler.messageQueue.poll(1, TimeUnit.SECONDS); - msg = echoer.handler.messageQueue.poll(1, TimeUnit.SECONDS); + public void testPartialBinaryMessage() throws Exception + { + final TestEndpoint clientEndpoint = new TestEndpoint(new PartialByteBufferCaptureHandler()); + assertThat(clientEndpoint, instanceOf(javax.websocket.Endpoint.class)); + + ClientEndpointConfig clientConfig = ClientEndpointConfig.Builder.create() + .preferredSubprotocols(Collections.singletonList("partial-binary")) + .build(); + + final Session session = container.connectToServer(clientEndpoint, clientConfig, serverUri); + try + { + session.getBasicRemote().sendBinary(BufferUtil.toBuffer("", UTF_8)); + session.getBasicRemote().sendBinary(BufferUtil.toBuffer("Echo", UTF_8)); + byte bigBuf[] = new byte[1024 * 1024]; + Arrays.fill(bigBuf, (byte) 'x'); + // allocate fresh ByteBuffer and copy array contents, not wrap + // as the send will modify the wrapped array (for client masking purposes) + ByteBuffer bigByteBuffer = ByteBuffer.allocate(bigBuf.length); + bigByteBuffer.put(bigBuf); + bigByteBuffer.flip(); + session.getBasicRemote().sendBinary(bigByteBuffer); + session.getBasicRemote().sendBinary(BufferUtil.toBuffer("Another Echo", UTF_8)); + + String msg; + msg = clientEndpoint.handler.messageQueue.poll(5, TimeUnit.SECONDS); + assertThat("Echo'd Message", msg, is("")); + msg = clientEndpoint.handler.messageQueue.poll(5, TimeUnit.SECONDS); + assertThat("Echo'd Message", msg, is("Echo")); + msg = clientEndpoint.handler.messageQueue.poll(5, TimeUnit.SECONDS); + assertThat("Echo'd Message.length", msg.length(), is(bigBuf.length)); + msg = clientEndpoint.handler.messageQueue.poll(5, TimeUnit.SECONDS); + assertThat("Echo'd Message", msg, is("Another Echo")); + } + finally + { + session.close(); + } } - private static void sendBinary(Session session, String message) throws IOException { - final ByteBuffer bb = ByteBuffer.wrap(message.getBytes()); - session.getBasicRemote().sendBinary(bb); - } - - private static class TestEndpoint extends Endpoint { + public static class TestEndpoint extends Endpoint + { public final AbstractHandler handler; - public TestEndpoint(AbstractHandler handler) { + public TestEndpoint(AbstractHandler handler) + { this.handler = handler; } @Override - public void onOpen(Session session, EndpointConfig config) { + public void onOpen(Session session, EndpointConfig config) + { + session.setMaxTextMessageBufferSize(2 * 1024 * 1024); + session.setMaxBinaryMessageBufferSize(2 * 1024 * 1024); session.addMessageHandler(handler); } } @@ -206,7 +400,8 @@ public class MessageReceivingTest { /** * Abstract message handler implementation, used for tests. */ - private static abstract class AbstractHandler implements MessageHandler { + private static abstract class AbstractHandler implements MessageHandler + { public final BlockingQueue messageQueue = new LinkedBlockingDeque<>(); } @@ -214,7 +409,8 @@ public class MessageReceivingTest { * Partial message handler for receiving binary messages. */ public static class PartialByteBufferCaptureHandler extends AbstractHandler implements - MessageHandler.Partial { + MessageHandler.Partial + { /** * Parts of the current message. This list is appended with every non-last part and is * cleared after last part of a message has been received. @@ -222,32 +418,44 @@ public class MessageReceivingTest { private final List currentMessage = new ArrayList<>(); @Override - public void onMessage(ByteBuffer messagePart, boolean last) { - final ByteBuffer bufferCopy = ByteBuffer.allocate(messagePart.capacity()); - bufferCopy.put(messagePart); + public void onMessage(ByteBuffer messagePart, boolean last) + { + if(LOG.isDebugEnabled()) + { + LOG.debug("PartialByteBufferCaptureHandler.onMessage({}, {})", BufferUtil.toDetailString(messagePart), last); + } + + final ByteBuffer bufferCopy = DataUtils.copyOf(messagePart); currentMessage.add(bufferCopy); - if (last) { + if (last) + { int totalSize = 0; - for (ByteBuffer bb : currentMessage) { + for (ByteBuffer bb : currentMessage) + { totalSize += bb.capacity(); } final ByteBuffer result = ByteBuffer.allocate(totalSize); - for (ByteBuffer bb : currentMessage) { + for (ByteBuffer bb : currentMessage) + { result.put(bb); } - final String stringResult = new String(result.array()); + BufferUtil.flipToFlush(result, 0); + final String stringResult = BufferUtil.toUTF8String(result); messageQueue.offer(stringResult); currentMessage.clear(); } } } + /** * Whole message handler for receiving binary messages. */ public static class WholeByteBufferCaptureHandler extends AbstractHandler implements - MessageHandler.Whole { + MessageHandler.Whole + { @Override - public void onMessage(ByteBuffer message) { + public void onMessage(ByteBuffer message) + { final String stringResult = new String(message.array()); messageQueue.offer(stringResult); } @@ -257,7 +465,8 @@ public class MessageReceivingTest { * Partial message handler for receiving text messages. */ public static class PartialStringCaptureHandler extends AbstractHandler implements - MessageHandler.Partial { + MessageHandler.Partial + { /** * Parts of the current message. This list is appended with every non-last part and is * cleared after last part of a message has been received. @@ -265,22 +474,26 @@ public class MessageReceivingTest { private StringBuilder sb = new StringBuilder(); @Override - public void onMessage(String messagePart, boolean last) { + public void onMessage(String messagePart, boolean last) + { sb.append(messagePart); - if (last) { + if (last) + { messageQueue.add(sb.toString()); sb = new StringBuilder(); } } } - + /** * Whole message handler for receiving text messages. */ public static class WholeStringCaptureHandler extends AbstractHandler implements - MessageHandler.Whole { + MessageHandler.Whole + { @Override - public void onMessage(String message) { + public void onMessage(String message) + { messageQueue.add(message); } } diff --git a/jetty-websocket/websocket-tests/src/test/resources/jetty-logging.properties b/jetty-websocket/websocket-tests/src/test/resources/jetty-logging.properties index 314719e45f2..714faff550b 100644 --- a/jetty-websocket/websocket-tests/src/test/resources/jetty-logging.properties +++ b/jetty-websocket/websocket-tests/src/test/resources/jetty-logging.properties @@ -18,8 +18,9 @@ # # +# org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.Slf4jLog org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog -org.eclipse.jetty.LEVEL=WARN +# org.eclipse.jetty.LEVEL=INFO # org.eclipse.jetty.util.log.stderr.LONG=true # org.eclipse.jetty.server.AbstractConnector.LEVEL=DEBUG @@ -33,7 +34,7 @@ org.eclipse.jetty.LEVEL=WARN # org.eclipse.jetty.websocket.jsr356.messages.LEVEL=DEBUG # org.eclipse.jetty.websocket.tests.LEVEL=DEBUG # org.eclipse.jetty.websocket.tests.client.LEVEL=DEBUG -# org.eclipse.jetty.websocket.tests.client.jsr356.LEVEL=DEBUG +org.eclipse.jetty.websocket.tests.client.jsr356.LEVEL=DEBUG # org.eclipse.jetty.websocket.tests.server.LEVEL=DEBUG # org.eclipse.jetty.websocket.tests.server.jsr356.LEVEL=DEBUG # org.eclipse.jetty.websocket.common.LEVEL=DEBUG diff --git a/jetty-websocket/websocket-tests/src/test/resources/logback-test.xml b/jetty-websocket/websocket-tests/src/test/resources/logback-test.xml new file mode 100644 index 00000000000..0b0627e01d8 --- /dev/null +++ b/jetty-websocket/websocket-tests/src/test/resources/logback-test.xml @@ -0,0 +1,32 @@ + + + + + + + %d{HH:mm:ss.SSS} [%-30thread] %-5level %logger{45} - %msg%n + + + + + + target/tests.log + false + + + %d{HH:mm:ss.SSS} [%-30thread] %-5level %logger{45} - %msg%n + + + + + + + + + + + + + \ No newline at end of file