Issue #207 - more review rework

This commit is contained in:
Joakim Erdfelt 2017-04-21 16:21:07 -07:00
parent f9e2f5f96e
commit 683509be60
21 changed files with 299 additions and 484 deletions

View File

@ -1,61 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common;
import java.util.concurrent.atomic.AtomicReference;
/**
* Atomic Close State
*/
public class AtomicClose
{
enum State
{
/** No close handshake initiated (yet) */
NONE,
/** Local side initiated the close handshake */
LOCAL,
/** Remote side initiated the close handshake */
REMOTE,
/** An abnormal close situation (disconnect, timeout, etc...) */
ABNORMAL
}
private final AtomicReference<State> state = new AtomicReference<>(State.NONE);
public State get()
{
return state.get();
}
public boolean onAbnormal()
{
return state.compareAndSet(State.NONE, State.ABNORMAL);
}
public boolean onLocal()
{
return state.compareAndSet(State.NONE, State.LOCAL);
}
public boolean onRemote()
{
return state.compareAndSet(State.NONE, State.REMOTE);
}
}

View File

@ -22,8 +22,8 @@ import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.Utf8Appendable.NotUtf8Exception;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.websocket.api.BadPayloadException;
import org.eclipse.jetty.websocket.api.CloseStatus;
import org.eclipse.jetty.websocket.api.ProtocolException;
@ -34,7 +34,7 @@ import org.eclipse.jetty.websocket.common.frames.CloseFrame;
public class CloseInfo
{
private int statusCode;
private byte[] reasonBytes;
private String reason;
public CloseInfo()
{
@ -82,7 +82,7 @@ public class CloseInfo
{
// Reason (trimmed to max reason size)
int len = Math.min(data.remaining(), CloseStatus.MAX_REASON_PHRASE);
reasonBytes = new byte[len];
byte reasonBytes[] = new byte[len];
data.get(reasonBytes,0,len);
// Spec Requirement : throw BadPayloadException on invalid UTF8
@ -93,6 +93,7 @@ public class CloseInfo
Utf8StringBuilder utf = new Utf8StringBuilder();
// if this throws, we know we have bad UTF8
utf.append(reasonBytes,0,reasonBytes.length);
this.reason = utf.toString();
}
catch (NotUtf8Exception e)
{
@ -127,22 +128,17 @@ public class CloseInfo
public CloseInfo(int statusCode, String reason)
{
this.statusCode = statusCode;
if (reason != null)
{
byte[] utf8Bytes = reason.getBytes(StandardCharsets.UTF_8);
if (utf8Bytes.length > CloseStatus.MAX_REASON_PHRASE)
{
this.reasonBytes = new byte[CloseStatus.MAX_REASON_PHRASE];
System.arraycopy(utf8Bytes,0,this.reasonBytes,0,CloseStatus.MAX_REASON_PHRASE);
}
else
{
this.reasonBytes = utf8Bytes;
}
}
this.reason = reason;
}
private ByteBuffer asByteBuffer()
/**
* Convert a raw status code and reason into a WebSocket Close frame payload buffer.
*
* @param statusCode the status code
* @param reason the optional reason string
* @return the payload buffer if valid. null if invalid status code for payload buffer.
*/
public static ByteBuffer asPayloadBuffer(int statusCode, String reason)
{
if ((statusCode == StatusCode.NO_CLOSE) || (statusCode == StatusCode.NO_CODE) || (statusCode == (-1)))
{
@ -151,26 +147,44 @@ public class CloseInfo
}
int len = 2; // status code
boolean hasReason = (this.reasonBytes != null) && (this.reasonBytes.length > 0);
byte reasonBytes[];
byte[] utf8Bytes = reason.getBytes(StandardCharsets.UTF_8);
if (utf8Bytes.length > CloseStatus.MAX_REASON_PHRASE)
{
reasonBytes = new byte[CloseStatus.MAX_REASON_PHRASE];
System.arraycopy(utf8Bytes, 0, reasonBytes, 0, CloseStatus.MAX_REASON_PHRASE);
}
else
{
reasonBytes = utf8Bytes;
}
boolean hasReason = (reasonBytes != null) && (reasonBytes.length > 0);
if (hasReason)
{
len += this.reasonBytes.length;
len += reasonBytes.length;
}
ByteBuffer buf = BufferUtil.allocate(len);
BufferUtil.flipToFill(buf);
buf.put((byte)((statusCode >>> 8) & 0xFF));
buf.put((byte)((statusCode >>> 0) & 0xFF));
buf.put((byte) ((statusCode >>> 8) & 0xFF));
buf.put((byte) ((statusCode >>> 0) & 0xFF));
if (hasReason)
{
buf.put(this.reasonBytes,0,this.reasonBytes.length);
buf.put(reasonBytes, 0, reasonBytes.length);
}
BufferUtil.flipToFlush(buf,0);
BufferUtil.flipToFlush(buf, 0);
return buf;
}
private ByteBuffer asByteBuffer()
{
return asPayloadBuffer(statusCode, reason);
}
public CloseFrame asFrame()
{
CloseFrame frame = new CloseFrame();
@ -188,11 +202,7 @@ public class CloseInfo
public String getReason()
{
if (this.reasonBytes == null)
{
return null;
}
return new String(this.reasonBytes,StandardCharsets.UTF_8);
return this.reason;
}
public int getStatusCode()
@ -200,11 +210,6 @@ public class CloseInfo
return statusCode;
}
public boolean isHarsh()
{
return !((statusCode == StatusCode.NORMAL) || (statusCode == StatusCode.NO_CODE));
}
public boolean isAbnormal()
{
return (statusCode != StatusCode.NORMAL);

View File

@ -0,0 +1,46 @@
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.FrameCallback;
/**
* A callback which will trigger complete regardless of success/failure.
*/
public abstract class CompletionCallback implements FrameCallback
{
private static final Logger LOG = Log.getLogger(CompletionCallback.class);
@Override
public void fail(Throwable cause)
{
LOG.ignore(cause);
complete();
}
@Override
public void succeed()
{
complete();
}
public abstract void complete();
}

View File

@ -41,9 +41,8 @@ public interface LogicalConnection extends OutgoingFrames, SuspendToken
/**
* Terminate the connection (no close frame sent)
* @param onlyOutput true to only close the output (half-close), false to close fully.
*/
void disconnect(boolean onlyOutput);
void disconnect();
/**
* Register Read Interest in Connection.

View File

@ -103,7 +103,7 @@ public class Parser
this.policy = wspolicy;
this.parserHandler = parserHandler;
LOG = Log.getLogger(Parser.class.getName() + "_" + wspolicy.getBehavior());
LOG = Log.getLogger(Parser.class.getName() + "." + wspolicy.getBehavior());
}
private void assertSanePayloadLength(long len)

View File

@ -32,6 +32,7 @@ import java.util.Objects;
import java.util.ServiceLoader;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.io.ByteBufferPool;
@ -58,7 +59,6 @@ import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
@ -75,29 +75,7 @@ import org.eclipse.jetty.websocket.common.scopes.WebSocketSessionScope;
public class WebSocketSession extends ContainerLifeCycle implements Session, RemoteEndpointFactory,
WebSocketSessionScope, IncomingFrames, LogicalConnection.Listener, Connection.Listener
{
public class OnDisconnectCallback implements WriteCallback
{
private final boolean outputOnly;
public OnDisconnectCallback(boolean outputOnly)
{
this.outputOnly = outputOnly;
}
@Override
public void writeFailed(Throwable x)
{
LOG.debug("writeFailed()", x);
disconnect(outputOnly);
}
@Override
public void writeSuccess()
{
LOG.debug("writeSuccess()");
disconnect(outputOnly);
}
}
private static final FrameCallback EMPTY = new FrameCallback.Adapter();
private final Logger LOG;
@ -107,7 +85,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
private final LogicalConnection connection;
private final Executor executor;
private final AtomicConnectionState connectionState = new AtomicConnectionState();
private final AtomicClose closeState = new AtomicClose();
private final AtomicBoolean closeSent = new AtomicBoolean();
// The websocket endpoint object itself
private final Object endpoint;
@ -134,7 +112,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
Objects.requireNonNull(containerScope, "Container Scope cannot be null");
Objects.requireNonNull(requestURI, "Request URI cannot be null");
LOG = Log.getLogger(WebSocketSession.class.getName() + "_" + connection.getPolicy().getBehavior().name());
LOG = Log.getLogger(WebSocketSession.class.getName() + "." + connection.getPolicy().getBehavior().name());
this.classLoader = Thread.currentThread().getContextClassLoader();
this.containerScope = containerScope;
@ -174,31 +152,23 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
@Override
public void close(int statusCode, String reason)
{
if (connectionState.onClosing())
close(new CloseInfo(statusCode, reason), EMPTY);
}
private void close(CloseInfo closeInfo, FrameCallback callback)
{
LOG.debug("ConnectionState: Transition to CLOSING");
// This is the first CLOSE event
if (closeState.onLocal())
// TODO: review close from onOpen
if(closeSent.compareAndSet(false,true))
{
LOG.debug("CloseState: Transition to LOCAL");
// this is Local initiated.
CloseInfo closeInfo = new CloseInfo(statusCode, reason);
Frame closeFrame = closeInfo.asFrame();
outgoingHandler.outgoingFrame(closeFrame, new OnDisconnectCallback(true), BatchMode.AUTO);
LOG.debug("Sending Close Frame");
CloseFrame closeFrame = closeInfo.asFrame();
outgoingHandler.outgoingFrame(closeFrame, callback, BatchMode.OFF);
}
else
{
LOG.debug("CloseState: Expected LOCAL, but was " + closeState.get());
}
}
else if(connectionState.onClosed())
{
LOG.debug("ConnectionState: Transition to CLOSED");
// This is the reply to the CLOSING entry point
CloseInfo closeInfo = new CloseInfo(statusCode, reason);
Frame closeFrame = closeInfo.asFrame();
outgoingHandler.outgoingFrame(closeFrame, new OnDisconnectCallback(false), BatchMode.AUTO);
LOG.debug("Close Frame Previously Sent: ignoring: {} [{}]", closeInfo, callback);
callback.succeed();
}
}
@ -208,31 +178,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
@Override
public void disconnect()
{
disconnect(true);
}
private void disconnect(boolean outputOnly)
{
if(connectionState.onClosing())
{
// Is this is a harsh disconnect: OPEN -> CLOSING -> CLOSED
if (closeState.onAbnormal())
{
// notify local endpoint of harsh disconnect
notifyClose(StatusCode.SHUTDOWN, "Harsh disconnect");
}
}
if(connectionState.onClosed())
{
// Transition: CLOSING -> CLOSED
connection.disconnect(outputOnly);
if (closeState.onLocal())
{
// notify local endpoint of harsh disconnect
notifyClose(StatusCode.SHUTDOWN, "Harsh disconnect");
}
}
connection.disconnect();
}
public void dispatch(Runnable runnable)
@ -355,11 +301,6 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
return connectionState;
}
public AtomicClose getCloseState()
{
return closeState;
}
@Override
public WebSocketContainerScope getContainerScope()
{
@ -481,10 +422,8 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
@Override
public void incomingFrame(Frame frame, FrameCallback callback)
{
ClassLoader old = Thread.currentThread().getContextClassLoader();
try
try(ThreadClassLoaderScope scope = new ThreadClassLoaderScope(classLoader))
{
Thread.currentThread().setContextClassLoader(classLoader);
if (connectionState.get() == AtomicConnectionState.State.OPEN)
{
// For endpoints that want to see raw frames.
@ -496,33 +435,37 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
{
case OpCode.CLOSE:
{
boolean validate = true;
CloseFrame closeframe = (CloseFrame) frame;
CloseInfo close = new CloseInfo(closeframe, validate);
CloseInfo closeInfo = null;
// process handshake
if(connectionState.onClosing())
if (connectionState.onClosing())
{
LOG.debug("ConnectionState: Transition to CLOSING");
// we transitioned to CLOSING state
if(closeState.onRemote())
{
LOG.debug("CloseState: Transition to REMOTE");
// Remote initiated.
// Send reply to remote
close(close.getStatusCode(), close.getReason());
CloseFrame closeframe = (CloseFrame) frame;
closeInfo = new CloseInfo(closeframe, true);
}
else
{
LOG.debug("CloseState: Already at LOCAL");
// Local initiated, this was the reply.
disconnect();
LOG.debug("ConnectionState: {} - Close Frame Received", connectionState);
}
}
else
if (closeInfo != null)
{
LOG.debug("ConnectionState: Not CLOSING: was " + connectionState.get());
notifyClose(closeInfo.getStatusCode(), closeInfo.getReason());
close(closeInfo, new CompletionCallback()
{
@Override
public void complete()
{
if (connectionState.onClosed())
{
LOG.debug("ConnectionState: Transition to CLOSED");
connection.disconnect();
}
}
});
}
// let fill/parse continue
callback.succeed();
return;
@ -547,7 +490,14 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
endpointFunctions.onPing(frame.getPayload());
callback.succeed();
try
{
getRemote().sendPong(pongBuf);
}
catch (Throwable t)
{
LOG.debug("Unable to send pong", t);
}
break;
}
case OpCode.PONG:
@ -590,19 +540,10 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
LOG.debug("Discarding post EOF frame - {}", frame);
}
}
catch (Throwable cause)
{
callback.fail(cause);
onError(cause);
}
finally
{
// Unset active MessageSink if this was a fin frame
if (frame.getType().isData() && frame.isFin() && activeMessageSink != null)
activeMessageSink = null;
Thread.currentThread().setContextClassLoader(old);
}
}
@Override
@ -630,7 +571,9 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
{
LOG.debug("notifyClose({},{}) [{}]", statusCode, reason, getState());
}
endpointFunctions.onClose(new CloseInfo(statusCode, reason));
CloseInfo closeInfo = new CloseInfo(statusCode, reason);
endpointFunctions.onClose(closeInfo);
}
/**

View File

@ -677,6 +677,7 @@ public class CommonEndpointFunctions<T extends Session> extends AbstractLifeCycl
this.session = session;
// Call (optional) on open method
// TODO: catch end user throwables
if (onOpenFunction != null)
onOpenFunction.apply(this.session);
}
@ -686,6 +687,7 @@ public class CommonEndpointFunctions<T extends Session> extends AbstractLifeCycl
{
assertIsStarted();
// TODO: catch end user throwables
if (onCloseFunction != null)
onCloseFunction.apply(close);
}
@ -695,6 +697,7 @@ public class CommonEndpointFunctions<T extends Session> extends AbstractLifeCycl
{
assertIsStarted();
// TODO: catch end user throwables
if (onFrameFunction != null)
onFrameFunction.apply(frame);
}
@ -735,6 +738,7 @@ public class CommonEndpointFunctions<T extends Session> extends AbstractLifeCycl
@Override
public void onContinuation(Frame frame, FrameCallback callback)
{
// TODO: catch end user throwables
acceptMessage(frame, callback);
}
@ -745,6 +749,7 @@ public class CommonEndpointFunctions<T extends Session> extends AbstractLifeCycl
return;
// Accept the payload into the message sink
// TODO: catch end user throwables
activeMessageSink.accept(frame, callback);
if (frame.isFin())
activeMessageSink = null;
@ -755,6 +760,7 @@ public class CommonEndpointFunctions<T extends Session> extends AbstractLifeCycl
{
assertIsStarted();
// TODO: catch end user throwables
if (onPingFunction != null)
onPingFunction.apply(payload);
}
@ -764,6 +770,7 @@ public class CommonEndpointFunctions<T extends Session> extends AbstractLifeCycl
{
assertIsStarted();
// TODO: catch end user throwables
if (onPongFunction != null)
onPongFunction.apply(payload);
}

View File

@ -43,7 +43,6 @@ import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.SuspendToken;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.Generator;
@ -70,29 +69,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
}
}
public class OnDisconnectCallback implements WriteCallback
{
private final boolean outputOnly;
public OnDisconnectCallback(boolean outputOnly)
{
this.outputOnly = outputOnly;
}
@Override
public void writeFailed(Throwable x)
{
disconnect(outputOnly);
}
@Override
public void writeSuccess()
{
disconnect(outputOnly);
}
}
/**
* Minimum size of a buffer is the determined to be what would be the maximum framing header size (not including payload)
*/
@ -118,7 +94,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
{
super(endp,executor);
LOG = Log.getLogger(AbstractWebSocketConnection.class.getName() + "_" + policy.getBehavior());
LOG = Log.getLogger(AbstractWebSocketConnection.class.getName() + "." + policy.getBehavior());
this.id = String.format("%s:%d->%s:%d",
endp.getLocalAddress().getAddress().getHostAddress(),
@ -150,10 +126,10 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
}
@Override
public void disconnect(boolean onlyOutput)
public void disconnect()
{
if (LOG.isDebugEnabled())
LOG.debug("disconnect({})", onlyOutput ? "OUTPUT_ONLY" : "BOTH");
LOG.debug("disconnect()");
// close FrameFlusher, we cannot write anymore at this point.
flusher.close();
@ -164,19 +140,9 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
if (LOG.isDebugEnabled())
LOG.debug("Shutting down output {}",endPoint);
endPoint.shutdownOutput();
if (!onlyOutput)
{
if (LOG.isDebugEnabled())
LOG.debug("Closing {}",endPoint);
endPoint.close();
}
else
{
closed.set(true);
}
}
protected void execute(Runnable task)
{

View File

@ -238,6 +238,9 @@ public class FrameFlusher
private void succeedEntries()
{
if(LOG.isDebugEnabled())
LOG.debug("succeedEntries()");
// Do not allocate the iterator here.
for (int i = 0; i < entries.size(); ++i)
{
@ -290,7 +293,7 @@ public class FrameFlusher
}
public static final BinaryFrame FLUSH_FRAME = new BinaryFrame();
private static final Logger LOG = Log.getLogger(FrameFlusher.class);
private final Logger LOG;
private final EndPoint endpoint;
private final int bufferSize;
private final Generator generator;
@ -303,6 +306,7 @@ public class FrameFlusher
public FrameFlusher(Generator generator, EndPoint endpoint, int bufferSize, int maxGather)
{
this.LOG = Log.getLogger(FrameFlusher.class.getName() + "." + generator.getBehavior().name());
this.endpoint = endpoint;
this.bufferSize = bufferSize;
this.generator = Objects.requireNonNull(generator);
@ -319,7 +323,7 @@ public class FrameFlusher
if (!closed)
{
closed = true;
LOG.debug("{} closing {}",this);
LOG.debug("{} closing",this);
entries = new ArrayList<>();
entries.addAll(queue);
@ -441,7 +445,7 @@ public class FrameFlusher
public String toString()
{
ByteBuffer aggregate = flusher.aggregate;
return String.format("%s[%s,queueSize=%d,aggregateSize=%d,failure=%s]",getClass().getSimpleName(),generator.getBehavior(),queue.size(),aggregate == null?0:aggregate.position(),
return String.format("%s[queueSize=%d,aggregateSize=%d,failure=%s]",getClass().getSimpleName(),queue.size(),aggregate == null?0:aggregate.position(),
failure);
}
}

View File

@ -1,55 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import org.junit.Test;
public class AtomicCloseTest
{
@Test
public void testCloseLocalNormal()
{
AtomicClose state = new AtomicClose();
assertThat("State", state.get(), is(AtomicClose.State.NONE));
assertThat("Local 1st", state.onLocal(), is(true));
assertThat("State", state.get(), is(AtomicClose.State.LOCAL));
assertThat("Local 2nd", state.onLocal(), is(false));
assertThat("State", state.get(), is(AtomicClose.State.LOCAL));
assertThat("Remote 1st", state.onRemote(), is(false));
assertThat("State", state.get(), is(AtomicClose.State.LOCAL));
}
@Test
public void testCloseRemoteNormal()
{
AtomicClose state = new AtomicClose();
assertThat("State", state.get(), is(AtomicClose.State.NONE));
assertThat("Remote 1st", state.onRemote(), is(true));
assertThat("State", state.get(), is(AtomicClose.State.REMOTE));
assertThat("Local 1st", state.onRemote(), is(false));
assertThat("State", state.get(), is(AtomicClose.State.REMOTE));
assertThat("Remote 2nd", state.onRemote(), is(false));
assertThat("State", state.get(), is(AtomicClose.State.REMOTE));
}
}

View File

@ -74,10 +74,10 @@ public class LocalWebSocketConnection implements LogicalConnection
}
@Override
public void disconnect(boolean outputOnly)
public void disconnect()
{
if (LOG.isDebugEnabled())
LOG.debug("disconnect({})", outputOnly);
LOG.debug("disconnect()");
}
@Override

View File

@ -45,7 +45,7 @@ public class DummyConnection implements LogicalConnection
}
@Override
public void disconnect(boolean outputOnly)
public void disconnect()
{
}

View File

@ -1,32 +1,33 @@
Documenting the States of a WebSocket.
Documenting the States of a WebSocket Session.
NEW:
NEW: (Internal)
A new WebSocket session.
It has had no connection attempted yet.
CONNECTING:
CONNECTING: (RFC6455) // synonyms: HANDSHAKING
The connection is being attempted, along with the Upgrade handshake.
CONNECTED:
CONNECTED: (Internal) // synonyms: HANDSHAKED, HANDSHAKEN, OPENING
The connection is established.
The User WebSocket Endpoint has not been notified yet.
OPEN:
OPEN: (RFC6455) // synonyms: OPENED
User WebSocket Endpoint has been Opened (the onOpen method has called)
User WebSocket Endpoint is now Opened (the onOpen method has been called, no close by onOpen)
CLOSING:
CLOSING: (RFC6455)
The close handshake has begun.
Either the local initiated the close, waiting for the remote to reply.
Or the remote initiated the close, and the local hasn't replied yet.
This can be considered a logical half-closed state.
CLOSED:
+ (sub state) CLOSE_FRAME_SENT (bool)
CLOSED: (RFC6455)
The connection and session is closed.
This means either the close handshake completed, or the connection was
@ -36,39 +37,71 @@ CLOSED:
Normal Client Initiated Close State Transition
WSEndpoint created
Client: State: NEW
Client: WSEndpoint created
Client: WSSession created
Http GET w/Upgrade initiated
State: CONNECTING
Client: State: CONNECTING
Upgrade Handshake negotiated (HTTP 101 response)
State: CONNECTED
WSEndpoint.onOpen() called
State: OPEN
WSEndpoint.onMessage()
Session.close(local close details)
Connection disconnect output
State: CLOSING
Remote: Received CLOSE Frame
Connection disconnect completely
WSEndpoint.onClose(remote close details)
State: CLOSED
Client: State: CONNECTED
Client: WSEndpoint.onOpen() called
Client: State: OPEN
Client: WSEndpoint.onMessage() - received msg
Client: WSSession.close(local close details)
Client: State: CLOSING
Client: Send CLOSE Frame to remote
Remote: Received CLOSE Frame from client
Remote: Sends CLOSE Frame reply (remote close details)
Client: WSEndpoint.onClose(remote close details)
Client: State: CLOSED
Client: TCP Connection disconnect completely
----
Normal Remote Initiated Close State Transition
WSEndpoint created
Client: State: NEW
Client: WSEndpoint created
Client: WSSession created
Http GET w/Upgrade initiated
State: CONNECTING
Client: State: CONNECTING
Upgrade Handshake negotiated (HTTP 101 response)
State: CONNECTED
WSEndpoint.onOpen() called
State: OPEN
WSEndpoint.onMessage()
Remote: Receive CLOSE frame
State: CLOSING
Session.close(remote close details)
Connection disconnect completely
WSEndpoint.onClose(local close details)
State: CLOSED
Client: State: CONNECTED
Client: WSEndpoint.onOpen() called
Client: State: OPEN
Client: WSEndpoint.onMessage() - received msg
Remote: Sends CLOSE Frame (remote close details)
Client: Received CLOSE Frame from remote
Client: State: CLOSING
Client: WSEndpoint.onClose(remote close details)
Client: after onClose()
if can transition to CLOSED
Client: Sends CLOSE Frame
Client: TCP Connection disconnect completely
---
Overlapping Close State (Remote first) Transition
Client: WSEndpoint created
Client: WSSession created
Http GET w/Upgrade initiated
Client: State: CONNECTING
Upgrade Handshake negotiated (HTTP 101 response)
Client: State: CONNECTED
Client: WSEndpoint.onOpen() called
Client: State: OPEN
Client: WSEndpoint.onMessage() - received msg
Remote[t1]: Sends CLOSE Frame (remote close details)
Client[t2]: WSSession.close(local close details)
Client[t2]: State: CLOSING
Client[t1]: Received CLOSE Frame from remote
Client[t1]: Cannot transition to State: CLOSING (no-op)
Client[t1]: WSEndpoint.onClose(remote close details)
Client[t1]: after onClose()
if can transition to CLOSED
Client: Sends CLOSE Frame
Client: TCP Connection disconnect completely

View File

@ -3,7 +3,7 @@
<parent>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-parent</artifactId>
<version>9.4.3-SNAPSHOT</version>
<version>9.4.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -32,7 +32,6 @@ import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.toolchain.test.ByteBufferAssert;
@ -154,15 +153,13 @@ public class Fuzzer extends ContainerLifeCycle
LOG.debug("expect() {} frame(s)", expect.size());
// Read frames
Future<List<WebSocketFrame>> futFrames = session.getUntrustedEndpoint().expectedFrames(expectedCount);
List<WebSocketFrame> frames = futFrames.get(duration, unit);
UntrustedWSEndpoint endpoint = session.getUntrustedEndpoint();
String prefix = "";
for (int i = 0; i < expectedCount; i++)
{
WebSocketFrame expected = expect.get(i);
WebSocketFrame actual = frames.get(i);
WebSocketFrame actual = endpoint.framesQueue.poll(5, TimeUnit.SECONDS);
prefix = "Frame[" + i + "]";

View File

@ -25,12 +25,9 @@ import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.BufferUtil;
@ -56,23 +53,19 @@ public class TrackingEndpoint implements WebSocketListener, WebSocketFrameListen
public AtomicReference<CloseInfo> closeInfo = new AtomicReference<>();
public AtomicReference<Throwable> error = new AtomicReference<>();
private CompletableFuture<List<String>> expectedMessagesFuture = new CompletableFuture<>();
private AtomicReference<Integer> expectedMessageCount = new AtomicReference<>();
private List<String> messages = new ArrayList<>();
public BlockingQueue<String> messageQueue = new LinkedBlockingDeque<>();
public BlockingQueue<ByteBuffer> bufferQueue = new LinkedBlockingDeque<>();
public BlockingQueue<WebSocketFrame> framesQueue = new LinkedBlockingDeque<>();
private CompletableFuture<List<WebSocketFrame>> expectedFramesFuture = new CompletableFuture<>();
private AtomicReference<Integer> expectedFramesCount = new AtomicReference<>();
private List<WebSocketFrame> frames = new ArrayList<>();
private WebSocketSession session;
public TrackingEndpoint(String id)
{
LOG = Log.getLogger(this.getClass().getName() + "_" + id);
LOG = Log.getLogger(this.getClass().getName() + "." + id);
}
public void assertClose(String prefix, int expectedCloseStatusCode, Matcher<String> reasonMatcher) throws InterruptedException
public void assertCloseInfo(String prefix, int expectedCloseStatusCode, Matcher<String> reasonMatcher) throws InterruptedException
{
assertThat(prefix + " endpoint close event received", closeLatch.await(10, TimeUnit.SECONDS), Matchers.is(true));
CloseInfo close = closeInfo.get();
assertThat(prefix + " close info", close, Matchers.notNullValue());
assertThat(prefix + " received close code", close.getStatusCode(), Matchers.is(expectedCloseStatusCode));
@ -84,38 +77,6 @@ public class TrackingEndpoint implements WebSocketListener, WebSocketFrameListen
this.session.close(statusCode, reason);
}
public Future<List<WebSocketFrame>> expectedFrames(int expectedCount)
{
synchronized (expectedFramesCount)
{
if (!expectedFramesCount.compareAndSet(null, expectedCount))
{
throw new IllegalStateException("Can only have 1 registered frame count future");
}
else
{
checkFrameCount();
}
}
return expectedFramesFuture;
}
public Future<List<String>> expectedMessages(int expectedCount)
{
synchronized (expectedMessagesFuture)
{
if (!expectedMessageCount.compareAndSet(null, expectedCount))
{
throw new IllegalStateException("Can only have 1 registered message count future");
}
else
{
checkMessageCount();
}
}
return expectedMessagesFuture;
}
public RemoteEndpoint getRemote()
{
return session.getRemote();
@ -128,6 +89,8 @@ public class TrackingEndpoint implements WebSocketListener, WebSocketFrameListen
{
LOG.info("onWSBinary({})", BufferUtil.toDetailString(ByteBuffer.wrap(payload, offset, len)));
}
bufferQueue.offer(ByteBuffer.wrap(payload, offset, len));
}
@Override
@ -156,13 +119,10 @@ public class TrackingEndpoint implements WebSocketListener, WebSocketFrameListen
assertThat("Error must have value", cause, notNullValue());
if (error.compareAndSet(null, cause) == false)
{
LOG.warn("Original Cause", error.get());
LOG.warn("Extra/Excess Cause", cause);
LOG.warn("onError should only happen once - Original Cause", error.get());
LOG.warn("onError should only happen once - Extra/Excess Cause", cause);
fail("onError should only happen once!");
}
this.expectedMessagesFuture.completeExceptionally(cause);
this.expectedFramesFuture.completeExceptionally(cause);
}
@Override
@ -173,11 +133,7 @@ public class TrackingEndpoint implements WebSocketListener, WebSocketFrameListen
LOG.debug("onWSFrame({})", frame);
}
synchronized (expectedFramesFuture)
{
frames.add(WebSocketFrame.copy(frame));
checkFrameCount();
}
framesQueue.offer(WebSocketFrame.copy(frame));
}
@Override
@ -188,30 +144,6 @@ public class TrackingEndpoint implements WebSocketListener, WebSocketFrameListen
LOG.debug("onWSText(\"{}\")", text);
}
synchronized (expectedMessagesFuture)
{
messages.add(text);
checkMessageCount();
}
}
private void checkMessageCount()
{
Integer expected = expectedMessageCount.get();
if (expected != null && messages.size() >= expected.intValue())
{
expectedMessagesFuture.complete(messages);
}
}
private void checkFrameCount()
{
Integer expected = expectedFramesCount.get();
if (expected != null && frames.size() >= expected.intValue())
{
expectedFramesFuture.complete(frames);
}
messageQueue.offer(text);
}
}

View File

@ -23,7 +23,6 @@ import static org.junit.Assert.assertThat;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@ -109,13 +108,11 @@ public class UntrustedWSClientTest
{
UntrustedWSEndpoint endpoint = session.getUntrustedEndpoint();
Future<List<String>> futMessages = endpoint.expectedMessages(1);
session.getRemote().sendString("hello");
List<String> messages = futMessages.get();
assertThat("Messages.size", messages.size(), is(1));
assertThat("Messages[0]", messages.get(0), is("hello"));
String message = endpoint.messageQueue.poll(5, TimeUnit.SECONDS);
assertThat("message", message, is("hello"));
// TODO: test close
}
}
finally

View File

@ -35,7 +35,6 @@ import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
@ -207,7 +206,7 @@ public class ClientCloseTest
assertThat("Client WebSocket is Open", clientSocket.openLatch.await(30, TimeUnit.SECONDS), is(true));
UntrustedWSEndpoint serverEndpoint = serverSession.getUntrustedEndpoint();
Future<List<WebSocketFrame>> futFrames = serverEndpoint.expectedFrames(1);
// Future<List<WebSocketFrame>> futFrames = serverEndpoint.expectedFrames(1);
try
{
@ -219,8 +218,7 @@ public class ClientCloseTest
testFut.get(30, TimeUnit.SECONDS);
// Read Frame on server side
List<WebSocketFrame> frames = futFrames.get(30, TimeUnit.SECONDS);
WebSocketFrame frame = frames.get(0);
WebSocketFrame frame = serverEndpoint.framesQueue.poll(10, TimeUnit.SECONDS);
assertThat("Server received frame", frame.getOpCode(), is(OpCode.TEXT));
assertThat("Server received frame payload", frame.getPayloadAsUTF8(), is(echoMsg));
@ -333,7 +331,7 @@ public class ClientCloseTest
clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason);
// server receives close frame
serverSession.getUntrustedEndpoint().assertClose("Server", StatusCode.NORMAL, is(origCloseReason));
serverSession.getUntrustedEndpoint().assertCloseInfo("Server", StatusCode.NORMAL, is(origCloseReason));
// server sends 2 messages
RemoteEndpoint remote = serverSession.getRemote();
@ -451,7 +449,7 @@ public class ClientCloseTest
assertThat("OnError", clientSocket.error.get().getMessage(), containsString("Invalid control frame"));
// client parse invalid frame, notifies server of close (protocol error)
serverSession.getUntrustedEndpoint().assertClose("Server", StatusCode.PROTOCOL, allOf(containsString("Invalid control frame"), containsString("length")));
serverSession.getUntrustedEndpoint().assertCloseInfo("Server", StatusCode.PROTOCOL, allOf(containsString("Invalid control frame"), containsString("length")));
}
// server disconnects
@ -489,7 +487,7 @@ public class ClientCloseTest
clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason);
// server receives close frame
serverSession.getUntrustedEndpoint().assertClose("Server", StatusCode.NORMAL, is(origCloseReason));
serverSession.getUntrustedEndpoint().assertCloseInfo("Server", StatusCode.NORMAL, is(origCloseReason));
// client should not have received close message (yet)
clientSocket.assertNoCloseEvent();
@ -531,7 +529,7 @@ public class ClientCloseTest
clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason);
// server receives close frame
serverSession.getUntrustedEndpoint().assertClose("Server", StatusCode.NORMAL, is(origCloseReason));
serverSession.getUntrustedEndpoint().assertCloseInfo("Server", StatusCode.NORMAL, is(origCloseReason));
// client should not have received close message (yet)
clientSocket.assertNoCloseEvent();
@ -581,7 +579,7 @@ public class ClientCloseTest
for (int i = 0; i < clientCount; i++)
{
// server receives close frame
serverSessions[i].getUntrustedEndpoint().assertClose("Server", StatusCode.SHUTDOWN, containsString("Shutdown"));
serverSessions[i].getUntrustedEndpoint().assertCloseInfo("Server", StatusCode.SHUTDOWN, containsString("Shutdown"));
}
// clients disconnect

View File

@ -24,7 +24,6 @@ import static org.junit.Assert.assertThat;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@ -36,6 +35,7 @@ import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.tests.TrackingEndpoint;
import org.eclipse.jetty.websocket.tests.UntrustedWSEndpoint;
import org.eclipse.jetty.websocket.tests.UntrustedWSServer;
import org.eclipse.jetty.websocket.tests.UntrustedWSSession;
import org.junit.After;
@ -99,30 +99,33 @@ public class EchoTest
server.registerConnectFuture(wsURI, serverSessionFut);
// Client connects
TrackingEndpoint clientSocket = new TrackingEndpoint(WebSocketBehavior.CLIENT.name());
Future<Session> clientConnectFuture = client.connect(clientSocket, wsURI);
TrackingEndpoint clientEndpoint = new TrackingEndpoint(WebSocketBehavior.CLIENT.name());
Future<Session> clientConnectFuture = client.connect(clientEndpoint, wsURI);
// Server accepts connect
UntrustedWSSession serverSession = serverSessionFut.get(10, TimeUnit.SECONDS);
UntrustedWSEndpoint serverEndpoint = serverSession.getUntrustedEndpoint();
// client confirms connection via echo
assertThat("Client Opened", clientSocket.openLatch.await(5, TimeUnit.SECONDS), is(true));
Future<List<String>> futMessages = clientSocket.expectedMessages(1);
assertThat("Client onOpen event", clientEndpoint.openLatch.await(5, TimeUnit.SECONDS), is(true));
// client sends message
clientSocket.getRemote().sendString("Hello Echo");
List<String> messages = futMessages.get(10, TimeUnit.SECONDS);
assertThat("Messages[0]", messages.get(0), is("Hello Echo"));
clientEndpoint.getRemote().sendString("Hello Echo");
// Wait for response to echo
String message = clientEndpoint.messageQueue.poll(5, TimeUnit.SECONDS);
assertThat("message", message, is("Hello Echo"));
// client closes
clientSocket.close(StatusCode.NORMAL, "Normal Close");
// client triggers close event on client ws-endpoint
clientSocket.assertClose("Client", StatusCode.NORMAL, containsString("Normal Close"));
clientEndpoint.close(StatusCode.NORMAL, "Normal Close");
// Server close event
serverSession.getUntrustedEndpoint().assertClose("Server", StatusCode.NORMAL, containsString("Normal Close"));
assertThat("Server onClose event", serverSession.getUntrustedEndpoint().closeLatch.await(5, TimeUnit.SECONDS), is(true));
serverEndpoint.assertCloseInfo("Server", StatusCode.NORMAL, containsString("Normal Close"));
// client triggers close event on client ws-endpoint
assertThat("Client onClose event", clientEndpoint.closeLatch.await(5, TimeUnit.SECONDS), is(true));
clientEndpoint.assertCloseInfo("Client", StatusCode.NORMAL, containsString("Normal Close"));
}
}

View File

@ -20,23 +20,24 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
org.eclipse.jetty.LEVEL=WARN
# org.eclipse.jetty.util.log.stderr.LONG=true
# org.eclipse.jetty.io.WriteFlusher.LEVEL=DEBUG
# org.eclipse.jetty.websocket.LEVEL=DEBUG
# org.eclipse.jetty.websocket.LEVEL=INFO
org.eclipse.jetty.websocket.tests.LEVEL=DEBUG
org.eclipse.jetty.websocket.common.io.LEVEL=DEBUG
# org.eclipse.jetty.websocket.tests.LEVEL=DEBUG
# org.eclipse.jetty.websocket.common.io.LEVEL=DEBUG
# org.eclipse.jetty.websocket.server.ab.LEVEL=DEBUG
org.eclipse.jetty.websocket.common.WebSocketSession.LEVEL=DEBUG
org.eclipse.jetty.websocket.common.Parser.LEVEL=DEBUG
# org.eclipse.jetty.websocket.common.WebSocketSession.LEVEL=DEBUG
# org.eclipse.jetty.websocket.common.Parser.LEVEL=DEBUG
# org.eclipse.jetty.websocket.common.Generator.LEVEL=DEBUG
# org.eclipse.jetty.websocket.server.ab.Fuzzer.LEVEL=DEBUG
# org.eclipse.jetty.websocket.server.blockhead.LEVEL=DEBUG
# org.eclipse.jetty.websocket.server.helper.LEVEL=DEBUG
org.eclipse.jetty.websocket.common.CompletionCallback.LEVEL=ALL
# org.eclipse.jetty.websocket.client.io.ConnectPromise.LEVEL=DEBUG
# org.eclipse.jetty.websocket.common.WebSocketSession_OPEN.LEVEL=DEBUG
org.eclipse.jetty.websocket.common.io.IOState.LEVEL=DEBUG
# org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection_OPEN.LEVEL=DEBUG
### Show state changes on BrowserDebugTool