Fixing WebSocket MessageReceivingTest

This commit is contained in:
Joakim Erdfelt 2017-06-20 13:40:30 -07:00
parent 52ded32ea0
commit db9997a792
4 changed files with 359 additions and 101 deletions

View File

@ -56,6 +56,18 @@
<version>${project.version}</version>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.toolchain</groupId>
<artifactId>jetty-test-helper</artifactId>

View File

@ -18,19 +18,23 @@
package org.eclipse.jetty.websocket.tests.client.jsr356;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertThat;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import javax.websocket.ClientEndpointConfig;
import javax.websocket.ContainerProvider;
import javax.websocket.Endpoint;
import javax.websocket.EndpointConfig;
@ -41,97 +45,224 @@ import javax.websocket.WebSocketContainer;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.api.util.WSURI;
import org.eclipse.jetty.websocket.common.util.TextUtil;
import org.eclipse.jetty.websocket.server.WebSocketHandler;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.eclipse.jetty.websocket.tests.DataUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
/**
* This class tests receiving of messages by different types of {@link MessageHandler}
*/
public class MessageReceivingTest {
private static final Logger LOG = Log.getLogger(EndpointEchoTest.class);
private static Server server;
private static EchoHandler handler;
private static URI serverUri;
private WebSocketContainer container;
private final String VERY_LONG_STRING;
public MessageReceivingTest() {
byte raw[] = new byte[1024 * 1024];
Arrays.fill(raw, (byte)'x');
VERY_LONG_STRING = new String(raw, StandardCharsets.UTF_8);
public class MessageReceivingTest
{
@WebSocket
public static class SendPartialTextSocket
{
@OnWebSocketMessage
public void onText(org.eclipse.jetty.websocket.api.Session session, String message) throws IOException
{
RemoteEndpoint remote = session.getRemote();
String parts[] = message.split(" ");
for (int i = 0; i < parts.length; i++)
{
if (i > 0)
remote.sendPartialString(" ", false);
boolean last = (i >= (parts.length - 1));
remote.sendPartialString(parts[i], last);
}
}
}
@WebSocket
public static class SendPartialBinarySocket
{
@OnWebSocketMessage
public void onText(org.eclipse.jetty.websocket.api.Session session, ByteBuffer payload) throws IOException
{
RemoteEndpoint remote = session.getRemote();
ByteBuffer copy = DataUtils.copyOf(payload);
int segmentSize = 128 * 1024;
int segmentCount = Math.max(1, copy.remaining() / segmentSize);
if (LOG.isDebugEnabled())
{
LOG.debug(".onText(payload.length={})", payload.remaining());
LOG.debug("segmentSize={}, segmentCount={}", segmentSize, segmentCount);
}
for (int i = 0; i < segmentCount; i++)
{
ByteBuffer segment = copy.slice();
segment.position(i * segmentSize);
int remaining = segment.remaining();
segment.limit(segment.position() + Math.min(remaining, segmentSize));
boolean last = (i >= (segmentCount - 1));
if (LOG.isDebugEnabled())
{
LOG.debug("segment[{}].sendPartialBytes({}, {})", i, BufferUtil.toDetailString(segment), last);
}
remote.sendPartialBytes(segment.asReadOnlyBuffer(), last);
}
}
}
@WebSocket
public static class EchoWholeMessageSocket
{
@OnWebSocketMessage
public void onText(org.eclipse.jetty.websocket.api.Session session, String message) throws IOException
{
if (LOG.isDebugEnabled())
{
LOG.debug("{}.onText({})", EchoWholeMessageSocket.class.getSimpleName(), TextUtil.hint(message));
}
session.getRemote().sendString(message);
}
@OnWebSocketMessage
public void onBinary(org.eclipse.jetty.websocket.api.Session session, ByteBuffer payload) throws IOException
{
if (LOG.isDebugEnabled())
{
LOG.debug("{}.onBinary({})", EchoWholeMessageSocket.class.getSimpleName(), BufferUtil.toDetailString(payload));
}
ByteBuffer copy = DataUtils.copyOf(payload);
session.getRemote().sendBytes(copy);
}
}
public static class ServerMessageSendingHandler extends WebSocketHandler implements WebSocketCreator
{
@Override
public void configure(WebSocketServletFactory factory)
{
factory.getPolicy().setMaxTextMessageSize(2 * 1024 * 1024);
factory.getPolicy().setMaxBinaryMessageSize(2 * 1024 * 1024);
factory.setCreator(this);
}
@Override
public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp)
{
if (req.hasSubProtocol("partial-text"))
{
resp.setAcceptedSubProtocol("partial-text");
return new SendPartialTextSocket();
}
if (req.hasSubProtocol("partial-binary"))
{
resp.setAcceptedSubProtocol("partial-binary");
return new SendPartialBinarySocket();
}
if (req.hasSubProtocol("echo"))
{
resp.setAcceptedSubProtocol("echo");
return new EchoWholeMessageSocket();
}
return null;
}
}
private static final Logger LOG = Log.getLogger(MessageReceivingTest.class);
private static Server server;
private static URI serverUri;
private WebSocketContainer container;
@BeforeClass
public static void startServer() throws Exception {
public static void startServer() throws Exception
{
server = new Server();
ServerConnector connector = new ServerConnector(server);
connector.setPort(0);
server.addConnector(connector);
handler = new EchoHandler();
ContextHandler context = new ContextHandler();
context.setContextPath("/");
context.setHandler(handler);
context.setHandler(new ServerMessageSendingHandler());
server.setHandler(context);
// Start Server
server.start();
String host = connector.getHost();
if (host == null) {
host = "localhost";
}
int port = connector.getLocalPort();
serverUri = new URI(String.format("ws://%s:%d/", host, port));
serverUri = WSURI.toWebsocket(server.getURI()).resolve("/");
}
@AfterClass
public static void stopServer() {
try {
server.stop();
} catch (Exception e) {
e.printStackTrace(System.err);
}
public static void stopServer() throws Exception
{
server.stop();
}
@Before
public void initClient() {
public void initClient()
{
container = ContainerProvider.getWebSocketContainer();
}
@After
public void stopClient() throws Exception
{
((LifeCycle)container).stop();
((LifeCycle) container).stop();
}
/**
* Method tests receiving of text messages at once.
*
* @throws Exception on exception occur
*/
@Test
@Ignore("flappy test")
public void testWholeTextMessage() throws Exception {
final TestEndpoint echoer = new TestEndpoint(new WholeStringCaptureHandler());
Assert.assertThat(echoer, instanceOf(javax.websocket.Endpoint.class));
// Issue connect using instance of class that extends Endpoint
final Session session = container.connectToServer(echoer, serverUri);
session.getBasicRemote().sendText("");
session.getBasicRemote().sendText("Echo");
session.getBasicRemote().sendText(VERY_LONG_STRING);
session.getBasicRemote().sendText("Echo");
String msg = echoer.handler.messageQueue.poll(1, TimeUnit.SECONDS);
msg = echoer.handler.messageQueue.poll(1, TimeUnit.SECONDS);
public void testWholeTextMessage() throws Exception
{
final TestEndpoint clientEndpoint = new TestEndpoint(new WholeStringCaptureHandler());
assertThat(clientEndpoint, instanceOf(javax.websocket.Endpoint.class));
ClientEndpointConfig clientConfig = ClientEndpointConfig.Builder.create()
.preferredSubprotocols(Collections.singletonList("echo"))
.build();
byte raw[] = new byte[1024 * 1024];
Arrays.fill(raw, (byte) 'x');
String veryLongString = new String(raw, UTF_8);
final Session session = container.connectToServer(clientEndpoint, clientConfig, serverUri);
try
{
session.getBasicRemote().sendText("");
session.getBasicRemote().sendText("Echo");
session.getBasicRemote().sendText(veryLongString);
session.getBasicRemote().sendText("Another Echo");
String msg;
msg = clientEndpoint.handler.messageQueue.poll(5, TimeUnit.SECONDS);
assertThat("Received Message", msg, is(""));
msg = clientEndpoint.handler.messageQueue.poll(5, TimeUnit.SECONDS);
assertThat("Received Message", msg, is("Echo"));
msg = clientEndpoint.handler.messageQueue.poll(5, TimeUnit.SECONDS);
assertThat("Received Message", msg, is(veryLongString));
msg = clientEndpoint.handler.messageQueue.poll(5, TimeUnit.SECONDS);
assertThat("Received Message", msg, is("Another Echo"));
}
finally
{
session.close();
}
}
/**
@ -140,15 +271,33 @@ public class MessageReceivingTest {
* @throws Exception on exception occur
*/
@Test
public void testPartialTextMessage() throws Exception {
final TestEndpoint echoer = new TestEndpoint(new PartialStringCaptureHandler());
Assert.assertThat(echoer, instanceOf(javax.websocket.Endpoint.class));
// Issue connect using instance of class that extends Endpoint
final Session session = container.connectToServer(echoer, serverUri);
session.getBasicRemote().sendText("");
session.getBasicRemote().sendText("Echo");
String msg = echoer.handler.messageQueue.poll(1, TimeUnit.SECONDS);
msg = echoer.handler.messageQueue.poll(1, TimeUnit.SECONDS);
public void testPartialTextMessage() throws Exception
{
final TestEndpoint clientEndpoint = new TestEndpoint(new PartialStringCaptureHandler());
assertThat(clientEndpoint, instanceOf(javax.websocket.Endpoint.class));
ClientEndpointConfig clientConfig = ClientEndpointConfig.Builder.create()
.preferredSubprotocols(Collections.singletonList("partial-text"))
.build();
final Session session = container.connectToServer(clientEndpoint, clientConfig, serverUri);
try
{
session.getBasicRemote().sendText("");
session.getBasicRemote().sendText("Echo");
session.getBasicRemote().sendText("I can live for two months on a good compliment.");
String msg;
msg = clientEndpoint.handler.messageQueue.poll(5, TimeUnit.SECONDS);
assertThat("Received Message", msg, is(""));
msg = clientEndpoint.handler.messageQueue.poll(5, TimeUnit.SECONDS);
assertThat("Received Message", msg, is("Echo"));
msg = clientEndpoint.handler.messageQueue.poll(5, TimeUnit.SECONDS);
assertThat("Received Message", msg, is("I can live for two months on a good compliment."));
}
finally
{
session.close();
}
}
/**
@ -157,15 +306,31 @@ public class MessageReceivingTest {
* @throws Exception on exception occur
*/
@Test
public void testWholeBinaryMessage() throws Exception {
final TestEndpoint echoer = new TestEndpoint(new WholeByteBufferCaptureHandler());
Assert.assertThat(echoer, instanceOf(javax.websocket.Endpoint.class));
// Issue connect using instance of class that extends Endpoint
final Session session = container.connectToServer(echoer, serverUri);
sendBinary(session, "");
sendBinary(session, "Echo");
String msg = echoer.handler.messageQueue.poll(1, TimeUnit.SECONDS);
msg = echoer.handler.messageQueue.poll(1, TimeUnit.SECONDS);
public void testWholeBinaryMessage() throws Exception
{
final TestEndpoint clientEndpoint = new TestEndpoint(new WholeByteBufferCaptureHandler());
assertThat(clientEndpoint, instanceOf(javax.websocket.Endpoint.class));
ClientEndpointConfig clientConfig = ClientEndpointConfig.Builder.create()
.preferredSubprotocols(Collections.singletonList("echo"))
.build();
final Session session = container.connectToServer(clientEndpoint, clientConfig, serverUri);
try
{
session.getBasicRemote().sendBinary(BufferUtil.toBuffer("", UTF_8));
session.getBasicRemote().sendBinary(BufferUtil.toBuffer("Echo", UTF_8));
String msg;
msg = clientEndpoint.handler.messageQueue.poll(5, TimeUnit.SECONDS);
assertThat("Received Message", msg, is(""));
msg = clientEndpoint.handler.messageQueue.poll(5, TimeUnit.SECONDS);
assertThat("Received Message", msg, is("Echo"));
}
finally
{
session.close();
}
}
/**
@ -174,31 +339,60 @@ public class MessageReceivingTest {
* @throws Exception on exception occur
*/
@Test
public void testPartialBinaryMessage() throws Exception {
final TestEndpoint echoer = new TestEndpoint(new PartialByteBufferCaptureHandler());
Assert.assertThat(echoer, instanceOf(javax.websocket.Endpoint.class));
// Issue connect using instance of class that extends Endpoint
final Session session = container.connectToServer(echoer, serverUri);
sendBinary(session, "");
sendBinary(session, "Echo");
String msg = echoer.handler.messageQueue.poll(1, TimeUnit.SECONDS);
msg = echoer.handler.messageQueue.poll(1, TimeUnit.SECONDS);
public void testPartialBinaryMessage() throws Exception
{
final TestEndpoint clientEndpoint = new TestEndpoint(new PartialByteBufferCaptureHandler());
assertThat(clientEndpoint, instanceOf(javax.websocket.Endpoint.class));
ClientEndpointConfig clientConfig = ClientEndpointConfig.Builder.create()
.preferredSubprotocols(Collections.singletonList("partial-binary"))
.build();
final Session session = container.connectToServer(clientEndpoint, clientConfig, serverUri);
try
{
session.getBasicRemote().sendBinary(BufferUtil.toBuffer("", UTF_8));
session.getBasicRemote().sendBinary(BufferUtil.toBuffer("Echo", UTF_8));
byte bigBuf[] = new byte[1024 * 1024];
Arrays.fill(bigBuf, (byte) 'x');
// allocate fresh ByteBuffer and copy array contents, not wrap
// as the send will modify the wrapped array (for client masking purposes)
ByteBuffer bigByteBuffer = ByteBuffer.allocate(bigBuf.length);
bigByteBuffer.put(bigBuf);
bigByteBuffer.flip();
session.getBasicRemote().sendBinary(bigByteBuffer);
session.getBasicRemote().sendBinary(BufferUtil.toBuffer("Another Echo", UTF_8));
String msg;
msg = clientEndpoint.handler.messageQueue.poll(5, TimeUnit.SECONDS);
assertThat("Echo'd Message", msg, is(""));
msg = clientEndpoint.handler.messageQueue.poll(5, TimeUnit.SECONDS);
assertThat("Echo'd Message", msg, is("Echo"));
msg = clientEndpoint.handler.messageQueue.poll(5, TimeUnit.SECONDS);
assertThat("Echo'd Message.length", msg.length(), is(bigBuf.length));
msg = clientEndpoint.handler.messageQueue.poll(5, TimeUnit.SECONDS);
assertThat("Echo'd Message", msg, is("Another Echo"));
}
finally
{
session.close();
}
}
private static void sendBinary(Session session, String message) throws IOException {
final ByteBuffer bb = ByteBuffer.wrap(message.getBytes());
session.getBasicRemote().sendBinary(bb);
}
private static class TestEndpoint extends Endpoint {
public static class TestEndpoint extends Endpoint
{
public final AbstractHandler handler;
public TestEndpoint(AbstractHandler handler) {
public TestEndpoint(AbstractHandler handler)
{
this.handler = handler;
}
@Override
public void onOpen(Session session, EndpointConfig config) {
public void onOpen(Session session, EndpointConfig config)
{
session.setMaxTextMessageBufferSize(2 * 1024 * 1024);
session.setMaxBinaryMessageBufferSize(2 * 1024 * 1024);
session.addMessageHandler(handler);
}
}
@ -206,7 +400,8 @@ public class MessageReceivingTest {
/**
* Abstract message handler implementation, used for tests.
*/
private static abstract class AbstractHandler implements MessageHandler {
private static abstract class AbstractHandler implements MessageHandler
{
public final BlockingQueue<String> messageQueue = new LinkedBlockingDeque<>();
}
@ -214,7 +409,8 @@ public class MessageReceivingTest {
* Partial message handler for receiving binary messages.
*/
public static class PartialByteBufferCaptureHandler extends AbstractHandler implements
MessageHandler.Partial<ByteBuffer> {
MessageHandler.Partial<ByteBuffer>
{
/**
* Parts of the current message. This list is appended with every non-last part and is
* cleared after last part of a message has been received.
@ -222,32 +418,44 @@ public class MessageReceivingTest {
private final List<ByteBuffer> currentMessage = new ArrayList<>();
@Override
public void onMessage(ByteBuffer messagePart, boolean last) {
final ByteBuffer bufferCopy = ByteBuffer.allocate(messagePart.capacity());
bufferCopy.put(messagePart);
public void onMessage(ByteBuffer messagePart, boolean last)
{
if(LOG.isDebugEnabled())
{
LOG.debug("PartialByteBufferCaptureHandler.onMessage({}, {})", BufferUtil.toDetailString(messagePart), last);
}
final ByteBuffer bufferCopy = DataUtils.copyOf(messagePart);
currentMessage.add(bufferCopy);
if (last) {
if (last)
{
int totalSize = 0;
for (ByteBuffer bb : currentMessage) {
for (ByteBuffer bb : currentMessage)
{
totalSize += bb.capacity();
}
final ByteBuffer result = ByteBuffer.allocate(totalSize);
for (ByteBuffer bb : currentMessage) {
for (ByteBuffer bb : currentMessage)
{
result.put(bb);
}
final String stringResult = new String(result.array());
BufferUtil.flipToFlush(result, 0);
final String stringResult = BufferUtil.toUTF8String(result);
messageQueue.offer(stringResult);
currentMessage.clear();
}
}
}
/**
* Whole message handler for receiving binary messages.
*/
public static class WholeByteBufferCaptureHandler extends AbstractHandler implements
MessageHandler.Whole<ByteBuffer> {
MessageHandler.Whole<ByteBuffer>
{
@Override
public void onMessage(ByteBuffer message) {
public void onMessage(ByteBuffer message)
{
final String stringResult = new String(message.array());
messageQueue.offer(stringResult);
}
@ -257,7 +465,8 @@ public class MessageReceivingTest {
* Partial message handler for receiving text messages.
*/
public static class PartialStringCaptureHandler extends AbstractHandler implements
MessageHandler.Partial<String> {
MessageHandler.Partial<String>
{
/**
* Parts of the current message. This list is appended with every non-last part and is
* cleared after last part of a message has been received.
@ -265,22 +474,26 @@ public class MessageReceivingTest {
private StringBuilder sb = new StringBuilder();
@Override
public void onMessage(String messagePart, boolean last) {
public void onMessage(String messagePart, boolean last)
{
sb.append(messagePart);
if (last) {
if (last)
{
messageQueue.add(sb.toString());
sb = new StringBuilder();
}
}
}
/**
* Whole message handler for receiving text messages.
*/
public static class WholeStringCaptureHandler extends AbstractHandler implements
MessageHandler.Whole<String> {
MessageHandler.Whole<String>
{
@Override
public void onMessage(String message) {
public void onMessage(String message)
{
messageQueue.add(message);
}
}

View File

@ -18,8 +18,9 @@
#
#
# org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.Slf4jLog
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
org.eclipse.jetty.LEVEL=WARN
# org.eclipse.jetty.LEVEL=INFO
# org.eclipse.jetty.util.log.stderr.LONG=true
# org.eclipse.jetty.server.AbstractConnector.LEVEL=DEBUG
@ -33,7 +34,7 @@ org.eclipse.jetty.LEVEL=WARN
# org.eclipse.jetty.websocket.jsr356.messages.LEVEL=DEBUG
# org.eclipse.jetty.websocket.tests.LEVEL=DEBUG
# org.eclipse.jetty.websocket.tests.client.LEVEL=DEBUG
# org.eclipse.jetty.websocket.tests.client.jsr356.LEVEL=DEBUG
org.eclipse.jetty.websocket.tests.client.jsr356.LEVEL=DEBUG
# org.eclipse.jetty.websocket.tests.server.LEVEL=DEBUG
# org.eclipse.jetty.websocket.tests.server.jsr356.LEVEL=DEBUG
# org.eclipse.jetty.websocket.common.LEVEL=DEBUG

View File

@ -0,0 +1,32 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false">
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<Pattern>
%d{HH:mm:ss.SSS} [%-30thread] %-5level %logger{45} - %msg%n
</Pattern>
</encoder>
</appender>
<appender name="FILE" class="ch.qos.logback.core.FileAppender">
<file>target/tests.log</file>
<append>false</append>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<Pattern>
%d{HH:mm:ss.SSS} [%-30thread] %-5level %logger{45} - %msg%n
</Pattern>
</encoder>
</appender>
<!--<logger name="org.eclipse.jetty.websocket.tests.client.jsr356" level="debug" />-->
<!--<logger name="org.eclipse.jetty.websocket" level="debug" additivity="false">
<appender-ref ref="FILE" />
</logger>-->
<root level="warn">
<appender-ref ref="STDOUT" />
</root>
</configuration>