Issue #4475 - add tests not reading until EOF

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2020-01-14 19:14:48 +11:00
parent 16c406b55c
commit c7b6ccca98
1 changed files with 63 additions and 9 deletions

View File

@ -20,10 +20,10 @@ package org.eclipse.jetty.websocket.jsr356.server;
import java.io.IOException; import java.io.IOException;
import java.io.Reader; import java.io.Reader;
import java.io.StringWriter;
import java.io.Writer; import java.io.Writer;
import java.net.URI; import java.net.URI;
import java.util.Random; import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.websocket.ClientEndpoint; import javax.websocket.ClientEndpoint;
@ -50,14 +50,14 @@ import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
public class TextStreamTest public class TextStreamTest
{ {
private static final String PATH = "/echo"; private static final String PATH = "/echo";
private static final String CHARS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; private static final String CHARS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
private static final BlockingArrayQueue<QueuedTextStreamer> serverEndpoints = new BlockingArrayQueue<>();
private static CompletableFuture<QueuedTextStreamer> queuedTextStreamerFuture = new CompletableFuture<>();
private Server server; private Server server;
private ServerConnector connector; private ServerConnector connector;
@ -74,6 +74,7 @@ public class TextStreamTest
ServerContainer container = WebSocketServerContainerInitializer.configureContext(context); ServerContainer container = WebSocketServerContainerInitializer.configureContext(context);
container.addEndpoint(ServerEndpointConfig.Builder.create(ServerTextStreamer.class, PATH).build()); container.addEndpoint(ServerEndpointConfig.Builder.create(ServerTextStreamer.class, PATH).build());
container.addEndpoint(ServerEndpointConfig.Builder.create(QueuedTextStreamer.class, "/test").build()); container.addEndpoint(ServerEndpointConfig.Builder.create(QueuedTextStreamer.class, "/test").build());
container.addEndpoint(ServerEndpointConfig.Builder.create(QueuedPartialTextStreamer.class, "/partial").build());
server.start(); server.start();
@ -136,7 +137,7 @@ public class TextStreamTest
} }
@Test @Test
public void test() throws Exception public void testMessageOrdering() throws Exception
{ {
URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + "/test"); URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + "/test");
ClientTextStreamer client = new ClientTextStreamer(); ClientTextStreamer client = new ClientTextStreamer();
@ -147,7 +148,8 @@ public class TextStreamTest
session.getBasicRemote().sendText(Integer.toString(i)); session.getBasicRemote().sendText(Integer.toString(i));
session.close(); session.close();
QueuedTextStreamer queuedTextStreamer = queuedTextStreamerFuture.get(5, TimeUnit.SECONDS); QueuedTextStreamer queuedTextStreamer = serverEndpoints.poll(5, TimeUnit.SECONDS);
assertNotNull(queuedTextStreamer);
for (int i = 0; i < numLoops; i++) for (int i = 0; i < numLoops; i++)
{ {
String msg = queuedTextStreamer.messages.poll(5, TimeUnit.SECONDS); String msg = queuedTextStreamer.messages.poll(5, TimeUnit.SECONDS);
@ -156,7 +158,7 @@ public class TextStreamTest
} }
@Test @Test
public void testFragmented() throws Exception public void testFragmentedMessageOrdering() throws Exception
{ {
URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + "/test"); URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + "/test");
ClientTextStreamer client = new ClientTextStreamer(); ClientTextStreamer client = new ClientTextStreamer();
@ -171,7 +173,8 @@ public class TextStreamTest
} }
session.close(); session.close();
QueuedTextStreamer queuedTextStreamer = queuedTextStreamerFuture.get(5, TimeUnit.SECONDS); QueuedTextStreamer queuedTextStreamer = serverEndpoints.poll(5, TimeUnit.SECONDS);
assertNotNull(queuedTextStreamer);
for (int i = 0; i < numLoops; i++) for (int i = 0; i < numLoops; i++)
{ {
String msg = queuedTextStreamer.messages.poll(5, TimeUnit.SECONDS); String msg = queuedTextStreamer.messages.poll(5, TimeUnit.SECONDS);
@ -180,6 +183,29 @@ public class TextStreamTest
} }
} }
@Test
public void testMessageOrderingDoNotReadToEOF() throws Exception
{
URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + "/partial");
ClientTextStreamer client = new ClientTextStreamer();
Session session = wsClient.connectToServer(client, uri);
final int numLoops = 20;
for (int i = 0; i < numLoops; i++)
{
session.getBasicRemote().sendText(i + "|-----");
}
session.close();
QueuedTextStreamer queuedTextStreamer = serverEndpoints.poll(5, TimeUnit.SECONDS);
assertNotNull(queuedTextStreamer);
for (int i = 0; i < numLoops; i++)
{
String msg = queuedTextStreamer.messages.poll(5, TimeUnit.SECONDS);
assertThat(msg, Matchers.is(Integer.toString(i)));
}
}
private char[] randomChars(int size) private char[] randomChars(int size)
{ {
char[] data = new char[size]; char[] data = new char[size];
@ -241,11 +267,11 @@ public class TextStreamTest
public static class QueuedTextStreamer extends Endpoint implements MessageHandler.Whole<Reader> public static class QueuedTextStreamer extends Endpoint implements MessageHandler.Whole<Reader>
{ {
private BlockingArrayQueue<String> messages = new BlockingArrayQueue<>(); protected BlockingArrayQueue<String> messages = new BlockingArrayQueue<>();
public QueuedTextStreamer() public QueuedTextStreamer()
{ {
queuedTextStreamerFuture.complete(this); serverEndpoints.add(this);
} }
@Override @Override
@ -268,4 +294,32 @@ public class TextStreamTest
} }
} }
} }
public static class QueuedPartialTextStreamer extends QueuedTextStreamer
{
@Override
public void onMessage(Reader input)
{
try
{
Thread.sleep(Math.abs(new Random().nextLong() % 200));
// Do not read to EOF but just the first '|'.
StringWriter writer = new StringWriter();
while (true)
{
int read = input.read();
if (read < 0 || read == '|')
break;
writer.write(read);
}
messages.add(writer.toString());
}
catch (Exception e)
{
e.printStackTrace();
}
}
}
} }