From 683509be601e269828b8c3a58b4bd56793a9bd89 Mon Sep 17 00:00:00 2001 From: Joakim Erdfelt Date: Fri, 21 Apr 2017 16:21:07 -0700 Subject: [PATCH] Issue #207 - more review rework --- .../jetty/websocket/common/AtomicClose.java | 61 ------ .../jetty/websocket/common/CloseInfo.java | 83 ++++---- .../websocket/common/CompletionCallback.java | 46 +++++ .../websocket/common/LogicalConnection.java | 3 +- .../jetty/websocket/common/Parser.java | 2 +- .../websocket/common/WebSocketSession.java | 179 ++++++------------ .../websocket/common/frames/CloseFrame.java | 2 +- .../function/CommonEndpointFunctions.java | 19 +- .../io/AbstractWebSocketConnection.java | 44 +---- .../websocket/common/io/FrameFlusher.java | 10 +- .../websocket/common/AtomicCloseTest.java | 55 ------ .../common/io/LocalWebSocketConnection.java | 4 +- .../common/test/DummyConnection.java | 2 +- .../websocket-common/websocket-states.txt | 101 ++++++---- jetty-websocket/websocket-tests/pom.xml | 2 +- .../eclipse/jetty/websocket/tests/Fuzzer.java | 7 +- .../websocket/tests/TrackingEndpoint.java | 96 ++-------- .../tests/UntrustedWSClientTest.java | 9 +- .../tests/client/ClientCloseTest.java | 16 +- .../websocket/tests/client/EchoTest.java | 31 +-- .../test/resources/jetty-logging.properties | 11 +- 21 files changed, 299 insertions(+), 484 deletions(-) delete mode 100644 jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/AtomicClose.java create mode 100644 jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/CompletionCallback.java delete mode 100644 jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/AtomicCloseTest.java diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/AtomicClose.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/AtomicClose.java deleted file mode 100644 index 8f9876ae939..00000000000 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/AtomicClose.java +++ /dev/null @@ -1,61 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// -// You may elect to redistribute this code under either of these licenses. -// ======================================================================== -// - -package org.eclipse.jetty.websocket.common; - -import java.util.concurrent.atomic.AtomicReference; - -/** - * Atomic Close State - */ -public class AtomicClose -{ - enum State - { - /** No close handshake initiated (yet) */ - NONE, - /** Local side initiated the close handshake */ - LOCAL, - /** Remote side initiated the close handshake */ - REMOTE, - /** An abnormal close situation (disconnect, timeout, etc...) */ - ABNORMAL - } - - private final AtomicReference state = new AtomicReference<>(State.NONE); - - public State get() - { - return state.get(); - } - - public boolean onAbnormal() - { - return state.compareAndSet(State.NONE, State.ABNORMAL); - } - - public boolean onLocal() - { - return state.compareAndSet(State.NONE, State.LOCAL); - } - - public boolean onRemote() - { - return state.compareAndSet(State.NONE, State.REMOTE); - } -} diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/CloseInfo.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/CloseInfo.java index a411716c38d..a4b31f28187 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/CloseInfo.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/CloseInfo.java @@ -22,8 +22,8 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.util.Utf8StringBuilder; import org.eclipse.jetty.util.Utf8Appendable.NotUtf8Exception; +import org.eclipse.jetty.util.Utf8StringBuilder; import org.eclipse.jetty.websocket.api.BadPayloadException; import org.eclipse.jetty.websocket.api.CloseStatus; import org.eclipse.jetty.websocket.api.ProtocolException; @@ -34,7 +34,7 @@ import org.eclipse.jetty.websocket.common.frames.CloseFrame; public class CloseInfo { private int statusCode; - private byte[] reasonBytes; + private String reason; public CloseInfo() { @@ -82,7 +82,7 @@ public class CloseInfo { // Reason (trimmed to max reason size) int len = Math.min(data.remaining(), CloseStatus.MAX_REASON_PHRASE); - reasonBytes = new byte[len]; + byte reasonBytes[] = new byte[len]; data.get(reasonBytes,0,len); // Spec Requirement : throw BadPayloadException on invalid UTF8 @@ -93,6 +93,7 @@ public class CloseInfo Utf8StringBuilder utf = new Utf8StringBuilder(); // if this throws, we know we have bad UTF8 utf.append(reasonBytes,0,reasonBytes.length); + this.reason = utf.toString(); } catch (NotUtf8Exception e) { @@ -127,50 +128,63 @@ public class CloseInfo public CloseInfo(int statusCode, String reason) { this.statusCode = statusCode; - if (reason != null) - { - byte[] utf8Bytes = reason.getBytes(StandardCharsets.UTF_8); - if (utf8Bytes.length > CloseStatus.MAX_REASON_PHRASE) - { - this.reasonBytes = new byte[CloseStatus.MAX_REASON_PHRASE]; - System.arraycopy(utf8Bytes,0,this.reasonBytes,0,CloseStatus.MAX_REASON_PHRASE); - } - else - { - this.reasonBytes = utf8Bytes; - } - } + this.reason = reason; } - - private ByteBuffer asByteBuffer() + + /** + * Convert a raw status code and reason into a WebSocket Close frame payload buffer. + * + * @param statusCode the status code + * @param reason the optional reason string + * @return the payload buffer if valid. null if invalid status code for payload buffer. + */ + public static ByteBuffer asPayloadBuffer(int statusCode, String reason) { if ((statusCode == StatusCode.NO_CLOSE) || (statusCode == StatusCode.NO_CODE) || (statusCode == (-1))) { // codes that are not allowed to be used in endpoint. return null; } - + int len = 2; // status code - boolean hasReason = (this.reasonBytes != null) && (this.reasonBytes.length > 0); + byte reasonBytes[]; + + byte[] utf8Bytes = reason.getBytes(StandardCharsets.UTF_8); + if (utf8Bytes.length > CloseStatus.MAX_REASON_PHRASE) + { + reasonBytes = new byte[CloseStatus.MAX_REASON_PHRASE]; + System.arraycopy(utf8Bytes, 0, reasonBytes, 0, CloseStatus.MAX_REASON_PHRASE); + } + else + { + reasonBytes = utf8Bytes; + } + + boolean hasReason = (reasonBytes != null) && (reasonBytes.length > 0); if (hasReason) { - len += this.reasonBytes.length; + len += reasonBytes.length; } - + ByteBuffer buf = BufferUtil.allocate(len); BufferUtil.flipToFill(buf); - buf.put((byte)((statusCode >>> 8) & 0xFF)); - buf.put((byte)((statusCode >>> 0) & 0xFF)); - + buf.put((byte) ((statusCode >>> 8) & 0xFF)); + buf.put((byte) ((statusCode >>> 0) & 0xFF)); + if (hasReason) { - buf.put(this.reasonBytes,0,this.reasonBytes.length); + buf.put(reasonBytes, 0, reasonBytes.length); } - BufferUtil.flipToFlush(buf,0); - + BufferUtil.flipToFlush(buf, 0); + return buf; } + private ByteBuffer asByteBuffer() + { + return asPayloadBuffer(statusCode, reason); + } + public CloseFrame asFrame() { CloseFrame frame = new CloseFrame(); @@ -185,14 +199,10 @@ public class CloseInfo } return frame; } - + public String getReason() { - if (this.reasonBytes == null) - { - return null; - } - return new String(this.reasonBytes,StandardCharsets.UTF_8); + return this.reason; } public int getStatusCode() @@ -200,11 +210,6 @@ public class CloseInfo return statusCode; } - public boolean isHarsh() - { - return !((statusCode == StatusCode.NORMAL) || (statusCode == StatusCode.NO_CODE)); - } - public boolean isAbnormal() { return (statusCode != StatusCode.NORMAL); diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/CompletionCallback.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/CompletionCallback.java new file mode 100644 index 00000000000..42e6fd5b8ce --- /dev/null +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/CompletionCallback.java @@ -0,0 +1,46 @@ +// +// ======================================================================== +// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.common; + +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.FrameCallback; + +/** + * A callback which will trigger complete regardless of success/failure. + */ +public abstract class CompletionCallback implements FrameCallback +{ + private static final Logger LOG = Log.getLogger(CompletionCallback.class); + + @Override + public void fail(Throwable cause) + { + LOG.ignore(cause); + complete(); + } + + @Override + public void succeed() + { + complete(); + } + + public abstract void complete(); +} diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/LogicalConnection.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/LogicalConnection.java index 39e971c2249..5193a902cc5 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/LogicalConnection.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/LogicalConnection.java @@ -41,9 +41,8 @@ public interface LogicalConnection extends OutgoingFrames, SuspendToken /** * Terminate the connection (no close frame sent) - * @param onlyOutput true to only close the output (half-close), false to close fully. */ - void disconnect(boolean onlyOutput); + void disconnect(); /** * Register Read Interest in Connection. diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/Parser.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/Parser.java index 91afbe1bfa5..ab37781bb0a 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/Parser.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/Parser.java @@ -103,7 +103,7 @@ public class Parser this.policy = wspolicy; this.parserHandler = parserHandler; - LOG = Log.getLogger(Parser.class.getName() + "_" + wspolicy.getBehavior()); + LOG = Log.getLogger(Parser.class.getName() + "." + wspolicy.getBehavior()); } private void assertSanePayloadLength(long len) diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java index 47ea9e835c0..a0183b1b7be 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java @@ -32,6 +32,7 @@ import java.util.Objects; import java.util.ServiceLoader; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.io.ByteBufferPool; @@ -58,7 +59,6 @@ import org.eclipse.jetty.websocket.api.UpgradeResponse; import org.eclipse.jetty.websocket.api.WebSocketBehavior; import org.eclipse.jetty.websocket.api.WebSocketException; import org.eclipse.jetty.websocket.api.WebSocketPolicy; -import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory; import org.eclipse.jetty.websocket.api.extensions.Frame; import org.eclipse.jetty.websocket.api.extensions.IncomingFrames; @@ -75,29 +75,7 @@ import org.eclipse.jetty.websocket.common.scopes.WebSocketSessionScope; public class WebSocketSession extends ContainerLifeCycle implements Session, RemoteEndpointFactory, WebSocketSessionScope, IncomingFrames, LogicalConnection.Listener, Connection.Listener { - public class OnDisconnectCallback implements WriteCallback - { - private final boolean outputOnly; - - public OnDisconnectCallback(boolean outputOnly) - { - this.outputOnly = outputOnly; - } - - @Override - public void writeFailed(Throwable x) - { - LOG.debug("writeFailed()", x); - disconnect(outputOnly); - } - - @Override - public void writeSuccess() - { - LOG.debug("writeSuccess()"); - disconnect(outputOnly); - } - } + private static final FrameCallback EMPTY = new FrameCallback.Adapter(); private final Logger LOG; @@ -107,7 +85,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem private final LogicalConnection connection; private final Executor executor; private final AtomicConnectionState connectionState = new AtomicConnectionState(); - private final AtomicClose closeState = new AtomicClose(); + private final AtomicBoolean closeSent = new AtomicBoolean(); // The websocket endpoint object itself private final Object endpoint; @@ -134,7 +112,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem Objects.requireNonNull(containerScope, "Container Scope cannot be null"); Objects.requireNonNull(requestURI, "Request URI cannot be null"); - LOG = Log.getLogger(WebSocketSession.class.getName() + "_" + connection.getPolicy().getBehavior().name()); + LOG = Log.getLogger(WebSocketSession.class.getName() + "." + connection.getPolicy().getBehavior().name()); this.classLoader = Thread.currentThread().getContextClassLoader(); this.containerScope = containerScope; @@ -174,31 +152,23 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem @Override public void close(int statusCode, String reason) { - if (connectionState.onClosing()) + close(new CloseInfo(statusCode, reason), EMPTY); + } + + private void close(CloseInfo closeInfo, FrameCallback callback) + { + // TODO: review close from onOpen + + if(closeSent.compareAndSet(false,true)) { - LOG.debug("ConnectionState: Transition to CLOSING"); - // This is the first CLOSE event - if (closeState.onLocal()) - { - LOG.debug("CloseState: Transition to LOCAL"); - // this is Local initiated. - CloseInfo closeInfo = new CloseInfo(statusCode, reason); - Frame closeFrame = closeInfo.asFrame(); - outgoingHandler.outgoingFrame(closeFrame, new OnDisconnectCallback(true), BatchMode.AUTO); - } - else - { - LOG.debug("CloseState: Expected LOCAL, but was " + closeState.get()); - } + LOG.debug("Sending Close Frame"); + CloseFrame closeFrame = closeInfo.asFrame(); + outgoingHandler.outgoingFrame(closeFrame, callback, BatchMode.OFF); } - else if(connectionState.onClosed()) + else { - LOG.debug("ConnectionState: Transition to CLOSED"); - - // This is the reply to the CLOSING entry point - CloseInfo closeInfo = new CloseInfo(statusCode, reason); - Frame closeFrame = closeInfo.asFrame(); - outgoingHandler.outgoingFrame(closeFrame, new OnDisconnectCallback(false), BatchMode.AUTO); + LOG.debug("Close Frame Previously Sent: ignoring: {} [{}]", closeInfo, callback); + callback.succeed(); } } @@ -208,31 +178,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem @Override public void disconnect() { - disconnect(true); - } - - private void disconnect(boolean outputOnly) - { - if(connectionState.onClosing()) - { - // Is this is a harsh disconnect: OPEN -> CLOSING -> CLOSED - if (closeState.onAbnormal()) - { - // notify local endpoint of harsh disconnect - notifyClose(StatusCode.SHUTDOWN, "Harsh disconnect"); - } - } - - if(connectionState.onClosed()) - { - // Transition: CLOSING -> CLOSED - connection.disconnect(outputOnly); - if (closeState.onLocal()) - { - // notify local endpoint of harsh disconnect - notifyClose(StatusCode.SHUTDOWN, "Harsh disconnect"); - } - } + connection.disconnect(); } public void dispatch(Runnable runnable) @@ -355,11 +301,6 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem return connectionState; } - public AtomicClose getCloseState() - { - return closeState; - } - @Override public WebSocketContainerScope getContainerScope() { @@ -481,10 +422,8 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem @Override public void incomingFrame(Frame frame, FrameCallback callback) { - ClassLoader old = Thread.currentThread().getContextClassLoader(); - try + try(ThreadClassLoaderScope scope = new ThreadClassLoaderScope(classLoader)) { - Thread.currentThread().setContextClassLoader(classLoader); if (connectionState.get() == AtomicConnectionState.State.OPEN) { // For endpoints that want to see raw frames. @@ -496,33 +435,37 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem { case OpCode.CLOSE: { - boolean validate = true; - CloseFrame closeframe = (CloseFrame) frame; - CloseInfo close = new CloseInfo(closeframe, validate); - - // process handshake - if(connectionState.onClosing()) + CloseInfo closeInfo = null; + + if (connectionState.onClosing()) { LOG.debug("ConnectionState: Transition to CLOSING"); - // we transitioned to CLOSING state - if(closeState.onRemote()) - { - LOG.debug("CloseState: Transition to REMOTE"); - // Remote initiated. - // Send reply to remote - close(close.getStatusCode(), close.getReason()); - } - else - { - LOG.debug("CloseState: Already at LOCAL"); - // Local initiated, this was the reply. - disconnect(); - } + CloseFrame closeframe = (CloseFrame) frame; + closeInfo = new CloseInfo(closeframe, true); } else { - LOG.debug("ConnectionState: Not CLOSING: was " + connectionState.get()); + LOG.debug("ConnectionState: {} - Close Frame Received", connectionState); } + + if (closeInfo != null) + { + notifyClose(closeInfo.getStatusCode(), closeInfo.getReason()); + close(closeInfo, new CompletionCallback() + { + @Override + public void complete() + { + if (connectionState.onClosed()) + { + LOG.debug("ConnectionState: Transition to CLOSED"); + connection.disconnect(); + } + } + }); + } + + // let fill/parse continue callback.succeed(); return; @@ -546,8 +489,15 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem endpointFunctions.onPing(frame.getPayload()); callback.succeed(); - - getRemote().sendPong(pongBuf); + + try + { + getRemote().sendPong(pongBuf); + } + catch (Throwable t) + { + LOG.debug("Unable to send pong", t); + } break; } case OpCode.PONG: @@ -590,19 +540,10 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem LOG.debug("Discarding post EOF frame - {}", frame); } } - catch (Throwable cause) - { - callback.fail(cause); - onError(cause); - } - finally - { - // Unset active MessageSink if this was a fin frame - if (frame.getType().isData() && frame.isFin() && activeMessageSink != null) - activeMessageSink = null; - - Thread.currentThread().setContextClassLoader(old); - } + + // Unset active MessageSink if this was a fin frame + if (frame.getType().isData() && frame.isFin() && activeMessageSink != null) + activeMessageSink = null; } @Override @@ -630,7 +571,9 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem { LOG.debug("notifyClose({},{}) [{}]", statusCode, reason, getState()); } - endpointFunctions.onClose(new CloseInfo(statusCode, reason)); + + CloseInfo closeInfo = new CloseInfo(statusCode, reason); + endpointFunctions.onClose(closeInfo); } /** diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/frames/CloseFrame.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/frames/CloseFrame.java index 26d168c80f5..dbee87ebc86 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/frames/CloseFrame.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/frames/CloseFrame.java @@ -27,7 +27,7 @@ public class CloseFrame extends ControlFrame { super(OpCode.CLOSE); } - + @Override public Type getType() { diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/function/CommonEndpointFunctions.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/function/CommonEndpointFunctions.java index d263e52093a..2a4744f46a7 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/function/CommonEndpointFunctions.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/function/CommonEndpointFunctions.java @@ -677,6 +677,7 @@ public class CommonEndpointFunctions extends AbstractLifeCycl this.session = session; // Call (optional) on open method + // TODO: catch end user throwables if (onOpenFunction != null) onOpenFunction.apply(this.session); } @@ -686,6 +687,7 @@ public class CommonEndpointFunctions extends AbstractLifeCycl { assertIsStarted(); + // TODO: catch end user throwables if (onCloseFunction != null) onCloseFunction.apply(close); } @@ -694,7 +696,8 @@ public class CommonEndpointFunctions extends AbstractLifeCycl public void onFrame(Frame frame) { assertIsStarted(); - + + // TODO: catch end user throwables if (onFrameFunction != null) onFrameFunction.apply(frame); } @@ -703,7 +706,7 @@ public class CommonEndpointFunctions extends AbstractLifeCycl public void onError(Throwable cause) { assertIsStarted(); - + if (onErrorFunction != null) onErrorFunction.apply(cause); else @@ -717,7 +720,7 @@ public class CommonEndpointFunctions extends AbstractLifeCycl if (activeMessageSink == null) activeMessageSink = onTextSink; - + acceptMessage(frame, callback); } @@ -728,13 +731,14 @@ public class CommonEndpointFunctions extends AbstractLifeCycl if (activeMessageSink == null) activeMessageSink = onBinarySink; - + acceptMessage(frame, callback); } @Override public void onContinuation(Frame frame, FrameCallback callback) { + // TODO: catch end user throwables acceptMessage(frame, callback); } @@ -745,6 +749,7 @@ public class CommonEndpointFunctions extends AbstractLifeCycl return; // Accept the payload into the message sink + // TODO: catch end user throwables activeMessageSink.accept(frame, callback); if (frame.isFin()) activeMessageSink = null; @@ -754,7 +759,8 @@ public class CommonEndpointFunctions extends AbstractLifeCycl public void onPing(ByteBuffer payload) { assertIsStarted(); - + + // TODO: catch end user throwables if (onPingFunction != null) onPingFunction.apply(payload); } @@ -763,7 +769,8 @@ public class CommonEndpointFunctions extends AbstractLifeCycl public void onPong(ByteBuffer payload) { assertIsStarted(); - + + // TODO: catch end user throwables if (onPongFunction != null) onPongFunction.apply(payload); } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java index cf6349ead92..fb5492a8997 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java @@ -43,7 +43,6 @@ import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.FrameCallback; import org.eclipse.jetty.websocket.api.SuspendToken; import org.eclipse.jetty.websocket.api.WebSocketPolicy; -import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig; import org.eclipse.jetty.websocket.api.extensions.Frame; import org.eclipse.jetty.websocket.common.Generator; @@ -70,29 +69,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp } } - public class OnDisconnectCallback implements WriteCallback - { - private final boolean outputOnly; - - public OnDisconnectCallback(boolean outputOnly) - { - this.outputOnly = outputOnly; - } - - @Override - public void writeFailed(Throwable x) - { - disconnect(outputOnly); - } - - @Override - public void writeSuccess() - { - disconnect(outputOnly); - } - } - - /** * Minimum size of a buffer is the determined to be what would be the maximum framing header size (not including payload) */ @@ -118,7 +94,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp { super(endp,executor); - LOG = Log.getLogger(AbstractWebSocketConnection.class.getName() + "_" + policy.getBehavior()); + LOG = Log.getLogger(AbstractWebSocketConnection.class.getName() + "." + policy.getBehavior()); this.id = String.format("%s:%d->%s:%d", endp.getLocalAddress().getAddress().getHostAddress(), @@ -150,10 +126,10 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp } @Override - public void disconnect(boolean onlyOutput) + public void disconnect() { if (LOG.isDebugEnabled()) - LOG.debug("disconnect({})", onlyOutput ? "OUTPUT_ONLY" : "BOTH"); + LOG.debug("disconnect()"); // close FrameFlusher, we cannot write anymore at this point. flusher.close(); @@ -164,18 +140,8 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp if (LOG.isDebugEnabled()) LOG.debug("Shutting down output {}",endPoint); - endPoint.shutdownOutput(); - - if (!onlyOutput) - { - if (LOG.isDebugEnabled()) - LOG.debug("Closing {}",endPoint); - endPoint.close(); - } - else - { - closed.set(true); - } + endPoint.close(); + closed.set(true); } protected void execute(Runnable task) diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java index b58f012496b..4396c2c02a8 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java @@ -238,6 +238,9 @@ public class FrameFlusher private void succeedEntries() { + if(LOG.isDebugEnabled()) + LOG.debug("succeedEntries()"); + // Do not allocate the iterator here. for (int i = 0; i < entries.size(); ++i) { @@ -290,7 +293,7 @@ public class FrameFlusher } public static final BinaryFrame FLUSH_FRAME = new BinaryFrame(); - private static final Logger LOG = Log.getLogger(FrameFlusher.class); + private final Logger LOG; private final EndPoint endpoint; private final int bufferSize; private final Generator generator; @@ -303,6 +306,7 @@ public class FrameFlusher public FrameFlusher(Generator generator, EndPoint endpoint, int bufferSize, int maxGather) { + this.LOG = Log.getLogger(FrameFlusher.class.getName() + "." + generator.getBehavior().name()); this.endpoint = endpoint; this.bufferSize = bufferSize; this.generator = Objects.requireNonNull(generator); @@ -319,7 +323,7 @@ public class FrameFlusher if (!closed) { closed = true; - LOG.debug("{} closing {}",this); + LOG.debug("{} closing",this); entries = new ArrayList<>(); entries.addAll(queue); @@ -441,7 +445,7 @@ public class FrameFlusher public String toString() { ByteBuffer aggregate = flusher.aggregate; - return String.format("%s[%s,queueSize=%d,aggregateSize=%d,failure=%s]",getClass().getSimpleName(),generator.getBehavior(),queue.size(),aggregate == null?0:aggregate.position(), + return String.format("%s[queueSize=%d,aggregateSize=%d,failure=%s]",getClass().getSimpleName(),queue.size(),aggregate == null?0:aggregate.position(), failure); } } diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/AtomicCloseTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/AtomicCloseTest.java deleted file mode 100644 index 5a716ee06ec..00000000000 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/AtomicCloseTest.java +++ /dev/null @@ -1,55 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// -// You may elect to redistribute this code under either of these licenses. -// ======================================================================== -// - -package org.eclipse.jetty.websocket.common; - -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; - -import org.junit.Test; - -public class AtomicCloseTest -{ - @Test - public void testCloseLocalNormal() - { - AtomicClose state = new AtomicClose(); - - assertThat("State", state.get(), is(AtomicClose.State.NONE)); - assertThat("Local 1st", state.onLocal(), is(true)); - assertThat("State", state.get(), is(AtomicClose.State.LOCAL)); - assertThat("Local 2nd", state.onLocal(), is(false)); - assertThat("State", state.get(), is(AtomicClose.State.LOCAL)); - assertThat("Remote 1st", state.onRemote(), is(false)); - assertThat("State", state.get(), is(AtomicClose.State.LOCAL)); - } - - @Test - public void testCloseRemoteNormal() - { - AtomicClose state = new AtomicClose(); - - assertThat("State", state.get(), is(AtomicClose.State.NONE)); - assertThat("Remote 1st", state.onRemote(), is(true)); - assertThat("State", state.get(), is(AtomicClose.State.REMOTE)); - assertThat("Local 1st", state.onRemote(), is(false)); - assertThat("State", state.get(), is(AtomicClose.State.REMOTE)); - assertThat("Remote 2nd", state.onRemote(), is(false)); - assertThat("State", state.get(), is(AtomicClose.State.REMOTE)); - } -} diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/LocalWebSocketConnection.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/LocalWebSocketConnection.java index 2e8553ad453..e79eb62b548 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/LocalWebSocketConnection.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/LocalWebSocketConnection.java @@ -74,10 +74,10 @@ public class LocalWebSocketConnection implements LogicalConnection } @Override - public void disconnect(boolean outputOnly) + public void disconnect() { if (LOG.isDebugEnabled()) - LOG.debug("disconnect({})", outputOnly); + LOG.debug("disconnect()"); } @Override diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/DummyConnection.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/DummyConnection.java index f628c7d8251..ee411bdc8f8 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/DummyConnection.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/test/DummyConnection.java @@ -45,7 +45,7 @@ public class DummyConnection implements LogicalConnection } @Override - public void disconnect(boolean outputOnly) + public void disconnect() { } diff --git a/jetty-websocket/websocket-common/websocket-states.txt b/jetty-websocket/websocket-common/websocket-states.txt index 60f992b5911..3d8ea4f5c7b 100644 --- a/jetty-websocket/websocket-common/websocket-states.txt +++ b/jetty-websocket/websocket-common/websocket-states.txt @@ -1,32 +1,33 @@ -Documenting the States of a WebSocket. +Documenting the States of a WebSocket Session. - -NEW: +NEW: (Internal) A new WebSocket session. It has had no connection attempted yet. -CONNECTING: +CONNECTING: (RFC6455) // synonyms: HANDSHAKING The connection is being attempted, along with the Upgrade handshake. -CONNECTED: +CONNECTED: (Internal) // synonyms: HANDSHAKED, HANDSHAKEN, OPENING The connection is established. The User WebSocket Endpoint has not been notified yet. -OPEN: +OPEN: (RFC6455) // synonyms: OPENED - User WebSocket Endpoint has been Opened (the onOpen method has called) + User WebSocket Endpoint is now Opened (the onOpen method has been called, no close by onOpen) -CLOSING: +CLOSING: (RFC6455) The close handshake has begun. Either the local initiated the close, waiting for the remote to reply. Or the remote initiated the close, and the local hasn't replied yet. This can be considered a logical half-closed state. -CLOSED: + + (sub state) CLOSE_FRAME_SENT (bool) + +CLOSED: (RFC6455) The connection and session is closed. This means either the close handshake completed, or the connection was @@ -36,39 +37,71 @@ CLOSED: Normal Client Initiated Close State Transition - WSEndpoint created + Client: State: NEW + Client: WSEndpoint created + Client: WSSession created Http GET w/Upgrade initiated - State: CONNECTING + Client: State: CONNECTING Upgrade Handshake negotiated (HTTP 101 response) - State: CONNECTED - WSEndpoint.onOpen() called - State: OPEN - WSEndpoint.onMessage() - Session.close(local close details) - Connection disconnect output - State: CLOSING - Remote: Received CLOSE Frame - Connection disconnect completely - WSEndpoint.onClose(remote close details) - State: CLOSED + Client: State: CONNECTED + Client: WSEndpoint.onOpen() called + Client: State: OPEN + Client: WSEndpoint.onMessage() - received msg + Client: WSSession.close(local close details) + Client: State: CLOSING + Client: Send CLOSE Frame to remote + Remote: Received CLOSE Frame from client + Remote: Sends CLOSE Frame reply (remote close details) + Client: WSEndpoint.onClose(remote close details) + Client: State: CLOSED + Client: TCP Connection disconnect completely ---- Normal Remote Initiated Close State Transition - WSEndpoint created + Client: State: NEW + Client: WSEndpoint created + Client: WSSession created Http GET w/Upgrade initiated - State: CONNECTING + Client: State: CONNECTING Upgrade Handshake negotiated (HTTP 101 response) - State: CONNECTED - WSEndpoint.onOpen() called - State: OPEN - WSEndpoint.onMessage() - Remote: Receive CLOSE frame - State: CLOSING - Session.close(remote close details) - Connection disconnect completely - WSEndpoint.onClose(local close details) - State: CLOSED + Client: State: CONNECTED + Client: WSEndpoint.onOpen() called + Client: State: OPEN + Client: WSEndpoint.onMessage() - received msg + Remote: Sends CLOSE Frame (remote close details) + Client: Received CLOSE Frame from remote + Client: State: CLOSING + Client: WSEndpoint.onClose(remote close details) + Client: after onClose() + if can transition to CLOSED + Client: Sends CLOSE Frame + Client: TCP Connection disconnect completely + +--- + +Overlapping Close State (Remote first) Transition + + Client: WSEndpoint created + Client: WSSession created + Http GET w/Upgrade initiated + Client: State: CONNECTING + Upgrade Handshake negotiated (HTTP 101 response) + Client: State: CONNECTED + Client: WSEndpoint.onOpen() called + Client: State: OPEN + Client: WSEndpoint.onMessage() - received msg + Remote[t1]: Sends CLOSE Frame (remote close details) + Client[t2]: WSSession.close(local close details) + Client[t2]: State: CLOSING + Client[t1]: Received CLOSE Frame from remote + Client[t1]: Cannot transition to State: CLOSING (no-op) + Client[t1]: WSEndpoint.onClose(remote close details) + Client[t1]: after onClose() + if can transition to CLOSED + Client: Sends CLOSE Frame + Client: TCP Connection disconnect completely + diff --git a/jetty-websocket/websocket-tests/pom.xml b/jetty-websocket/websocket-tests/pom.xml index d03ef57d360..3a6c868793d 100644 --- a/jetty-websocket/websocket-tests/pom.xml +++ b/jetty-websocket/websocket-tests/pom.xml @@ -3,7 +3,7 @@ org.eclipse.jetty.websocket websocket-parent - 9.4.3-SNAPSHOT + 9.4.5-SNAPSHOT 4.0.0 diff --git a/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/Fuzzer.java b/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/Fuzzer.java index aa54bdccd12..a4da85cfa00 100644 --- a/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/Fuzzer.java +++ b/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/Fuzzer.java @@ -32,7 +32,6 @@ import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; import java.util.Locale; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.eclipse.jetty.toolchain.test.ByteBufferAssert; @@ -154,15 +153,13 @@ public class Fuzzer extends ContainerLifeCycle LOG.debug("expect() {} frame(s)", expect.size()); // Read frames - Future> futFrames = session.getUntrustedEndpoint().expectedFrames(expectedCount); - - List frames = futFrames.get(duration, unit); + UntrustedWSEndpoint endpoint = session.getUntrustedEndpoint(); String prefix = ""; for (int i = 0; i < expectedCount; i++) { WebSocketFrame expected = expect.get(i); - WebSocketFrame actual = frames.get(i); + WebSocketFrame actual = endpoint.framesQueue.poll(5, TimeUnit.SECONDS); prefix = "Frame[" + i + "]"; diff --git a/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/TrackingEndpoint.java b/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/TrackingEndpoint.java index f1dcc17c821..23093449474 100644 --- a/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/TrackingEndpoint.java +++ b/jetty-websocket/websocket-tests/src/main/java/org/eclipse/jetty/websocket/tests/TrackingEndpoint.java @@ -25,12 +25,9 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.util.BufferUtil; @@ -56,23 +53,19 @@ public class TrackingEndpoint implements WebSocketListener, WebSocketFrameListen public AtomicReference closeInfo = new AtomicReference<>(); public AtomicReference error = new AtomicReference<>(); - private CompletableFuture> expectedMessagesFuture = new CompletableFuture<>(); - private AtomicReference expectedMessageCount = new AtomicReference<>(); - private List messages = new ArrayList<>(); + public BlockingQueue messageQueue = new LinkedBlockingDeque<>(); + public BlockingQueue bufferQueue = new LinkedBlockingDeque<>(); + public BlockingQueue framesQueue = new LinkedBlockingDeque<>(); - private CompletableFuture> expectedFramesFuture = new CompletableFuture<>(); - private AtomicReference expectedFramesCount = new AtomicReference<>(); - private List frames = new ArrayList<>(); private WebSocketSession session; public TrackingEndpoint(String id) { - LOG = Log.getLogger(this.getClass().getName() + "_" + id); + LOG = Log.getLogger(this.getClass().getName() + "." + id); } - public void assertClose(String prefix, int expectedCloseStatusCode, Matcher reasonMatcher) throws InterruptedException + public void assertCloseInfo(String prefix, int expectedCloseStatusCode, Matcher reasonMatcher) throws InterruptedException { - assertThat(prefix + " endpoint close event received", closeLatch.await(10, TimeUnit.SECONDS), Matchers.is(true)); CloseInfo close = closeInfo.get(); assertThat(prefix + " close info", close, Matchers.notNullValue()); assertThat(prefix + " received close code", close.getStatusCode(), Matchers.is(expectedCloseStatusCode)); @@ -84,38 +77,6 @@ public class TrackingEndpoint implements WebSocketListener, WebSocketFrameListen this.session.close(statusCode, reason); } - public Future> expectedFrames(int expectedCount) - { - synchronized (expectedFramesCount) - { - if (!expectedFramesCount.compareAndSet(null, expectedCount)) - { - throw new IllegalStateException("Can only have 1 registered frame count future"); - } - else - { - checkFrameCount(); - } - } - return expectedFramesFuture; - } - - public Future> expectedMessages(int expectedCount) - { - synchronized (expectedMessagesFuture) - { - if (!expectedMessageCount.compareAndSet(null, expectedCount)) - { - throw new IllegalStateException("Can only have 1 registered message count future"); - } - else - { - checkMessageCount(); - } - } - return expectedMessagesFuture; - } - public RemoteEndpoint getRemote() { return session.getRemote(); @@ -128,6 +89,8 @@ public class TrackingEndpoint implements WebSocketListener, WebSocketFrameListen { LOG.info("onWSBinary({})", BufferUtil.toDetailString(ByteBuffer.wrap(payload, offset, len))); } + + bufferQueue.offer(ByteBuffer.wrap(payload, offset, len)); } @Override @@ -156,13 +119,10 @@ public class TrackingEndpoint implements WebSocketListener, WebSocketFrameListen assertThat("Error must have value", cause, notNullValue()); if (error.compareAndSet(null, cause) == false) { - LOG.warn("Original Cause", error.get()); - LOG.warn("Extra/Excess Cause", cause); + LOG.warn("onError should only happen once - Original Cause", error.get()); + LOG.warn("onError should only happen once - Extra/Excess Cause", cause); fail("onError should only happen once!"); } - - this.expectedMessagesFuture.completeExceptionally(cause); - this.expectedFramesFuture.completeExceptionally(cause); } @Override @@ -173,11 +133,7 @@ public class TrackingEndpoint implements WebSocketListener, WebSocketFrameListen LOG.debug("onWSFrame({})", frame); } - synchronized (expectedFramesFuture) - { - frames.add(WebSocketFrame.copy(frame)); - checkFrameCount(); - } + framesQueue.offer(WebSocketFrame.copy(frame)); } @Override @@ -187,31 +143,7 @@ public class TrackingEndpoint implements WebSocketListener, WebSocketFrameListen { LOG.debug("onWSText(\"{}\")", text); } - - synchronized (expectedMessagesFuture) - { - messages.add(text); - checkMessageCount(); - } - } - - private void checkMessageCount() - { - Integer expected = expectedMessageCount.get(); - - if (expected != null && messages.size() >= expected.intValue()) - { - expectedMessagesFuture.complete(messages); - } - } - - private void checkFrameCount() - { - Integer expected = expectedFramesCount.get(); - - if (expected != null && frames.size() >= expected.intValue()) - { - expectedFramesFuture.complete(frames); - } + + messageQueue.offer(text); } } diff --git a/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/UntrustedWSClientTest.java b/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/UntrustedWSClientTest.java index cee3bd95bcc..a0046799fad 100644 --- a/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/UntrustedWSClientTest.java +++ b/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/UntrustedWSClientTest.java @@ -23,7 +23,6 @@ import static org.junit.Assert.assertThat; import java.io.IOException; import java.net.URI; -import java.util.List; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -109,13 +108,11 @@ public class UntrustedWSClientTest { UntrustedWSEndpoint endpoint = session.getUntrustedEndpoint(); - Future> futMessages = endpoint.expectedMessages(1); - session.getRemote().sendString("hello"); - List messages = futMessages.get(); - assertThat("Messages.size", messages.size(), is(1)); - assertThat("Messages[0]", messages.get(0), is("hello")); + String message = endpoint.messageQueue.poll(5, TimeUnit.SECONDS); + assertThat("message", message, is("hello")); + // TODO: test close } } finally diff --git a/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ClientCloseTest.java b/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ClientCloseTest.java index ad22f164b47..555d8eab7b1 100644 --- a/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ClientCloseTest.java +++ b/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ClientCloseTest.java @@ -35,7 +35,6 @@ import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.Arrays; -import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; @@ -207,7 +206,7 @@ public class ClientCloseTest assertThat("Client WebSocket is Open", clientSocket.openLatch.await(30, TimeUnit.SECONDS), is(true)); UntrustedWSEndpoint serverEndpoint = serverSession.getUntrustedEndpoint(); - Future> futFrames = serverEndpoint.expectedFrames(1); + // Future> futFrames = serverEndpoint.expectedFrames(1); try { @@ -219,8 +218,7 @@ public class ClientCloseTest testFut.get(30, TimeUnit.SECONDS); // Read Frame on server side - List frames = futFrames.get(30, TimeUnit.SECONDS); - WebSocketFrame frame = frames.get(0); + WebSocketFrame frame = serverEndpoint.framesQueue.poll(10, TimeUnit.SECONDS); assertThat("Server received frame", frame.getOpCode(), is(OpCode.TEXT)); assertThat("Server received frame payload", frame.getPayloadAsUTF8(), is(echoMsg)); @@ -333,7 +331,7 @@ public class ClientCloseTest clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason); // server receives close frame - serverSession.getUntrustedEndpoint().assertClose("Server", StatusCode.NORMAL, is(origCloseReason)); + serverSession.getUntrustedEndpoint().assertCloseInfo("Server", StatusCode.NORMAL, is(origCloseReason)); // server sends 2 messages RemoteEndpoint remote = serverSession.getRemote(); @@ -451,7 +449,7 @@ public class ClientCloseTest assertThat("OnError", clientSocket.error.get().getMessage(), containsString("Invalid control frame")); // client parse invalid frame, notifies server of close (protocol error) - serverSession.getUntrustedEndpoint().assertClose("Server", StatusCode.PROTOCOL, allOf(containsString("Invalid control frame"), containsString("length"))); + serverSession.getUntrustedEndpoint().assertCloseInfo("Server", StatusCode.PROTOCOL, allOf(containsString("Invalid control frame"), containsString("length"))); } // server disconnects @@ -489,7 +487,7 @@ public class ClientCloseTest clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason); // server receives close frame - serverSession.getUntrustedEndpoint().assertClose("Server", StatusCode.NORMAL, is(origCloseReason)); + serverSession.getUntrustedEndpoint().assertCloseInfo("Server", StatusCode.NORMAL, is(origCloseReason)); // client should not have received close message (yet) clientSocket.assertNoCloseEvent(); @@ -531,7 +529,7 @@ public class ClientCloseTest clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason); // server receives close frame - serverSession.getUntrustedEndpoint().assertClose("Server", StatusCode.NORMAL, is(origCloseReason)); + serverSession.getUntrustedEndpoint().assertCloseInfo("Server", StatusCode.NORMAL, is(origCloseReason)); // client should not have received close message (yet) clientSocket.assertNoCloseEvent(); @@ -581,7 +579,7 @@ public class ClientCloseTest for (int i = 0; i < clientCount; i++) { // server receives close frame - serverSessions[i].getUntrustedEndpoint().assertClose("Server", StatusCode.SHUTDOWN, containsString("Shutdown")); + serverSessions[i].getUntrustedEndpoint().assertCloseInfo("Server", StatusCode.SHUTDOWN, containsString("Shutdown")); } // clients disconnect diff --git a/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/EchoTest.java b/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/EchoTest.java index 303d8d02e20..8185f34ab5c 100644 --- a/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/EchoTest.java +++ b/jetty-websocket/websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/EchoTest.java @@ -24,7 +24,6 @@ import static org.junit.Assert.assertThat; import java.io.IOException; import java.net.URI; -import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -36,6 +35,7 @@ import org.eclipse.jetty.websocket.api.StatusCode; import org.eclipse.jetty.websocket.api.WebSocketBehavior; import org.eclipse.jetty.websocket.client.WebSocketClient; import org.eclipse.jetty.websocket.tests.TrackingEndpoint; +import org.eclipse.jetty.websocket.tests.UntrustedWSEndpoint; import org.eclipse.jetty.websocket.tests.UntrustedWSServer; import org.eclipse.jetty.websocket.tests.UntrustedWSSession; import org.junit.After; @@ -99,30 +99,33 @@ public class EchoTest server.registerConnectFuture(wsURI, serverSessionFut); // Client connects - TrackingEndpoint clientSocket = new TrackingEndpoint(WebSocketBehavior.CLIENT.name()); - Future clientConnectFuture = client.connect(clientSocket, wsURI); + TrackingEndpoint clientEndpoint = new TrackingEndpoint(WebSocketBehavior.CLIENT.name()); + Future clientConnectFuture = client.connect(clientEndpoint, wsURI); // Server accepts connect UntrustedWSSession serverSession = serverSessionFut.get(10, TimeUnit.SECONDS); + UntrustedWSEndpoint serverEndpoint = serverSession.getUntrustedEndpoint(); // client confirms connection via echo - assertThat("Client Opened", clientSocket.openLatch.await(5, TimeUnit.SECONDS), is(true)); + assertThat("Client onOpen event", clientEndpoint.openLatch.await(5, TimeUnit.SECONDS), is(true)); - Future> futMessages = clientSocket.expectedMessages(1); - // client sends message - clientSocket.getRemote().sendString("Hello Echo"); - List messages = futMessages.get(10, TimeUnit.SECONDS); - assertThat("Messages[0]", messages.get(0), is("Hello Echo")); + clientEndpoint.getRemote().sendString("Hello Echo"); + + // Wait for response to echo + String message = clientEndpoint.messageQueue.poll(5, TimeUnit.SECONDS); + assertThat("message", message, is("Hello Echo")); // client closes - clientSocket.close(StatusCode.NORMAL, "Normal Close"); + clientEndpoint.close(StatusCode.NORMAL, "Normal Close"); - // client triggers close event on client ws-endpoint - clientSocket.assertClose("Client", StatusCode.NORMAL, containsString("Normal Close")); - // Server close event - serverSession.getUntrustedEndpoint().assertClose("Server", StatusCode.NORMAL, containsString("Normal Close")); + assertThat("Server onClose event", serverSession.getUntrustedEndpoint().closeLatch.await(5, TimeUnit.SECONDS), is(true)); + serverEndpoint.assertCloseInfo("Server", StatusCode.NORMAL, containsString("Normal Close")); + + // client triggers close event on client ws-endpoint + assertThat("Client onClose event", clientEndpoint.closeLatch.await(5, TimeUnit.SECONDS), is(true)); + clientEndpoint.assertCloseInfo("Client", StatusCode.NORMAL, containsString("Normal Close")); } } diff --git a/jetty-websocket/websocket-tests/src/test/resources/jetty-logging.properties b/jetty-websocket/websocket-tests/src/test/resources/jetty-logging.properties index d24cbb3dab8..893b7d808d2 100644 --- a/jetty-websocket/websocket-tests/src/test/resources/jetty-logging.properties +++ b/jetty-websocket/websocket-tests/src/test/resources/jetty-logging.properties @@ -20,23 +20,24 @@ org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog org.eclipse.jetty.LEVEL=WARN +# org.eclipse.jetty.util.log.stderr.LONG=true # org.eclipse.jetty.io.WriteFlusher.LEVEL=DEBUG # org.eclipse.jetty.websocket.LEVEL=DEBUG # org.eclipse.jetty.websocket.LEVEL=INFO -org.eclipse.jetty.websocket.tests.LEVEL=DEBUG -org.eclipse.jetty.websocket.common.io.LEVEL=DEBUG +# org.eclipse.jetty.websocket.tests.LEVEL=DEBUG +# org.eclipse.jetty.websocket.common.io.LEVEL=DEBUG # org.eclipse.jetty.websocket.server.ab.LEVEL=DEBUG -org.eclipse.jetty.websocket.common.WebSocketSession.LEVEL=DEBUG -org.eclipse.jetty.websocket.common.Parser.LEVEL=DEBUG +# org.eclipse.jetty.websocket.common.WebSocketSession.LEVEL=DEBUG +# org.eclipse.jetty.websocket.common.Parser.LEVEL=DEBUG # org.eclipse.jetty.websocket.common.Generator.LEVEL=DEBUG # org.eclipse.jetty.websocket.server.ab.Fuzzer.LEVEL=DEBUG # org.eclipse.jetty.websocket.server.blockhead.LEVEL=DEBUG # org.eclipse.jetty.websocket.server.helper.LEVEL=DEBUG +org.eclipse.jetty.websocket.common.CompletionCallback.LEVEL=ALL # org.eclipse.jetty.websocket.client.io.ConnectPromise.LEVEL=DEBUG # org.eclipse.jetty.websocket.common.WebSocketSession_OPEN.LEVEL=DEBUG -org.eclipse.jetty.websocket.common.io.IOState.LEVEL=DEBUG # org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection_OPEN.LEVEL=DEBUG ### Show state changes on BrowserDebugTool