WebSocket core test cleanups

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2019-02-28 16:20:30 +11:00
parent d3c32b2446
commit 4ee399be1e
15 changed files with 271 additions and 709 deletions

View File

@ -1,140 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public abstract class AbstractTrackingEndpoint<T>
{
public final Logger LOG;
public T session;
public CountDownLatch openLatch = new CountDownLatch(1);
public CountDownLatch closeLatch = new CountDownLatch(1);
public CountDownLatch errorLatch = new CountDownLatch(1);
public AtomicReference<CloseStatus> closeInfo = new AtomicReference<>();
public AtomicReference<Throwable> closeStack = new AtomicReference<>();
public AtomicReference<Throwable> error = new AtomicReference<>();
public AbstractTrackingEndpoint(String id)
{
LOG = Log.getLogger(this.getClass().getName() + "." + id);
LOG.debug("init");
}
public void assertCloseInfo(String prefix, int expectedCloseStatusCode, Matcher<? super String> reasonMatcher) throws InterruptedException
{
CloseStatus close = closeInfo.get();
assertThat(prefix + " close info", close, Matchers.notNullValue());
assertThat(prefix + " received close code", close.getCode(), Matchers.is(expectedCloseStatusCode));
assertThat(prefix + " received close reason", close.getReason(), reasonMatcher);
}
public void assertErrorEvent(String prefix, Matcher<Throwable> throwableMatcher, Matcher<? super String> messageMatcher)
{
assertThat(prefix + " error event type", error.get(), throwableMatcher);
assertThat(prefix + " error event message", error.get().getMessage(), messageMatcher);
}
public void assertNoErrorEvents(String prefix)
{
assertTrue(error.get() == null, prefix + " error event should not have occurred");
}
public void assertNotClosed(String prefix)
{
assertTrue(closeLatch.getCount() > 0, prefix + " close event should not have occurred: got " + closeInfo.get());
}
public void assertNotOpened(String prefix)
{
assertTrue(openLatch.getCount() > 0, prefix + " onOpen event should not have occurred");
}
public void awaitCloseEvent(String prefix) throws InterruptedException
{
assertTrue(closeLatch.await(Timeouts.CLOSE_EVENT_MS, TimeUnit.MILLISECONDS), prefix + " onClose event should have occurred");
}
public void awaitOpenEvent(String prefix) throws InterruptedException
{
assertTrue(openLatch.await(Timeouts.OPEN_EVENT_MS, TimeUnit.MILLISECONDS), prefix + " onOpen event should have occurred");
}
public void awaitErrorEvent(String prefix) throws InterruptedException
{
assertTrue(errorLatch.await(Timeouts.CLOSE_EVENT_MS, TimeUnit.MILLISECONDS), prefix + " onError event should have occurred");
}
protected void onWSOpen(T session)
{
this.session = session;
if (LOG.isDebugEnabled())
{
LOG.debug("onWSOpen()");
}
this.openLatch.countDown();
}
protected void onWSClose(int statusCode, String reason)
{
if (LOG.isDebugEnabled())
{
LOG.debug("onWSClose({}, {})", statusCode, reason);
}
CloseStatus close = new CloseStatus(statusCode, reason);
if (closeInfo.compareAndSet(null, close) == false)
{
LOG.warn("onClose should only happen once - Original Close: " + closeInfo.get(), closeStack.get());
LOG.warn("onClose should only happen once - Extra/Excess Close: " + close, new Throwable("extra/excess"));
fail("onClose should only happen once!");
}
closeStack.compareAndSet(null, new Throwable("original"));
this.closeLatch.countDown();
}
protected void onWSError(Throwable cause)
{
if (LOG.isDebugEnabled())
{
LOG.debug("onWSError()", cause);
}
assertThat("Error must have value", cause, notNullValue());
if (error.compareAndSet(null, cause) == false)
{
LOG.warn("onError should only happen once - Original Cause", error.get());
LOG.warn("onError should only happen once - Extra/Excess Cause", cause);
fail("onError should only happen once!");
}
this.errorLatch.countDown();
}
}

View File

@ -18,6 +18,10 @@
package org.eclipse.jetty.websocket.core;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
@ -25,10 +29,6 @@ import org.eclipse.jetty.websocket.core.internal.Generator;
import org.eclipse.jetty.websocket.core.internal.Parser;
import org.junit.jupiter.api.Test;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -106,7 +106,7 @@ public class GeneratorParserRoundtripTest
}
// Validate
Frame txt = (Frame)capture.framesQueue.poll(1, TimeUnit.SECONDS);
Frame txt = capture.framesQueue.poll(1, TimeUnit.SECONDS);
assertTrue(txt.isMasked(), "Text.isMasked");
assertThat("Text parsed", txt.getPayloadAsUTF8(), is(message));
}

View File

@ -1,56 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core;
import org.eclipse.jetty.util.component.LifeCycle;
import java.util.function.Supplier;
/**
* Simple {@link AutoCloseable} to allow Jetty {@link LifeCycle} components to
* be managed using {@code try-with-resources} techniques.
* <p>
* {@link LifeCycle#start()} occurs at constructor.
* {@link LifeCycle#stop()} occurs at {@link #close()}.
* </p>
*
* @param <T> the {@link LifeCycle} to have resource managed
*/
public class LifeCycleScope<T extends LifeCycle> implements AutoCloseable, Supplier<T>
{
private final T lifecycle;
public LifeCycleScope(T lifecycle) throws Exception
{
this.lifecycle = lifecycle;
this.lifecycle.start();
}
@Override
public void close() throws Exception
{
this.lifecycle.stop();
}
@Override
public T get()
{
return this.lifecycle;
}
}

View File

@ -43,7 +43,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
public class MessageHandlerTest
{
// Testing with 4 byte UTF8 character "\uD842\uDF9F"
static String fourByteUtf8String = "\uD842\uDF9F";
static byte[] fourByteUtf8Bytes = fourByteUtf8String.getBytes(StandardCharsets.UTF_8);

View File

@ -29,11 +29,14 @@ import org.eclipse.jetty.util.log.Logger;
public class TestFrameHandler implements SynchronousFrameHandler
{
private static Logger LOG = Log.getLogger(SynchronousFrameHandler.class);
private CoreSession session;
private static Logger LOG = Log.getLogger(TestFrameHandler.class);
protected CoreSession session;
public BlockingQueue<Frame> receivedFrames = new BlockingArrayQueue<>();
protected Throwable failure;
public CountDownLatch open = new CountDownLatch(1);
public CountDownLatch error = new CountDownLatch(1);
public CountDownLatch closed = new CountDownLatch(1);
public CoreSession getCoreSession()
@ -46,19 +49,24 @@ public class TestFrameHandler implements SynchronousFrameHandler
return receivedFrames;
}
public Throwable getError()
{
return failure;
}
@Override
public void onOpen(CoreSession coreSession)
{
LOG.info("onOpen {}", coreSession);
this.session = coreSession;
open.countDown();
}
@Override
public void onFrame(Frame frame, Callback callback)
public void onFrame(Frame frame)
{
LOG.info("onFrame: " + OpCode.name(frame.getOpCode()) + ":" + BufferUtil.toDetailString(frame.getPayload()));
receivedFrames.offer(Frame.copy(frame));
callback.succeeded();
}
@Override
@ -72,14 +80,27 @@ public class TestFrameHandler implements SynchronousFrameHandler
public void onError(Throwable cause)
{
LOG.info("onError {} ", cause == null?null:cause.toString());
failure = cause;
error.countDown();
}
public void sendText(String text)
{
Frame frame = new Frame(OpCode.TEXT);
frame.setFin(true);
frame.setPayload(text);
LOG.info("sendText {} ", text);
Frame frame = new Frame(OpCode.TEXT, text);
getCoreSession().sendFrame(frame, Callback.NOOP, false);
}
public void sendFrame(Frame frame)
{
LOG.info("sendFrame {} ", frame);
getCoreSession().sendFrame(frame, Callback.NOOP, false);
}
public void sendClose()
{
LOG.info("sendClose");
Frame frame = new Frame(OpCode.CLOSE);
getCoreSession().sendFrame(frame, Callback.NOOP, false);
}
}

View File

@ -18,15 +18,16 @@
package org.eclipse.jetty.websocket.core;
import java.io.IOException;
import java.util.List;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.util.DecoratedObjectFactory;
import org.eclipse.jetty.websocket.core.server.Negotiation;
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
import java.io.IOException;
import java.util.List;
public class TestWebSocketNegotiator implements WebSocketNegotiator
{
final DecoratedObjectFactory objectFactory;
@ -34,6 +35,14 @@ public class TestWebSocketNegotiator implements WebSocketNegotiator
final ByteBufferPool bufferPool;
private final FrameHandler frameHandler;
public TestWebSocketNegotiator(FrameHandler frameHandler)
{
this.objectFactory = new DecoratedObjectFactory();
this.extensionRegistry = new WebSocketExtensionRegistry();
this.bufferPool = new MappedByteBufferPool();
this.frameHandler = frameHandler;
}
public TestWebSocketNegotiator(DecoratedObjectFactory objectFactory, WebSocketExtensionRegistry extensionRegistry, ByteBufferPool bufferPool,
FrameHandler frameHandler)
{

View File

@ -24,24 +24,13 @@ import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.NetworkConnector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.DecoratedObjectFactory;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.log.StacklessLogging;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.websocket.core.internal.Parser;
import org.eclipse.jetty.websocket.core.internal.WebSocketChannel;
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
import org.eclipse.jetty.websocket.core.server.WebSocketUpgradeHandler;
import org.eclipse.jetty.websocket.core.server.internal.RFC6455Handshaker;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
@ -75,7 +64,7 @@ public class WebSocketOpenTest extends WebSocketTester
public void setup(BiFunction<FrameHandler.CoreSession,Callback,Void> onOpen) throws Exception
{
serverHandler = new TestFrameHandler(onOpen);
server = new WebSocketServer(0, serverHandler);
server = new WebSocketServer(serverHandler);
server.start();
client = newClient(server.getLocalPort());
}
@ -95,7 +84,7 @@ public class WebSocketOpenTest extends WebSocketTester
assertThat(frame.getPayloadAsUTF8(),is("Hello"));
client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true));
assertTrue(server.handler.onClosed.await(5, TimeUnit.SECONDS));
assertTrue(serverHandler.onClosed.await(5, TimeUnit.SECONDS));
assertThat(serverHandler.closeStatus.getCode(),is(CloseStatus.NORMAL));
frame = receiveFrame(client.getInputStream());
@ -115,10 +104,10 @@ public class WebSocketOpenTest extends WebSocketTester
return null;
});
assertTrue(server.handler.onError.await(5, TimeUnit.SECONDS));
assertTrue(serverHandler.onError.await(5, TimeUnit.SECONDS));
assertThat(serverHandler.error, notNullValue());
assertTrue(server.handler.onClosed.await(5, TimeUnit.SECONDS));
assertTrue(serverHandler.onClosed.await(5, TimeUnit.SECONDS));
assertThat(serverHandler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR));
Parser.ParsedFrame frame = receiveFrame(client.getInputStream());
@ -144,7 +133,7 @@ public class WebSocketOpenTest extends WebSocketTester
assertThat(new CloseStatus(frame).getCode(),is(CloseStatus.SHUTDOWN));
client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true));
assertTrue(server.handler.onClosed.await(5, TimeUnit.SECONDS));
assertTrue(serverHandler.onClosed.await(5, TimeUnit.SECONDS));
assertThat(serverHandler.closeStatus.getCode(),is(CloseStatus.NORMAL));
}
@ -180,12 +169,12 @@ public class WebSocketOpenTest extends WebSocketTester
// But cannot receive
client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true));
assertFalse(server.handler.onClosed.await(1, TimeUnit.SECONDS));
assertFalse(serverHandler.onClosed.await(1, TimeUnit.SECONDS));
// Can't demand until open
assertThrows(Throwable.class, () -> session.demand(1));
client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true));
assertFalse(server.handler.onClosed.await(1, TimeUnit.SECONDS));
assertFalse(serverHandler.onClosed.await(1, TimeUnit.SECONDS));
// Succeeded moves to OPEN state and still does not read CLOSE frame
onOpenCallback.succeeded();
@ -194,10 +183,10 @@ public class WebSocketOpenTest extends WebSocketTester
// Demand start receiving frames
session.demand(1);
client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true));
assertTrue(server.handler.onClosed.await(5, TimeUnit.SECONDS));
assertTrue(serverHandler.onClosed.await(5, TimeUnit.SECONDS));
// Closed handled normally
assertTrue(server.handler.onClosed.await(5, TimeUnit.SECONDS));
assertTrue(serverHandler.onClosed.await(5, TimeUnit.SECONDS));
assertThat(serverHandler.closeStatus.getCode(),is(CloseStatus.NORMAL));
frame = receiveFrame(client.getInputStream());
assertThat(frame.getOpCode(),is(OpCode.CLOSE));
@ -302,70 +291,5 @@ public class WebSocketOpenTest extends WebSocketTester
session.sendFrame(frame, callback, false);
}
}
static class WebSocketServer extends AbstractLifeCycle
{
private static Logger LOG = Log.getLogger(WebSocketServer.class);
private final Server server;
private final TestFrameHandler handler;
public void doStart() throws Exception
{
server.start();
}
public void doStop() throws Exception
{
server.stop();
}
public int getLocalPort()
{
return server.getBean(NetworkConnector.class).getLocalPort();
}
public WebSocketServer(int port, TestFrameHandler frameHandler)
{
this.handler = frameHandler;
server = new Server();
server.getBean(QueuedThreadPool.class).setName("WSCoreServer");
ServerConnector connector = new ServerConnector(server, new HttpConnectionFactory());
connector.addBean(new RFC6455Handshaker());
connector.setPort(port);
connector.setIdleTimeout(1000000);
server.addConnector(connector);
ContextHandler context = new ContextHandler("/");
server.setHandler(context);
WebSocketNegotiator negotiator = new TestWebSocketNegotiator(new DecoratedObjectFactory(), new WebSocketExtensionRegistry(),
connector.getByteBufferPool(), frameHandler);
WebSocketUpgradeHandler upgradeHandler = new TestWebSocketUpgradeHandler(negotiator);
context.setHandler(upgradeHandler);
}
public void sendFrame(Frame frame)
{
handler.getCoreSession().sendFrame(frame, NOOP, false);
}
public void sendText(String text)
{
LOG.info("sending {}...", text);
WebSocketOpenTest.TestFrameHandler.sendText(handler.session, text);
}
public void close()
{
handler.getCoreSession().close(CloseStatus.NORMAL, "WebSocketServer Initiated Close", Callback.NOOP);
}
public boolean isOpen()
{
return handler.getCoreSession().isOutputOpen();
}
}
}

View File

@ -0,0 +1,97 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.core;
import java.io.IOException;
import java.util.List;
import org.eclipse.jetty.server.NetworkConnector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.server.Negotiation;
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
import org.eclipse.jetty.websocket.core.server.WebSocketUpgradeHandler;
public class WebSocketServer
{
private static Logger LOG = Log.getLogger(WebSocketServer.class);
private final Server server;
public void start() throws Exception
{
server.start();
}
public void stop() throws Exception
{
server.stop();
}
public int getLocalPort()
{
return server.getBean(NetworkConnector.class).getLocalPort();
}
public Server getServer()
{
return server;
}
public WebSocketServer(FrameHandler frameHandler)
{
this(new DefaultNegotiator(frameHandler));
}
public WebSocketServer(WebSocketNegotiator negotiator)
{
server = new Server();
ServerConnector connector = new ServerConnector(server);
connector.setPort(0);
server.addConnector(connector);
ContextHandler context = new ContextHandler("/");
server.setHandler(context);
WebSocketUpgradeHandler upgradeHandler = new WebSocketUpgradeHandler(negotiator);
context.setHandler(upgradeHandler);
}
private static class DefaultNegotiator extends WebSocketNegotiator.AbstractNegotiator
{
private final FrameHandler frameHandler;
public DefaultNegotiator(FrameHandler frameHandler)
{
this.frameHandler = frameHandler;
}
@Override
public FrameHandler negotiate(Negotiation negotiation) throws IOException
{
List<String> offeredSubprotocols = negotiation.getOfferedSubprotocols();
if (!offeredSubprotocols.isEmpty())
negotiation.setSubprotocol(offeredSubprotocols.get(0));
return frameHandler;
}
}
}

View File

@ -34,6 +34,8 @@ import org.eclipse.jetty.util.B64Code;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.core.internal.Parser;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import static org.hamcrest.MatcherAssert.assertThat;
@ -43,9 +45,24 @@ import static org.hamcrest.Matchers.startsWith;
public class WebSocketTester
{
private static String NON_RANDOM_KEY = new String(B64Code.encode("0123456701234567".getBytes()));
private static SslContextFactory sslContextFactory;
protected ByteBufferPool bufferPool;
protected Parser parser;
@BeforeAll
public static void startSslContextFactory() throws Exception
{
sslContextFactory = new SslContextFactory(true);
sslContextFactory.setEndpointIdentificationAlgorithm("");
sslContextFactory.start();
}
@AfterAll
public static void stopSslContextFactory() throws Exception
{
sslContextFactory.stop();
}
@BeforeEach
public void before()
{
@ -55,24 +72,26 @@ public class WebSocketTester
protected Socket newClient(int port) throws Exception
{
return newClient(port, false);
return newClient(port, false, null);
}
protected Socket newClient(int port, boolean tls) throws Exception
{
return newClient(port, tls, null);
}
protected Socket newClient(int port, String extensions) throws Exception
{
return newClient(port, false, extensions);
}
protected Socket newClient(int port, boolean tls, String extensions) throws Exception
{
Socket client;
if (!tls)
{
client = new Socket();
}
else
{
SslContextFactory sslContextFactory = new SslContextFactory(true);
sslContextFactory.start();
client = sslContextFactory.newSslSocket();
sslContextFactory.stop();
}
client.connect(new InetSocketAddress("127.0.0.1", port));
@ -85,6 +104,8 @@ public class WebSocketTester
fields.add(HttpHeader.PRAGMA, "no-cache");
fields.add(HttpHeader.CACHE_CONTROL, "no-cache");
fields.add(HttpHeader.SEC_WEBSOCKET_SUBPROTOCOL, "test");
if (extensions != null)
fields.add(HttpHeader.SEC_WEBSOCKET_EXTENSIONS, extensions);
client.getOutputStream().write(("GET / HTTP/1.1\r\n" + fields.toString()).getBytes(StandardCharsets.ISO_8859_1));

View File

@ -27,7 +27,6 @@ import org.eclipse.jetty.websocket.core.TestUpgradeHandler;
import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry;
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
import org.eclipse.jetty.websocket.core.server.WebSocketUpgradeHandler;
import org.eclipse.jetty.websocket.core.server.internal.RFC6455Handshaker;
/**
* WebSocket Server for use with <a href="https://github.com/crossbario/autobahn-testsuite">autobahn websocket testsuite</a> (wstest).
@ -78,9 +77,7 @@ public class AutobahnWebSocketServer
server,
new HttpConnectionFactory()
);
connector.addBean(new RFC6455Handshaker());
//connector.setPort(9001);
connector.setIdleTimeout(10000);
server.addConnector(connector);

View File

@ -19,33 +19,19 @@
package org.eclipse.jetty.websocket.core.client;
import java.net.URI;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.NetworkConnector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.DecoratedObjectFactory;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.FrameHandler.CoreSession;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.TestFrameHandler;
import org.eclipse.jetty.websocket.core.TestWebSocketNegotiator;
import org.eclipse.jetty.websocket.core.TestWebSocketUpgradeHandler;
import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry;
import org.eclipse.jetty.websocket.core.WebSocketServer;
import org.eclipse.jetty.websocket.core.internal.WebSocketChannel;
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
import org.eclipse.jetty.websocket.core.server.WebSocketUpgradeHandler;
import org.eclipse.jetty.websocket.core.server.internal.RFC6455Handshaker;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -62,46 +48,51 @@ public class WebSocketClientServerTest
private static Logger LOG = Log.getLogger(WebSocketClientServerTest.class);
private WebSocketServer server;
private WebSocketClient client;
private TestFrameHandler serverHandler;
private URI serverUri;
private WebSocketCoreClient client;
@BeforeEach
public void setup() throws Exception
{
serverHandler = new TestFrameHandler();
server = new WebSocketServer(serverHandler);
server.start();
serverUri = new URI("ws://localhost:" + server.getLocalPort());
client = new WebSocketCoreClient();
client.start();
}
@Test
public void testHello() throws Exception
{
TestFrameHandler serverHandler = new TestFrameHandler();
TestFrameHandler clientHandler = new TestFrameHandler();
server = new WebSocketServer(0, serverHandler);
server.start();
client = new WebSocketClient("localhost", server.getLocalPort(), clientHandler);
client.start();
CompletableFuture<CoreSession> connect = client.connect(clientHandler, serverUri);
connect.get(5, TimeUnit.SECONDS);
String message = "hello world";
client.sendText(message);
Frame recv = server.getFrames().poll(5, TimeUnit.SECONDS);
clientHandler.sendText(message);
Frame recv = serverHandler.getFrames().poll(5, TimeUnit.SECONDS);
assertNotNull(recv);
assertThat(recv.getPayloadAsUTF8(), Matchers.equalTo(message));
message = "back at ya!";
server.sendText(message);
recv = client.getFrames().poll(5, TimeUnit.SECONDS);
serverHandler.sendText(message);
recv = clientHandler.getFrames().poll(5, TimeUnit.SECONDS);
assertNotNull(recv);
assertThat(recv.getPayloadAsUTF8(), Matchers.equalTo(message));
client.close();
clientHandler.sendClose();
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
assertTrue(client.handler.closed.await(5, TimeUnit.SECONDS));
assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS));
assertTrue(clientHandler.closed.await(5, TimeUnit.SECONDS));
}
@Test
public void testClientSocketClosedInCloseHandshake() throws Exception
{
TestFrameHandler serverHandler = new TestFrameHandler();
TestFrameHandler clientHandler = new TestFrameHandler()
{
@Override
@ -121,160 +112,37 @@ public class WebSocketClientServerTest
}
}
};
server = new WebSocketServer(0, serverHandler);
server.start();
client = new WebSocketClient("localhost", server.getLocalPort(), clientHandler);
client.start();
CompletableFuture<CoreSession> connect = client.connect(clientHandler, serverUri);
connect.get(5, TimeUnit.SECONDS);
String message = "hello world";
server.sendText(message);
Frame recv = client.getFrames().poll(5, TimeUnit.SECONDS);
serverHandler.sendText(message);
Frame recv = clientHandler.getFrames().poll(5, TimeUnit.SECONDS);
assertNotNull(recv);
assertThat(recv.getPayloadAsUTF8(), Matchers.equalTo(message));
server.close();
serverHandler.sendClose();
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
assertTrue(client.handler.closed.await(5, TimeUnit.SECONDS));
assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS));
assertTrue(clientHandler.closed.await(5, TimeUnit.SECONDS));
}
@Test
public void testClientSocketClosed() throws Exception
{
TestFrameHandler serverHandler = new TestFrameHandler();
TestFrameHandler clientHandler = new TestFrameHandler();
server = new WebSocketServer(0, serverHandler);
server.start();
client = new WebSocketClient("localhost", server.getLocalPort(), clientHandler);
client.start();
CompletableFuture<CoreSession> connect = client.connect(clientHandler, serverUri);
connect.get(5, TimeUnit.SECONDS);
String message = "hello world";
client.sendText(message);
Frame recv = server.getFrames().poll(2, TimeUnit.SECONDS);
clientHandler.sendText(message);
Frame recv = serverHandler.getFrames().poll(2, TimeUnit.SECONDS);
assertNotNull(recv);
assertThat(recv.getPayloadAsUTF8(), Matchers.equalTo(message));
((WebSocketChannel)client.handler.getCoreSession()).getConnection().getEndPoint().close();
assertTrue(client.handler.closed.await(5, TimeUnit.SECONDS));
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
}
static class WebSocketClient
{
private static Logger LOG = Log.getLogger(WebSocketClient.class);
private URI baseWebSocketUri;
private WebSocketCoreClient client;
private TestFrameHandler handler;
public WebSocketClient(String hostname, int port, TestFrameHandler frameHandler) throws Exception
{
this.baseWebSocketUri = new URI("ws://" + hostname + ":" + port);
this.client = new WebSocketCoreClient();
this.handler = frameHandler;
}
public void start() throws Exception
{
ClientUpgradeRequest request = ClientUpgradeRequest.from(client, baseWebSocketUri.resolve("/test"), handler);
request.setSubProtocols("test");
this.client.start();
Future<FrameHandler.CoreSession> response = client.connect(request);
response.get(5, TimeUnit.SECONDS);
}
public void sendFrame(Frame frame)
{
handler.getCoreSession().sendFrame(frame, Callback.NOOP, false);
}
public void sendText(String line)
{
LOG.info("sending {}...", line);
handler.sendText(line);
}
public BlockingQueue<Frame> getFrames()
{
return handler.getFrames();
}
public void close()
{
handler.getCoreSession().close(CloseStatus.NORMAL, "WebSocketClient Initiated Close", Callback.NOOP);
}
public boolean isOpen()
{
return handler.getCoreSession().isOutputOpen();
}
}
static class WebSocketServer
{
private static Logger LOG = Log.getLogger(WebSocketServer.class);
private final Server server;
private final TestFrameHandler handler;
public void start() throws Exception
{
server.start();
}
public int getLocalPort()
{
return server.getBean(NetworkConnector.class).getLocalPort();
}
public WebSocketServer(int port, TestFrameHandler frameHandler)
{
this.handler = frameHandler;
server = new Server();
server.getBean(QueuedThreadPool.class).setName("WSCoreServer");
ServerConnector connector = new ServerConnector(server, new HttpConnectionFactory());
connector.addBean(new RFC6455Handshaker());
connector.setPort(port);
connector.setIdleTimeout(1000000);
server.addConnector(connector);
ContextHandler context = new ContextHandler("/");
server.setHandler(context);
WebSocketNegotiator negotiator = new TestWebSocketNegotiator(new DecoratedObjectFactory(), new WebSocketExtensionRegistry(),
connector.getByteBufferPool(), frameHandler);
WebSocketUpgradeHandler upgradeHandler = new TestWebSocketUpgradeHandler(negotiator);
context.setHandler(upgradeHandler);
}
public void sendFrame(Frame frame)
{
handler.getCoreSession().sendFrame(frame, Callback.NOOP, false);
}
public void sendText(String line)
{
LOG.info("sending {}...", line);
handler.sendText(line);
}
public BlockingQueue<Frame> getFrames()
{
return handler.getFrames();
}
public void close()
{
handler.getCoreSession().close(CloseStatus.NORMAL, "WebSocketServer Initiated Close", Callback.NOOP);
}
public boolean isOpen()
{
return handler.getCoreSession().isOutputOpen();
}
((WebSocketChannel)clientHandler.getCoreSession()).getConnection().getEndPoint().close();
assertTrue(clientHandler.closed.await(5, TimeUnit.SECONDS));
assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS));
}
}

View File

@ -44,10 +44,8 @@ import org.eclipse.jetty.websocket.core.Behavior;
import org.eclipse.jetty.websocket.core.CapturedHexPayloads;
import org.eclipse.jetty.websocket.core.ExtensionConfig;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.IncomingFrames;
import org.eclipse.jetty.websocket.core.IncomingFramesCapture;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.OutgoingFrames;
import org.eclipse.jetty.websocket.core.OutgoingNetworkBytesCapture;
import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry;
import org.eclipse.jetty.websocket.core.internal.ExtensionStack;
@ -388,32 +386,24 @@ public class DeflateFrameExtensionTest extends AbstractExtensionTest
serverExtension.setWebSocketChannel(channelWithMaxMessageSize(maxMessageSize));
// Chain the next element to decompress.
clientExtension.setNextOutgoingFrames(new OutgoingFrames()
clientExtension.setNextOutgoingFrames((frame, callback, batch) ->
{
@Override
public void sendFrame(Frame frame, Callback callback, boolean batch)
{
LOG.debug("outgoingFrame({})", frame);
serverExtension.onFrame(frame, callback);
callback.succeeded();
}
LOG.debug("outgoingFrame({})", frame);
serverExtension.onFrame(frame, callback);
callback.succeeded();
});
final ByteArrayOutputStream result = new ByteArrayOutputStream(input.length);
serverExtension.setNextIncomingFrames(new IncomingFrames()
serverExtension.setNextIncomingFrames((frame, callback) ->
{
@Override
public void onFrame(Frame frame, Callback callback)
LOG.debug("incomingFrame({})", frame);
try
{
LOG.debug("incomingFrame({})", frame);
try
{
result.write(BufferUtil.toArray(frame.getPayload()));
}
catch (IOException x)
{
throw new RuntimeIOException(x);
}
result.write(BufferUtil.toArray(frame.getPayload()));
}
catch (IOException x)
{
throw new RuntimeIOException(x);
}
});

View File

@ -19,36 +19,21 @@
package org.eclipse.jetty.websocket.core.extensions;
import java.net.Socket;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.NetworkConnector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.DecoratedObjectFactory;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.RawFrameBuilder;
import org.eclipse.jetty.websocket.core.TestFrameHandler;
import org.eclipse.jetty.websocket.core.TestWebSocketNegotiator;
import org.eclipse.jetty.websocket.core.TestWebSocketUpgradeHandler;
import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry;
import org.eclipse.jetty.websocket.core.WebSocketServer;
import org.eclipse.jetty.websocket.core.WebSocketTester;
import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator;
import org.eclipse.jetty.websocket.core.server.WebSocketServerTest;
import org.eclipse.jetty.websocket.core.server.WebSocketUpgradeHandler;
import org.eclipse.jetty.websocket.core.server.internal.RFC6455Handshaker;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.eclipse.jetty.util.Callback.NOOP;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@ -56,31 +41,40 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class ValidationExtensionTest extends WebSocketTester
{
private static Logger LOG = Log.getLogger(WebSocketServerTest.class);
private WebSocketServer server;
TestFrameHandler serverHandler;
@BeforeEach
public void start() throws Exception
{
serverHandler = new TestFrameHandler();
WebSocketNegotiator negotiator = new TestWebSocketNegotiator(serverHandler);
server = new WebSocketServer(negotiator);
server.start();
}
@AfterEach
public void stop() throws Exception
{
server.stop();
}
@Test
public void testNonUtf8BinaryPayload() throws Exception
{
TestFrameHandler serverHandler = new TestFrameHandler();
server = new WebSocketServer(0, serverHandler);
server.start();
byte[] nonUtf8Payload = { 0x7F, (byte)0xFF, (byte)0xFF };
try (Socket client = newClient(server.getLocalPort()))
{
client.getOutputStream().write(RawFrameBuilder.buildFrame(OpCode.BINARY, nonUtf8Payload, true));
Frame frame = server.handler.receivedFrames.poll(5, TimeUnit.SECONDS);
Frame frame = serverHandler.receivedFrames.poll(5, TimeUnit.SECONDS);
assertNotNull(frame);
assertThat(frame.getOpCode(), is(OpCode.BINARY));
assertThat(frame.getPayload().array(), is(nonUtf8Payload));
//close normally
client.getOutputStream().write(RawFrameBuilder.buildClose(CloseStatus.NORMAL_STATUS, true));
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS));
frame = receiveFrame(client.getInputStream());
assertNotNull(frame);
assertThat(frame.getOpCode(), is(OpCode.CLOSE));
@ -91,11 +85,6 @@ public class ValidationExtensionTest extends WebSocketTester
@Test
public void testValidContinuationOnNonUtf8Boundary() throws Exception
{
TestFrameHandler serverHandler = new TestFrameHandler();
server = new WebSocketServer(0, serverHandler);
server.start();
// Testing with 4 byte UTF8 character "\uD842\uDF9F"
byte[] initialPayload = new byte[] { (byte)0xF0, (byte)0xA0 };
byte[] continuationPayload = new byte[] { (byte)0xAE, (byte)0x9F };
@ -103,20 +92,20 @@ public class ValidationExtensionTest extends WebSocketTester
try (Socket client = newClient(server.getLocalPort()))
{
client.getOutputStream().write(RawFrameBuilder.buildFrame(OpCode.TEXT, initialPayload, true, false));
Frame frame = server.handler.receivedFrames.poll(5, TimeUnit.SECONDS);
Frame frame = serverHandler.receivedFrames.poll(5, TimeUnit.SECONDS);
assertNotNull(frame);
assertThat(frame.getOpCode(), is(OpCode.TEXT));
assertThat(frame.getPayload().array(), is(initialPayload));
client.getOutputStream().write(RawFrameBuilder.buildFrame(OpCode.CONTINUATION, continuationPayload, true));
frame = server.handler.receivedFrames.poll(5, TimeUnit.SECONDS);
frame = serverHandler.receivedFrames.poll(5, TimeUnit.SECONDS);
assertNotNull(frame);
assertThat(frame.getOpCode(), is(OpCode.CONTINUATION));
assertThat(frame.getPayload().array(), is(continuationPayload));
//close normally
client.getOutputStream().write(RawFrameBuilder.buildClose(CloseStatus.NORMAL_STATUS, true));
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS));
frame = receiveFrame(client.getInputStream());
assertNotNull(frame);
assertThat(frame.getOpCode(), is(OpCode.CLOSE));
@ -127,11 +116,6 @@ public class ValidationExtensionTest extends WebSocketTester
@Test
public void testInvalidContinuationOnNonUtf8Boundary() throws Exception
{
TestFrameHandler serverHandler = new TestFrameHandler();
server = new WebSocketServer(0, serverHandler);
server.start();
// Testing with 4 byte UTF8 character "\uD842\uDF9F"
byte[] initialPayload = new byte[] { (byte)0xF0, (byte)0xA0 };
byte[] incompleteContinuationPayload = new byte[] { (byte)0xAE };
@ -139,7 +123,7 @@ public class ValidationExtensionTest extends WebSocketTester
try (Socket client = newClient(server.getLocalPort()))
{
client.getOutputStream().write(RawFrameBuilder.buildFrame(OpCode.TEXT, initialPayload, true, false));
Frame frame = server.handler.receivedFrames.poll(5, TimeUnit.SECONDS);
Frame frame = serverHandler.receivedFrames.poll(5, TimeUnit.SECONDS);
assertNotNull(frame);
assertThat(frame.getOpCode(), is(OpCode.TEXT));
assertThat(frame.getPayload().array(), is(initialPayload));
@ -151,74 +135,4 @@ public class ValidationExtensionTest extends WebSocketTester
assertThat(new CloseStatus(frame.getPayload()).getCode(), is(CloseStatus.BAD_PAYLOAD));
}
}
static class WebSocketServer extends AbstractLifeCycle
{
private static Logger LOG = Log.getLogger(WebSocketServer.class);
private final Server server;
private final TestFrameHandler handler;
public void doStart() throws Exception
{
server.start();
}
public void doStop() throws Exception
{
server.stop();
}
public int getLocalPort()
{
return server.getBean(NetworkConnector.class).getLocalPort();
}
public WebSocketServer(int port, TestFrameHandler frameHandler)
{
this.handler = frameHandler;
server = new Server();
server.getBean(QueuedThreadPool.class).setName("WSCoreServer");
ServerConnector connector = new ServerConnector(server, new HttpConnectionFactory());
connector.addBean(new RFC6455Handshaker());
connector.setPort(port);
connector.setIdleTimeout(1000000);
server.addConnector(connector);
ContextHandler context = new ContextHandler("/");
server.setHandler(context);
WebSocketNegotiator negotiator = new TestWebSocketNegotiator(new DecoratedObjectFactory(), new WebSocketExtensionRegistry(),
connector.getByteBufferPool(), frameHandler);
WebSocketUpgradeHandler upgradeHandler = new TestWebSocketUpgradeHandler(negotiator);
context.setHandler(upgradeHandler);
}
public void sendFrame(Frame frame)
{
handler.getCoreSession().sendFrame(frame, NOOP, false);
}
public void sendText(String line)
{
LOG.info("sending {}...", line);
handler.sendText(line);
}
public BlockingQueue<Frame> getFrames()
{
return handler.getFrames();
}
public void close()
{
handler.getCoreSession().close(CloseStatus.NORMAL, "WebSocketServer Initiated Close", Callback.NOOP);
}
public boolean isOpen()
{
return handler.getCoreSession().isOutputOpen();
}
}
}

View File

@ -132,7 +132,6 @@ public class FrameFlusherTest
});
serverTask.get();
System.out.printf("Received: %,d frames%n", endPoint.incomingFrames.size());
}
public static class CapturingEndPoint extends MockEndpoint

View File

@ -25,29 +25,18 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.NetworkConnector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.DecoratedObjectFactory;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.RawFrameBuilder;
import org.eclipse.jetty.websocket.core.TestFrameHandler;
import org.eclipse.jetty.websocket.core.TestWebSocketNegotiator;
import org.eclipse.jetty.websocket.core.TestWebSocketUpgradeHandler;
import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry;
import org.eclipse.jetty.websocket.core.WebSocketServer;
import org.eclipse.jetty.websocket.core.WebSocketTester;
import org.eclipse.jetty.websocket.core.server.internal.RFC6455Handshaker;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
@ -82,7 +71,7 @@ public class WebSocketServerTest extends WebSocketTester
}
};
server = new WebSocketServer(0, serverHandler);
server = new WebSocketServer(serverHandler);
server.start();
try (Socket client = newClient(server.getLocalPort()))
@ -97,7 +86,7 @@ public class WebSocketServerTest extends WebSocketTester
assertThat(frame.getPayloadAsUTF8(), is("Hello!"));
client.getOutputStream().write(RawFrameBuilder.buildClose(CloseStatus.NORMAL_STATUS, true));
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS));
frame = receiveFrame(client.getInputStream());
assertNotNull(frame);
MatcherAssert.assertThat(frame.getOpCode(), Matchers.is(OpCode.CLOSE));
@ -117,7 +106,7 @@ public class WebSocketServerTest extends WebSocketTester
}
};
server = new WebSocketServer(0, serverHandler);
server = new WebSocketServer(serverHandler);
server.start();
try (Socket client = newClient(server.getLocalPort()))
@ -139,10 +128,10 @@ public class WebSocketServerTest extends WebSocketTester
assertThat(frame.getPayloadAsUTF8(), is("World"));
client.getOutputStream().write(RawFrameBuilder.buildClose(CloseStatus.NORMAL_STATUS, true));
assertFalse(server.handler.closed.await(250, TimeUnit.MILLISECONDS));
assertFalse(serverHandler.closed.await(250, TimeUnit.MILLISECONDS));
serverHandler.getCoreSession().demand(1);
assertTrue(server.handler.closed.await(10, TimeUnit.SECONDS));
assertTrue(serverHandler.closed.await(10, TimeUnit.SECONDS));
frame = serverHandler.receivedFrames.poll(10, TimeUnit.SECONDS);
assertNotNull(frame);
assertThat(frame.getOpCode(), is(OpCode.CLOSE));
@ -185,7 +174,7 @@ public class WebSocketServerTest extends WebSocketTester
}
};
server = new WebSocketServer(0, serverHandler);
server = new WebSocketServer(serverHandler);
server.start();
try (Socket client = newClient(server.getLocalPort()))
@ -226,7 +215,7 @@ public class WebSocketServerTest extends WebSocketTester
assertThat(serverHandler.receivedFrames.poll().getPayload().array(), sameInstance(second));
assertThat(first, not(sameInstance(second)));
ByteBufferPool pool = server.server.getConnectors()[0].getByteBufferPool();
ByteBufferPool pool = server.getServer().getConnectors()[0].getByteBufferPool();
assertThat(pool.acquire(first.length, false).array(), not(sameInstance(first)));
receivedCallbacks.poll().succeeded();
@ -249,13 +238,13 @@ public class WebSocketServerTest extends WebSocketTester
{
TestFrameHandler serverHandler = new TestFrameHandler();
server = new WebSocketServer(0, serverHandler);
server = new WebSocketServer(serverHandler);
server.start();
try (Socket client = newClient(server.getLocalPort()))
{
client.getOutputStream().write(RawFrameBuilder.buildFrame((byte)4, "payload", true));
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS));
Frame frame = receiveFrame(client.getInputStream());
assertNotNull(frame);
@ -269,14 +258,14 @@ public class WebSocketServerTest extends WebSocketTester
{
TestFrameHandler serverHandler = new TestFrameHandler();
server = new WebSocketServer(0, serverHandler);
server = new WebSocketServer(serverHandler);
server.start();
try (Socket client = newClient(server.getLocalPort()))
{
// Write client close without masking!
client.getOutputStream().write(RawFrameBuilder.buildClose(CloseStatus.NORMAL_STATUS, false));
assertTrue(server.handler.closed.await(5, TimeUnit.SECONDS));
assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS));
Frame frame = receiveFrame(client.getInputStream());
assertNotNull(frame);
assertThat(frame.getOpCode(), is(OpCode.CLOSE));
@ -314,7 +303,7 @@ public class WebSocketServerTest extends WebSocketTester
}
};
server = new WebSocketServer(0, serverHandler);
server = new WebSocketServer(serverHandler);
server.start();
try (Socket client = newClient(server.getLocalPort()))
@ -380,7 +369,7 @@ public class WebSocketServerTest extends WebSocketTester
}
};
server = new WebSocketServer(0, serverHandler);
server = new WebSocketServer(serverHandler);
server.start();
try (Socket client = newClient(server.getLocalPort()))
@ -448,7 +437,7 @@ public class WebSocketServerTest extends WebSocketTester
}
};
server = new WebSocketServer(0, serverHandler);
server = new WebSocketServer(serverHandler);
server.start();
try (Socket client = newClient(server.getLocalPort()))
@ -484,74 +473,4 @@ public class WebSocketServerTest extends WebSocketTester
assertThat(frame.getOpCode(), is(OpCode.CLOSE));
}
}
static class WebSocketServer extends AbstractLifeCycle
{
private static Logger LOG = Log.getLogger(WebSocketServer.class);
private final Server server;
private final TestFrameHandler handler;
public void doStart() throws Exception
{
server.start();
}
public void doStop() throws Exception
{
server.stop();
}
public int getLocalPort()
{
return server.getBean(NetworkConnector.class).getLocalPort();
}
public WebSocketServer(int port, TestFrameHandler frameHandler)
{
this.handler = frameHandler;
server = new Server();
server.getBean(QueuedThreadPool.class).setName("WSCoreServer");
ServerConnector connector = new ServerConnector(server, new HttpConnectionFactory());
connector.addBean(new RFC6455Handshaker());
connector.setPort(port);
connector.setIdleTimeout(1000000);
server.addConnector(connector);
ContextHandler context = new ContextHandler("/");
server.setHandler(context);
WebSocketNegotiator negotiator = new TestWebSocketNegotiator(new DecoratedObjectFactory(), new WebSocketExtensionRegistry(),
connector.getByteBufferPool(), frameHandler);
WebSocketUpgradeHandler upgradeHandler = new TestWebSocketUpgradeHandler(negotiator);
context.setHandler(upgradeHandler);
}
public void sendFrame(Frame frame)
{
handler.getCoreSession().sendFrame(frame, Callback.NOOP, false);
}
public void sendText(String line)
{
LOG.info("sending {}...", line);
handler.sendText(line);
}
public BlockingQueue<Frame> getFrames()
{
return handler.getFrames();
}
public void close()
{
handler.getCoreSession().close(CloseStatus.NORMAL, "WebSocketServer Initiated Close", Callback.NOOP);
}
public boolean isOpen()
{
return handler.getCoreSession().isOutputOpen();
}
}
}