Issue #207 - more testing updates

This commit is contained in:
Joakim Erdfelt 2017-04-21 09:00:30 -07:00
parent a8d4c68bdc
commit ce9cd8d168
29 changed files with 1310 additions and 473 deletions

View File

@ -116,12 +116,12 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
* *
* @param executor * @param executor
* the executor to use * the executor to use
* @deprecated use {@link #WebSocketClient(HttpClient)} instead
*/ */
@Deprecated
public WebSocketClient(Executor executor) public WebSocketClient(Executor executor)
{ {
this(null,executor); this(new HttpClient());
this.httpClient.setExecutor(executor);
} }
/** /**
@ -129,12 +129,11 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
* *
* @param bufferPool * @param bufferPool
* byte buffer pool to use * byte buffer pool to use
* @deprecated use {@link #WebSocketClient(HttpClient)} instead
*/ */
@Deprecated
public WebSocketClient(ByteBufferPool bufferPool) public WebSocketClient(ByteBufferPool bufferPool)
{ {
this(null,null,bufferPool); this(new HttpClient());
this.httpClient.setByteBufferPool(bufferPool);
} }
/** /**
@ -142,12 +141,10 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
* *
* @param sslContextFactory * @param sslContextFactory
* ssl context factory to use * ssl context factory to use
* @deprecated use {@link #WebSocketClient(HttpClient)} with its own {@link SslContextFactory}
*/ */
@Deprecated
public WebSocketClient(SslContextFactory sslContextFactory) public WebSocketClient(SslContextFactory sslContextFactory)
{ {
this(sslContextFactory,null); this(new HttpClient(sslContextFactory));
} }
/** /**
@ -157,12 +154,11 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
* ssl context factory to use * ssl context factory to use
* @param executor * @param executor
* the executor to use * the executor to use
* @deprecated use {@link #WebSocketClient(HttpClient)} instead
*/ */
@Deprecated
public WebSocketClient(SslContextFactory sslContextFactory, Executor executor) public WebSocketClient(SslContextFactory sslContextFactory, Executor executor)
{ {
this(sslContextFactory,executor,new MappedByteBufferPool()); this(new HttpClient(sslContextFactory));
this.httpClient.setExecutor(executor);
} }
/** /**

View File

@ -411,7 +411,7 @@ public class WebSocketUpgradeRequest extends HttpRequest implements CompleteList
} }
this.localEndpoint = localEndpoint; this.localEndpoint = localEndpoint;
this.fut = new CompletableFuture<Session>(); this.fut = new CompletableFuture<>();
} }
private final String genRandomKey() private final String genRandomKey()

View File

@ -20,6 +20,9 @@ package org.eclipse.jetty.websocket.common;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
/**
* Atomic Close State
*/
public class AtomicClose public class AtomicClose
{ {
enum State enum State

View File

@ -20,6 +20,9 @@ package org.eclipse.jetty.websocket.common;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
/**
* Atomic Connection State
*/
public class AtomicConnectionState public class AtomicConnectionState
{ {
/** /**
@ -59,7 +62,7 @@ public class AtomicConnectionState
* Connection should be disconnected and no further reads or writes should occur. * Connection should be disconnected and no further reads or writes should occur.
* </p> * </p>
*/ */
CLOSED; CLOSED
} }
private AtomicReference<State> state = new AtomicReference<>(); private AtomicReference<State> state = new AtomicReference<>();

View File

@ -41,8 +41,14 @@ public interface LogicalConnection extends OutgoingFrames, SuspendToken
/** /**
* Terminate the connection (no close frame sent) * Terminate the connection (no close frame sent)
* @param onlyOutput true to only close the output (half-close), false to close fully.
*/ */
void disconnect(); void disconnect(boolean onlyOutput);
/**
* Register Read Interest in Connection.
*/
void fillInterested();
/** /**
* Get the ByteBufferPool in use by the connection * Get the ByteBufferPool in use by the connection

View File

@ -68,7 +68,7 @@ public class Parser
PAYLOAD PAYLOAD
} }
private static final Logger LOG = Log.getLogger(Parser.class); private final Logger LOG;
private final WebSocketPolicy policy; private final WebSocketPolicy policy;
private final ByteBufferPool bufferPool; private final ByteBufferPool bufferPool;
private final Parser.Handler parserHandler; private final Parser.Handler parserHandler;
@ -102,6 +102,8 @@ public class Parser
this.bufferPool = bufferPool; this.bufferPool = bufferPool;
this.policy = wspolicy; this.policy = wspolicy;
this.parserHandler = parserHandler; this.parserHandler = parserHandler;
LOG = Log.getLogger(Parser.class.getName() + "_" + wspolicy.getBehavior());
} }
private void assertSanePayloadLength(long len) private void assertSanePayloadLength(long len)
@ -187,7 +189,7 @@ public class Parser
while (parseFrame(buffer)) while (parseFrame(buffer))
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} Parsed Frame: {}", policy.getBehavior(), frame); LOG.debug("Parsed Frame: {}", frame);
assertBehavior(); assertBehavior();
@ -268,7 +270,7 @@ public class Parser
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
{ {
LOG.debug("{} Parsing {} bytes",policy.getBehavior(),buffer.remaining()); LOG.debug("Parsing {} bytes",buffer.remaining());
} }
while (buffer.hasRemaining()) while (buffer.hasRemaining())
@ -291,8 +293,7 @@ public class Parser
} }
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} OpCode {}, fin={} rsv={}{}{}", LOG.debug("OpCode {}, fin={} rsv={}{}{}",
policy.getBehavior(),
OpCode.name(opcode), OpCode.name(opcode),
fin, fin,
(((b & 0x40) != 0)?'1':'.'), (((b & 0x40) != 0)?'1':'.'),
@ -583,7 +584,7 @@ public class Parser
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
{ {
LOG.debug("{} Raw Payload: {}",policy.getBehavior(),BufferUtil.toDetailString(window)); LOG.debug("Raw Payload: {}",BufferUtil.toDetailString(window));
} }
maskProcessor.process(window); maskProcessor.process(window);

View File

@ -58,6 +58,7 @@ import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.api.WebSocketBehavior; import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.api.WebSocketException; import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory; import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
import org.eclipse.jetty.websocket.api.extensions.Frame; import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames; import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
@ -74,8 +75,32 @@ import org.eclipse.jetty.websocket.common.scopes.WebSocketSessionScope;
public class WebSocketSession extends ContainerLifeCycle implements Session, RemoteEndpointFactory, public class WebSocketSession extends ContainerLifeCycle implements Session, RemoteEndpointFactory,
WebSocketSessionScope, IncomingFrames, LogicalConnection.Listener, Connection.Listener WebSocketSessionScope, IncomingFrames, LogicalConnection.Listener, Connection.Listener
{ {
private static final Logger LOG = Log.getLogger(WebSocketSession.class); public class OnDisconnectCallback implements WriteCallback
private static final Logger LOG_OPEN = Log.getLogger(WebSocketSession.class.getName() + "_OPEN"); {
private final boolean outputOnly;
public OnDisconnectCallback(boolean outputOnly)
{
this.outputOnly = outputOnly;
}
@Override
public void writeFailed(Throwable x)
{
LOG.debug("writeFailed()", x);
disconnect(outputOnly);
}
@Override
public void writeSuccess()
{
LOG.debug("writeSuccess()");
disconnect(outputOnly);
}
}
private final Logger LOG;
private final WebSocketContainerScope containerScope; private final WebSocketContainerScope containerScope;
private final WebSocketPolicy policy; private final WebSocketPolicy policy;
private final URI requestURI; private final URI requestURI;
@ -109,6 +134,8 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
Objects.requireNonNull(containerScope, "Container Scope cannot be null"); Objects.requireNonNull(containerScope, "Container Scope cannot be null");
Objects.requireNonNull(requestURI, "Request URI cannot be null"); Objects.requireNonNull(requestURI, "Request URI cannot be null");
LOG = Log.getLogger(WebSocketSession.class.getName() + "_" + connection.getPolicy().getBehavior().name());
this.classLoader = Thread.currentThread().getContextClassLoader(); this.classLoader = Thread.currentThread().getContextClassLoader();
this.containerScope = containerScope; this.containerScope = containerScope;
this.requestURI = requestURI; this.requestURI = requestURI;
@ -149,14 +176,29 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
{ {
if (connectionState.onClosing()) if (connectionState.onClosing())
{ {
LOG.debug("ConnectionState: Transition to CLOSING");
// This is the first CLOSE event // This is the first CLOSE event
if (closeState.onLocal()) if (closeState.onLocal())
{ {
LOG.debug("CloseState: Transition to LOCAL");
// this is Local initiated. // this is Local initiated.
CloseInfo closeInfo = new CloseInfo(statusCode, reason); CloseInfo closeInfo = new CloseInfo(statusCode, reason);
Frame closeFrame = closeInfo.asFrame(); Frame closeFrame = closeInfo.asFrame();
outgoingHandler.outgoingFrame(closeFrame, new FrameCallback.Adapter(), BatchMode.AUTO); outgoingHandler.outgoingFrame(closeFrame, new OnDisconnectCallback(true), BatchMode.AUTO);
} }
else
{
LOG.debug("CloseState: Expected LOCAL, but was " + closeState.get());
}
}
else if(connectionState.onClosed())
{
LOG.debug("ConnectionState: Transition to CLOSED");
// This is the reply to the CLOSING entry point
CloseInfo closeInfo = new CloseInfo(statusCode, reason);
Frame closeFrame = closeInfo.asFrame();
outgoingHandler.outgoingFrame(closeFrame, new OnDisconnectCallback(false), BatchMode.AUTO);
} }
} }
@ -166,24 +208,25 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
@Override @Override
public void disconnect() public void disconnect()
{ {
disconnect(true);
}
private void disconnect(boolean outputOnly)
{
if(connectionState.onClosing())
{
// Is this is a harsh disconnect: OPEN -> CLOSING -> CLOSED
if (closeState.onAbnormal())
{
// notify local endpoint of harsh disconnect
notifyClose(StatusCode.SHUTDOWN, "Harsh disconnect");
}
}
if(connectionState.onClosed()) if(connectionState.onClosed())
{ {
connection.disconnect(); // Transition: CLOSING -> CLOSED
connection.disconnect(outputOnly);
// TODO: notify local endpoint onClose() ?
// TODO: notifyClose(close.getStatusCode(), close.getReason());
try
{
if (LOG.isDebugEnabled())
LOG.debug("{}.onSessionClosed()", containerScope.getClass().getSimpleName());
containerScope.onSessionClosed(this);
}
catch (Throwable t)
{
LOG.ignore(t);
}
if (closeState.onLocal()) if (closeState.onLocal())
{ {
// notify local endpoint of harsh disconnect // notify local endpoint of harsh disconnect
@ -381,8 +424,8 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
@Override @Override
public RemoteEndpoint getRemote() public RemoteEndpoint getRemote()
{ {
if (LOG_OPEN.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG_OPEN.debug("[{}] {}.getRemote()", getPolicy().getBehavior(), this.getClass().getSimpleName()); LOG.debug("{}.getRemote()", this.getClass().getSimpleName());
AtomicConnectionState.State state = connectionState.get(); AtomicConnectionState.State state = connectionState.get();
@ -397,7 +440,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
@Override @Override
public InetSocketAddress getRemoteAddress() public InetSocketAddress getRemoteAddress()
{ {
return remote.getInetSocketAddress(); return connection.getRemoteAddress();
} }
public URI getRequestURI() public URI getRequestURI()
@ -460,19 +503,26 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
// process handshake // process handshake
if(connectionState.onClosing()) if(connectionState.onClosing())
{ {
LOG.debug("ConnectionState: Transition to CLOSING");
// we transitioned to CLOSING state // we transitioned to CLOSING state
if(closeState.onRemote()) if(closeState.onRemote())
{ {
LOG.debug("CloseState: Transition to REMOTE");
// Remote initiated. // Remote initiated.
// Send reply to remote // Send reply to remote
close(close.getStatusCode(), close.getReason()); close(close.getStatusCode(), close.getReason());
} }
else else
{ {
LOG.debug("CloseState: Already at LOCAL");
// Local initiated, this was the reply. // Local initiated, this was the reply.
disconnect(); disconnect();
} }
} }
else
{
LOG.debug("ConnectionState: Not CLOSING: was " + connectionState.get());
}
callback.succeed(); callback.succeed();
return; return;
@ -655,6 +705,16 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
@Override @Override
public void onClosed(Connection connection) public void onClosed(Connection connection)
{ {
try
{
if (LOG.isDebugEnabled())
LOG.debug("{}.onSessionClosed()", containerScope.getClass().getSimpleName());
containerScope.onSessionClosed(this);
}
catch (Throwable t)
{
LOG.ignore(t);
}
} }
/** /**
@ -664,8 +724,8 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
@Override @Override
public void onOpened(Connection connection) public void onOpened(Connection connection)
{ {
if (LOG_OPEN.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG_OPEN.debug("[{}] {}.onOpened()", getPolicy().getBehavior(), this.getClass().getSimpleName()); LOG.debug("{}.onOpened()", this.getClass().getSimpleName());
connectionState.onConnecting(); connectionState.onConnecting();
open(); open();
} }
@ -680,8 +740,8 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
*/ */
public void open() public void open()
{ {
if (LOG_OPEN.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG_OPEN.debug("[{}] {}.open()", getPolicy().getBehavior(), this.getClass().getSimpleName()); LOG.debug("{}.open()", this.getClass().getSimpleName());
if (remote != null) if (remote != null)
{ {
@ -696,8 +756,8 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
{ {
// Connect remote // Connect remote
remote = remoteEndpointFactory.newRemoteEndpoint(connection, outgoingHandler, getBatchMode()); remote = remoteEndpointFactory.newRemoteEndpoint(connection, outgoingHandler, getBatchMode());
if (LOG_OPEN.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG_OPEN.debug("[{}] {}.open() remote={}", getPolicy().getBehavior(), this.getClass().getSimpleName(), remote); LOG.debug("{}.open() remote={}", this.getClass().getSimpleName(), remote);
// Open WebSocket // Open WebSocket
endpointFunctions.onOpen(this); endpointFunctions.onOpen(this);
@ -726,6 +786,8 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
{ {
openFuture.complete(this); openFuture.complete(this);
} }
connection.fillInterested();
} }
} }
else else

View File

@ -42,7 +42,6 @@ import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.FrameCallback; import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.SuspendToken; import org.eclipse.jetty.websocket.api.SuspendToken;
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig; import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
@ -93,21 +92,18 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
} }
} }
private static final Logger LOG = Log.getLogger(AbstractWebSocketConnection.class);
private static final Logger LOG_OPEN = Log.getLogger(AbstractWebSocketConnection.class.getName() + ".OPEN");
private static final Logger LOG_CLOSE = Log.getLogger(AbstractWebSocketConnection.class.getName() + ".CLOSE");
/** /**
* Minimum size of a buffer is the determined to be what would be the maximum framing header size (not including payload) * Minimum size of a buffer is the determined to be what would be the maximum framing header size (not including payload)
*/ */
private static final int MIN_BUFFER_SIZE = Generator.MAX_HEADER_LENGTH; private static final int MIN_BUFFER_SIZE = Generator.MAX_HEADER_LENGTH;
private final Logger LOG;
private final ByteBufferPool bufferPool; private final ByteBufferPool bufferPool;
private final Scheduler scheduler; private final Scheduler scheduler;
private final Generator generator; private final Generator generator;
private final Parser parser; private final Parser parser;
private final WebSocketPolicy policy; private final WebSocketPolicy policy;
private final WebSocketBehavior behavior;
private final AtomicBoolean suspendToken; private final AtomicBoolean suspendToken;
private final AtomicBoolean closed = new AtomicBoolean(); private final AtomicBoolean closed = new AtomicBoolean();
private final FrameFlusher flusher; private final FrameFlusher flusher;
@ -121,13 +117,15 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
public AbstractWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool, ExtensionStack extensionStack) public AbstractWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool, ExtensionStack extensionStack)
{ {
super(endp,executor); super(endp,executor);
LOG = Log.getLogger(AbstractWebSocketConnection.class.getName() + "_" + policy.getBehavior());
this.id = String.format("%s:%d->%s:%d", this.id = String.format("%s:%d->%s:%d",
endp.getLocalAddress().getAddress().getHostAddress(), endp.getLocalAddress().getAddress().getHostAddress(),
endp.getLocalAddress().getPort(), endp.getLocalAddress().getPort(),
endp.getRemoteAddress().getAddress().getHostAddress(), endp.getRemoteAddress().getAddress().getHostAddress(),
endp.getRemoteAddress().getPort()); endp.getRemoteAddress().getPort());
this.policy = policy; this.policy = policy;
this.behavior = policy.getBehavior();
this.bufferPool = bufferPool; this.bufferPool = bufferPool;
this.extensionStack = extensionStack; this.extensionStack = extensionStack;
@ -152,31 +150,32 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
} }
@Override @Override
public void disconnect() public void disconnect(boolean onlyOutput)
{ {
if (LOG_CLOSE.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG_CLOSE.debug("{} disconnect()",behavior); LOG.debug("disconnect({})", onlyOutput ? "OUTPUT_ONLY" : "BOTH");
disconnect(false);
}
private void disconnect(boolean onlyOutput)
{
if (LOG_CLOSE.isDebugEnabled())
LOG_CLOSE.debug("{} disconnect({})",behavior,onlyOutput?"outputOnly":"both");
// close FrameFlusher, we cannot write anymore at this point. // close FrameFlusher, we cannot write anymore at this point.
flusher.close(); flusher.close();
EndPoint endPoint = getEndPoint(); EndPoint endPoint = getEndPoint();
// We need to gently close first, to allow // We need to gently close first, to allow
// SSL close alerts to be sent by Jetty // SSL close alerts to be sent by Jetty
if (LOG_CLOSE.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG_CLOSE.debug("Shutting down output {}",endPoint); LOG.debug("Shutting down output {}",endPoint);
endPoint.shutdownOutput(); endPoint.shutdownOutput();
if (!onlyOutput) if (!onlyOutput)
{ {
if (LOG_CLOSE.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG_CLOSE.debug("Closing {}",endPoint); LOG.debug("Closing {}",endPoint);
endPoint.close(); endPoint.close();
} }
else
{
closed.set(true);
}
} }
protected void execute(Runnable task) protected void execute(Runnable task)
@ -269,9 +268,12 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
public void onClose() public void onClose()
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} onClose()",behavior); LOG.debug("onClose()");
super.onClose();
closed.set(true);
flusher.close(); flusher.close();
super.onClose();
} }
@Override @Override
@ -322,7 +324,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
{ {
try try
{ {
while (true) while (isOpen())
{ {
if (suspendToken.get()) if (suspendToken.get())
{ {
@ -411,8 +413,8 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
@Override @Override
public void onOpen() public void onOpen()
{ {
if(LOG_OPEN.isDebugEnabled()) if(LOG.isDebugEnabled())
LOG_OPEN.debug("[{}] {}.onOpened()",behavior,this.getClass().getSimpleName()); LOG.debug("{}.onOpened()",this.getClass().getSimpleName());
super.onOpen(); super.onOpen();
} }

View File

@ -79,12 +79,12 @@ public class FrameFlusher
ByteBuffer payload = entry.frame.getPayload(); ByteBuffer payload = entry.frame.getPayload();
if (BufferUtil.hasContent(payload)) if (BufferUtil.hasContent(payload))
{ {
BufferUtil.append(aggregate,payload); BufferUtil.put(payload, aggregate);
} }
} }
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
{ {
LOG.debug("{} aggregated {} frames: {}",FrameFlusher.this,entries.size(),entries); LOG.debug("{} aggregated {} frames in {}: {}", FrameFlusher.this, entries.size(), aggregate, entries);
} }
succeeded(); succeeded();
return Action.SCHEDULED; return Action.SCHEDULED;
@ -113,6 +113,7 @@ public class FrameFlusher
{ {
if (!BufferUtil.isEmpty(aggregate)) if (!BufferUtil.isEmpty(aggregate))
{ {
aggregate.flip();
buffers.add(aggregate); buffers.add(aggregate);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
{ {

View File

@ -35,16 +35,12 @@ public class FutureWriteCallback extends FutureCallback implements WriteCallback
@Override @Override
public void writeFailed(Throwable cause) public void writeFailed(Throwable cause)
{ {
if (LOG.isDebugEnabled())
LOG.debug(".writeFailed",cause);
failed(cause); failed(cause);
} }
@Override @Override
public void writeSuccess() public void writeSuccess()
{ {
if (LOG.isDebugEnabled())
LOG.debug(".writeSuccess");
succeeded(); succeeded();
} }
} }

View File

@ -20,10 +20,12 @@
* Jetty WebSocket Common : Implementation [<em>Internal Use Only</em>] * Jetty WebSocket Common : Implementation [<em>Internal Use Only</em>]
* <p> * <p>
* A core set of internal implementation classes for the Jetty WebSocket API. * A core set of internal implementation classes for the Jetty WebSocket API.
* </p>
* <p> * <p>
* Note: do not reference or use classes present in this package space in your code. <br> * Note: do not reference or use classes present in this package space in your code. <br>
* Restrict your usage to the Jetty WebSocket API classes, the Jetty WebSocket Client API, * Restrict your usage to the Jetty WebSocket API classes, the Jetty WebSocket Client API,
* or the Jetty WebSocket Servlet API. * or the Jetty WebSocket Servlet API.
* </p>
*/ */
package org.eclipse.jetty.websocket.common; package org.eclipse.jetty.websocket.common;

View File

@ -74,10 +74,15 @@ public class LocalWebSocketConnection implements LogicalConnection
} }
@Override @Override
public void disconnect() public void disconnect(boolean outputOnly)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("disconnect()"); LOG.debug("disconnect({})", outputOnly);
}
@Override
public void fillInterested()
{
} }
@Override @Override

View File

@ -45,7 +45,12 @@ public class DummyConnection implements LogicalConnection
} }
@Override @Override
public void disconnect() public void disconnect(boolean outputOnly)
{
}
@Override
public void fillInterested()
{ {
} }

View File

@ -0,0 +1,74 @@
Documenting the States of a WebSocket.
NEW:
A new WebSocket session.
It has had no connection attempted yet.
CONNECTING:
The connection is being attempted, along with the Upgrade handshake.
CONNECTED:
The connection is established.
The User WebSocket Endpoint has not been notified yet.
OPEN:
User WebSocket Endpoint has been Opened (the onOpen method has called)
CLOSING:
The close handshake has begun.
Either the local initiated the close, waiting for the remote to reply.
Or the remote initiated the close, and the local hasn't replied yet.
This can be considered a logical half-closed state.
CLOSED:
The connection and session is closed.
This means either the close handshake completed, or the connection was
disconnected for other reasons.
----
Normal Client Initiated Close State Transition
WSEndpoint created
Http GET w/Upgrade initiated
State: CONNECTING
Upgrade Handshake negotiated (HTTP 101 response)
State: CONNECTED
WSEndpoint.onOpen() called
State: OPEN
WSEndpoint.onMessage()
Session.close(local close details)
Connection disconnect output
State: CLOSING
Remote: Received CLOSE Frame
Connection disconnect completely
WSEndpoint.onClose(remote close details)
State: CLOSED
----
Normal Remote Initiated Close State Transition
WSEndpoint created
Http GET w/Upgrade initiated
State: CONNECTING
Upgrade Handshake negotiated (HTTP 101 response)
State: CONNECTED
WSEndpoint.onOpen() called
State: OPEN
WSEndpoint.onMessage()
Remote: Receive CLOSE frame
State: CLOSING
Session.close(remote close details)
Connection disconnect completely
WSEndpoint.onClose(local close details)
State: CLOSED

View File

@ -32,5 +32,5 @@ public interface WebSocketHandshake
* @param response the response * @param response the response
* @throws IOException if unable to handshake * @throws IOException if unable to handshake
*/ */
public void doHandshakeResponse(ServletUpgradeRequest request, ServletUpgradeResponse response) throws IOException; void doHandshakeResponse(ServletUpgradeRequest request, ServletUpgradeResponse response) throws IOException;
} }

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -244,6 +245,16 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
this.sessionFactories.add(sessionFactory); this.sessionFactories.add(sessionFactory);
} }
public void setSessionFactories(SessionFactory... factories)
{
if (factories == null || factories.length < 1)
{
throw new IllegalStateException("Must declare SessionFactory implementations");
}
this.sessionFactories.clear();
this.sessionFactories.addAll(Arrays.asList(factories));
}
private WebSocketSession createSession(URI requestURI, Object websocket, LogicalConnection connection) private WebSocketSession createSession(URI requestURI, Object websocket, LogicalConnection connection)
{ {
if (websocket == null) if (websocket == null)
@ -565,6 +576,7 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
// Setup Session // Setup Session
WebSocketSession session = createSession(request.getRequestURI(), websocket, wsConnection); WebSocketSession session = createSession(request.getRequestURI(), websocket, wsConnection);
session.setUpgradeRequest(request); session.setUpgradeRequest(request);
// set true negotiated extension list back to response // set true negotiated extension list back to response
response.setExtensions(extensionStack.getNegotiatedExtensions()); response.setExtensions(extensionStack.getNegotiatedExtensions());
session.setUpgradeResponse(response); session.setUpgradeResponse(response);
@ -589,9 +601,6 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
// Tell jetty about the new upgraded connection // Tell jetty about the new upgraded connection
request.setServletAttribute(HttpConnection.UPGRADE_CONNECTION_ATTRIBUTE, wsConnection); request.setServletAttribute(HttpConnection.UPGRADE_CONNECTION_ATTRIBUTE, wsConnection);
if (LOG.isDebugEnabled())
LOG.debug("Handshake Response: {}", handshaker);
if (getSendServerVersion(connector)) if (getSendServerVersion(connector))
response.setHeader("Server", HttpConfiguration.SERVER_VERSION); response.setHeader("Server", HttpConfiguration.SERVER_VERSION);
@ -599,7 +608,7 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
handshaker.doHandshakeResponse(request, response); handshaker.doHandshakeResponse(request, response);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Websocket upgrade {} {} {} {}", request.getRequestURI(), version, response.getAcceptedSubProtocol(), wsConnection); LOG.debug("Websocket upgrade {} v={} subprotocol={} connection={}", request.getRequestURI(), version, response.getAcceptedSubProtocol(), wsConnection);
return true; return true;
} }

View File

@ -0,0 +1,46 @@
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.tests;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.websocket.api.FrameCallback;
public class BlockerFrameCallback implements FrameCallback
{
private CompletableFuture<Void> future = new CompletableFuture<>();
@Override
public void fail(Throwable cause)
{
future.completeExceptionally(cause);
}
@Override
public void succeed()
{
future.complete(null);
}
public void block() throws Exception
{
future.get(1, TimeUnit.MINUTES);
}
}

View File

@ -32,7 +32,6 @@ import java.nio.ByteBuffer;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -42,7 +41,6 @@ import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.FrameCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame; import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.common.CloseInfo; import org.eclipse.jetty.websocket.common.CloseInfo;
@ -55,28 +53,6 @@ import org.eclipse.jetty.websocket.common.WebSocketFrame;
*/ */
public class Fuzzer extends ContainerLifeCycle public class Fuzzer extends ContainerLifeCycle
{ {
public static class BlockerCallback implements FrameCallback
{
private CompletableFuture<Void> future = new CompletableFuture<>();
@Override
public void fail(Throwable cause)
{
future.completeExceptionally(cause);
}
@Override
public void succeed()
{
future.complete(null);
}
public void block() throws Exception
{
future.get(1, TimeUnit.MINUTES);
}
}
public static class Session implements AutoCloseable public static class Session implements AutoCloseable
{ {
// Client side framing mask // Client side framing mask
@ -249,7 +225,7 @@ public class Fuzzer extends ContainerLifeCycle
{ {
for (WebSocketFrame f : send) for (WebSocketFrame f : send)
{ {
BlockerCallback blocker = new BlockerCallback(); BlockerFrameCallback blocker = new BlockerFrameCallback();
session.getOutgoingHandler().outgoingFrame(f, blocker, BatchMode.OFF); session.getOutgoingHandler().outgoingFrame(f, blocker, BatchMode.OFF);
blocker.block(); blocker.block();
} }

View File

@ -0,0 +1,217 @@
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.tests;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketFrameListener;
import org.eclipse.jetty.websocket.api.WebSocketListener;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
public class TrackingEndpoint implements WebSocketListener, WebSocketFrameListener
{
private final Logger LOG;
public CountDownLatch openLatch = new CountDownLatch(1);
public CountDownLatch closeLatch = new CountDownLatch(1);
public AtomicReference<CloseInfo> closeInfo = new AtomicReference<>();
public AtomicReference<Throwable> error = new AtomicReference<>();
private CompletableFuture<List<String>> expectedMessagesFuture = new CompletableFuture<>();
private AtomicReference<Integer> expectedMessageCount = new AtomicReference<>();
private List<String> messages = new ArrayList<>();
private CompletableFuture<List<WebSocketFrame>> expectedFramesFuture = new CompletableFuture<>();
private AtomicReference<Integer> expectedFramesCount = new AtomicReference<>();
private List<WebSocketFrame> frames = new ArrayList<>();
private WebSocketSession session;
public TrackingEndpoint(String id)
{
LOG = Log.getLogger(this.getClass().getName() + "_" + id);
}
public void assertClose(String prefix, int expectedCloseStatusCode, Matcher<String> reasonMatcher) throws InterruptedException
{
assertThat(prefix + " endpoint close event received", closeLatch.await(10, TimeUnit.SECONDS), Matchers.is(true));
CloseInfo close = closeInfo.get();
assertThat(prefix + " close info", close, Matchers.notNullValue());
assertThat(prefix + " received close code", close.getStatusCode(), Matchers.is(expectedCloseStatusCode));
assertThat(prefix + " received close reason", close.getReason(), reasonMatcher);
}
public void close(int statusCode, String reason)
{
this.session.close(statusCode, reason);
}
public Future<List<WebSocketFrame>> expectedFrames(int expectedCount)
{
synchronized (expectedFramesCount)
{
if (!expectedFramesCount.compareAndSet(null, expectedCount))
{
throw new IllegalStateException("Can only have 1 registered frame count future");
}
else
{
checkFrameCount();
}
}
return expectedFramesFuture;
}
public Future<List<String>> expectedMessages(int expectedCount)
{
synchronized (expectedMessagesFuture)
{
if (!expectedMessageCount.compareAndSet(null, expectedCount))
{
throw new IllegalStateException("Can only have 1 registered message count future");
}
else
{
checkMessageCount();
}
}
return expectedMessagesFuture;
}
public RemoteEndpoint getRemote()
{
return session.getRemote();
}
@Override
public void onWebSocketBinary(byte[] payload, int offset, int len)
{
if (LOG.isDebugEnabled())
{
LOG.info("onWSBinary({})", BufferUtil.toDetailString(ByteBuffer.wrap(payload, offset, len)));
}
}
@Override
public void onWebSocketClose(int statusCode, String reason)
{
this.closeLatch.countDown();
CloseInfo close = new CloseInfo(statusCode, reason);
assertThat("Close only happened once", closeInfo.compareAndSet(null, close), is(true));
}
@Override
public void onWebSocketConnect(Session session)
{
assertThat("Session type", session, instanceOf(WebSocketSession.class));
this.session = (WebSocketSession) session;
if (LOG.isDebugEnabled())
{
LOG.debug("onWebSocketConnect()");
}
this.openLatch.countDown();
}
@Override
public void onWebSocketError(Throwable cause)
{
assertThat("Error must have value", cause, notNullValue());
if (error.compareAndSet(null, cause) == false)
{
LOG.warn("Original Cause", error.get());
LOG.warn("Extra/Excess Cause", cause);
fail("onError should only happen once!");
}
this.expectedMessagesFuture.completeExceptionally(cause);
this.expectedFramesFuture.completeExceptionally(cause);
}
@Override
public void onWebSocketFrame(Frame frame)
{
if (LOG.isDebugEnabled())
{
LOG.debug("onWSFrame({})", frame);
}
synchronized (expectedFramesFuture)
{
frames.add(WebSocketFrame.copy(frame));
checkFrameCount();
}
}
@Override
public void onWebSocketText(String text)
{
if (LOG.isDebugEnabled())
{
LOG.debug("onWSText(\"{}\")", text);
}
synchronized (expectedMessagesFuture)
{
messages.add(text);
checkMessageCount();
}
}
private void checkMessageCount()
{
Integer expected = expectedMessageCount.get();
if (expected != null && messages.size() >= expected.intValue())
{
expectedMessagesFuture.complete(messages);
}
}
private void checkFrameCount()
{
Integer expected = expectedFramesCount.get();
if (expected != null && frames.size() >= expected.intValue())
{
expectedFramesFuture.complete(frames);
}
}
}

View File

@ -30,6 +30,7 @@ import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient; import org.eclipse.jetty.websocket.client.WebSocketClient;
@ -51,7 +52,7 @@ public class UntrustedWSClient extends WebSocketClient
public Future<UntrustedWSSession> connect(URI toUri, ClientUpgradeRequest req) throws IOException public Future<UntrustedWSSession> connect(URI toUri, ClientUpgradeRequest req) throws IOException
{ {
final Future<Session> connectFut = super.connect(new UntrustedWSEndpoint(), toUri, req); final Future<Session> connectFut = super.connect(new UntrustedWSEndpoint(WebSocketBehavior.CLIENT.name()), toUri, req);
return new CompletableFuture<UntrustedWSSession>() { return new CompletableFuture<UntrustedWSSession>() {
@Override @Override
public UntrustedWSSession get() throws InterruptedException, ExecutionException public UntrustedWSSession get() throws InterruptedException, ExecutionException

View File

@ -27,6 +27,8 @@ import org.eclipse.jetty.util.SharedBlockingCallback;
import org.eclipse.jetty.util.SharedBlockingCallback.Blocker; import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection; import org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection;
/** /**
@ -53,6 +55,17 @@ public class UntrustedWSConnection
internalConnection.getEndPoint().flush(); internalConnection.getEndPoint().flush();
} }
/**
* Forward a frame to the {@Link OutgoingFrames} handler
* @param frame
*/
public void write(Frame frame) throws Exception
{
BlockerFrameCallback blocker = new BlockerFrameCallback();
this.internalConnection.outgoingFrame(frame, blocker, BatchMode.OFF);
blocker.block();
}
/** /**
* Write arbitrary bytes out the active connection. * Write arbitrary bytes out the active connection.
* *

View File

@ -18,136 +18,121 @@
package org.eclipse.jetty.websocket.tests; package org.eclipse.jetty.websocket.tests;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import java.util.ArrayList; import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch; import java.util.function.BiFunction;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketFrameListener; import org.eclipse.jetty.websocket.api.WebSocketFrameListener;
import org.eclipse.jetty.websocket.api.WebSocketListener; import org.eclipse.jetty.websocket.api.WebSocketListener;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
public class UntrustedWSEndpoint implements WebSocketListener, WebSocketFrameListener public class UntrustedWSEndpoint extends TrackingEndpoint implements WebSocketListener, WebSocketFrameListener
{ {
private static final Logger LOG = Log.getLogger(UntrustedWSEndpoint.class); private static final Logger LOG = Log.getLogger(UntrustedWSEndpoint.class);
@SuppressWarnings("unused") private UntrustedWSSession untrustedSession;
private Session session; private CompletableFuture<UntrustedWSSession> connectFuture;
public CountDownLatch openLatch = new CountDownLatch(1);
public CountDownLatch closeLatch = new CountDownLatch(1);
public AtomicReference<CloseInfo> closeInfo = new AtomicReference<>();
public AtomicReference<Throwable> error = new AtomicReference<>();
private CompletableFuture<List<String>> expectedMessagesFuture = new CompletableFuture<>(); private BiFunction<UntrustedWSSession, String, String> onTextFunction;
private AtomicReference<Integer> expectedMessageCount = new AtomicReference<>(); private BiFunction<UntrustedWSSession, ByteBuffer, ByteBuffer> onBinaryFunction;
private List<String> messages = new ArrayList<>();
private CompletableFuture<List<WebSocketFrame>> expectedFramesFuture = new CompletableFuture<>(); public CompletableFuture<UntrustedWSSession> getConnectFuture()
private AtomicReference<Integer> expectedFramesCount = new AtomicReference<>();
private List<WebSocketFrame> frames = new ArrayList<>();
public Future<List<WebSocketFrame>> expectedFrames(int expectedCount)
{ {
if (!expectedFramesCount.compareAndSet(null, expectedCount)) return connectFuture;
{
throw new IllegalStateException("Can only have 1 registered frame count future");
}
return expectedFramesFuture;
} }
public Future<List<String>> expectedMessages(int expectedCount) public UntrustedWSEndpoint(String id)
{ {
if (!expectedMessageCount.compareAndSet(null, expectedCount)) super(id);
{
throw new IllegalStateException("Can only have 1 registered message count future");
}
return expectedMessagesFuture;
} }
@Override @Override
public void onWebSocketConnect(Session session) public void onWebSocketConnect(Session session)
{ {
this.session = session; assertThat("Session type", session, instanceOf(UntrustedWSSession.class));
this.openLatch.countDown(); this.untrustedSession = (UntrustedWSSession) session;
if (this.connectFuture != null)
{
this.connectFuture.complete(this.untrustedSession);
} }
@Override super.onWebSocketConnect(session);
public void onWebSocketClose(int statusCode, String reason)
{
this.closeLatch.countDown();
CloseInfo close = new CloseInfo(statusCode, reason);
assertThat("Close only happened once", closeInfo.compareAndSet(null, close), is(true));
} }
@Override @Override
public void onWebSocketError(Throwable cause) public void onWebSocketError(Throwable cause)
{ {
assertThat("Error must have value", cause, notNullValue()); if (this.connectFuture != null)
if (error.compareAndSet(null, cause) == false)
{ {
LOG.warn("Original Cause", error.get()); // Always trip this, doesn't matter if if completed normally first.
LOG.warn("Extra/Excess Cause", cause); this.connectFuture.completeExceptionally(cause);
fail("onError should only happen once!");
} }
synchronized (expectedMessagesFuture) super.onWebSocketError(cause);
{
if (expectedMessagesFuture != null)
expectedMessagesFuture.completeExceptionally(cause);
}
synchronized (expectedFramesFuture)
{
if (expectedFramesFuture != null)
expectedFramesFuture.completeExceptionally(cause);
}
} }
@Override @Override
public void onWebSocketBinary(byte[] payload, int offset, int len) public void onWebSocketBinary(byte[] payload, int offset, int len)
{ {
// TODO super.onWebSocketBinary(payload, offset, len);
if (onBinaryFunction != null)
{
try
{
ByteBuffer msg = ByteBuffer.wrap(payload, offset, len);
ByteBuffer responseBuffer = onBinaryFunction.apply(this.untrustedSession, msg);
if (responseBuffer != null)
{
this.getRemote().sendBytes(responseBuffer);
}
}
catch (Throwable t)
{
LOG.warn("Unable to send binary", t);
}
}
} }
@Override @Override
public void onWebSocketText(String text) public void onWebSocketText(String text)
{ {
messages.add(text); super.onWebSocketText(text);
synchronized (expectedMessagesFuture)
{
Integer expected = expectedMessageCount.get();
if (expected != null && messages.size() >= expected.intValue()) if (onTextFunction != null)
{ {
expectedMessagesFuture.complete(messages); try
{
String responseText = onTextFunction.apply(this.untrustedSession, text);
if (responseText != null)
{
this.getRemote().sendString(responseText);
}
}
catch (Throwable t)
{
LOG.warn("Unable to send text", t);
} }
} }
} }
@Override public void setConnectFuture(CompletableFuture<UntrustedWSSession> future)
public void onWebSocketFrame(Frame frame)
{ {
frames.add(WebSocketFrame.copy(frame)); this.connectFuture = future;
synchronized (expectedFramesFuture) }
{
Integer expected = expectedFramesCount.get();
if (expected != null && frames.size() >= expected.intValue()) public void setOnBinaryFunction(BiFunction<UntrustedWSSession, ByteBuffer, ByteBuffer> onBinaryFunction)
{ {
expectedFramesFuture.complete(frames); this.onBinaryFunction = onBinaryFunction;
} }
}
public void setOnTextFunction(BiFunction<UntrustedWSSession, String, String> onTextFunction)
{
this.onTextFunction = onTextFunction;
} }
} }

View File

@ -0,0 +1,145 @@
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.tests;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.api.util.WSURI;
public class UntrustedWSServer extends ContainerLifeCycle implements UntrustedWSSessionFactory.Listener
{
private static final Logger LOG = Log.getLogger(SimpleServletServer.class);
private Server server;
private ServerConnector connector;
private URI wsUri;
private boolean ssl = false;
private SslContextFactory sslContextFactory;
private Map<URI, CompletableFuture<UntrustedWSSession>> connectionFutures = new ConcurrentHashMap<>();
@Override
protected void doStart() throws Exception
{
// Configure Server
server = new Server();
if (ssl)
{
// HTTP Configuration
HttpConfiguration http_config = new HttpConfiguration();
http_config.setSecureScheme("https");
http_config.setSecurePort(0);
http_config.setOutputBufferSize(32768);
http_config.setRequestHeaderSize(8192);
http_config.setResponseHeaderSize(8192);
http_config.setSendServerVersion(true);
http_config.setSendDateHeader(false);
sslContextFactory = new SslContextFactory();
sslContextFactory.setKeyStorePath(MavenTestingUtils.getTestResourceFile("keystore").getAbsolutePath());
sslContextFactory.setKeyStorePassword("storepwd");
sslContextFactory.setKeyManagerPassword("keypwd");
sslContextFactory.setExcludeCipherSuites("SSL_RSA_WITH_DES_CBC_SHA","SSL_DHE_RSA_WITH_DES_CBC_SHA","SSL_DHE_DSS_WITH_DES_CBC_SHA",
"SSL_RSA_EXPORT_WITH_RC4_40_MD5","SSL_RSA_EXPORT_WITH_DES40_CBC_SHA","SSL_DHE_RSA_EXPORT_WITH_DES40_CBC_SHA",
"SSL_DHE_DSS_EXPORT_WITH_DES40_CBC_SHA");
// SSL HTTP Configuration
HttpConfiguration https_config = new HttpConfiguration(http_config);
https_config.addCustomizer(new SecureRequestCustomizer());
// SSL Connector
connector = new ServerConnector(server,new SslConnectionFactory(sslContextFactory, HttpVersion.HTTP_1_1.asString()),new HttpConnectionFactory(https_config));
connector.setPort(0);
}
else
{
// Basic HTTP connector
connector = new ServerConnector(server);
connector.setPort(0);
}
server.addConnector(connector);
ServletContextHandler context = new ServletContextHandler();
context.setContextPath("/");
server.setHandler(context);
// Serve untrusted endpoint
context.addServlet(UntrustedWSServlet.class, "/untrusted/*").setInitOrder(1);
// Start Server
addBean(server);
super.doStart();
// Wireup Context related things
UntrustedWSSessionFactory sessionFactory = (UntrustedWSSessionFactory) context.getServletContext().getAttribute(UntrustedWSSessionFactory.class.getName());
sessionFactory.addListener(this);
// Establish the Server URI
URI serverUri = server.getURI();
wsUri = WSURI.toWebsocket(serverUri).resolve("/");
// Some debugging
if (LOG.isDebugEnabled())
{
LOG.debug("WebSocket Server URI: " + wsUri.toASCIIString());
LOG.debug(server.dump());
}
super.doStart();
}
public URI getWsUri()
{
return wsUri;
}
@Override
public void onSessionCreate(UntrustedWSSession session, URI requestURI)
{
// A new session was created (but not connected, yet)
CompletableFuture<UntrustedWSSession> sessionFuture = this.connectionFutures.get(requestURI);
if(sessionFuture != null)
{
session.getUntrustedEndpoint().setConnectFuture(sessionFuture);
}
this.connectionFutures.put(requestURI, session.getUntrustedEndpoint().getConnectFuture());
}
public void registerConnectFuture(URI uri, CompletableFuture<UntrustedWSSession> sessionFuture)
{
this.connectionFutures.put(uri, sessionFuture);
}
}

View File

@ -0,0 +1,46 @@
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.tests;
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.server.WebSocketServerFactory;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
public class UntrustedWSServlet extends WebSocketServlet implements WebSocketCreator
{
@Override
public void configure(WebSocketServletFactory factory)
{
WebSocketServerFactory serverFactory = (WebSocketServerFactory) factory;
serverFactory.setCreator(this);
UntrustedWSSessionFactory sessionFactory = new UntrustedWSSessionFactory(serverFactory);
this.getServletContext().setAttribute(UntrustedWSSessionFactory.class.getName(), sessionFactory);
serverFactory.setSessionFactories(sessionFactory);
}
@Override
public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp)
{
return new UntrustedWSEndpoint(WebSocketBehavior.SERVER.name());
}
}

View File

@ -19,7 +19,11 @@
package org.eclipse.jetty.websocket.tests; package org.eclipse.jetty.websocket.tests;
import java.net.URI; import java.net.URI;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WebSocketConnectionListener; import org.eclipse.jetty.websocket.api.WebSocketConnectionListener;
import org.eclipse.jetty.websocket.api.annotations.WebSocket; import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.common.LogicalConnection; import org.eclipse.jetty.websocket.common.LogicalConnection;
@ -29,13 +33,31 @@ import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope;
public class UntrustedWSSessionFactory implements SessionFactory public class UntrustedWSSessionFactory implements SessionFactory
{ {
interface Listener
{
void onSessionCreate(UntrustedWSSession session, URI requestURI);
}
private final static Logger LOG = Log.getLogger(UntrustedWSSessionFactory.class);
private final WebSocketContainerScope containerScope; private final WebSocketContainerScope containerScope;
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
public UntrustedWSSessionFactory(WebSocketContainerScope containerScope) public UntrustedWSSessionFactory(WebSocketContainerScope containerScope)
{ {
this.containerScope = containerScope; this.containerScope = containerScope;
} }
public boolean addListener(Listener listener)
{
return this.listeners.add(listener);
}
public boolean removeListener(Listener listener)
{
return this.listeners.remove(listener);
}
@Override @Override
public boolean supports(Object websocket) public boolean supports(Object websocket)
{ {
@ -45,6 +67,17 @@ public class UntrustedWSSessionFactory implements SessionFactory
@Override @Override
public WebSocketSession createSession(URI requestURI, Object websocket, LogicalConnection connection) public WebSocketSession createSession(URI requestURI, Object websocket, LogicalConnection connection)
{ {
return new UntrustedWSSession(containerScope, requestURI, websocket, connection); final UntrustedWSSession session = new UntrustedWSSession(containerScope, requestURI, websocket, connection);
listeners.forEach((listener) -> {
try
{
listener.onSessionCreate(session, requestURI);
}
catch (Throwable t)
{
LOG.warn("Unable to notify listener " + listener, t);
}
});
return session;
} }
} }

View File

@ -16,24 +16,26 @@
// ======================================================================== // ========================================================================
// //
package org.eclipse.jetty.websocket.client; package org.eclipse.jetty.websocket.tests.client;
import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.containsString;
import java.net.URI; import java.net.URI;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode; import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.common.test.IBlockheadServerConnection; import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.common.test.LeakTrackingBufferPoolRule; import org.eclipse.jetty.websocket.tests.LeakTrackingBufferPoolRule;
import org.eclipse.jetty.websocket.common.test.XBlockheadServer; import org.eclipse.jetty.websocket.tests.UntrustedWSServer;
import org.eclipse.jetty.websocket.tests.UntrustedWSSession;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TestName;
/** /**
* Tests for conditions due to bad networking. * Tests for conditions due to bad networking.
@ -41,12 +43,12 @@ import org.junit.Test;
public class BadNetworkTest public class BadNetworkTest
{ {
@Rule @Rule
public TestTracker tt = new TestTracker(); public TestName testname = new TestName();
@Rule @Rule
public LeakTrackingBufferPoolRule bufferPool = new LeakTrackingBufferPoolRule("Test"); public LeakTrackingBufferPoolRule bufferPool = new LeakTrackingBufferPoolRule("Test");
private XBlockheadServer server; private UntrustedWSServer server;
private WebSocketClient client; private WebSocketClient client;
@Before @Before
@ -60,7 +62,7 @@ public class BadNetworkTest
@Before @Before
public void startServer() throws Exception public void startServer() throws Exception
{ {
server = new XBlockheadServer(); server = new UntrustedWSServer();
server.start(); server.start();
} }
@ -79,13 +81,11 @@ public class BadNetworkTest
@Test @Test
public void testAbruptClientClose() throws Exception public void testAbruptClientClose() throws Exception
{ {
JettyTrackingSocket wsocket = new JettyTrackingSocket(); TrackingSocket wsocket = new TrackingSocket();
URI wsUri = server.getWsUri(); URI wsUri = server.getWsUri();
Future<Session> future = client.connect(wsocket,wsUri);
IBlockheadServerConnection ssocket = server.accept(); Future<Session> future = client.connect(wsocket, wsUri);
ssocket.upgrade();
// Validate that we are connected // Validate that we are connected
future.get(30, TimeUnit.SECONDS); future.get(30, TimeUnit.SECONDS);
@ -107,21 +107,27 @@ public class BadNetworkTest
@Test @Test
public void testAbruptServerClose() throws Exception public void testAbruptServerClose() throws Exception
{ {
JettyTrackingSocket wsocket = new JettyTrackingSocket(); TrackingSocket wsocket = new TrackingSocket();
URI wsUri = server.getWsUri(); URI wsURI = server.getWsUri().resolve("/untrusted/" + testname.getMethodName());
Future<Session> future = client.connect(wsocket,wsUri);
IBlockheadServerConnection ssocket = server.accept(); CompletableFuture<UntrustedWSSession> sessionFuture = new CompletableFuture<UntrustedWSSession>()
ssocket.upgrade(); {
@Override
public boolean complete(UntrustedWSSession session)
{
// server disconnect
session.disconnect();
return super.complete(session);
}
};
server.registerConnectFuture(wsURI, sessionFuture);
Future<Session> future = client.connect(wsocket, wsURI);
// Validate that we are connected // Validate that we are connected
future.get(30, TimeUnit.SECONDS); future.get(30, TimeUnit.SECONDS);
wsocket.waitForConnected(30, TimeUnit.SECONDS); wsocket.waitForConnected(30, TimeUnit.SECONDS);
// Have server disconnect abruptly
ssocket.disconnect();
// Wait for close (as response to idle timeout) // Wait for close (as response to idle timeout)
wsocket.waitForClose(10, TimeUnit.SECONDS); wsocket.waitForClose(10, TimeUnit.SECONDS);

View File

@ -16,7 +16,7 @@
// ======================================================================== // ========================================================================
// //
package org.eclipse.jetty.websocket.client; package org.eclipse.jetty.websocket.tests.client;
import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
@ -29,15 +29,17 @@ import static org.junit.Assert.assertThat;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.net.URI;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel; import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -57,25 +59,27 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.log.StacklessLogging; import org.eclipse.jetty.util.log.StacklessLogging;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.websocket.api.ProtocolException; import org.eclipse.jetty.websocket.api.ProtocolException;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode; import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketAdapter; import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.eclipse.jetty.websocket.common.CloseInfo; import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.common.OpCode; import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.Parser; import org.eclipse.jetty.websocket.common.Parser;
import org.eclipse.jetty.websocket.common.WebSocketFrame; import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.WebSocketSession; import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.frames.TextFrame;
import org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection; import org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection;
import org.eclipse.jetty.websocket.common.test.IBlockheadServerConnection; import org.eclipse.jetty.websocket.tests.RawFrameBuilder;
import org.eclipse.jetty.websocket.common.test.IncomingFramesCapture; import org.eclipse.jetty.websocket.tests.UntrustedWSConnection;
import org.eclipse.jetty.websocket.common.test.RawFrameBuilder; import org.eclipse.jetty.websocket.tests.UntrustedWSEndpoint;
import org.eclipse.jetty.websocket.common.test.XBlockheadServer; import org.eclipse.jetty.websocket.tests.UntrustedWSServer;
import org.eclipse.jetty.websocket.tests.UntrustedWSSession;
import org.hamcrest.Matcher; import org.hamcrest.Matcher;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TestName;
public class ClientCloseTest public class ClientCloseTest
{ {
@ -185,13 +189,16 @@ public class ClientCloseTest
} }
} }
@Rule
public TestName testname = new TestName();
@Rule @Rule
public TestTracker tt = new TestTracker(); public TestTracker tt = new TestTracker();
private XBlockheadServer server; private UntrustedWSServer server;
private WebSocketClient client; private WebSocketClient client;
private void confirmConnection(CloseTrackingSocket clientSocket, Future<Session> clientFuture, IBlockheadServerConnection serverConns) throws Exception private void confirmConnection(CloseTrackingSocket clientSocket, Future<Session> clientFuture, UntrustedWSSession serverSession) throws Exception
{ {
// Wait for client connect on via future // Wait for client connect on via future
clientFuture.get(30, TimeUnit.SECONDS); clientFuture.get(30, TimeUnit.SECONDS);
@ -199,6 +206,9 @@ public class ClientCloseTest
// Wait for client connect via client websocket // Wait for client connect via client websocket
assertThat("Client WebSocket is Open", clientSocket.openLatch.await(30, TimeUnit.SECONDS), is(true)); assertThat("Client WebSocket is Open", clientSocket.openLatch.await(30, TimeUnit.SECONDS), is(true));
UntrustedWSEndpoint serverEndpoint = serverSession.getUntrustedEndpoint();
Future<List<WebSocketFrame>> futFrames = serverEndpoint.expectedFrames(1);
try try
{ {
// Send message from client to server // Send message from client to server
@ -209,14 +219,13 @@ public class ClientCloseTest
testFut.get(30, TimeUnit.SECONDS); testFut.get(30, TimeUnit.SECONDS);
// Read Frame on server side // Read Frame on server side
IncomingFramesCapture serverCapture = serverConns.readFrames(1,30,TimeUnit.SECONDS); List<WebSocketFrame> frames = futFrames.get(30, TimeUnit.SECONDS);
serverCapture.assertFrameCount(1); WebSocketFrame frame = frames.get(0);
WebSocketFrame frame = serverCapture.getFrames().poll();
assertThat("Server received frame", frame.getOpCode(), is(OpCode.TEXT)); assertThat("Server received frame", frame.getOpCode(), is(OpCode.TEXT));
assertThat("Server received frame payload", frame.getPayloadAsUTF8(), is(echoMsg)); assertThat("Server received frame payload", frame.getPayloadAsUTF8(), is(echoMsg));
// Server send echo reply // Server send echo reply
serverConns.write(new TextFrame().setPayload(echoMsg)); serverEndpoint.getRemote().sendString(echoMsg);
// Wait for received echo // Wait for received echo
clientSocket.messageQueue.awaitEventCount(1, 1, TimeUnit.SECONDS); clientSocket.messageQueue.awaitEventCount(1, 1, TimeUnit.SECONDS);
@ -234,32 +243,13 @@ public class ClientCloseTest
} }
} }
private void confirmServerReceivedCloseFrame(IBlockheadServerConnection serverConn, int expectedCloseCode, Matcher<String> closeReasonMatcher) throws IOException,
TimeoutException
{
IncomingFramesCapture serverCapture = serverConn.readFrames(1,30,TimeUnit.SECONDS);
serverCapture.assertFrameCount(1);
serverCapture.assertHasFrame(OpCode.CLOSE,1);
WebSocketFrame frame = serverCapture.getFrames().poll();
assertThat("Server received close frame",frame.getOpCode(),is(OpCode.CLOSE));
CloseInfo closeInfo = new CloseInfo(frame);
assertThat("Server received close code",closeInfo.getStatusCode(),is(expectedCloseCode));
if (closeReasonMatcher == null)
{
assertThat("Server received close reason",closeInfo.getReason(),nullValue());
}
else
{
assertThat("Server received close reason",closeInfo.getReason(),closeReasonMatcher);
}
}
public static class TestClientTransportOverHTTP extends HttpClientTransportOverHTTP public static class TestClientTransportOverHTTP extends HttpClientTransportOverHTTP
{ {
@Override @Override
protected SelectorManager newSelectorManager(HttpClient client) protected SelectorManager newSelectorManager(HttpClient client)
{ {
return new ClientSelectorManager(client, 1){ return new ClientSelectorManager(client, 1)
{
@Override @Override
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key) protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key)
{ {
@ -301,7 +291,7 @@ public class ClientCloseTest
@Before @Before
public void startServer() throws Exception public void startServer() throws Exception
{ {
server = new XBlockheadServer(); server = new UntrustedWSServer();
server.start(); server.start();
} }
@ -324,31 +314,34 @@ public class ClientCloseTest
final int timeout = 1000; final int timeout = 1000;
client.setMaxIdleTimeout(timeout); client.setMaxIdleTimeout(timeout);
URI wsURI = server.getWsUri().resolve("/untrusted/" + testname.getMethodName());
CompletableFuture<UntrustedWSSession> serverSessionFut = new CompletableFuture<>();
server.registerConnectFuture(wsURI, serverSessionFut);
// Client connects // Client connects
CloseTrackingSocket clientSocket = new CloseTrackingSocket(); CloseTrackingSocket clientSocket = new CloseTrackingSocket();
Future<Session> clientConnectFuture = client.connect(clientSocket,server.getWsUri()); Future<Session> clientConnectFuture = client.connect(clientSocket, wsURI);
// Server accepts connect // Server accepts connect
IBlockheadServerConnection serverConn = server.accept(); UntrustedWSSession serverSession = serverSessionFut.get(10, TimeUnit.SECONDS);
serverConn.upgrade();
// client confirms connection via echo // client confirms connection via echo
confirmConnection(clientSocket,clientConnectFuture,serverConn); confirmConnection(clientSocket, clientConnectFuture, serverSession);
// client sends close frame (code 1000, normal) // client sends close frame (code 1000, normal)
final String origCloseReason = "Normal Close"; final String origCloseReason = "Normal Close";
clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason); clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason);
// server receives close frame // server receives close frame
confirmServerReceivedCloseFrame(serverConn,StatusCode.NORMAL,is(origCloseReason)); serverSession.getUntrustedEndpoint().assertClose("Server", StatusCode.NORMAL, is(origCloseReason));
// server sends 2 messages // server sends 2 messages
serverConn.write(new TextFrame().setPayload("Hello")); RemoteEndpoint remote = serverSession.getRemote();
serverConn.write(new TextFrame().setPayload("World")); remote.sendString("Hello");
remote.sendString("World");
// server sends close frame (code 1000, no reason) // server sends close frame (code 1000, no reason)
CloseInfo sclose = new CloseInfo(StatusCode.NORMAL,"From Server"); serverSession.close(StatusCode.NORMAL, "From Server");
serverConn.write(sclose.asFrame());
// client receives 2 messages // client receives 2 messages
clientSocket.messageQueue.awaitEventCount(2, 1, TimeUnit.SECONDS); clientSocket.messageQueue.awaitEventCount(2, 1, TimeUnit.SECONDS);
@ -373,16 +366,19 @@ public class ClientCloseTest
final int timeout = 1000; final int timeout = 1000;
client.setMaxIdleTimeout(timeout); client.setMaxIdleTimeout(timeout);
URI wsURI = server.getWsUri().resolve("/untrusted/" + testname.getMethodName());
CompletableFuture<UntrustedWSSession> serverSessionFut = new CompletableFuture<>();
server.registerConnectFuture(wsURI, serverSessionFut);
// Client connects // Client connects
CloseTrackingSocket clientSocket = new CloseTrackingSocket(); CloseTrackingSocket clientSocket = new CloseTrackingSocket();
Future<Session> clientConnectFuture = client.connect(clientSocket,server.getWsUri()); Future<Session> clientConnectFuture = client.connect(clientSocket, wsURI);
// Server accepts connect // Server accepts connect
IBlockheadServerConnection serverConn = server.accept(); UntrustedWSSession serverSession = serverSessionFut.get(10, TimeUnit.SECONDS);
serverConn.upgrade();
// client confirms connection via echo // client confirms connection via echo
confirmConnection(clientSocket,clientConnectFuture,serverConn); confirmConnection(clientSocket, clientConnectFuture, serverSession);
// client sends BIG frames (until it cannot write anymore) // client sends BIG frames (until it cannot write anymore)
// server must not read (for test purpose, in order to congest connection) // server must not read (for test purpose, in order to congest connection)
@ -419,16 +415,19 @@ public class ClientCloseTest
final int timeout = 1000; final int timeout = 1000;
client.setMaxIdleTimeout(timeout); client.setMaxIdleTimeout(timeout);
URI wsURI = server.getWsUri().resolve("/untrusted/" + testname.getMethodName());
CompletableFuture<UntrustedWSSession> serverSessionFut = new CompletableFuture<>();
server.registerConnectFuture(wsURI, serverSessionFut);
// Client connects // Client connects
CloseTrackingSocket clientSocket = new CloseTrackingSocket(); CloseTrackingSocket clientSocket = new CloseTrackingSocket();
Future<Session> clientConnectFuture = client.connect(clientSocket,server.getWsUri()); Future<Session> clientConnectFuture = client.connect(clientSocket, wsURI);
// Server accepts connect // Server accepts connect
IBlockheadServerConnection serverConn = server.accept(); UntrustedWSSession serverSession = serverSessionFut.get(10, TimeUnit.SECONDS);
serverConn.upgrade();
// client confirms connection via echo // client confirms connection via echo
confirmConnection(clientSocket,clientConnectFuture,serverConn); confirmConnection(clientSocket, clientConnectFuture, serverSession);
// client should not have received close message (yet) // client should not have received close message (yet)
clientSocket.assertNoCloseEvent(); clientSocket.assertNoCloseEvent();
@ -444,7 +443,7 @@ public class ClientCloseTest
BufferUtil.flipToFlush(bad, 0); BufferUtil.flipToFlush(bad, 0);
try (StacklessLogging ignored = new StacklessLogging(Parser.class)) try (StacklessLogging ignored = new StacklessLogging(Parser.class))
{ {
serverConn.write(bad); serverSession.getUntrustedConnection().writeRaw(bad);
// client should have noticed the error // client should have noticed the error
assertThat("OnError Latch", clientSocket.errorLatch.await(2, TimeUnit.SECONDS), is(true)); assertThat("OnError Latch", clientSocket.errorLatch.await(2, TimeUnit.SECONDS), is(true));
@ -452,11 +451,11 @@ public class ClientCloseTest
assertThat("OnError", clientSocket.error.get().getMessage(), containsString("Invalid control frame")); assertThat("OnError", clientSocket.error.get().getMessage(), containsString("Invalid control frame"));
// client parse invalid frame, notifies server of close (protocol error) // client parse invalid frame, notifies server of close (protocol error)
confirmServerReceivedCloseFrame(serverConn,StatusCode.PROTOCOL,allOf(containsString("Invalid control frame"),containsString("length"))); serverSession.getUntrustedEndpoint().assertClose("Server", StatusCode.PROTOCOL, allOf(containsString("Invalid control frame"), containsString("length")));
} }
// server disconnects // server disconnects
serverConn.disconnect(); serverSession.disconnect();
// client triggers close event on client ws-endpoint // client triggers close event on client ws-endpoint
clientSocket.assertReceivedCloseEvent(timeout, is(StatusCode.PROTOCOL), allOf(containsString("Invalid control frame"), containsString("length"))); clientSocket.assertReceivedCloseEvent(timeout, is(StatusCode.PROTOCOL), allOf(containsString("Invalid control frame"), containsString("length")));
@ -469,16 +468,19 @@ public class ClientCloseTest
final int timeout = 1000; final int timeout = 1000;
client.setMaxIdleTimeout(timeout); client.setMaxIdleTimeout(timeout);
URI wsURI = server.getWsUri().resolve("/untrusted/" + testname.getMethodName());
CompletableFuture<UntrustedWSSession> serverSessionFut = new CompletableFuture<>();
server.registerConnectFuture(wsURI, serverSessionFut);
// Client connects // Client connects
CloseTrackingSocket clientSocket = new CloseTrackingSocket(); CloseTrackingSocket clientSocket = new CloseTrackingSocket();
Future<Session> clientConnectFuture = client.connect(clientSocket,server.getWsUri()); Future<Session> clientConnectFuture = client.connect(clientSocket, wsURI);
// Server accepts connect // Server accepts connect
IBlockheadServerConnection serverConn = server.accept(); UntrustedWSSession serverSession = serverSessionFut.get(10, TimeUnit.SECONDS);
serverConn.upgrade();
// client confirms connection via echo // client confirms connection via echo
confirmConnection(clientSocket,clientConnectFuture,serverConn); confirmConnection(clientSocket, clientConnectFuture, serverSession);
try (StacklessLogging ignored = new StacklessLogging(CloseTrackingSocket.class)) try (StacklessLogging ignored = new StacklessLogging(CloseTrackingSocket.class))
{ {
@ -487,13 +489,13 @@ public class ClientCloseTest
clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason); clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason);
// server receives close frame // server receives close frame
confirmServerReceivedCloseFrame(serverConn,StatusCode.NORMAL,is(origCloseReason)); serverSession.getUntrustedEndpoint().assertClose("Server", StatusCode.NORMAL, is(origCloseReason));
// client should not have received close message (yet) // client should not have received close message (yet)
clientSocket.assertNoCloseEvent(); clientSocket.assertNoCloseEvent();
// server shuts down connection (no frame reply) // server shuts down connection (no frame reply)
serverConn.disconnect(); serverSession.disconnect();
// client reads -1 (EOF) // client reads -1 (EOF)
clientSocket.assertReceivedErrorEvent(timeout, IOException.class, containsString("EOF")); clientSocket.assertReceivedErrorEvent(timeout, IOException.class, containsString("EOF"));
@ -509,23 +511,27 @@ public class ClientCloseTest
final int timeout = 1000; final int timeout = 1000;
client.setMaxIdleTimeout(timeout); client.setMaxIdleTimeout(timeout);
URI wsURI = server.getWsUri().resolve("/untrusted/" + testname.getMethodName());
CompletableFuture<UntrustedWSSession> serverSessionFut = new CompletableFuture<>();
server.registerConnectFuture(wsURI, serverSessionFut);
// Client connects // Client connects
CloseTrackingSocket clientSocket = new CloseTrackingSocket(); CloseTrackingSocket clientSocket = new CloseTrackingSocket();
Future<Session> clientConnectFuture = client.connect(clientSocket,server.getWsUri()); Future<Session> clientConnectFuture = client.connect(clientSocket, wsURI);
// Server accepts connect // Server accepts connect
IBlockheadServerConnection serverConn = server.accept(); UntrustedWSSession serverSession = serverSessionFut.get(10, TimeUnit.SECONDS);
serverConn.upgrade(); UntrustedWSConnection serverConn = serverSession.getUntrustedConnection();
// client confirms connection via echo // client confirms connection via echo
confirmConnection(clientSocket,clientConnectFuture,serverConn); confirmConnection(clientSocket, clientConnectFuture, serverSession);
// client sends close frame // client sends close frame
final String origCloseReason = "Normal Close"; final String origCloseReason = "Normal Close";
clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason); clientSocket.getSession().close(StatusCode.NORMAL, origCloseReason);
// server receives close frame // server receives close frame
confirmServerReceivedCloseFrame(serverConn,StatusCode.NORMAL,is(origCloseReason)); serverSession.getUntrustedEndpoint().assertClose("Server", StatusCode.NORMAL, is(origCloseReason));
// client should not have received close message (yet) // client should not have received close message (yet)
clientSocket.assertNoCloseEvent(); clientSocket.assertNoCloseEvent();
@ -548,21 +554,24 @@ public class ClientCloseTest
int clientCount = 3; int clientCount = 3;
CloseTrackingSocket clientSockets[] = new CloseTrackingSocket[clientCount]; CloseTrackingSocket clientSockets[] = new CloseTrackingSocket[clientCount];
IBlockheadServerConnection serverConns[] = new IBlockheadServerConnection[clientCount]; UntrustedWSSession serverSessions[] = new UntrustedWSSession[clientCount];
// Connect Multiple Clients // Connect Multiple Clients
for (int i = 0; i < clientCount; i++) for (int i = 0; i < clientCount; i++)
{ {
URI wsURI = server.getWsUri().resolve("/untrusted/" + testname.getMethodName() + "/" + i);
CompletableFuture<UntrustedWSSession> serverSessionFut = new CompletableFuture<>();
server.registerConnectFuture(wsURI, serverSessionFut);
// Client Request Upgrade // Client Request Upgrade
clientSockets[i] = new CloseTrackingSocket(); clientSockets[i] = new CloseTrackingSocket();
Future<Session> clientConnectFuture = client.connect(clientSockets[i],server.getWsUri()); Future<Session> clientConnectFuture = client.connect(clientSockets[i], wsURI);
// Server accepts connection // Server accepts connection
serverConns[i] = server.accept(); serverSessions[i] = serverSessionFut.get(10, TimeUnit.SECONDS);
serverConns[i].upgrade();
// client confirms connection via echo // client confirms connection via echo
confirmConnection(clientSockets[i],clientConnectFuture,serverConns[i]); confirmConnection(clientSockets[i], clientConnectFuture, serverSessions[i]);
} }
// client lifecycle stop // client lifecycle stop
@ -572,7 +581,7 @@ public class ClientCloseTest
for (int i = 0; i < clientCount; i++) for (int i = 0; i < clientCount; i++)
{ {
// server receives close frame // server receives close frame
confirmServerReceivedCloseFrame(serverConns[i],StatusCode.SHUTDOWN,containsString("Shutdown")); serverSessions[i].getUntrustedEndpoint().assertClose("Server", StatusCode.SHUTDOWN, containsString("Shutdown"));
} }
// clients disconnect // clients disconnect
@ -589,16 +598,28 @@ public class ClientCloseTest
final int timeout = 1000; final int timeout = 1000;
client.setMaxIdleTimeout(timeout); client.setMaxIdleTimeout(timeout);
URI wsURI = server.getWsUri().resolve("/untrusted/" + testname.getMethodName());
CompletableFuture<UntrustedWSSession> serverSessionFut = new CompletableFuture<UntrustedWSSession>()
{
@Override
public boolean complete(UntrustedWSSession session)
{
// echo back text as-well
session.getUntrustedEndpoint().setOnTextFunction((serverSession, text) -> text);
return super.complete(session);
}
};
server.registerConnectFuture(wsURI, serverSessionFut);
// Client connects // Client connects
CloseTrackingSocket clientSocket = new CloseTrackingSocket(); CloseTrackingSocket clientSocket = new CloseTrackingSocket();
Future<Session> clientConnectFuture = client.connect(clientSocket,server.getWsUri()); Future<Session> clientConnectFuture = client.connect(clientSocket, wsURI);
// Server accepts connect // Server accepts connect
IBlockheadServerConnection serverConn = server.accept(); UntrustedWSSession serverSession = serverSessionFut.get(10, TimeUnit.SECONDS);
serverConn.upgrade();
// client confirms connection via echo // client confirms connection via echo
confirmConnection(clientSocket,clientConnectFuture,serverConn); confirmConnection(clientSocket, clientConnectFuture, serverSession);
// setup client endpoint for write failure (test only) // setup client endpoint for write failure (test only)
EndPoint endp = clientSocket.getEndPoint(); EndPoint endp = clientSocket.getEndPoint();

View File

@ -0,0 +1,128 @@
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.tests.client;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.tests.TrackingEndpoint;
import org.eclipse.jetty.websocket.tests.UntrustedWSServer;
import org.eclipse.jetty.websocket.tests.UntrustedWSSession;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
public class EchoTest
{
@Rule
public TestName testname = new TestName();
private UntrustedWSServer server;
private WebSocketClient client;
@Before
public void startClient() throws Exception
{
client = new WebSocketClient();
client.start();
}
@Before
public void startServer() throws Exception
{
server = new UntrustedWSServer();
server.start();
}
@After
public void stopClient() throws Exception
{
client.stop();
}
@After
public void stopServer() throws Exception
{
server.stop();
}
@Test
public void testBasicEcho() throws IOException, InterruptedException, ExecutionException, TimeoutException
{
// Set client timeout
final int timeout = 1000;
client.setMaxIdleTimeout(timeout);
URI wsURI = server.getWsUri().resolve("/untrusted/" + testname.getMethodName());
CompletableFuture<UntrustedWSSession> serverSessionFut = new CompletableFuture<UntrustedWSSession>()
{
@Override
public boolean complete(UntrustedWSSession session)
{
// echo back text as-well
session.getUntrustedEndpoint().setOnTextFunction((serverSession, text) -> text);
return super.complete(session);
}
};
server.registerConnectFuture(wsURI, serverSessionFut);
// Client connects
TrackingEndpoint clientSocket = new TrackingEndpoint(WebSocketBehavior.CLIENT.name());
Future<Session> clientConnectFuture = client.connect(clientSocket, wsURI);
// Server accepts connect
UntrustedWSSession serverSession = serverSessionFut.get(10, TimeUnit.SECONDS);
// client confirms connection via echo
assertThat("Client Opened", clientSocket.openLatch.await(5, TimeUnit.SECONDS), is(true));
Future<List<String>> futMessages = clientSocket.expectedMessages(1);
// client sends message
clientSocket.getRemote().sendString("Hello Echo");
List<String> messages = futMessages.get(10, TimeUnit.SECONDS);
assertThat("Messages[0]", messages.get(0), is("Hello Echo"));
// client closes
clientSocket.close(StatusCode.NORMAL, "Normal Close");
// client triggers close event on client ws-endpoint
clientSocket.assertClose("Client", StatusCode.NORMAL, containsString("Normal Close"));
// Server close event
serverSession.getUntrustedEndpoint().assertClose("Server", StatusCode.NORMAL, containsString("Normal Close"));
}
}

View File

@ -0,0 +1,55 @@
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.tests.client;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.hamcrest.Matcher;
@WebSocket
public class TrackingSocket
{
private Session session;
public void assertClose(int expectedStatusCode, Matcher<String> reasonMatcher)
{
}
public Session getSession()
{
return session;
}
@OnWebSocketConnect
public void onOpen(Session session)
{
this.session = session;
}
public void waitForClose(int timeout, TimeUnit unit)
{
}
public void waitForConnected(int timeout, TimeUnit unit)
{
}
}