diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClient.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClient.java index d5ad688a4fd..6a385f7f690 100644 --- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClient.java +++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClient.java @@ -116,12 +116,12 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont * * @param executor * the executor to use - * @deprecated use {@link #WebSocketClient(HttpClient)} instead */ - @Deprecated public WebSocketClient(Executor executor) { - this(null,executor); + this(new HttpClient()); + this.httpClient.setExecutor(executor); + } /** @@ -129,12 +129,11 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont * * @param bufferPool * byte buffer pool to use - * @deprecated use {@link #WebSocketClient(HttpClient)} instead */ - @Deprecated public WebSocketClient(ByteBufferPool bufferPool) { - this(null,null,bufferPool); + this(new HttpClient()); + this.httpClient.setByteBufferPool(bufferPool); } /** @@ -142,12 +141,10 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont * * @param sslContextFactory * ssl context factory to use - * @deprecated use {@link #WebSocketClient(HttpClient)} with its own {@link SslContextFactory} */ - @Deprecated public WebSocketClient(SslContextFactory sslContextFactory) { - this(sslContextFactory,null); + this(new HttpClient(sslContextFactory)); } /** @@ -157,12 +154,11 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont * ssl context factory to use * @param executor * the executor to use - * @deprecated use {@link #WebSocketClient(HttpClient)} instead */ - @Deprecated public WebSocketClient(SslContextFactory sslContextFactory, Executor executor) { - this(sslContextFactory,executor,new MappedByteBufferPool()); + this(new HttpClient(sslContextFactory)); + this.httpClient.setExecutor(executor); } /** diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketUpgradeRequest.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketUpgradeRequest.java index dc373cc8245..5a2590731a5 100644 --- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketUpgradeRequest.java +++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketUpgradeRequest.java @@ -411,7 +411,7 @@ public class WebSocketUpgradeRequest extends HttpRequest implements CompleteList } this.localEndpoint = localEndpoint; - this.fut = new CompletableFuture(); + this.fut = new CompletableFuture<>(); } private final String genRandomKey() diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/AtomicClose.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/AtomicClose.java index c31de5059d3..8f9876ae939 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/AtomicClose.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/AtomicClose.java @@ -20,6 +20,9 @@ package org.eclipse.jetty.websocket.common; import java.util.concurrent.atomic.AtomicReference; +/** + * Atomic Close State + */ public class AtomicClose { enum State diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/AtomicConnectionState.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/AtomicConnectionState.java index de7ced9ac6d..d904dd640b0 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/AtomicConnectionState.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/AtomicConnectionState.java @@ -20,6 +20,9 @@ package org.eclipse.jetty.websocket.common; import java.util.concurrent.atomic.AtomicReference; +/** + * Atomic Connection State + */ public class AtomicConnectionState { /** @@ -59,7 +62,7 @@ public class AtomicConnectionState * Connection should be disconnected and no further reads or writes should occur. *

*/ - CLOSED; + CLOSED } private AtomicReference state = new AtomicReference<>(); diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/LogicalConnection.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/LogicalConnection.java index 1c8b57d1def..39e971c2249 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/LogicalConnection.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/LogicalConnection.java @@ -41,9 +41,15 @@ public interface LogicalConnection extends OutgoingFrames, SuspendToken /** * Terminate the connection (no close frame sent) + * @param onlyOutput true to only close the output (half-close), false to close fully. */ - void disconnect(); - + void disconnect(boolean onlyOutput); + + /** + * Register Read Interest in Connection. + */ + void fillInterested(); + /** * Get the ByteBufferPool in use by the connection * @return the buffer pool diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/Parser.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/Parser.java index 4e1285348df..91afbe1bfa5 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/Parser.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/Parser.java @@ -68,7 +68,7 @@ public class Parser PAYLOAD } - private static final Logger LOG = Log.getLogger(Parser.class); + private final Logger LOG; private final WebSocketPolicy policy; private final ByteBufferPool bufferPool; private final Parser.Handler parserHandler; @@ -102,6 +102,8 @@ public class Parser this.bufferPool = bufferPool; this.policy = wspolicy; this.parserHandler = parserHandler; + + LOG = Log.getLogger(Parser.class.getName() + "_" + wspolicy.getBehavior()); } private void assertSanePayloadLength(long len) @@ -187,7 +189,7 @@ public class Parser while (parseFrame(buffer)) { if (LOG.isDebugEnabled()) - LOG.debug("{} Parsed Frame: {}", policy.getBehavior(), frame); + LOG.debug("Parsed Frame: {}", frame); assertBehavior(); @@ -268,7 +270,7 @@ public class Parser { if (LOG.isDebugEnabled()) { - LOG.debug("{} Parsing {} bytes",policy.getBehavior(),buffer.remaining()); + LOG.debug("Parsing {} bytes",buffer.remaining()); } while (buffer.hasRemaining()) @@ -291,8 +293,7 @@ public class Parser } if (LOG.isDebugEnabled()) - LOG.debug("{} OpCode {}, fin={} rsv={}{}{}", - policy.getBehavior(), + LOG.debug("OpCode {}, fin={} rsv={}{}{}", OpCode.name(opcode), fin, (((b & 0x40) != 0)?'1':'.'), @@ -583,7 +584,7 @@ public class Parser if (LOG.isDebugEnabled()) { - LOG.debug("{} Raw Payload: {}",policy.getBehavior(),BufferUtil.toDetailString(window)); + LOG.debug("Raw Payload: {}",BufferUtil.toDetailString(window)); } maskProcessor.process(window); diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java index 9f20c058c81..47ea9e835c0 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java @@ -58,6 +58,7 @@ import org.eclipse.jetty.websocket.api.UpgradeResponse; import org.eclipse.jetty.websocket.api.WebSocketBehavior; import org.eclipse.jetty.websocket.api.WebSocketException; import org.eclipse.jetty.websocket.api.WebSocketPolicy; +import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory; import org.eclipse.jetty.websocket.api.extensions.Frame; import org.eclipse.jetty.websocket.api.extensions.IncomingFrames; @@ -74,8 +75,32 @@ import org.eclipse.jetty.websocket.common.scopes.WebSocketSessionScope; public class WebSocketSession extends ContainerLifeCycle implements Session, RemoteEndpointFactory, WebSocketSessionScope, IncomingFrames, LogicalConnection.Listener, Connection.Listener { - private static final Logger LOG = Log.getLogger(WebSocketSession.class); - private static final Logger LOG_OPEN = Log.getLogger(WebSocketSession.class.getName() + "_OPEN"); + public class OnDisconnectCallback implements WriteCallback + { + private final boolean outputOnly; + + public OnDisconnectCallback(boolean outputOnly) + { + this.outputOnly = outputOnly; + } + + @Override + public void writeFailed(Throwable x) + { + LOG.debug("writeFailed()", x); + disconnect(outputOnly); + } + + @Override + public void writeSuccess() + { + LOG.debug("writeSuccess()"); + disconnect(outputOnly); + } + } + + private final Logger LOG; + private final WebSocketContainerScope containerScope; private final WebSocketPolicy policy; private final URI requestURI; @@ -108,6 +133,8 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem { Objects.requireNonNull(containerScope, "Container Scope cannot be null"); Objects.requireNonNull(requestURI, "Request URI cannot be null"); + + LOG = Log.getLogger(WebSocketSession.class.getName() + "_" + connection.getPolicy().getBehavior().name()); this.classLoader = Thread.currentThread().getContextClassLoader(); this.containerScope = containerScope; @@ -147,16 +174,31 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem @Override public void close(int statusCode, String reason) { - if(connectionState.onClosing()) + if (connectionState.onClosing()) { + LOG.debug("ConnectionState: Transition to CLOSING"); // This is the first CLOSE event - if(closeState.onLocal()) + if (closeState.onLocal()) { + LOG.debug("CloseState: Transition to LOCAL"); // this is Local initiated. CloseInfo closeInfo = new CloseInfo(statusCode, reason); Frame closeFrame = closeInfo.asFrame(); - outgoingHandler.outgoingFrame(closeFrame, new FrameCallback.Adapter(), BatchMode.AUTO); + outgoingHandler.outgoingFrame(closeFrame, new OnDisconnectCallback(true), BatchMode.AUTO); } + else + { + LOG.debug("CloseState: Expected LOCAL, but was " + closeState.get()); + } + } + else if(connectionState.onClosed()) + { + LOG.debug("ConnectionState: Transition to CLOSED"); + + // This is the reply to the CLOSING entry point + CloseInfo closeInfo = new CloseInfo(statusCode, reason); + Frame closeFrame = closeInfo.asFrame(); + outgoingHandler.outgoingFrame(closeFrame, new OnDisconnectCallback(false), BatchMode.AUTO); } } @@ -166,24 +208,25 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem @Override public void disconnect() { + disconnect(true); + } + + private void disconnect(boolean outputOnly) + { + if(connectionState.onClosing()) + { + // Is this is a harsh disconnect: OPEN -> CLOSING -> CLOSED + if (closeState.onAbnormal()) + { + // notify local endpoint of harsh disconnect + notifyClose(StatusCode.SHUTDOWN, "Harsh disconnect"); + } + } + if(connectionState.onClosed()) { - connection.disconnect(); - - // TODO: notify local endpoint onClose() ? - // TODO: notifyClose(close.getStatusCode(), close.getReason()); - - try - { - if (LOG.isDebugEnabled()) - LOG.debug("{}.onSessionClosed()", containerScope.getClass().getSimpleName()); - containerScope.onSessionClosed(this); - } - catch (Throwable t) - { - LOG.ignore(t); - } - + // Transition: CLOSING -> CLOSED + connection.disconnect(outputOnly); if (closeState.onLocal()) { // notify local endpoint of harsh disconnect @@ -381,8 +424,8 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem @Override public RemoteEndpoint getRemote() { - if (LOG_OPEN.isDebugEnabled()) - LOG_OPEN.debug("[{}] {}.getRemote()", getPolicy().getBehavior(), this.getClass().getSimpleName()); + if (LOG.isDebugEnabled()) + LOG.debug("{}.getRemote()", this.getClass().getSimpleName()); AtomicConnectionState.State state = connectionState.get(); @@ -397,7 +440,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem @Override public InetSocketAddress getRemoteAddress() { - return remote.getInetSocketAddress(); + return connection.getRemoteAddress(); } public URI getRequestURI() @@ -460,19 +503,26 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem // process handshake if(connectionState.onClosing()) { + LOG.debug("ConnectionState: Transition to CLOSING"); // we transitioned to CLOSING state if(closeState.onRemote()) { + LOG.debug("CloseState: Transition to REMOTE"); // Remote initiated. // Send reply to remote close(close.getStatusCode(), close.getReason()); } else { + LOG.debug("CloseState: Already at LOCAL"); // Local initiated, this was the reply. disconnect(); } } + else + { + LOG.debug("ConnectionState: Not CLOSING: was " + connectionState.get()); + } callback.succeed(); return; @@ -655,6 +705,16 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem @Override public void onClosed(Connection connection) { + try + { + if (LOG.isDebugEnabled()) + LOG.debug("{}.onSessionClosed()", containerScope.getClass().getSimpleName()); + containerScope.onSessionClosed(this); + } + catch (Throwable t) + { + LOG.ignore(t); + } } /** @@ -664,8 +724,8 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem @Override public void onOpened(Connection connection) { - if (LOG_OPEN.isDebugEnabled()) - LOG_OPEN.debug("[{}] {}.onOpened()", getPolicy().getBehavior(), this.getClass().getSimpleName()); + if (LOG.isDebugEnabled()) + LOG.debug("{}.onOpened()", this.getClass().getSimpleName()); connectionState.onConnecting(); open(); } @@ -680,8 +740,8 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem */ public void open() { - if (LOG_OPEN.isDebugEnabled()) - LOG_OPEN.debug("[{}] {}.open()", getPolicy().getBehavior(), this.getClass().getSimpleName()); + if (LOG.isDebugEnabled()) + LOG.debug("{}.open()", this.getClass().getSimpleName()); if (remote != null) { @@ -696,8 +756,8 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem { // Connect remote remote = remoteEndpointFactory.newRemoteEndpoint(connection, outgoingHandler, getBatchMode()); - if (LOG_OPEN.isDebugEnabled()) - LOG_OPEN.debug("[{}] {}.open() remote={}", getPolicy().getBehavior(), this.getClass().getSimpleName(), remote); + if (LOG.isDebugEnabled()) + LOG.debug("{}.open() remote={}", this.getClass().getSimpleName(), remote); // Open WebSocket endpointFunctions.onOpen(this); @@ -726,6 +786,8 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem { openFuture.complete(this); } + + connection.fillInterested(); } } else diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java index 76e8aee4c16..cf6349ead92 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java @@ -42,7 +42,6 @@ import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.FrameCallback; import org.eclipse.jetty.websocket.api.SuspendToken; -import org.eclipse.jetty.websocket.api.WebSocketBehavior; import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig; @@ -93,21 +92,18 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp } } - private static final Logger LOG = Log.getLogger(AbstractWebSocketConnection.class); - private static final Logger LOG_OPEN = Log.getLogger(AbstractWebSocketConnection.class.getName() + ".OPEN"); - private static final Logger LOG_CLOSE = Log.getLogger(AbstractWebSocketConnection.class.getName() + ".CLOSE"); - + /** * Minimum size of a buffer is the determined to be what would be the maximum framing header size (not including payload) */ private static final int MIN_BUFFER_SIZE = Generator.MAX_HEADER_LENGTH; + private final Logger LOG; private final ByteBufferPool bufferPool; private final Scheduler scheduler; private final Generator generator; private final Parser parser; private final WebSocketPolicy policy; - private final WebSocketBehavior behavior; private final AtomicBoolean suspendToken; private final AtomicBoolean closed = new AtomicBoolean(); private final FrameFlusher flusher; @@ -121,13 +117,15 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp public AbstractWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool, ExtensionStack extensionStack) { super(endp,executor); + + LOG = Log.getLogger(AbstractWebSocketConnection.class.getName() + "_" + policy.getBehavior()); + this.id = String.format("%s:%d->%s:%d", endp.getLocalAddress().getAddress().getHostAddress(), endp.getLocalAddress().getPort(), endp.getRemoteAddress().getAddress().getHostAddress(), endp.getRemoteAddress().getPort()); this.policy = policy; - this.behavior = policy.getBehavior(); this.bufferPool = bufferPool; this.extensionStack = extensionStack; @@ -152,31 +150,32 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp } @Override - public void disconnect() + public void disconnect(boolean onlyOutput) { - if (LOG_CLOSE.isDebugEnabled()) - LOG_CLOSE.debug("{} disconnect()",behavior); - disconnect(false); - } - - private void disconnect(boolean onlyOutput) - { - if (LOG_CLOSE.isDebugEnabled()) - LOG_CLOSE.debug("{} disconnect({})",behavior,onlyOutput?"outputOnly":"both"); + if (LOG.isDebugEnabled()) + LOG.debug("disconnect({})", onlyOutput ? "OUTPUT_ONLY" : "BOTH"); + // close FrameFlusher, we cannot write anymore at this point. flusher.close(); + EndPoint endPoint = getEndPoint(); // We need to gently close first, to allow // SSL close alerts to be sent by Jetty - if (LOG_CLOSE.isDebugEnabled()) - LOG_CLOSE.debug("Shutting down output {}",endPoint); + if (LOG.isDebugEnabled()) + LOG.debug("Shutting down output {}",endPoint); + endPoint.shutdownOutput(); + if (!onlyOutput) { - if (LOG_CLOSE.isDebugEnabled()) - LOG_CLOSE.debug("Closing {}",endPoint); + if (LOG.isDebugEnabled()) + LOG.debug("Closing {}",endPoint); endPoint.close(); } + else + { + closed.set(true); + } } protected void execute(Runnable task) @@ -269,9 +268,12 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp public void onClose() { if (LOG.isDebugEnabled()) - LOG.debug("{} onClose()",behavior); - super.onClose(); + LOG.debug("onClose()"); + + closed.set(true); + flusher.close(); + super.onClose(); } @Override @@ -322,7 +324,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp { try { - while (true) + while (isOpen()) { if (suspendToken.get()) { @@ -411,8 +413,8 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp @Override public void onOpen() { - if(LOG_OPEN.isDebugEnabled()) - LOG_OPEN.debug("[{}] {}.onOpened()",behavior,this.getClass().getSimpleName()); + if(LOG.isDebugEnabled()) + LOG.debug("{}.onOpened()",this.getClass().getSimpleName()); super.onOpen(); } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java index a32447e4a95..20ae0b33b95 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java @@ -79,12 +79,12 @@ public class FrameFlusher ByteBuffer payload = entry.frame.getPayload(); if (BufferUtil.hasContent(payload)) { - BufferUtil.append(aggregate,payload); + BufferUtil.put(payload, aggregate); } } if (LOG.isDebugEnabled()) { - LOG.debug("{} aggregated {} frames: {}",FrameFlusher.this,entries.size(),entries); + LOG.debug("{} aggregated {} frames in {}: {}", FrameFlusher.this, entries.size(), aggregate, entries); } succeeded(); return Action.SCHEDULED; @@ -113,6 +113,7 @@ public class FrameFlusher { if (!BufferUtil.isEmpty(aggregate)) { + aggregate.flip(); buffers.add(aggregate); if (LOG.isDebugEnabled()) { diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FutureWriteCallback.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FutureWriteCallback.java index 0cdd5c497d4..7824da06d96 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FutureWriteCallback.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FutureWriteCallback.java @@ -35,16 +35,12 @@ public class FutureWriteCallback extends FutureCallback implements WriteCallback @Override public void writeFailed(Throwable cause) { - if (LOG.isDebugEnabled()) - LOG.debug(".writeFailed",cause); failed(cause); } @Override public void writeSuccess() { - if (LOG.isDebugEnabled()) - LOG.debug(".writeSuccess"); succeeded(); } } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/package-info.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/package-info.java index 8383edcd320..4c433e33b33 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/package-info.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/package-info.java @@ -20,10 +20,12 @@ * Jetty WebSocket Common : Implementation [Internal Use Only] *

* A core set of internal implementation classes for the Jetty WebSocket API. + *

*

* Note: do not reference or use classes present in this package space in your code.
* Restrict your usage to the Jetty WebSocket API classes, the Jetty WebSocket Client API, * or the Jetty WebSocket Servlet API. + *

*/ package org.eclipse.jetty.websocket.common; diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/LocalWebSocketConnection.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/LocalWebSocketConnection.java index ec173b5751d..2e8553ad453 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/LocalWebSocketConnection.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/LocalWebSocketConnection.java @@ -74,12 +74,17 @@ public class LocalWebSocketConnection implements LogicalConnection } @Override - public void disconnect() + public void disconnect(boolean outputOnly) { if (LOG.isDebugEnabled()) - LOG.debug("disconnect()"); + LOG.debug("disconnect({})", outputOnly); } - + + @Override + public void fillInterested() + { + } + @Override public ByteBufferPool getBufferPool() { diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/DummyConnection.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/DummyConnection.java index a5b3e163d43..f628c7d8251 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/DummyConnection.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/DummyConnection.java @@ -45,10 +45,15 @@ public class DummyConnection implements LogicalConnection } @Override - public void disconnect() + public void disconnect(boolean outputOnly) { } - + + @Override + public void fillInterested() + { + } + @Override public ByteBufferPool getBufferPool() { diff --git a/jetty-websocket/websocket-common/websocket-states.txt b/jetty-websocket/websocket-common/websocket-states.txt new file mode 100644 index 00000000000..60f992b5911 --- /dev/null +++ b/jetty-websocket/websocket-common/websocket-states.txt @@ -0,0 +1,74 @@ +Documenting the States of a WebSocket. + + +NEW: + + A new WebSocket session. + It has had no connection attempted yet. + +CONNECTING: + + The connection is being attempted, along with the Upgrade handshake. + +CONNECTED: + + The connection is established. + The User WebSocket Endpoint has not been notified yet. + +OPEN: + + User WebSocket Endpoint has been Opened (the onOpen method has called) + +CLOSING: + + The close handshake has begun. + Either the local initiated the close, waiting for the remote to reply. + Or the remote initiated the close, and the local hasn't replied yet. + This can be considered a logical half-closed state. + +CLOSED: + + The connection and session is closed. + This means either the close handshake completed, or the connection was + disconnected for other reasons. + +---- + +Normal Client Initiated Close State Transition + + WSEndpoint created + Http GET w/Upgrade initiated + State: CONNECTING + Upgrade Handshake negotiated (HTTP 101 response) + State: CONNECTED + WSEndpoint.onOpen() called + State: OPEN + WSEndpoint.onMessage() + Session.close(local close details) + Connection disconnect output + State: CLOSING + Remote: Received CLOSE Frame + Connection disconnect completely + WSEndpoint.onClose(remote close details) + State: CLOSED + +---- + +Normal Remote Initiated Close State Transition + + WSEndpoint created + Http GET w/Upgrade initiated + State: CONNECTING + Upgrade Handshake negotiated (HTTP 101 response) + State: CONNECTED + WSEndpoint.onOpen() called + State: OPEN + WSEndpoint.onMessage() + Remote: Receive CLOSE frame + State: CLOSING + Session.close(remote close details) + Connection disconnect completely + WSEndpoint.onClose(local close details) + State: CLOSED + + diff --git a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketHandshake.java b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketHandshake.java index 74569ec1972..7d24e26011c 100644 --- a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketHandshake.java +++ b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketHandshake.java @@ -32,5 +32,5 @@ public interface WebSocketHandshake * @param response the response * @throws IOException if unable to handshake */ - public void doHandshakeResponse(ServletUpgradeRequest request, ServletUpgradeResponse response) throws IOException; + void doHandshakeResponse(ServletUpgradeRequest request, ServletUpgradeResponse response) throws IOException; } diff --git a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java index c0cc1f5fc89..504b02c9d4b 100644 --- a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java +++ b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -243,6 +244,16 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc } this.sessionFactories.add(sessionFactory); } + + public void setSessionFactories(SessionFactory... factories) + { + if (factories == null || factories.length < 1) + { + throw new IllegalStateException("Must declare SessionFactory implementations"); + } + this.sessionFactories.clear(); + this.sessionFactories.addAll(Arrays.asList(factories)); + } private WebSocketSession createSession(URI requestURI, Object websocket, LogicalConnection connection) { @@ -565,6 +576,7 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc // Setup Session WebSocketSession session = createSession(request.getRequestURI(), websocket, wsConnection); session.setUpgradeRequest(request); + // set true negotiated extension list back to response response.setExtensions(extensionStack.getNegotiatedExtensions()); session.setUpgradeResponse(response); @@ -589,9 +601,6 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc // Tell jetty about the new upgraded connection request.setServletAttribute(HttpConnection.UPGRADE_CONNECTION_ATTRIBUTE, wsConnection); - if (LOG.isDebugEnabled()) - LOG.debug("Handshake Response: {}", handshaker); - if (getSendServerVersion(connector)) response.setHeader("Server", HttpConfiguration.SERVER_VERSION); @@ -599,7 +608,7 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc handshaker.doHandshakeResponse(request, response); if (LOG.isDebugEnabled()) - LOG.debug("Websocket upgrade {} {} {} {}", request.getRequestURI(), version, response.getAcceptedSubProtocol(), wsConnection); + LOG.debug("Websocket upgrade {} v={} subprotocol={} connection={}", request.getRequestURI(), version, response.getAcceptedSubProtocol(), wsConnection); return true; } diff --git a/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/BlockerFrameCallback.java b/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/BlockerFrameCallback.java new file mode 100644 index 00000000000..bf0eb750c5f --- /dev/null +++ b/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/BlockerFrameCallback.java @@ -0,0 +1,46 @@ +// +// ======================================================================== +// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.websocket.api.FrameCallback; + +public class BlockerFrameCallback implements FrameCallback +{ + private CompletableFuture future = new CompletableFuture<>(); + + @Override + public void fail(Throwable cause) + { + future.completeExceptionally(cause); + } + + @Override + public void succeed() + { + future.complete(null); + } + + public void block() throws Exception + { + future.get(1, TimeUnit.MINUTES); + } +} \ No newline at end of file diff --git a/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/Fuzzer.java b/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/Fuzzer.java index e782e0c8198..aa54bdccd12 100644 --- a/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/Fuzzer.java +++ b/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/Fuzzer.java @@ -32,7 +32,6 @@ import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; import java.util.Locale; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -42,7 +41,6 @@ import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.websocket.api.BatchMode; -import org.eclipse.jetty.websocket.api.FrameCallback; import org.eclipse.jetty.websocket.api.extensions.Frame; import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; import org.eclipse.jetty.websocket.common.CloseInfo; @@ -55,28 +53,6 @@ import org.eclipse.jetty.websocket.common.WebSocketFrame; */ public class Fuzzer extends ContainerLifeCycle { - public static class BlockerCallback implements FrameCallback - { - private CompletableFuture future = new CompletableFuture<>(); - - @Override - public void fail(Throwable cause) - { - future.completeExceptionally(cause); - } - - @Override - public void succeed() - { - future.complete(null); - } - - public void block() throws Exception - { - future.get(1, TimeUnit.MINUTES); - } - } - public static class Session implements AutoCloseable { // Client side framing mask @@ -249,7 +225,7 @@ public class Fuzzer extends ContainerLifeCycle { for (WebSocketFrame f : send) { - BlockerCallback blocker = new BlockerCallback(); + BlockerFrameCallback blocker = new BlockerFrameCallback(); session.getOutgoingHandler().outgoingFrame(f, blocker, BatchMode.OFF); blocker.block(); } diff --git a/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/TrackingEndpoint.java b/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/TrackingEndpoint.java new file mode 100644 index 00000000000..f1dcc17c821 --- /dev/null +++ b/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/TrackingEndpoint.java @@ -0,0 +1,217 @@ +// +// ======================================================================== +// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.RemoteEndpoint; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.WebSocketFrameListener; +import org.eclipse.jetty.websocket.api.WebSocketListener; +import org.eclipse.jetty.websocket.api.extensions.Frame; +import org.eclipse.jetty.websocket.common.CloseInfo; +import org.eclipse.jetty.websocket.common.WebSocketFrame; +import org.eclipse.jetty.websocket.common.WebSocketSession; +import org.hamcrest.Matcher; +import org.hamcrest.Matchers; + +public class TrackingEndpoint implements WebSocketListener, WebSocketFrameListener +{ + private final Logger LOG; + + public CountDownLatch openLatch = new CountDownLatch(1); + public CountDownLatch closeLatch = new CountDownLatch(1); + public AtomicReference closeInfo = new AtomicReference<>(); + public AtomicReference error = new AtomicReference<>(); + + private CompletableFuture> expectedMessagesFuture = new CompletableFuture<>(); + private AtomicReference expectedMessageCount = new AtomicReference<>(); + private List messages = new ArrayList<>(); + + private CompletableFuture> expectedFramesFuture = new CompletableFuture<>(); + private AtomicReference expectedFramesCount = new AtomicReference<>(); + private List frames = new ArrayList<>(); + private WebSocketSession session; + + public TrackingEndpoint(String id) + { + LOG = Log.getLogger(this.getClass().getName() + "_" + id); + } + + public void assertClose(String prefix, int expectedCloseStatusCode, Matcher reasonMatcher) throws InterruptedException + { + assertThat(prefix + " endpoint close event received", closeLatch.await(10, TimeUnit.SECONDS), Matchers.is(true)); + CloseInfo close = closeInfo.get(); + assertThat(prefix + " close info", close, Matchers.notNullValue()); + assertThat(prefix + " received close code", close.getStatusCode(), Matchers.is(expectedCloseStatusCode)); + assertThat(prefix + " received close reason", close.getReason(), reasonMatcher); + } + + public void close(int statusCode, String reason) + { + this.session.close(statusCode, reason); + } + + public Future> expectedFrames(int expectedCount) + { + synchronized (expectedFramesCount) + { + if (!expectedFramesCount.compareAndSet(null, expectedCount)) + { + throw new IllegalStateException("Can only have 1 registered frame count future"); + } + else + { + checkFrameCount(); + } + } + return expectedFramesFuture; + } + + public Future> expectedMessages(int expectedCount) + { + synchronized (expectedMessagesFuture) + { + if (!expectedMessageCount.compareAndSet(null, expectedCount)) + { + throw new IllegalStateException("Can only have 1 registered message count future"); + } + else + { + checkMessageCount(); + } + } + return expectedMessagesFuture; + } + + public RemoteEndpoint getRemote() + { + return session.getRemote(); + } + + @Override + public void onWebSocketBinary(byte[] payload, int offset, int len) + { + if (LOG.isDebugEnabled()) + { + LOG.info("onWSBinary({})", BufferUtil.toDetailString(ByteBuffer.wrap(payload, offset, len))); + } + } + + @Override + public void onWebSocketClose(int statusCode, String reason) + { + this.closeLatch.countDown(); + CloseInfo close = new CloseInfo(statusCode, reason); + assertThat("Close only happened once", closeInfo.compareAndSet(null, close), is(true)); + } + + @Override + public void onWebSocketConnect(Session session) + { + assertThat("Session type", session, instanceOf(WebSocketSession.class)); + this.session = (WebSocketSession) session; + if (LOG.isDebugEnabled()) + { + LOG.debug("onWebSocketConnect()"); + } + this.openLatch.countDown(); + } + + @Override + public void onWebSocketError(Throwable cause) + { + assertThat("Error must have value", cause, notNullValue()); + if (error.compareAndSet(null, cause) == false) + { + LOG.warn("Original Cause", error.get()); + LOG.warn("Extra/Excess Cause", cause); + fail("onError should only happen once!"); + } + + this.expectedMessagesFuture.completeExceptionally(cause); + this.expectedFramesFuture.completeExceptionally(cause); + } + + @Override + public void onWebSocketFrame(Frame frame) + { + if (LOG.isDebugEnabled()) + { + LOG.debug("onWSFrame({})", frame); + } + + synchronized (expectedFramesFuture) + { + frames.add(WebSocketFrame.copy(frame)); + checkFrameCount(); + } + } + + @Override + public void onWebSocketText(String text) + { + if (LOG.isDebugEnabled()) + { + LOG.debug("onWSText(\"{}\")", text); + } + + synchronized (expectedMessagesFuture) + { + messages.add(text); + checkMessageCount(); + } + } + + private void checkMessageCount() + { + Integer expected = expectedMessageCount.get(); + + if (expected != null && messages.size() >= expected.intValue()) + { + expectedMessagesFuture.complete(messages); + } + } + + private void checkFrameCount() + { + Integer expected = expectedFramesCount.get(); + + if (expected != null && frames.size() >= expected.intValue()) + { + expectedFramesFuture.complete(frames); + } + } +} diff --git a/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/UntrustedWSClient.java b/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/UntrustedWSClient.java index e281012e0f8..783096e195d 100644 --- a/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/UntrustedWSClient.java +++ b/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/UntrustedWSClient.java @@ -30,6 +30,7 @@ import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.WebSocketBehavior; import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; import org.eclipse.jetty.websocket.client.WebSocketClient; @@ -51,7 +52,7 @@ public class UntrustedWSClient extends WebSocketClient public Future connect(URI toUri, ClientUpgradeRequest req) throws IOException { - final Future connectFut = super.connect(new UntrustedWSEndpoint(), toUri, req); + final Future connectFut = super.connect(new UntrustedWSEndpoint(WebSocketBehavior.CLIENT.name()), toUri, req); return new CompletableFuture() { @Override public UntrustedWSSession get() throws InterruptedException, ExecutionException diff --git a/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/UntrustedWSConnection.java b/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/UntrustedWSConnection.java index 4d17d93a0de..dcf8c34b830 100644 --- a/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/UntrustedWSConnection.java +++ b/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/UntrustedWSConnection.java @@ -27,6 +27,8 @@ import org.eclipse.jetty.util.SharedBlockingCallback; import org.eclipse.jetty.util.SharedBlockingCallback.Blocker; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.BatchMode; +import org.eclipse.jetty.websocket.api.extensions.Frame; import org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection; /** @@ -53,6 +55,17 @@ public class UntrustedWSConnection internalConnection.getEndPoint().flush(); } + /** + * Forward a frame to the {@Link OutgoingFrames} handler + * @param frame + */ + public void write(Frame frame) throws Exception + { + BlockerFrameCallback blocker = new BlockerFrameCallback(); + this.internalConnection.outgoingFrame(frame, blocker, BatchMode.OFF); + blocker.block(); + } + /** * Write arbitrary bytes out the active connection. * diff --git a/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/UntrustedWSEndpoint.java b/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/UntrustedWSEndpoint.java index cc45c8d6962..5be288599ff 100644 --- a/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/UntrustedWSEndpoint.java +++ b/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/UntrustedWSEndpoint.java @@ -18,136 +18,121 @@ package org.eclipse.jetty.websocket.tests; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; -import java.util.ArrayList; -import java.util.List; +import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.WebSocketFrameListener; import org.eclipse.jetty.websocket.api.WebSocketListener; -import org.eclipse.jetty.websocket.api.extensions.Frame; -import org.eclipse.jetty.websocket.common.CloseInfo; -import org.eclipse.jetty.websocket.common.WebSocketFrame; -public class UntrustedWSEndpoint implements WebSocketListener, WebSocketFrameListener +public class UntrustedWSEndpoint extends TrackingEndpoint implements WebSocketListener, WebSocketFrameListener { private static final Logger LOG = Log.getLogger(UntrustedWSEndpoint.class); - @SuppressWarnings("unused") - private Session session; - public CountDownLatch openLatch = new CountDownLatch(1); - public CountDownLatch closeLatch = new CountDownLatch(1); - public AtomicReference closeInfo = new AtomicReference<>(); - public AtomicReference error = new AtomicReference<>(); + private UntrustedWSSession untrustedSession; + private CompletableFuture connectFuture; - private CompletableFuture> expectedMessagesFuture = new CompletableFuture<>(); - private AtomicReference expectedMessageCount = new AtomicReference<>(); - private List messages = new ArrayList<>(); + private BiFunction onTextFunction; + private BiFunction onBinaryFunction; - private CompletableFuture> expectedFramesFuture = new CompletableFuture<>(); - private AtomicReference expectedFramesCount = new AtomicReference<>(); - private List frames = new ArrayList<>(); - - public Future> expectedFrames(int expectedCount) + public CompletableFuture getConnectFuture() { - if (!expectedFramesCount.compareAndSet(null, expectedCount)) - { - throw new IllegalStateException("Can only have 1 registered frame count future"); - } - return expectedFramesFuture; + return connectFuture; } - public Future> expectedMessages(int expectedCount) + public UntrustedWSEndpoint(String id) { - if (!expectedMessageCount.compareAndSet(null, expectedCount)) - { - throw new IllegalStateException("Can only have 1 registered message count future"); - } - return expectedMessagesFuture; + super(id); } @Override public void onWebSocketConnect(Session session) { - this.session = session; - this.openLatch.countDown(); - } - - @Override - public void onWebSocketClose(int statusCode, String reason) - { - this.closeLatch.countDown(); - CloseInfo close = new CloseInfo(statusCode, reason); - assertThat("Close only happened once", closeInfo.compareAndSet(null, close), is(true)); + assertThat("Session type", session, instanceOf(UntrustedWSSession.class)); + this.untrustedSession = (UntrustedWSSession) session; + if (this.connectFuture != null) + { + this.connectFuture.complete(this.untrustedSession); + } + + super.onWebSocketConnect(session); } @Override public void onWebSocketError(Throwable cause) { - assertThat("Error must have value", cause, notNullValue()); - if (error.compareAndSet(null, cause) == false) + if (this.connectFuture != null) { - LOG.warn("Original Cause", error.get()); - LOG.warn("Extra/Excess Cause", cause); - fail("onError should only happen once!"); + // Always trip this, doesn't matter if if completed normally first. + this.connectFuture.completeExceptionally(cause); } - synchronized (expectedMessagesFuture) - { - if (expectedMessagesFuture != null) - expectedMessagesFuture.completeExceptionally(cause); - } - - synchronized (expectedFramesFuture) - { - if (expectedFramesFuture != null) - expectedFramesFuture.completeExceptionally(cause); - } + super.onWebSocketError(cause); } @Override public void onWebSocketBinary(byte[] payload, int offset, int len) { - // TODO + super.onWebSocketBinary(payload, offset, len); + + if (onBinaryFunction != null) + { + try + { + ByteBuffer msg = ByteBuffer.wrap(payload, offset, len); + ByteBuffer responseBuffer = onBinaryFunction.apply(this.untrustedSession, msg); + if (responseBuffer != null) + { + this.getRemote().sendBytes(responseBuffer); + } + } + catch (Throwable t) + { + LOG.warn("Unable to send binary", t); + } + } } @Override public void onWebSocketText(String text) { - messages.add(text); - synchronized (expectedMessagesFuture) + super.onWebSocketText(text); + + if (onTextFunction != null) { - Integer expected = expectedMessageCount.get(); - - if (expected != null && messages.size() >= expected.intValue()) + try { - expectedMessagesFuture.complete(messages); + String responseText = onTextFunction.apply(this.untrustedSession, text); + if (responseText != null) + { + this.getRemote().sendString(responseText); + } + } + catch (Throwable t) + { + LOG.warn("Unable to send text", t); } } } - @Override - public void onWebSocketFrame(Frame frame) + public void setConnectFuture(CompletableFuture future) { - frames.add(WebSocketFrame.copy(frame)); - synchronized (expectedFramesFuture) - { - Integer expected = expectedFramesCount.get(); - - if (expected != null && frames.size() >= expected.intValue()) - { - expectedFramesFuture.complete(frames); - } - } + this.connectFuture = future; + } + + public void setOnBinaryFunction(BiFunction onBinaryFunction) + { + this.onBinaryFunction = onBinaryFunction; + } + + public void setOnTextFunction(BiFunction onTextFunction) + { + this.onTextFunction = onTextFunction; } } diff --git a/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/UntrustedWSServer.java b/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/UntrustedWSServer.java new file mode 100644 index 00000000000..76103d6c392 --- /dev/null +++ b/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/UntrustedWSServer.java @@ -0,0 +1,145 @@ +// +// ======================================================================== +// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests; + +import java.net.URI; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.server.HttpConfiguration; +import org.eclipse.jetty.server.HttpConnectionFactory; +import org.eclipse.jetty.server.SecureRequestCustomizer; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.SslConnectionFactory; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.toolchain.test.MavenTestingUtils; +import org.eclipse.jetty.util.component.ContainerLifeCycle; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.eclipse.jetty.websocket.api.util.WSURI; + +public class UntrustedWSServer extends ContainerLifeCycle implements UntrustedWSSessionFactory.Listener +{ + private static final Logger LOG = Log.getLogger(SimpleServletServer.class); + private Server server; + private ServerConnector connector; + private URI wsUri; + private boolean ssl = false; + private SslContextFactory sslContextFactory; + + private Map> connectionFutures = new ConcurrentHashMap<>(); + + @Override + protected void doStart() throws Exception + { + // Configure Server + server = new Server(); + if (ssl) + { + // HTTP Configuration + HttpConfiguration http_config = new HttpConfiguration(); + http_config.setSecureScheme("https"); + http_config.setSecurePort(0); + http_config.setOutputBufferSize(32768); + http_config.setRequestHeaderSize(8192); + http_config.setResponseHeaderSize(8192); + http_config.setSendServerVersion(true); + http_config.setSendDateHeader(false); + + sslContextFactory = new SslContextFactory(); + sslContextFactory.setKeyStorePath(MavenTestingUtils.getTestResourceFile("keystore").getAbsolutePath()); + sslContextFactory.setKeyStorePassword("storepwd"); + sslContextFactory.setKeyManagerPassword("keypwd"); + sslContextFactory.setExcludeCipherSuites("SSL_RSA_WITH_DES_CBC_SHA","SSL_DHE_RSA_WITH_DES_CBC_SHA","SSL_DHE_DSS_WITH_DES_CBC_SHA", + "SSL_RSA_EXPORT_WITH_RC4_40_MD5","SSL_RSA_EXPORT_WITH_DES40_CBC_SHA","SSL_DHE_RSA_EXPORT_WITH_DES40_CBC_SHA", + "SSL_DHE_DSS_EXPORT_WITH_DES40_CBC_SHA"); + + // SSL HTTP Configuration + HttpConfiguration https_config = new HttpConfiguration(http_config); + https_config.addCustomizer(new SecureRequestCustomizer()); + + // SSL Connector + connector = new ServerConnector(server,new SslConnectionFactory(sslContextFactory, HttpVersion.HTTP_1_1.asString()),new HttpConnectionFactory(https_config)); + connector.setPort(0); + } + else + { + // Basic HTTP connector + connector = new ServerConnector(server); + connector.setPort(0); + } + server.addConnector(connector); + + ServletContextHandler context = new ServletContextHandler(); + context.setContextPath("/"); + server.setHandler(context); + + // Serve untrusted endpoint + context.addServlet(UntrustedWSServlet.class, "/untrusted/*").setInitOrder(1); + + // Start Server + addBean(server); + + super.doStart(); + + // Wireup Context related things + UntrustedWSSessionFactory sessionFactory = (UntrustedWSSessionFactory) context.getServletContext().getAttribute(UntrustedWSSessionFactory.class.getName()); + sessionFactory.addListener(this); + + // Establish the Server URI + URI serverUri = server.getURI(); + wsUri = WSURI.toWebsocket(serverUri).resolve("/"); + + // Some debugging + if (LOG.isDebugEnabled()) + { + LOG.debug("WebSocket Server URI: " + wsUri.toASCIIString()); + LOG.debug(server.dump()); + } + + super.doStart(); + } + + public URI getWsUri() + { + return wsUri; + } + + @Override + public void onSessionCreate(UntrustedWSSession session, URI requestURI) + { + // A new session was created (but not connected, yet) + CompletableFuture sessionFuture = this.connectionFutures.get(requestURI); + if(sessionFuture != null) + { + session.getUntrustedEndpoint().setConnectFuture(sessionFuture); + } + + this.connectionFutures.put(requestURI, session.getUntrustedEndpoint().getConnectFuture()); + } + + public void registerConnectFuture(URI uri, CompletableFuture sessionFuture) + { + this.connectionFutures.put(uri, sessionFuture); + } +} diff --git a/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/UntrustedWSServlet.java b/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/UntrustedWSServlet.java new file mode 100644 index 00000000000..0fc25d8a105 --- /dev/null +++ b/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/UntrustedWSServlet.java @@ -0,0 +1,46 @@ +// +// ======================================================================== +// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests; + +import org.eclipse.jetty.websocket.api.WebSocketBehavior; +import org.eclipse.jetty.websocket.server.WebSocketServerFactory; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; +import org.eclipse.jetty.websocket.servlet.WebSocketCreator; +import org.eclipse.jetty.websocket.servlet.WebSocketServlet; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; + +public class UntrustedWSServlet extends WebSocketServlet implements WebSocketCreator +{ + @Override + public void configure(WebSocketServletFactory factory) + { + WebSocketServerFactory serverFactory = (WebSocketServerFactory) factory; + serverFactory.setCreator(this); + UntrustedWSSessionFactory sessionFactory = new UntrustedWSSessionFactory(serverFactory); + this.getServletContext().setAttribute(UntrustedWSSessionFactory.class.getName(), sessionFactory); + serverFactory.setSessionFactories(sessionFactory); + } + + @Override + public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp) + { + return new UntrustedWSEndpoint(WebSocketBehavior.SERVER.name()); + } +} diff --git a/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/UntrustedWSSessionFactory.java b/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/UntrustedWSSessionFactory.java index 12552b2e8e2..d7c62d463ab 100644 --- a/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/UntrustedWSSessionFactory.java +++ b/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/UntrustedWSSessionFactory.java @@ -19,7 +19,11 @@ package org.eclipse.jetty.websocket.tests; import java.net.URI; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.websocket.api.WebSocketConnectionListener; import org.eclipse.jetty.websocket.api.annotations.WebSocket; import org.eclipse.jetty.websocket.common.LogicalConnection; @@ -29,13 +33,31 @@ import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope; public class UntrustedWSSessionFactory implements SessionFactory { + interface Listener + { + void onSessionCreate(UntrustedWSSession session, URI requestURI); + } + + private final static Logger LOG = Log.getLogger(UntrustedWSSessionFactory.class); + private final WebSocketContainerScope containerScope; + private final List listeners = new CopyOnWriteArrayList<>(); public UntrustedWSSessionFactory(WebSocketContainerScope containerScope) { this.containerScope = containerScope; } + public boolean addListener(Listener listener) + { + return this.listeners.add(listener); + } + + public boolean removeListener(Listener listener) + { + return this.listeners.remove(listener); + } + @Override public boolean supports(Object websocket) { @@ -45,6 +67,17 @@ public class UntrustedWSSessionFactory implements SessionFactory @Override public WebSocketSession createSession(URI requestURI, Object websocket, LogicalConnection connection) { - return new UntrustedWSSession(containerScope, requestURI, websocket, connection); + final UntrustedWSSession session = new UntrustedWSSession(containerScope, requestURI, websocket, connection); + listeners.forEach((listener) -> { + try + { + listener.onSessionCreate(session, requestURI); + } + catch (Throwable t) + { + LOG.warn("Unable to notify listener " + listener, t); + } + }); + return session; } } diff --git a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/BadNetworkTest.java b/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/BadNetworkTest.java similarity index 63% rename from jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/BadNetworkTest.java rename to jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/BadNetworkTest.java index 61d6a18794c..c9c321291de 100644 --- a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/BadNetworkTest.java +++ b/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/BadNetworkTest.java @@ -16,24 +16,26 @@ // ======================================================================== // -package org.eclipse.jetty.websocket.client; +package org.eclipse.jetty.websocket.tests.client; import static org.hamcrest.CoreMatchers.containsString; import java.net.URI; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import org.eclipse.jetty.toolchain.test.TestTracker; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.StatusCode; -import org.eclipse.jetty.websocket.common.test.IBlockheadServerConnection; -import org.eclipse.jetty.websocket.common.test.LeakTrackingBufferPoolRule; -import org.eclipse.jetty.websocket.common.test.XBlockheadServer; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.eclipse.jetty.websocket.tests.LeakTrackingBufferPoolRule; +import org.eclipse.jetty.websocket.tests.UntrustedWSServer; +import org.eclipse.jetty.websocket.tests.UntrustedWSSession; import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestName; /** * Tests for conditions due to bad networking. @@ -41,14 +43,14 @@ import org.junit.Test; public class BadNetworkTest { @Rule - public TestTracker tt = new TestTracker(); - + public TestName testname = new TestName(); + @Rule public LeakTrackingBufferPoolRule bufferPool = new LeakTrackingBufferPoolRule("Test"); - - private XBlockheadServer server; + + private UntrustedWSServer server; private WebSocketClient client; - + @Before public void startClient() throws Exception { @@ -56,75 +58,79 @@ public class BadNetworkTest client.getPolicy().setIdleTimeout(250); client.start(); } - + @Before public void startServer() throws Exception { - server = new XBlockheadServer(); + server = new UntrustedWSServer(); server.start(); } - + @After public void stopClient() throws Exception { client.stop(); } - + @After public void stopServer() throws Exception { server.stop(); } - + @Test public void testAbruptClientClose() throws Exception { - JettyTrackingSocket wsocket = new JettyTrackingSocket(); - + TrackingSocket wsocket = new TrackingSocket(); + URI wsUri = server.getWsUri(); - Future future = client.connect(wsocket,wsUri); - - IBlockheadServerConnection ssocket = server.accept(); - ssocket.upgrade(); - + + Future future = client.connect(wsocket, wsUri); + // Validate that we are connected - future.get(30,TimeUnit.SECONDS); - wsocket.waitForConnected(30,TimeUnit.SECONDS); - + future.get(30, TimeUnit.SECONDS); + wsocket.waitForConnected(30, TimeUnit.SECONDS); + // Have client disconnect abruptly Session session = wsocket.getSession(); session.disconnect(); - + // Client Socket should see close - wsocket.waitForClose(10,TimeUnit.SECONDS); - + wsocket.waitForClose(10, TimeUnit.SECONDS); + // Client Socket should see a close event, with status NO_CLOSE // This event is automatically supplied by the underlying WebSocketClientConnection // in the situation of a bad network connection. wsocket.assertClose(StatusCode.NO_CLOSE, containsString("disconnect")); } - + @Test public void testAbruptServerClose() throws Exception { - JettyTrackingSocket wsocket = new JettyTrackingSocket(); - - URI wsUri = server.getWsUri(); - Future future = client.connect(wsocket,wsUri); - - IBlockheadServerConnection ssocket = server.accept(); - ssocket.upgrade(); - + TrackingSocket wsocket = new TrackingSocket(); + + URI wsURI = server.getWsUri().resolve("/untrusted/" + testname.getMethodName()); + + CompletableFuture sessionFuture = new CompletableFuture() + { + @Override + public boolean complete(UntrustedWSSession session) + { + // server disconnect + session.disconnect(); + return super.complete(session); + } + }; + server.registerConnectFuture(wsURI, sessionFuture); + Future future = client.connect(wsocket, wsURI); + // Validate that we are connected - future.get(30,TimeUnit.SECONDS); - wsocket.waitForConnected(30,TimeUnit.SECONDS); - - // Have server disconnect abruptly - ssocket.disconnect(); - + future.get(30, TimeUnit.SECONDS); + wsocket.waitForConnected(30, TimeUnit.SECONDS); + // Wait for close (as response to idle timeout) - wsocket.waitForClose(10,TimeUnit.SECONDS); - + wsocket.waitForClose(10, TimeUnit.SECONDS); + // Client Socket should see a close event, with status NO_CLOSE // This event is automatically supplied by the underlying WebSocketClientConnection // in the situation of a bad network connection. diff --git a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ClientCloseTest.java b/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ClientCloseTest.java similarity index 60% rename from jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ClientCloseTest.java rename to jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ClientCloseTest.java index 032eddac3f4..ad22f164b47 100644 --- a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ClientCloseTest.java +++ b/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ClientCloseTest.java @@ -16,7 +16,7 @@ // ======================================================================== // -package org.eclipse.jetty.websocket.client; +package org.eclipse.jetty.websocket.tests.client; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; @@ -29,15 +29,17 @@ import static org.junit.Assert.assertThat; import java.io.IOException; import java.lang.reflect.Field; import java.net.SocketTimeoutException; +import java.net.URI; import java.nio.ByteBuffer; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -57,25 +59,27 @@ import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.StacklessLogging; import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.websocket.api.ProtocolException; +import org.eclipse.jetty.websocket.api.RemoteEndpoint; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.StatusCode; import org.eclipse.jetty.websocket.api.WebSocketAdapter; -import org.eclipse.jetty.websocket.common.CloseInfo; +import org.eclipse.jetty.websocket.client.WebSocketClient; import org.eclipse.jetty.websocket.common.OpCode; import org.eclipse.jetty.websocket.common.Parser; import org.eclipse.jetty.websocket.common.WebSocketFrame; import org.eclipse.jetty.websocket.common.WebSocketSession; -import org.eclipse.jetty.websocket.common.frames.TextFrame; import org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection; -import org.eclipse.jetty.websocket.common.test.IBlockheadServerConnection; -import org.eclipse.jetty.websocket.common.test.IncomingFramesCapture; -import org.eclipse.jetty.websocket.common.test.RawFrameBuilder; -import org.eclipse.jetty.websocket.common.test.XBlockheadServer; +import org.eclipse.jetty.websocket.tests.RawFrameBuilder; +import org.eclipse.jetty.websocket.tests.UntrustedWSConnection; +import org.eclipse.jetty.websocket.tests.UntrustedWSEndpoint; +import org.eclipse.jetty.websocket.tests.UntrustedWSServer; +import org.eclipse.jetty.websocket.tests.UntrustedWSSession; import org.hamcrest.Matcher; import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestName; public class ClientCloseTest { @@ -83,203 +87,189 @@ public class ClientCloseTest private static class CloseTrackingSocket extends WebSocketAdapter { - private static final Logger LOG = Log.getLogger(CloseTrackingSocket.class); - + private static final Logger LOG = Log.getLogger(CloseTrackingSocket.class); + public int closeCode = -1; public String closeReason = null; public CountDownLatch closeLatch = new CountDownLatch(1); public AtomicInteger closeCount = new AtomicInteger(0); public CountDownLatch openLatch = new CountDownLatch(1); public CountDownLatch errorLatch = new CountDownLatch(1); - + public EventQueue messageQueue = new EventQueue<>(); public AtomicReference error = new AtomicReference<>(); - + public void assertNoCloseEvent() { - assertThat("Client Close Event",closeLatch.getCount(),is(1L)); - assertThat("Client Close Event Status Code ",closeCode,is(-1)); + assertThat("Client Close Event", closeLatch.getCount(), is(1L)); + assertThat("Client Close Event Status Code ", closeCode, is(-1)); } - + public void assertReceivedCloseEvent(int clientTimeoutMs, Matcher statusCodeMatcher, Matcher reasonMatcher) throws InterruptedException { long maxTimeout = clientTimeoutMs * 4; - - assertThat("Client Close Event Occurred",closeLatch.await(maxTimeout,TimeUnit.MILLISECONDS),is(true)); - assertThat("Client Close Event Count",closeCount.get(),is(1)); - assertThat("Client Close Event Status Code",closeCode,statusCodeMatcher); + + assertThat("Client Close Event Occurred", closeLatch.await(maxTimeout, TimeUnit.MILLISECONDS), is(true)); + assertThat("Client Close Event Count", closeCount.get(), is(1)); + assertThat("Client Close Event Status Code", closeCode, statusCodeMatcher); if (reasonMatcher == null) { - assertThat("Client Close Event Reason",closeReason,nullValue()); + assertThat("Client Close Event Reason", closeReason, nullValue()); } else { - assertThat("Client Close Event Reason",closeReason,reasonMatcher); + assertThat("Client Close Event Reason", closeReason, reasonMatcher); } } - + public void assertReceivedErrorEvent(int clientTimeoutMs, Class expectedCause, Matcher messageMatcher) throws InterruptedException { long maxTimeout = clientTimeoutMs * 4; - - assertThat("Client Error Event Occurred",errorLatch.await(maxTimeout,TimeUnit.MILLISECONDS),is(true)); + + assertThat("Client Error Event Occurred", errorLatch.await(maxTimeout, TimeUnit.MILLISECONDS), is(true)); assertThat("Client Error Type", error.get(), instanceOf(expectedCause)); assertThat("Client Error Message", error.get().getMessage(), messageMatcher); } - + public void clearQueues() { messageQueue.clear(); } - + @Override public void onWebSocketClose(int statusCode, String reason) { - LOG.debug("onWebSocketClose({},{})",statusCode,reason); - super.onWebSocketClose(statusCode,reason); + LOG.debug("onWebSocketClose({},{})", statusCode, reason); + super.onWebSocketClose(statusCode, reason); closeCount.incrementAndGet(); closeCode = statusCode; closeReason = reason; closeLatch.countDown(); } - + @Override public void onWebSocketConnect(Session session) { - LOG.debug("onWebSocketConnect({})",session); + LOG.debug("onWebSocketConnect({})", session); super.onWebSocketConnect(session); openLatch.countDown(); } - + @Override public void onWebSocketError(Throwable cause) { - LOG.warn("onWebSocketError",cause); + LOG.warn("onWebSocketError", cause); assertThat("Unique Error Event", error.compareAndSet(null, cause), is(true)); errorLatch.countDown(); } - + @Override public void onWebSocketText(String message) { - LOG.debug("onWebSocketText({})",message); + LOG.debug("onWebSocketText({})", message); messageQueue.offer(message); } - + public EndPoint getEndPoint() throws Exception { Session session = getSession(); - assertThat("Session type",session,instanceOf(WebSocketSession.class)); - - WebSocketSession wssession = (WebSocketSession)session; + assertThat("Session type", session, instanceOf(WebSocketSession.class)); + + WebSocketSession wssession = (WebSocketSession) session; Field fld = wssession.getClass().getDeclaredField("connection"); fld.setAccessible(true); - assertThat("Field: connection",fld,notNullValue()); - + assertThat("Field: connection", fld, notNullValue()); + Object val = fld.get(wssession); - assertThat("Connection type",val,instanceOf(AbstractWebSocketConnection.class)); + assertThat("Connection type", val, instanceOf(AbstractWebSocketConnection.class)); @SuppressWarnings("resource") - AbstractWebSocketConnection wsconn = (AbstractWebSocketConnection)val; + AbstractWebSocketConnection wsconn = (AbstractWebSocketConnection) val; return wsconn.getEndPoint(); } } - + + @Rule + public TestName testname = new TestName(); + @Rule public TestTracker tt = new TestTracker(); - - private XBlockheadServer server; + + private UntrustedWSServer server; private WebSocketClient client; - - private void confirmConnection(CloseTrackingSocket clientSocket, Future clientFuture, IBlockheadServerConnection serverConns) throws Exception + + private void confirmConnection(CloseTrackingSocket clientSocket, Future clientFuture, UntrustedWSSession serverSession) throws Exception { // Wait for client connect on via future - clientFuture.get(30,TimeUnit.SECONDS); - + clientFuture.get(30, TimeUnit.SECONDS); + // Wait for client connect via client websocket - assertThat("Client WebSocket is Open",clientSocket.openLatch.await(30,TimeUnit.SECONDS),is(true)); - + assertThat("Client WebSocket is Open", clientSocket.openLatch.await(30, TimeUnit.SECONDS), is(true)); + + UntrustedWSEndpoint serverEndpoint = serverSession.getUntrustedEndpoint(); + Future> futFrames = serverEndpoint.expectedFrames(1); + try { // Send message from client to server final String echoMsg = "echo-test"; Future testFut = clientSocket.getRemote().sendStringByFuture(echoMsg); - + // Wait for send future - testFut.get(30,TimeUnit.SECONDS); - + testFut.get(30, TimeUnit.SECONDS); + // Read Frame on server side - IncomingFramesCapture serverCapture = serverConns.readFrames(1,30,TimeUnit.SECONDS); - serverCapture.assertFrameCount(1); - WebSocketFrame frame = serverCapture.getFrames().poll(); - assertThat("Server received frame",frame.getOpCode(),is(OpCode.TEXT)); - assertThat("Server received frame payload",frame.getPayloadAsUTF8(),is(echoMsg)); - + List frames = futFrames.get(30, TimeUnit.SECONDS); + WebSocketFrame frame = frames.get(0); + assertThat("Server received frame", frame.getOpCode(), is(OpCode.TEXT)); + assertThat("Server received frame payload", frame.getPayloadAsUTF8(), is(echoMsg)); + // Server send echo reply - serverConns.write(new TextFrame().setPayload(echoMsg)); - + serverEndpoint.getRemote().sendString(echoMsg); + // Wait for received echo - clientSocket.messageQueue.awaitEventCount(1,1,TimeUnit.SECONDS); - + clientSocket.messageQueue.awaitEventCount(1, 1, TimeUnit.SECONDS); + // Verify received message String recvMsg = clientSocket.messageQueue.poll(); - assertThat("Received message",recvMsg,is(echoMsg)); - + assertThat("Received message", recvMsg, is(echoMsg)); + // Verify that there are no errors - assertThat("Error events",clientSocket.error.get(),nullValue()); + assertThat("Error events", clientSocket.error.get(), nullValue()); } finally { clientSocket.clearQueues(); } } - - private void confirmServerReceivedCloseFrame(IBlockheadServerConnection serverConn, int expectedCloseCode, Matcher closeReasonMatcher) throws IOException, - TimeoutException - { - IncomingFramesCapture serverCapture = serverConn.readFrames(1,30,TimeUnit.SECONDS); - serverCapture.assertFrameCount(1); - serverCapture.assertHasFrame(OpCode.CLOSE,1); - WebSocketFrame frame = serverCapture.getFrames().poll(); - assertThat("Server received close frame",frame.getOpCode(),is(OpCode.CLOSE)); - CloseInfo closeInfo = new CloseInfo(frame); - assertThat("Server received close code",closeInfo.getStatusCode(),is(expectedCloseCode)); - if (closeReasonMatcher == null) - { - assertThat("Server received close reason",closeInfo.getReason(),nullValue()); - } - else - { - assertThat("Server received close reason",closeInfo.getReason(),closeReasonMatcher); - } - } - + public static class TestClientTransportOverHTTP extends HttpClientTransportOverHTTP { @Override protected SelectorManager newSelectorManager(HttpClient client) { - return new ClientSelectorManager(client, 1){ + return new ClientSelectorManager(client, 1) + { @Override protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key) { - TestEndPoint endPoint = new TestEndPoint(channel,selector,key,getScheduler()); + TestEndPoint endPoint = new TestEndPoint(channel, selector, key, getScheduler()); endPoint.setIdleTimeout(client.getIdleTimeout()); return endPoint; } }; } } - + public static class TestEndPoint extends SocketChannelEndPoint { public AtomicBoolean congestedFlush = new AtomicBoolean(false); - + public TestEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler) { - super((SocketChannel)channel,selector,key,scheduler); + super((SocketChannel) channel, selector, key, scheduler); } - + @Override public boolean flush(ByteBuffer... buffers) throws IOException { @@ -288,7 +278,7 @@ public class ClientCloseTest return flushed; } } - + @Before public void startClient() throws Exception { @@ -297,101 +287,107 @@ public class ClientCloseTest client.addBean(httpClient); client.start(); } - + @Before public void startServer() throws Exception { - server = new XBlockheadServer(); + server = new UntrustedWSServer(); server.start(); } - + @After public void stopClient() throws Exception { client.stop(); } - + @After public void stopServer() throws Exception { server.stop(); } - + @Test public void testHalfClose() throws Exception { // Set client timeout final int timeout = 1000; client.setMaxIdleTimeout(timeout); - + + URI wsURI = server.getWsUri().resolve("/untrusted/" + testname.getMethodName()); + CompletableFuture serverSessionFut = new CompletableFuture<>(); + server.registerConnectFuture(wsURI, serverSessionFut); + // Client connects CloseTrackingSocket clientSocket = new CloseTrackingSocket(); - Future clientConnectFuture = client.connect(clientSocket,server.getWsUri()); - + Future clientConnectFuture = client.connect(clientSocket, wsURI); + // Server accepts connect - IBlockheadServerConnection serverConn = server.accept(); - serverConn.upgrade(); - + UntrustedWSSession serverSession = serverSessionFut.get(10, TimeUnit.SECONDS); + // client confirms connection via echo - confirmConnection(clientSocket,clientConnectFuture,serverConn); - + confirmConnection(clientSocket, clientConnectFuture, serverSession); + // client sends close frame (code 1000, normal) final String origCloseReason = "Normal Close"; - clientSocket.getSession().close(StatusCode.NORMAL,origCloseReason); - + clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason); + // server receives close frame - confirmServerReceivedCloseFrame(serverConn,StatusCode.NORMAL,is(origCloseReason)); - + serverSession.getUntrustedEndpoint().assertClose("Server", StatusCode.NORMAL, is(origCloseReason)); + // server sends 2 messages - serverConn.write(new TextFrame().setPayload("Hello")); - serverConn.write(new TextFrame().setPayload("World")); - + RemoteEndpoint remote = serverSession.getRemote(); + remote.sendString("Hello"); + remote.sendString("World"); + // server sends close frame (code 1000, no reason) - CloseInfo sclose = new CloseInfo(StatusCode.NORMAL,"From Server"); - serverConn.write(sclose.asFrame()); - + serverSession.close(StatusCode.NORMAL, "From Server"); + // client receives 2 messages - clientSocket.messageQueue.awaitEventCount(2,1,TimeUnit.SECONDS); - + clientSocket.messageQueue.awaitEventCount(2, 1, TimeUnit.SECONDS); + // Verify received messages String recvMsg = clientSocket.messageQueue.poll(); - assertThat("Received message 1",recvMsg,is("Hello")); + assertThat("Received message 1", recvMsg, is("Hello")); recvMsg = clientSocket.messageQueue.poll(); - assertThat("Received message 2",recvMsg,is("World")); - + assertThat("Received message 2", recvMsg, is("World")); + // Verify that there are no errors - assertThat("Error events",clientSocket.error.get(),nullValue()); - + assertThat("Error events", clientSocket.error.get(), nullValue()); + // client close event on ws-endpoint - clientSocket.assertReceivedCloseEvent(timeout,is(StatusCode.NORMAL),containsString("From Server")); + clientSocket.assertReceivedCloseEvent(timeout, is(StatusCode.NORMAL), containsString("From Server")); } - + @Test public void testNetworkCongestion() throws Exception { // Set client timeout final int timeout = 1000; client.setMaxIdleTimeout(timeout); - + + URI wsURI = server.getWsUri().resolve("/untrusted/" + testname.getMethodName()); + CompletableFuture serverSessionFut = new CompletableFuture<>(); + server.registerConnectFuture(wsURI, serverSessionFut); + // Client connects CloseTrackingSocket clientSocket = new CloseTrackingSocket(); - Future clientConnectFuture = client.connect(clientSocket,server.getWsUri()); - + Future clientConnectFuture = client.connect(clientSocket, wsURI); + // Server accepts connect - IBlockheadServerConnection serverConn = server.accept(); - serverConn.upgrade(); - + UntrustedWSSession serverSession = serverSessionFut.get(10, TimeUnit.SECONDS); + // client confirms connection via echo - confirmConnection(clientSocket,clientConnectFuture,serverConn); - + confirmConnection(clientSocket, clientConnectFuture, serverSession); + // client sends BIG frames (until it cannot write anymore) // server must not read (for test purpose, in order to congest connection) // when write is congested, client enqueue close frame // client initiate write, but write never completes EndPoint endp = clientSocket.getEndPoint(); - assertThat("EndPoint is testable",endp,instanceOf(TestEndPoint.class)); - TestEndPoint testendp = (TestEndPoint)endp; - + assertThat("EndPoint is testable", endp, instanceOf(TestEndPoint.class)); + TestEndPoint testendp = (TestEndPoint) endp; + char msg[] = new char[10240]; int writeCount = 0; long writeSize = 0; @@ -399,222 +395,247 @@ public class ClientCloseTest while (!testendp.congestedFlush.get()) { int z = i - ((i / 26) * 26); - char c = (char)('a' + z); - Arrays.fill(msg,c); + char c = (char) ('a' + z); + Arrays.fill(msg, c); clientSocket.getRemote().sendStringByFuture(String.valueOf(msg)); writeCount++; writeSize += msg.length; } - LOG.info("Wrote {} frames totalling {} bytes of payload before congestion kicked in",writeCount,writeSize); - + LOG.info("Wrote {} frames totalling {} bytes of payload before congestion kicked in", writeCount, writeSize); + // Verify timeout error assertThat("OnError Latch", clientSocket.errorLatch.await(2, TimeUnit.SECONDS), is(true)); assertThat("OnError", clientSocket.error.get(), instanceOf(SocketTimeoutException.class)); } - + @Test public void testProtocolException() throws Exception { // Set client timeout final int timeout = 1000; client.setMaxIdleTimeout(timeout); - + + URI wsURI = server.getWsUri().resolve("/untrusted/" + testname.getMethodName()); + CompletableFuture serverSessionFut = new CompletableFuture<>(); + server.registerConnectFuture(wsURI, serverSessionFut); + // Client connects CloseTrackingSocket clientSocket = new CloseTrackingSocket(); - Future clientConnectFuture = client.connect(clientSocket,server.getWsUri()); - + Future clientConnectFuture = client.connect(clientSocket, wsURI); + // Server accepts connect - IBlockheadServerConnection serverConn = server.accept(); - serverConn.upgrade(); - + UntrustedWSSession serverSession = serverSessionFut.get(10, TimeUnit.SECONDS); + // client confirms connection via echo - confirmConnection(clientSocket,clientConnectFuture,serverConn); - + confirmConnection(clientSocket, clientConnectFuture, serverSession); + // client should not have received close message (yet) clientSocket.assertNoCloseEvent(); - + // server sends bad close frame (too big of a reason message) byte msg[] = new byte[400]; - Arrays.fill(msg,(byte)'x'); + Arrays.fill(msg, (byte) 'x'); ByteBuffer bad = ByteBuffer.allocate(500); - RawFrameBuilder.putOpFin(bad,OpCode.CLOSE,true); - RawFrameBuilder.putLength(bad,msg.length + 2,false); - bad.putShort((short)StatusCode.NORMAL); + RawFrameBuilder.putOpFin(bad, OpCode.CLOSE, true); + RawFrameBuilder.putLength(bad, msg.length + 2, false); + bad.putShort((short) StatusCode.NORMAL); bad.put(msg); - BufferUtil.flipToFlush(bad,0); + BufferUtil.flipToFlush(bad, 0); try (StacklessLogging ignored = new StacklessLogging(Parser.class)) { - serverConn.write(bad); - + serverSession.getUntrustedConnection().writeRaw(bad); + // client should have noticed the error assertThat("OnError Latch", clientSocket.errorLatch.await(2, TimeUnit.SECONDS), is(true)); assertThat("OnError", clientSocket.error.get(), instanceOf(ProtocolException.class)); assertThat("OnError", clientSocket.error.get().getMessage(), containsString("Invalid control frame")); - + // client parse invalid frame, notifies server of close (protocol error) - confirmServerReceivedCloseFrame(serverConn,StatusCode.PROTOCOL,allOf(containsString("Invalid control frame"),containsString("length"))); + serverSession.getUntrustedEndpoint().assertClose("Server", StatusCode.PROTOCOL, allOf(containsString("Invalid control frame"), containsString("length"))); } - + // server disconnects - serverConn.disconnect(); - + serverSession.disconnect(); + // client triggers close event on client ws-endpoint - clientSocket.assertReceivedCloseEvent(timeout,is(StatusCode.PROTOCOL),allOf(containsString("Invalid control frame"),containsString("length"))); + clientSocket.assertReceivedCloseEvent(timeout, is(StatusCode.PROTOCOL), allOf(containsString("Invalid control frame"), containsString("length"))); } - + @Test public void testReadEOF() throws Exception { // Set client timeout final int timeout = 1000; client.setMaxIdleTimeout(timeout); - + + URI wsURI = server.getWsUri().resolve("/untrusted/" + testname.getMethodName()); + CompletableFuture serverSessionFut = new CompletableFuture<>(); + server.registerConnectFuture(wsURI, serverSessionFut); + // Client connects CloseTrackingSocket clientSocket = new CloseTrackingSocket(); - Future clientConnectFuture = client.connect(clientSocket,server.getWsUri()); - + Future clientConnectFuture = client.connect(clientSocket, wsURI); + // Server accepts connect - IBlockheadServerConnection serverConn = server.accept(); - serverConn.upgrade(); - + UntrustedWSSession serverSession = serverSessionFut.get(10, TimeUnit.SECONDS); + // client confirms connection via echo - confirmConnection(clientSocket,clientConnectFuture,serverConn); - - try(StacklessLogging ignored = new StacklessLogging(CloseTrackingSocket.class)) + confirmConnection(clientSocket, clientConnectFuture, serverSession); + + try (StacklessLogging ignored = new StacklessLogging(CloseTrackingSocket.class)) { // client sends close frame final String origCloseReason = "Normal Close"; - clientSocket.getSession().close(StatusCode.NORMAL,origCloseReason); - + clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason); + // server receives close frame - confirmServerReceivedCloseFrame(serverConn,StatusCode.NORMAL,is(origCloseReason)); + serverSession.getUntrustedEndpoint().assertClose("Server", StatusCode.NORMAL, is(origCloseReason)); // client should not have received close message (yet) clientSocket.assertNoCloseEvent(); - + // server shuts down connection (no frame reply) - serverConn.disconnect(); - + serverSession.disconnect(); + // client reads -1 (EOF) clientSocket.assertReceivedErrorEvent(timeout, IOException.class, containsString("EOF")); // client triggers close event on client ws-endpoint clientSocket.assertReceivedCloseEvent(timeout, is(StatusCode.ABNORMAL), containsString("Disconnected")); } } - + @Test public void testServerNoCloseHandshake() throws Exception { // Set client timeout final int timeout = 1000; client.setMaxIdleTimeout(timeout); - + + URI wsURI = server.getWsUri().resolve("/untrusted/" + testname.getMethodName()); + CompletableFuture serverSessionFut = new CompletableFuture<>(); + server.registerConnectFuture(wsURI, serverSessionFut); + // Client connects CloseTrackingSocket clientSocket = new CloseTrackingSocket(); - Future clientConnectFuture = client.connect(clientSocket,server.getWsUri()); - + Future clientConnectFuture = client.connect(clientSocket, wsURI); + // Server accepts connect - IBlockheadServerConnection serverConn = server.accept(); - serverConn.upgrade(); - + UntrustedWSSession serverSession = serverSessionFut.get(10, TimeUnit.SECONDS); + UntrustedWSConnection serverConn = serverSession.getUntrustedConnection(); + // client confirms connection via echo - confirmConnection(clientSocket,clientConnectFuture,serverConn); - + confirmConnection(clientSocket, clientConnectFuture, serverSession); + // client sends close frame final String origCloseReason = "Normal Close"; - clientSocket.getSession().close(StatusCode.NORMAL,origCloseReason); - + clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason); + // server receives close frame - confirmServerReceivedCloseFrame(serverConn,StatusCode.NORMAL,is(origCloseReason)); - + serverSession.getUntrustedEndpoint().assertClose("Server", StatusCode.NORMAL, is(origCloseReason)); + // client should not have received close message (yet) clientSocket.assertNoCloseEvent(); - + // server never sends close frame handshake // server sits idle - + // client idle timeout triggers close event on client ws-endpoint assertThat("OnError Latch", clientSocket.errorLatch.await(2, TimeUnit.SECONDS), is(true)); assertThat("OnError", clientSocket.error.get(), instanceOf(SocketTimeoutException.class)); assertThat("OnError", clientSocket.error.get().getMessage(), containsString("Timeout on Read")); } - + @Test(timeout = 5000L) public void testStopLifecycle() throws Exception { // Set client timeout final int timeout = 1000; client.setMaxIdleTimeout(timeout); - + int clientCount = 3; CloseTrackingSocket clientSockets[] = new CloseTrackingSocket[clientCount]; - IBlockheadServerConnection serverConns[] = new IBlockheadServerConnection[clientCount]; - + UntrustedWSSession serverSessions[] = new UntrustedWSSession[clientCount]; + // Connect Multiple Clients for (int i = 0; i < clientCount; i++) { + URI wsURI = server.getWsUri().resolve("/untrusted/" + testname.getMethodName() + "/" + i); + CompletableFuture serverSessionFut = new CompletableFuture<>(); + server.registerConnectFuture(wsURI, serverSessionFut); + // Client Request Upgrade clientSockets[i] = new CloseTrackingSocket(); - Future clientConnectFuture = client.connect(clientSockets[i],server.getWsUri()); - + Future clientConnectFuture = client.connect(clientSockets[i], wsURI); + // Server accepts connection - serverConns[i] = server.accept(); - serverConns[i].upgrade(); - + serverSessions[i] = serverSessionFut.get(10, TimeUnit.SECONDS); + // client confirms connection via echo - confirmConnection(clientSockets[i],clientConnectFuture,serverConns[i]); + confirmConnection(clientSockets[i], clientConnectFuture, serverSessions[i]); } - + // client lifecycle stop client.stop(); - + // clients send close frames (code 1001, shutdown) for (int i = 0; i < clientCount; i++) { // server receives close frame - confirmServerReceivedCloseFrame(serverConns[i],StatusCode.SHUTDOWN,containsString("Shutdown")); + serverSessions[i].getUntrustedEndpoint().assertClose("Server", StatusCode.SHUTDOWN, containsString("Shutdown")); } - + // clients disconnect for (int i = 0; i < clientCount; i++) { - clientSockets[i].assertReceivedCloseEvent(timeout,is(StatusCode.SHUTDOWN),containsString("Shutdown")); + clientSockets[i].assertReceivedCloseEvent(timeout, is(StatusCode.SHUTDOWN), containsString("Shutdown")); } } - + @Test public void testWriteException() throws Exception { // Set client timeout final int timeout = 1000; client.setMaxIdleTimeout(timeout); - + + URI wsURI = server.getWsUri().resolve("/untrusted/" + testname.getMethodName()); + CompletableFuture serverSessionFut = new CompletableFuture() + { + @Override + public boolean complete(UntrustedWSSession session) + { + // echo back text as-well + session.getUntrustedEndpoint().setOnTextFunction((serverSession, text) -> text); + return super.complete(session); + } + }; + server.registerConnectFuture(wsURI, serverSessionFut); + // Client connects CloseTrackingSocket clientSocket = new CloseTrackingSocket(); - Future clientConnectFuture = client.connect(clientSocket,server.getWsUri()); - + Future clientConnectFuture = client.connect(clientSocket, wsURI); + // Server accepts connect - IBlockheadServerConnection serverConn = server.accept(); - serverConn.upgrade(); - + UntrustedWSSession serverSession = serverSessionFut.get(10, TimeUnit.SECONDS); + // client confirms connection via echo - confirmConnection(clientSocket,clientConnectFuture,serverConn); - + confirmConnection(clientSocket, clientConnectFuture, serverSession); + // setup client endpoint for write failure (test only) EndPoint endp = clientSocket.getEndPoint(); endp.shutdownOutput(); - + // client enqueue close frame // client write failure final String origCloseReason = "Normal Close"; - clientSocket.getSession().close(StatusCode.NORMAL,origCloseReason); - + clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason); + assertThat("OnError Latch", clientSocket.errorLatch.await(2, TimeUnit.SECONDS), is(true)); assertThat("OnError", clientSocket.error.get(), instanceOf(EofException.class)); - + // client triggers close event on client ws-endpoint // assert - close code==1006 (abnormal) // assert - close reason message contains (write failure) - clientSocket.assertReceivedCloseEvent(timeout,is(StatusCode.ABNORMAL),containsString("EOF")); + clientSocket.assertReceivedCloseEvent(timeout, is(StatusCode.ABNORMAL), containsString("EOF")); } } diff --git a/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/EchoTest.java b/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/EchoTest.java new file mode 100644 index 00000000000..303d8d02e20 --- /dev/null +++ b/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/EchoTest.java @@ -0,0 +1,128 @@ +// +// ======================================================================== +// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests.client; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.StatusCode; +import org.eclipse.jetty.websocket.api.WebSocketBehavior; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.eclipse.jetty.websocket.tests.TrackingEndpoint; +import org.eclipse.jetty.websocket.tests.UntrustedWSServer; +import org.eclipse.jetty.websocket.tests.UntrustedWSSession; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +public class EchoTest +{ + @Rule + public TestName testname = new TestName(); + + private UntrustedWSServer server; + private WebSocketClient client; + + @Before + public void startClient() throws Exception + { + client = new WebSocketClient(); + client.start(); + } + + @Before + public void startServer() throws Exception + { + server = new UntrustedWSServer(); + server.start(); + } + + @After + public void stopClient() throws Exception + { + client.stop(); + } + + @After + public void stopServer() throws Exception + { + server.stop(); + } + + @Test + public void testBasicEcho() throws IOException, InterruptedException, ExecutionException, TimeoutException + { + // Set client timeout + final int timeout = 1000; + client.setMaxIdleTimeout(timeout); + + URI wsURI = server.getWsUri().resolve("/untrusted/" + testname.getMethodName()); + CompletableFuture serverSessionFut = new CompletableFuture() + { + @Override + public boolean complete(UntrustedWSSession session) + { + // echo back text as-well + session.getUntrustedEndpoint().setOnTextFunction((serverSession, text) -> text); + return super.complete(session); + } + }; + server.registerConnectFuture(wsURI, serverSessionFut); + + // Client connects + TrackingEndpoint clientSocket = new TrackingEndpoint(WebSocketBehavior.CLIENT.name()); + Future clientConnectFuture = client.connect(clientSocket, wsURI); + + // Server accepts connect + UntrustedWSSession serverSession = serverSessionFut.get(10, TimeUnit.SECONDS); + + // client confirms connection via echo + assertThat("Client Opened", clientSocket.openLatch.await(5, TimeUnit.SECONDS), is(true)); + + Future> futMessages = clientSocket.expectedMessages(1); + + // client sends message + clientSocket.getRemote().sendString("Hello Echo"); + List messages = futMessages.get(10, TimeUnit.SECONDS); + assertThat("Messages[0]", messages.get(0), is("Hello Echo")); + + // client closes + clientSocket.close(StatusCode.NORMAL, "Normal Close"); + + // client triggers close event on client ws-endpoint + clientSocket.assertClose("Client", StatusCode.NORMAL, containsString("Normal Close")); + + // Server close event + serverSession.getUntrustedEndpoint().assertClose("Server", StatusCode.NORMAL, containsString("Normal Close")); + } + +} diff --git a/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/TrackingSocket.java b/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/TrackingSocket.java new file mode 100644 index 00000000000..374aeb8432a --- /dev/null +++ b/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/TrackingSocket.java @@ -0,0 +1,55 @@ +// +// ======================================================================== +// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests.client; + +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect; +import org.eclipse.jetty.websocket.api.annotations.WebSocket; +import org.hamcrest.Matcher; + +@WebSocket +public class TrackingSocket +{ + private Session session; + + public void assertClose(int expectedStatusCode, Matcher reasonMatcher) + { + } + + public Session getSession() + { + return session; + } + + @OnWebSocketConnect + public void onOpen(Session session) + { + this.session = session; + } + + public void waitForClose(int timeout, TimeUnit unit) + { + } + + public void waitForConnected(int timeout, TimeUnit unit) + { + } +}