Issue #207 - Identify non-echo client endpoint options for testing

This commit is contained in:
Joakim Erdfelt 2017-05-11 10:59:16 -07:00
parent e84e875e14
commit 6cfcbeffad
13 changed files with 344 additions and 209 deletions

View File

@ -101,6 +101,10 @@ public abstract class AbstractTrackingEndpoint<T>
protected void onWSClose(int statusCode, String reason) protected void onWSClose(int statusCode, String reason)
{ {
if(LOG.isDebugEnabled())
{
LOG.debug("onWSClose({}, {})", statusCode, reason);
}
CloseInfo close = new CloseInfo(statusCode, reason); CloseInfo close = new CloseInfo(statusCode, reason);
boolean closeTracked = closeInfo.compareAndSet(null, close); boolean closeTracked = closeInfo.compareAndSet(null, close);
this.closeLatch.countDown(); this.closeLatch.countDown();
@ -109,6 +113,10 @@ public abstract class AbstractTrackingEndpoint<T>
protected void onWSError(Throwable cause) protected void onWSError(Throwable cause)
{ {
if(LOG.isDebugEnabled())
{
LOG.debug("onWSError()", cause);
}
assertThat("Error must have value", cause, notNullValue()); assertThat("Error must have value", cause, notNullValue());
if (error.compareAndSet(null, cause) == false) if (error.compareAndSet(null, cause) == false)
{ {

View File

@ -18,38 +18,135 @@
package org.eclipse.jetty.websocket.tests.jsr356; package org.eclipse.jetty.websocket.tests.jsr356;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.websocket.CloseReason; import javax.websocket.CloseReason;
import javax.websocket.Endpoint;
import javax.websocket.EndpointConfig; import javax.websocket.EndpointConfig;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnOpen;
import javax.websocket.Session; import javax.websocket.Session;
import org.eclipse.jetty.websocket.tests.AbstractTrackingEndpoint; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.tests.DataUtils;
import org.eclipse.jetty.websocket.tests.Defaults;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
@SuppressWarnings("unused") public abstract class AbstractJsrTrackingEndpoint extends Endpoint
public abstract class AbstractJsrTrackingEndpoint extends AbstractTrackingEndpoint<Session>
{ {
public final Logger LOG;
public Session session;
public CountDownLatch openLatch = new CountDownLatch(1);
public CountDownLatch closeLatch = new CountDownLatch(1);
public AtomicReference<CloseInfo> closeInfo = new AtomicReference<>();
public AtomicReference<Throwable> error = new AtomicReference<>();
public BlockingQueue<String> messageQueue = new LinkedBlockingDeque<>();
public BlockingQueue<ByteBuffer> bufferQueue = new LinkedBlockingDeque<>();
public AbstractJsrTrackingEndpoint()
{
this("JsrTrackingEndpoint");
}
public AbstractJsrTrackingEndpoint(String id) public AbstractJsrTrackingEndpoint(String id)
{ {
super(id); LOG = Log.getLogger(this.getClass().getName() + "." + id);
LOG.debug("init");
} }
@OnOpen public void assertCloseInfo(String prefix, int expectedCloseStatusCode, Matcher<? super String> reasonMatcher) throws InterruptedException
{
CloseInfo close = closeInfo.get();
assertThat(prefix + " close info", close, Matchers.notNullValue());
assertThat(prefix + " received close code", close.getStatusCode(), Matchers.is(expectedCloseStatusCode));
assertThat(prefix + " received close reason", close.getReason(), reasonMatcher);
}
public void assertErrorEvent(String prefix, Matcher<Throwable> throwableMatcher, Matcher<? super String> messageMatcher)
{
assertThat(prefix + " error event type", error.get(), throwableMatcher);
assertThat(prefix + " error event message", error.get().getMessage(), messageMatcher);
}
public void assertNoErrorEvents(String prefix)
{
assertTrue(prefix + " error event should not have occurred", error.get() == null);
}
public void assertNotClosed(String prefix)
{
assertTrue(prefix + " close event should not have occurred", closeLatch.getCount() > 0);
}
public void assertNotOpened(String prefix)
{
assertTrue(prefix + " open event should not have occurred", openLatch.getCount() > 0);
}
public void awaitCloseEvent(String prefix) throws InterruptedException
{
assertTrue(prefix + " onClose event", closeLatch.await(Defaults.CLOSE_EVENT_TIMEOUT_MS, TimeUnit.MILLISECONDS));
}
public void awaitOpenEvent(String prefix) throws InterruptedException
{
assertTrue(prefix + " onOpen event", openLatch.await(Defaults.OPEN_EVENT_TIMEOUT_MS, TimeUnit.MILLISECONDS));
}
@Override
public void onOpen(Session session, EndpointConfig config) public void onOpen(Session session, EndpointConfig config)
{ {
super.onWSOpen(session); this.session = session;
if (LOG.isDebugEnabled())
{
LOG.debug("onOpen()");
}
this.openLatch.countDown();
} }
@OnClose protected void onWsText(String message)
public void onClose(CloseReason closeReason)
{ {
super.onWSClose(closeReason.getCloseCode().getCode(), closeReason.getReasonPhrase()); messageQueue.offer(message);
} }
@OnError protected void onWsBinary(ByteBuffer buffer)
public void onError(Throwable cause)
{ {
super.onWSError(cause); ByteBuffer copy = DataUtils.copyOf(buffer);
bufferQueue.offer(copy);
}
@Override
public void onClose(Session session, CloseReason closeReason)
{
CloseInfo close = new CloseInfo(closeReason.getCloseCode().getCode(), closeReason.getReasonPhrase());
boolean closeTracked = closeInfo.compareAndSet(null, close);
this.closeLatch.countDown();
assertTrue("Close only happened once", closeTracked);
}
@Override
public void onError(Session session, Throwable cause)
{
assertThat("Error must have value", cause, notNullValue());
if (error.compareAndSet(null, cause) == false)
{
LOG.warn("onError should only happen once - Original Cause", error.get());
LOG.warn("onError should only happen once - Extra/Excess Cause", cause);
fail("onError should only happen once!");
}
} }
} }

View File

@ -0,0 +1,55 @@
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.tests.jsr356;
import javax.websocket.CloseReason;
import javax.websocket.EndpointConfig;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import org.eclipse.jetty.websocket.tests.AbstractTrackingEndpoint;
@SuppressWarnings("unused")
public abstract class AbstractJsrTrackingSocket extends AbstractTrackingEndpoint<Session>
{
public AbstractJsrTrackingSocket(String id)
{
super(id);
}
@OnOpen
public void onOpen(Session session, EndpointConfig config)
{
super.onWSOpen(session);
}
@OnClose
public void onClose(CloseReason closeReason)
{
super.onWSClose(closeReason.getCloseCode().getCode(), closeReason.getReasonPhrase());
}
@OnError
public void onError(Throwable cause)
{
super.onWSError(cause);
}
}

View File

@ -18,105 +18,32 @@
package org.eclipse.jetty.websocket.tests.jsr356; package org.eclipse.jetty.websocket.tests.jsr356;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.websocket.CloseReason;
import javax.websocket.Endpoint;
import javax.websocket.EndpointConfig; import javax.websocket.EndpointConfig;
import javax.websocket.Session; import javax.websocket.Session;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.websocket.tests.DataUtils;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.tests.Defaults;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
public class JsrTrackingEndpoint extends Endpoint public class JsrTrackingEndpoint extends AbstractJsrTrackingEndpoint
{ {
public final Logger LOG;
public Session session;
public CountDownLatch openLatch = new CountDownLatch(1);
public CountDownLatch closeLatch = new CountDownLatch(1);
public AtomicReference<CloseInfo> closeInfo = new AtomicReference<>();
public AtomicReference<Throwable> error = new AtomicReference<>();
public BlockingQueue<String> messageQueue = new LinkedBlockingDeque<>();
public BlockingQueue<ByteBuffer> bufferQueue = new LinkedBlockingDeque<>();
public JsrTrackingEndpoint() public JsrTrackingEndpoint()
{ {
this("JsrTrackingEndpoint"); super("JsrTrackingEndpoint");
} }
public JsrTrackingEndpoint(String id) public JsrTrackingEndpoint(String id)
{ {
LOG = Log.getLogger(this.getClass().getName() + "." + id); super(id);
LOG.debug("init");
}
public void assertCloseInfo(String prefix, int expectedCloseStatusCode, Matcher<? super String> reasonMatcher) throws InterruptedException
{
CloseInfo close = closeInfo.get();
assertThat(prefix + " close info", close, Matchers.notNullValue());
assertThat(prefix + " received close code", close.getStatusCode(), Matchers.is(expectedCloseStatusCode));
assertThat(prefix + " received close reason", close.getReason(), reasonMatcher);
}
public void assertErrorEvent(String prefix, Matcher<Throwable> throwableMatcher, Matcher<? super String> messageMatcher)
{
assertThat(prefix + " error event type", error.get(), throwableMatcher);
assertThat(prefix + " error event message", error.get().getMessage(), messageMatcher);
}
public void assertNoErrorEvents(String prefix)
{
assertTrue(prefix + " error event should not have occurred", error.get() == null);
}
public void assertNotClosed(String prefix)
{
assertTrue(prefix + " close event should not have occurred", closeLatch.getCount() > 0);
}
public void assertNotOpened(String prefix)
{
assertTrue(prefix + " open event should not have occurred", openLatch.getCount() > 0);
}
public void awaitCloseEvent(String prefix) throws InterruptedException
{
assertTrue(prefix + " onClose event", closeLatch.await(Defaults.CLOSE_EVENT_TIMEOUT_MS, TimeUnit.MILLISECONDS));
}
public void awaitOpenEvent(String prefix) throws InterruptedException
{
assertTrue(prefix + " onOpen event", openLatch.await(Defaults.OPEN_EVENT_TIMEOUT_MS, TimeUnit.MILLISECONDS));
} }
@Override @Override
public void onOpen(Session session, EndpointConfig config) public void onOpen(Session session, EndpointConfig config)
{ {
this.session = session; super.onOpen(session, config);
if (LOG.isDebugEnabled())
{
LOG.debug("onOpen()");
}
this.openLatch.countDown();
// Chose to do this via a lambda MessageHandler to test javax.websocket 1.1 functionality
session.addMessageHandler(String.class, message -> session.addMessageHandler(String.class, message ->
{ {
messageQueue.offer(message); messageQueue.offer(message);
@ -130,11 +57,10 @@ public class JsrTrackingEndpoint extends Endpoint
} }
}); });
// Chose to do this via a lambda MessageHandler to test javax.websocket 1.1 functionality
session.addMessageHandler(ByteBuffer.class, buffer -> session.addMessageHandler(ByteBuffer.class, buffer ->
{ {
ByteBuffer copy = ByteBuffer.allocate(buffer.remaining()); ByteBuffer copy = DataUtils.copyOf(buffer);
BufferUtil.put(buffer, copy);
copy.flip();
bufferQueue.offer(copy); bufferQueue.offer(copy);
try try
{ {
@ -146,25 +72,4 @@ public class JsrTrackingEndpoint extends Endpoint
} }
}); });
} }
@Override
public void onClose(Session session, CloseReason closeReason)
{
CloseInfo close = new CloseInfo(closeReason.getCloseCode().getCode(), closeReason.getReasonPhrase());
boolean closeTracked = closeInfo.compareAndSet(null, close);
this.closeLatch.countDown();
assertTrue("Close only happened once", closeTracked);
}
@Override
public void onError(Session session, Throwable cause)
{
assertThat("Error must have value", cause, notNullValue());
if (error.compareAndSet(null, cause) == false)
{
LOG.warn("onError should only happen once - Original Cause", error.get());
LOG.warn("onError should only happen once - Extra/Excess Cause", cause);
fail("onError should only happen once!");
}
}
} }

View File

@ -42,7 +42,7 @@ import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest; import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.eclipse.jetty.websocket.servlet.WebSocketCreator; import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
import org.eclipse.jetty.websocket.tests.jsr356.AbstractJsrTrackingEndpoint; import org.eclipse.jetty.websocket.tests.jsr356.AbstractJsrTrackingSocket;
import org.eclipse.jetty.websocket.tests.UntrustedWSEndpoint; import org.eclipse.jetty.websocket.tests.UntrustedWSEndpoint;
import org.eclipse.jetty.websocket.tests.UntrustedWSServer; import org.eclipse.jetty.websocket.tests.UntrustedWSServer;
import org.junit.After; import org.junit.After;
@ -88,7 +88,7 @@ public class DecoderReaderManySmallTest
} }
@ClientEndpoint(decoders = EventIdDecoder.class, subprotocols = "eventids") @ClientEndpoint(decoders = EventIdDecoder.class, subprotocols = "eventids")
public static class EventIdSocket extends AbstractJsrTrackingEndpoint public static class EventIdSocket extends AbstractJsrTrackingSocket
{ {
public BlockingQueue<EventId> messageQueue = new LinkedBlockingDeque<>(); public BlockingQueue<EventId> messageQueue = new LinkedBlockingDeque<>();

View File

@ -22,111 +22,88 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import java.net.URI;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.websocket.ContainerProvider; import javax.websocket.ContainerProvider;
import javax.websocket.EndpointConfig;
import javax.websocket.MessageHandler;
import javax.websocket.Session; import javax.websocket.Session;
import javax.websocket.WebSocketContainer; import javax.websocket.WebSocketContainer;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.websocket.jsr356.JsrSession;
import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.websocket.tests.SimpleServletServer;
import org.eclipse.jetty.server.handler.ContextHandler; import org.eclipse.jetty.websocket.tests.jsr356.AbstractJsrTrackingEndpoint;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.websocket.tests.servlets.EchoServlet;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.tests.jsr356.JsrTrackingEndpoint;
import org.eclipse.jetty.websocket.tests.jsr356.endpoints.EchoStringEndpoint;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
public class EndpointEchoTest public class EndpointEchoTest
{ {
private static final Logger LOG = Log.getLogger(EndpointEchoTest.class); private static SimpleServletServer server;
private static Server server;
private static EchoHandler handler;
private static URI serverUri;
@BeforeClass @BeforeClass
public static void startServer() throws Exception public static void startServer() throws Exception
{ {
server = new Server(); server = new SimpleServletServer(new EchoServlet());
ServerConnector connector = new ServerConnector(server);
server.addConnector(connector);
handler = new EchoHandler();
ContextHandler context = new ContextHandler();
context.setContextPath("/");
context.setHandler(handler);
server.setHandler(context);
// Start Server
server.start(); server.start();
String host = connector.getHost();
if (host == null)
{
host = "localhost";
}
int port = connector.getLocalPort();
serverUri = new URI(String.format("ws://%s:%d/",host,port));
} }
@AfterClass @AfterClass
public static void stopServer() public static void stopServer() throws Exception
{
try
{ {
server.stop(); server.stop();
} }
catch (Exception e)
public static class ClientEndpoint extends AbstractJsrTrackingEndpoint implements MessageHandler.Whole<String>
{ {
e.printStackTrace(System.err); @Override
public void onOpen(Session session, EndpointConfig config)
{
super.onOpen(session, config);
session.addMessageHandler(this);
}
@Override
public void onMessage(String message)
{
super.onWsText(message);
} }
} }
@Test @Test
public void testBasicEchoInstance() throws Exception public void testEchoInstance() throws Exception
{ {
WebSocketContainer container = ContainerProvider.getWebSocketContainer(); WebSocketContainer container = ContainerProvider.getWebSocketContainer();
JsrTrackingEndpoint echoer = new JsrTrackingEndpoint(); ClientEndpoint clientEndpoint = new ClientEndpoint();
assertThat(echoer,instanceOf(javax.websocket.Endpoint.class)); assertThat(clientEndpoint, instanceOf(javax.websocket.Endpoint.class));
// Issue connect using instance of class that extends Endpoint // Issue connect using instance of class that extends Endpoint
Session session = container.connectToServer(echoer,serverUri); Session session = container.connectToServer(clientEndpoint, server.getServerUri());
session.getBasicRemote().sendText("Echo"); session.getBasicRemote().sendText("Echo");
String resp = echoer.messageQueue.poll(1,TimeUnit.SECONDS);
String resp = clientEndpoint.messageQueue.poll(1, TimeUnit.SECONDS);
assertThat("Response echo", resp, is("Echo")); assertThat("Response echo", resp, is("Echo"));
session.close();
clientEndpoint.awaitCloseEvent("Client");
} }
@Test @Test
public void testBasicEchoClassref() throws Exception public void testEchoClassRef() throws Exception
{ {
WebSocketContainer container = ContainerProvider.getWebSocketContainer(); WebSocketContainer container = ContainerProvider.getWebSocketContainer();
// Issue connect using class reference (class extends Endpoint) // Issue connect using class reference (class extends Endpoint)
Session session = container.connectToServer(JsrTrackingEndpoint.class,serverUri); Session session = container.connectToServer(ClientEndpoint.class, server.getServerUri());
session.getBasicRemote().sendText("Echo"); session.getBasicRemote().sendText("Echo");
}
@Test JsrSession jsrSession = (JsrSession) session;
public void testAbstractEchoInstance() throws Exception Object obj = jsrSession.getEndpoint();
{
WebSocketContainer container = ContainerProvider.getWebSocketContainer(); assertThat("session.endpoint", obj, instanceOf(ClientEndpoint.class));
EchoStringEndpoint echoer = new EchoStringEndpoint(); ClientEndpoint endpoint = (ClientEndpoint) obj;
assertThat(echoer,instanceOf(javax.websocket.Endpoint.class)); String resp = endpoint.messageQueue.poll(1, TimeUnit.SECONDS);
// Issue connect using instance of class that extends abstract that extends Endpoint
Session session = container.connectToServer(echoer,serverUri);
session.getBasicRemote().sendText("Echo");
String resp = echoer.messageQueue.poll(1,TimeUnit.SECONDS);
assertThat("Response echo", resp, is("Echo")); assertThat("Response echo", resp, is("Echo"));
}
@Test session.close();
public void testAbstractEchoClassref() throws Exception endpoint.awaitCloseEvent("Client");
{
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
// Issue connect using class reference (class that extends abstract that extends Endpoint)
Session session = container.connectToServer(EchoStringEndpoint.class,serverUri);
session.getBasicRemote().sendText("Echo");
} }
} }

View File

@ -0,0 +1,65 @@
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.tests.client.jsr356;
import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import javax.websocket.ClientEndpoint;
import javax.websocket.OnMessage;
import javax.websocket.PongMessage;
import org.eclipse.jetty.websocket.tests.DataUtils;
import org.eclipse.jetty.websocket.tests.jsr356.AbstractJsrTrackingSocket;
@ClientEndpoint
public class JsrClientEchoTrackingSocket extends AbstractJsrTrackingSocket
{
public BlockingQueue<String> messageQueue = new LinkedBlockingDeque<>();
public BlockingQueue<ByteBuffer> pongQueue = new LinkedBlockingDeque<>();
public BlockingQueue<ByteBuffer> bufferQueue = new LinkedBlockingDeque<>();
public JsrClientEchoTrackingSocket()
{
super("@ClientEndpoint");
}
@OnMessage(maxMessageSize = 50 * 1024 * 1024)
public String onText(String msg)
{
messageQueue.offer(msg);
return msg;
}
@OnMessage(maxMessageSize = 50 * 1024 * 1024)
public ByteBuffer onBinary(ByteBuffer buffer)
{
ByteBuffer copy = DataUtils.copyOf(buffer);
bufferQueue.offer(copy);
return buffer;
}
@OnMessage
public void onPong(PongMessage pong)
{
ByteBuffer copy = DataUtils.copyOf(pong.getApplicationData());
pongQueue.offer(copy);
}
}

View File

@ -27,10 +27,10 @@ import javax.websocket.OnMessage;
import javax.websocket.PongMessage; import javax.websocket.PongMessage;
import org.eclipse.jetty.websocket.tests.DataUtils; import org.eclipse.jetty.websocket.tests.DataUtils;
import org.eclipse.jetty.websocket.tests.jsr356.AbstractJsrTrackingEndpoint; import org.eclipse.jetty.websocket.tests.jsr356.AbstractJsrTrackingSocket;
@ClientEndpoint @ClientEndpoint
public class JsrClientTrackingSocket extends AbstractJsrTrackingEndpoint public class JsrClientTrackingSocket extends AbstractJsrTrackingSocket
{ {
public BlockingQueue<String> messageQueue = new LinkedBlockingDeque<>(); public BlockingQueue<String> messageQueue = new LinkedBlockingDeque<>();
public BlockingQueue<ByteBuffer> pongQueue = new LinkedBlockingDeque<>(); public BlockingQueue<ByteBuffer> pongQueue = new LinkedBlockingDeque<>();
@ -42,18 +42,16 @@ public class JsrClientTrackingSocket extends AbstractJsrTrackingEndpoint
} }
@OnMessage(maxMessageSize = 50 * 1024 * 1024) @OnMessage(maxMessageSize = 50 * 1024 * 1024)
public String onText(String msg) public void onText(String msg)
{ {
messageQueue.offer(msg); messageQueue.offer(msg);
return msg;
} }
@OnMessage(maxMessageSize = 50 * 1024 * 1024) @OnMessage(maxMessageSize = 50 * 1024 * 1024)
public ByteBuffer onBinary(ByteBuffer buffer) public void onBinary(ByteBuffer buffer)
{ {
ByteBuffer copy = DataUtils.copyOf(buffer); ByteBuffer copy = DataUtils.copyOf(buffer);
bufferQueue.offer(copy); bufferQueue.offer(copy);
return buffer;
} }
@OnMessage @OnMessage

View File

@ -37,7 +37,7 @@ import javax.websocket.Session;
import javax.websocket.WebSocketContainer; import javax.websocket.WebSocketContainer;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils; import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.websocket.tests.jsr356.AbstractJsrTrackingEndpoint; import org.eclipse.jetty.websocket.tests.jsr356.AbstractJsrTrackingSocket;
import org.eclipse.jetty.websocket.tests.UntrustedWSServer; import org.eclipse.jetty.websocket.tests.UntrustedWSServer;
import org.eclipse.jetty.websocket.tests.jsr356.coders.Quotes; import org.eclipse.jetty.websocket.tests.jsr356.coders.Quotes;
import org.eclipse.jetty.websocket.tests.jsr356.coders.QuotesEncoder; import org.eclipse.jetty.websocket.tests.jsr356.coders.QuotesEncoder;
@ -51,7 +51,7 @@ import org.junit.rules.TestName;
public class QuotesEncoderTest public class QuotesEncoderTest
{ {
@ClientEndpoint(encoders = QuotesEncoder.class, subprotocols = "echo") @ClientEndpoint(encoders = QuotesEncoder.class, subprotocols = "echo")
public static class QuotesSocket extends AbstractJsrTrackingEndpoint public static class QuotesSocket extends AbstractJsrTrackingSocket
{ {
public BlockingQueue<String> messageQueue = new LinkedBlockingDeque<>(); public BlockingQueue<String> messageQueue = new LinkedBlockingDeque<>();

View File

@ -24,12 +24,12 @@ import java.util.concurrent.LinkedBlockingDeque;
import javax.websocket.ClientEndpoint; import javax.websocket.ClientEndpoint;
import javax.websocket.OnMessage; import javax.websocket.OnMessage;
import org.eclipse.jetty.websocket.tests.jsr356.AbstractJsrTrackingEndpoint; import org.eclipse.jetty.websocket.tests.jsr356.AbstractJsrTrackingSocket;
import org.eclipse.jetty.websocket.tests.jsr356.coders.Quotes; import org.eclipse.jetty.websocket.tests.jsr356.coders.Quotes;
import org.eclipse.jetty.websocket.tests.jsr356.coders.QuotesDecoder; import org.eclipse.jetty.websocket.tests.jsr356.coders.QuotesDecoder;
@ClientEndpoint(decoders = QuotesDecoder.class, subprotocols = "quotes") @ClientEndpoint(decoders = QuotesDecoder.class, subprotocols = "quotes")
public class QuotesSocket extends AbstractJsrTrackingEndpoint public class QuotesSocket extends AbstractJsrTrackingSocket
{ {
public BlockingQueue<Quotes> messageQueue = new LinkedBlockingDeque<>(); public BlockingQueue<Quotes> messageQueue = new LinkedBlockingDeque<>();

View File

@ -18,6 +18,13 @@
package org.eclipse.jetty.websocket.tests.jsr356.endpoints; package org.eclipse.jetty.websocket.tests.jsr356.endpoints;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.websocket.CloseReason; import javax.websocket.CloseReason;
import javax.websocket.Endpoint; import javax.websocket.Endpoint;
import javax.websocket.EndpointConfig; import javax.websocket.EndpointConfig;
@ -26,6 +33,10 @@ import javax.websocket.Session;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.tests.Defaults;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
/** /**
* Base Abstract Class. * Base Abstract Class.
@ -33,14 +44,29 @@ import org.eclipse.jetty.util.log.Logger;
public abstract class AbstractStringEndpoint extends Endpoint implements MessageHandler.Whole<String> public abstract class AbstractStringEndpoint extends Endpoint implements MessageHandler.Whole<String>
{ {
private static final Logger LOG = Log.getLogger(AbstractStringEndpoint.class); private static final Logger LOG = Log.getLogger(AbstractStringEndpoint.class);
public CountDownLatch closeLatch = new CountDownLatch(1);
public AtomicReference<CloseInfo> closeInfo = new AtomicReference<>();
protected Session session; protected Session session;
protected EndpointConfig config; protected EndpointConfig config;
public void assertCloseInfo(String prefix, int expectedCloseStatusCode, Matcher<? super String> reasonMatcher) throws InterruptedException
{
CloseInfo close = closeInfo.get();
assertThat(prefix + " close info", close, Matchers.notNullValue());
assertThat(prefix + " received close code", close.getStatusCode(), Matchers.is(expectedCloseStatusCode));
assertThat(prefix + " received close reason", close.getReason(), reasonMatcher);
}
public void awaitCloseEvent(String prefix) throws InterruptedException
{
assertTrue(prefix + " onClose event", closeLatch.await(Defaults.CLOSE_EVENT_TIMEOUT_MS, TimeUnit.MILLISECONDS));
}
@Override @Override
public void onOpen(Session session, EndpointConfig config) public void onOpen(Session session, EndpointConfig config)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("onOpen({}, {})",session,config); LOG.debug("onOpen({}, {})", session, config);
session.addMessageHandler(this); session.addMessageHandler(this);
this.session = session; this.session = session;
this.config = config; this.config = config;
@ -49,12 +75,16 @@ public abstract class AbstractStringEndpoint extends Endpoint implements Message
public void onClose(Session session, CloseReason closeReason) public void onClose(Session session, CloseReason closeReason)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("onClose({}, {})",session,closeReason); LOG.debug("onClose({}, {})", session, closeReason);
this.session = null; this.session = null;
CloseInfo close = new CloseInfo(closeReason.getCloseCode().getCode(), closeReason.getReasonPhrase());
boolean closeTracked = closeInfo.compareAndSet(null, close);
this.closeLatch.countDown();
assertTrue("Close only happened once", closeTracked);
} }
public void onError(Session session, Throwable thr) public void onError(Session session, Throwable thr)
{ {
LOG.warn("onError()",thr); LOG.warn("onError()", thr);
} }
} }

View File

@ -20,10 +20,10 @@ package org.eclipse.jetty.websocket.tests.server.jsr356;
import javax.websocket.server.ServerEndpoint; import javax.websocket.server.ServerEndpoint;
import org.eclipse.jetty.websocket.tests.jsr356.AbstractJsrTrackingEndpoint; import org.eclipse.jetty.websocket.tests.jsr356.AbstractJsrTrackingSocket;
@ServerEndpoint("/echo") @ServerEndpoint("/echo")
public class JsrServerTrackingSocket extends AbstractJsrTrackingEndpoint public class JsrServerTrackingSocket extends AbstractJsrTrackingSocket
{ {
public JsrServerTrackingSocket() public JsrServerTrackingSocket()
{ {

View File

@ -53,7 +53,7 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.webapp.WebAppContext; import org.eclipse.jetty.webapp.WebAppContext;
import org.eclipse.jetty.websocket.tests.WSServer; import org.eclipse.jetty.websocket.tests.WSServer;
import org.eclipse.jetty.websocket.tests.client.jsr356.JsrClientTrackingSocket; import org.eclipse.jetty.websocket.tests.client.jsr356.JsrClientEchoTrackingSocket;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
@ -177,7 +177,7 @@ public class PingPongTest
private void assertEcho(String endpointPath, Consumer<Session> sendAction, String ... expectedMsgs) throws Exception private void assertEcho(String endpointPath, Consumer<Session> sendAction, String ... expectedMsgs) throws Exception
{ {
JsrClientTrackingSocket socket = new JsrClientTrackingSocket(); JsrClientEchoTrackingSocket socket = new JsrClientEchoTrackingSocket();
Session session = null; Session session = null;
URI toUri = serverUri.resolve(endpointPath); URI toUri = serverUri.resolve(endpointPath);