diff --git a/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/TextStreamTest.java b/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/TextStreamTest.java index f8418f79927..dc9750caa74 100644 --- a/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/TextStreamTest.java +++ b/jetty-websocket/javax-websocket-server-impl/src/test/java/org/eclipse/jetty/websocket/jsr356/server/TextStreamTest.java @@ -20,10 +20,10 @@ package org.eclipse.jetty.websocket.jsr356.server; import java.io.IOException; import java.io.Reader; +import java.io.StringWriter; import java.io.Writer; import java.net.URI; import java.util.Random; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import javax.websocket.ClientEndpoint; @@ -50,14 +50,14 @@ import org.junit.jupiter.api.Test; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; public class TextStreamTest { private static final String PATH = "/echo"; private static final String CHARS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; - - private static CompletableFuture queuedTextStreamerFuture = new CompletableFuture<>(); + private static final BlockingArrayQueue serverEndpoints = new BlockingArrayQueue<>(); private Server server; private ServerConnector connector; @@ -74,6 +74,7 @@ public class TextStreamTest ServerContainer container = WebSocketServerContainerInitializer.configureContext(context); container.addEndpoint(ServerEndpointConfig.Builder.create(ServerTextStreamer.class, PATH).build()); container.addEndpoint(ServerEndpointConfig.Builder.create(QueuedTextStreamer.class, "/test").build()); + container.addEndpoint(ServerEndpointConfig.Builder.create(QueuedPartialTextStreamer.class, "/partial").build()); server.start(); @@ -136,7 +137,7 @@ public class TextStreamTest } @Test - public void test() throws Exception + public void testMessageOrdering() throws Exception { URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + "/test"); ClientTextStreamer client = new ClientTextStreamer(); @@ -147,7 +148,8 @@ public class TextStreamTest session.getBasicRemote().sendText(Integer.toString(i)); session.close(); - QueuedTextStreamer queuedTextStreamer = queuedTextStreamerFuture.get(5, TimeUnit.SECONDS); + QueuedTextStreamer queuedTextStreamer = serverEndpoints.poll(5, TimeUnit.SECONDS); + assertNotNull(queuedTextStreamer); for (int i = 0; i < numLoops; i++) { String msg = queuedTextStreamer.messages.poll(5, TimeUnit.SECONDS); @@ -156,7 +158,7 @@ public class TextStreamTest } @Test - public void testFragmented() throws Exception + public void testFragmentedMessageOrdering() throws Exception { URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + "/test"); ClientTextStreamer client = new ClientTextStreamer(); @@ -171,7 +173,8 @@ public class TextStreamTest } session.close(); - QueuedTextStreamer queuedTextStreamer = queuedTextStreamerFuture.get(5, TimeUnit.SECONDS); + QueuedTextStreamer queuedTextStreamer = serverEndpoints.poll(5, TimeUnit.SECONDS); + assertNotNull(queuedTextStreamer); for (int i = 0; i < numLoops; i++) { String msg = queuedTextStreamer.messages.poll(5, TimeUnit.SECONDS); @@ -180,6 +183,29 @@ public class TextStreamTest } } + @Test + public void testMessageOrderingDoNotReadToEOF() throws Exception + { + URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + "/partial"); + ClientTextStreamer client = new ClientTextStreamer(); + Session session = wsClient.connectToServer(client, uri); + + final int numLoops = 20; + for (int i = 0; i < numLoops; i++) + { + session.getBasicRemote().sendText(i + "|-----"); + } + session.close(); + + QueuedTextStreamer queuedTextStreamer = serverEndpoints.poll(5, TimeUnit.SECONDS); + assertNotNull(queuedTextStreamer); + for (int i = 0; i < numLoops; i++) + { + String msg = queuedTextStreamer.messages.poll(5, TimeUnit.SECONDS); + assertThat(msg, Matchers.is(Integer.toString(i))); + } + } + private char[] randomChars(int size) { char[] data = new char[size]; @@ -241,11 +267,11 @@ public class TextStreamTest public static class QueuedTextStreamer extends Endpoint implements MessageHandler.Whole { - private BlockingArrayQueue messages = new BlockingArrayQueue<>(); + protected BlockingArrayQueue messages = new BlockingArrayQueue<>(); public QueuedTextStreamer() { - queuedTextStreamerFuture.complete(this); + serverEndpoints.add(this); } @Override @@ -268,4 +294,32 @@ public class TextStreamTest } } } + + public static class QueuedPartialTextStreamer extends QueuedTextStreamer + { + @Override + public void onMessage(Reader input) + { + try + { + Thread.sleep(Math.abs(new Random().nextLong() % 200)); + + // Do not read to EOF but just the first '|'. + StringWriter writer = new StringWriter(); + while (true) + { + int read = input.read(); + if (read < 0 || read == '|') + break; + writer.write(read); + } + + messages.add(writer.toString()); + } + catch (Exception e) + { + e.printStackTrace(); + } + } + } }