Issue #207 - Simplifying Connection and Close states
+ Removing IOState + Adding AtomicConnectionState + Adding AtomicClose + Reworking AbstractWebSocketConnection to be focused solely on low level Connection handling, with little to none websocket specifics. + WebSocketSession simplified
This commit is contained in:
parent
7e2e64e856
commit
a8d4c68bdc
|
@ -25,7 +25,7 @@ import java.nio.ByteBuffer;
|
|||
*/
|
||||
public interface Frame
|
||||
{
|
||||
public static enum Type
|
||||
enum Type
|
||||
{
|
||||
CONTINUATION((byte)0x00),
|
||||
TEXT((byte)0x01),
|
||||
|
@ -48,7 +48,7 @@ public interface Frame
|
|||
|
||||
private byte opcode;
|
||||
|
||||
private Type(byte code)
|
||||
Type(byte code)
|
||||
{
|
||||
this.opcode = code;
|
||||
}
|
||||
|
@ -80,24 +80,24 @@ public interface Frame
|
|||
}
|
||||
}
|
||||
|
||||
public byte[] getMask();
|
||||
byte[] getMask();
|
||||
|
||||
public byte getOpCode();
|
||||
byte getOpCode();
|
||||
|
||||
public ByteBuffer getPayload();
|
||||
ByteBuffer getPayload();
|
||||
|
||||
/**
|
||||
* The original payload length ({@link ByteBuffer#remaining()})
|
||||
*
|
||||
* @return the original payload length ({@link ByteBuffer#remaining()})
|
||||
*/
|
||||
public int getPayloadLength();
|
||||
int getPayloadLength();
|
||||
|
||||
public Type getType();
|
||||
Type getType();
|
||||
|
||||
public boolean hasPayload();
|
||||
boolean hasPayload();
|
||||
|
||||
public boolean isFin();
|
||||
boolean isFin();
|
||||
|
||||
/**
|
||||
* Same as {@link #isFin()}
|
||||
|
@ -106,14 +106,13 @@ public interface Frame
|
|||
* @deprecated use {@link #isFin()} instead
|
||||
*/
|
||||
@Deprecated
|
||||
public boolean isLast();
|
||||
boolean isLast();
|
||||
|
||||
public boolean isMasked();
|
||||
boolean isMasked();
|
||||
|
||||
public boolean isRsv1();
|
||||
boolean isRsv1();
|
||||
|
||||
public boolean isRsv2();
|
||||
|
||||
public boolean isRsv3();
|
||||
boolean isRsv2();
|
||||
|
||||
boolean isRsv3();
|
||||
}
|
||||
|
|
|
@ -49,25 +49,21 @@ public final class WSURI
|
|||
{
|
||||
// leave alone
|
||||
httpScheme = wsScheme;
|
||||
port = 80;
|
||||
}
|
||||
else if ("https".equalsIgnoreCase(wsScheme))
|
||||
{
|
||||
// leave alone
|
||||
httpScheme = wsScheme;
|
||||
port = 443;
|
||||
}
|
||||
else if ("ws".equalsIgnoreCase(wsScheme))
|
||||
{
|
||||
// convert to http
|
||||
httpScheme = "http";
|
||||
port = 80;
|
||||
}
|
||||
else if ("wss".equalsIgnoreCase(wsScheme))
|
||||
{
|
||||
// convert to https
|
||||
httpScheme = "https";
|
||||
port = 443;
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
|
@ -350,7 +350,6 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketCont
|
|||
init();
|
||||
|
||||
WebSocketUpgradeRequest wsReq = new WebSocketUpgradeRequest(this,httpClient,request);
|
||||
|
||||
wsReq.setUpgradeListener(upgradeListener);
|
||||
return wsReq.sendAsync();
|
||||
}
|
||||
|
|
|
@ -587,6 +587,8 @@ public class WebSocketUpgradeRequest extends HttpRequest implements CompleteList
|
|||
|
||||
// We can upgrade
|
||||
EndPoint endp = oldConn.getEndPoint();
|
||||
|
||||
endp = configure(endp);
|
||||
|
||||
WebSocketClientConnection connection = new WebSocketClientConnection(endp,wsClient.getExecutor(),wsClient.getScheduler(),wsClient.getPolicy(),
|
||||
wsClient.getBufferPool(), extensionStack);
|
||||
|
@ -597,7 +599,6 @@ public class WebSocketUpgradeRequest extends HttpRequest implements CompleteList
|
|||
session.setUpgradeRequest(new ClientUpgradeRequest(this));
|
||||
session.setUpgradeResponse(new ClientUpgradeResponse(response));
|
||||
connection.addListener(session);
|
||||
connection.setErrorListener(session);
|
||||
|
||||
// Setup Incoming Routing
|
||||
extensionStack.setNextIncoming(session);
|
||||
|
@ -618,6 +619,11 @@ public class WebSocketUpgradeRequest extends HttpRequest implements CompleteList
|
|||
// Now swap out the connection
|
||||
endp.upgrade(connection);
|
||||
}
|
||||
|
||||
public EndPoint configure(EndPoint endp)
|
||||
{
|
||||
return endp;
|
||||
}
|
||||
|
||||
public void setUpgradeListener(UpgradeListener upgradeListener)
|
||||
{
|
||||
|
|
|
@ -8,11 +8,11 @@ org.eclipse.jetty.LEVEL=WARN
|
|||
# org.eclipse.jetty.websocket.LEVEL=DEBUG
|
||||
# org.eclipse.jetty.websocket.client.LEVEL=DEBUG
|
||||
# org.eclipse.jetty.websocket.client.ClientCloseTest.LEVEL=DEBUG
|
||||
# org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.LEVEL=DEBUG
|
||||
org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.LEVEL=DEBUG
|
||||
# org.eclipse.jetty.websocket.common.io.IOState.LEVEL=DEBUG
|
||||
|
||||
# org.eclipse.jetty.websocket.common.Generator.LEVEL=DEBUG
|
||||
# org.eclipse.jetty.websocket.common.Parser.LEVEL=DEBUG
|
||||
org.eclipse.jetty.websocket.common.Parser.LEVEL=DEBUG
|
||||
# org.eclipse.jetty.websocket.client.TrackingSocket.LEVEL=DEBUG
|
||||
# org.eclipse.jetty.websocket.common.test.BlockheadServerConnection.LEVEL=DEBUG
|
||||
|
||||
|
|
|
@ -0,0 +1,58 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.common;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
public class AtomicClose
|
||||
{
|
||||
enum State
|
||||
{
|
||||
/** No close handshake initiated (yet) */
|
||||
NONE,
|
||||
/** Local side initiated the close handshake */
|
||||
LOCAL,
|
||||
/** Remote side initiated the close handshake */
|
||||
REMOTE,
|
||||
/** An abnormal close situation (disconnect, timeout, etc...) */
|
||||
ABNORMAL
|
||||
}
|
||||
|
||||
private final AtomicReference<State> state = new AtomicReference<>(State.NONE);
|
||||
|
||||
public State get()
|
||||
{
|
||||
return state.get();
|
||||
}
|
||||
|
||||
public boolean onAbnormal()
|
||||
{
|
||||
return state.compareAndSet(State.NONE, State.ABNORMAL);
|
||||
}
|
||||
|
||||
public boolean onLocal()
|
||||
{
|
||||
return state.compareAndSet(State.NONE, State.LOCAL);
|
||||
}
|
||||
|
||||
public boolean onRemote()
|
||||
{
|
||||
return state.compareAndSet(State.NONE, State.REMOTE);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,96 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.common;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
public class AtomicConnectionState
|
||||
{
|
||||
/**
|
||||
* Connection states as outlined in <a href="https://tools.ietf.org/html/rfc6455">RFC6455</a>.
|
||||
*/
|
||||
public enum State
|
||||
{
|
||||
/** [RFC] Initial state of a connection, the upgrade request / response is in progress */
|
||||
CONNECTING,
|
||||
/**
|
||||
* [Impl] Intermediate state between CONNECTING and OPEN, used to indicate that a upgrade request/response is successful, but the end-user provided socket's
|
||||
* onOpen code has yet to run.
|
||||
* <p>
|
||||
* This state is to allow the local socket to initiate messages and frames, but to NOT start reading yet.
|
||||
* </p>
|
||||
*/
|
||||
CONNECTED,
|
||||
/**
|
||||
* [RFC] The websocket connection is established and open.
|
||||
* <p>
|
||||
* This indicates that the Upgrade has succeed, and the end-user provided socket's onOpen code has completed.
|
||||
* <p>
|
||||
* It is now time to start reading from the remote endpoint.
|
||||
* </p>
|
||||
*/
|
||||
OPEN,
|
||||
/**
|
||||
* [RFC] The websocket closing handshake is started.
|
||||
* <p>
|
||||
* This can be considered a half-closed state.
|
||||
* </p>
|
||||
*/
|
||||
CLOSING,
|
||||
/**
|
||||
* [RFC] The websocket connection is closed.
|
||||
* <p>
|
||||
* Connection should be disconnected and no further reads or writes should occur.
|
||||
* </p>
|
||||
*/
|
||||
CLOSED;
|
||||
}
|
||||
|
||||
private AtomicReference<State> state = new AtomicReference<>();
|
||||
|
||||
public State get()
|
||||
{
|
||||
return state.get();
|
||||
}
|
||||
|
||||
public boolean onClosed()
|
||||
{
|
||||
return state.compareAndSet(State.CLOSING, State.CLOSED);
|
||||
}
|
||||
|
||||
public boolean onClosing()
|
||||
{
|
||||
return state.compareAndSet(State.OPEN, State.CLOSING);
|
||||
}
|
||||
|
||||
public boolean onConnected()
|
||||
{
|
||||
return state.compareAndSet(State.CONNECTING, State.CONNECTED);
|
||||
}
|
||||
|
||||
public boolean onConnecting()
|
||||
{
|
||||
return state.compareAndSet(null, State.CONNECTING);
|
||||
}
|
||||
|
||||
public boolean onOpen()
|
||||
{
|
||||
return state.compareAndSet(State.CONNECTED, State.OPEN);
|
||||
}
|
||||
}
|
|
@ -1,61 +0,0 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.common;
|
||||
|
||||
import org.eclipse.jetty.websocket.common.io.IOState;
|
||||
import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
|
||||
|
||||
/**
|
||||
* Connection states as outlined in <a href="https://tools.ietf.org/html/rfc6455">RFC6455</a>.
|
||||
*/
|
||||
public enum ConnectionState
|
||||
{
|
||||
/** [RFC] Initial state of a connection, the upgrade request / response is in progress */
|
||||
CONNECTING,
|
||||
/**
|
||||
* [Impl] Intermediate state between CONNECTING and OPEN, used to indicate that a upgrade request/response is successful, but the end-user provided socket's
|
||||
* onOpen code has yet to run.
|
||||
* <p>
|
||||
* This state is to allow the local socket to initiate messages and frames, but to NOT start reading yet.
|
||||
*/
|
||||
CONNECTED,
|
||||
/**
|
||||
* [RFC] The websocket connection is established and open.
|
||||
* <p>
|
||||
* This indicates that the Upgrade has succeed, and the end-user provided socket's onOpen code has completed.
|
||||
* <p>
|
||||
* It is now time to start reading from the remote endpoint.
|
||||
*/
|
||||
OPEN,
|
||||
/**
|
||||
* [RFC] The websocket closing handshake is started.
|
||||
* <p>
|
||||
* This can be considered a half-closed state.
|
||||
* <p>
|
||||
* When receiving this as an event on {@link ConnectionStateListener#onConnectionStateChange(ConnectionState)} a close frame should be sent using
|
||||
* the {@link CloseInfo} available from {@link IOState#getCloseInfo()}
|
||||
*/
|
||||
CLOSING,
|
||||
/**
|
||||
* [RFC] The websocket connection is closed.
|
||||
* <p>
|
||||
* Connection should be disconnected and no further reads or writes should occur.
|
||||
*/
|
||||
CLOSED;
|
||||
}
|
|
@ -213,9 +213,9 @@ public class Generator
|
|||
public ByteBuffer generateHeaderBytes(Frame frame)
|
||||
{
|
||||
ByteBuffer buffer = bufferPool.acquire(MAX_HEADER_LENGTH,false);
|
||||
int p = BufferUtil.flipToFill(buffer);
|
||||
BufferUtil.clearToFill(buffer);
|
||||
generateHeaderBytes(frame,buffer);
|
||||
BufferUtil.flipToFlush(buffer,p);
|
||||
BufferUtil.flipToFlush(buffer,0);
|
||||
return buffer;
|
||||
}
|
||||
|
||||
|
|
|
@ -22,14 +22,14 @@ import java.net.InetSocketAddress;
|
|||
import java.util.concurrent.Executor;
|
||||
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.io.Connection;
|
||||
import org.eclipse.jetty.websocket.api.SuspendToken;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
|
||||
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
|
||||
import org.eclipse.jetty.websocket.common.io.IOState;
|
||||
|
||||
public interface LogicalConnection extends OutgoingFrames, SuspendToken
|
||||
{
|
||||
interface ErrorListener
|
||||
interface Listener extends Connection.Listener
|
||||
{
|
||||
/**
|
||||
* Notification of an error condition at the Connection level
|
||||
|
@ -39,29 +39,6 @@ public interface LogicalConnection extends OutgoingFrames, SuspendToken
|
|||
void onError(Throwable cause);
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a websocket Close frame, without a status code or reason.
|
||||
* <p>
|
||||
* Basic usage: results in an non-blocking async write, then connection close.
|
||||
*
|
||||
* @see org.eclipse.jetty.websocket.api.StatusCode
|
||||
* @see #close(int, String)
|
||||
*/
|
||||
void close();
|
||||
|
||||
/**
|
||||
* Send a websocket Close frame, with status code.
|
||||
* <p>
|
||||
* Advanced usage: results in an non-blocking async write, then connection close.
|
||||
*
|
||||
* @param statusCode
|
||||
* the status code
|
||||
* @param reason
|
||||
* the (optional) reason. (can be null for no reason)
|
||||
* @see org.eclipse.jetty.websocket.api.StatusCode
|
||||
*/
|
||||
void close(int statusCode, String reason);
|
||||
|
||||
/**
|
||||
* Terminate the connection (no close frame sent)
|
||||
*/
|
||||
|
@ -86,13 +63,6 @@ public interface LogicalConnection extends OutgoingFrames, SuspendToken
|
|||
*/
|
||||
long getIdleTimeout();
|
||||
|
||||
/**
|
||||
* Get the IOState of the connection.
|
||||
*
|
||||
* @return the IOState of the connection.
|
||||
*/
|
||||
IOState getIOState();
|
||||
|
||||
/**
|
||||
* Get the local {@link InetSocketAddress} in use for this connection.
|
||||
* <p>
|
||||
|
|
|
@ -166,6 +166,13 @@ public class Parser
|
|||
return (flagsInUse & 0x10) != 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse the buffer.
|
||||
*
|
||||
* @param buffer the buffer to consume.
|
||||
* @return true if buffer is fully consumed, false if still has remaining bytes
|
||||
* @throws WebSocketException if unable to parse properly
|
||||
*/
|
||||
public boolean parse(ByteBuffer buffer) throws WebSocketException
|
||||
{
|
||||
// quick fail, nothing left to parse
|
||||
|
@ -263,6 +270,7 @@ public class Parser
|
|||
{
|
||||
LOG.debug("{} Parsing {} bytes",policy.getBehavior(),buffer.remaining());
|
||||
}
|
||||
|
||||
while (buffer.hasRemaining())
|
||||
{
|
||||
switch (state)
|
||||
|
@ -568,7 +576,6 @@ public class Parser
|
|||
int bytesAvailable = buffer.remaining();
|
||||
int windowBytes = Math.min(bytesAvailable, bytesExpected);
|
||||
int limit = buffer.limit();
|
||||
assert(buffer.position() + windowBytes < buffer.capacity());
|
||||
buffer.limit(buffer.position() + windowBytes);
|
||||
ByteBuffer window = buffer.slice();
|
||||
buffer.limit(limit);
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.io.IOException;
|
|||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
|
@ -31,6 +32,7 @@ import org.eclipse.jetty.util.log.Logger;
|
|||
import org.eclipse.jetty.websocket.api.BatchMode;
|
||||
import org.eclipse.jetty.websocket.api.FrameCallback;
|
||||
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketException;
|
||||
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
|
||||
import org.eclipse.jetty.websocket.common.BlockingWriteCallback.WriteBlocker;
|
||||
|
@ -78,24 +80,21 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
|
|||
private final static int PARTIAL_TEXT_MASK = 0x00040000;
|
||||
private final static int PARTIAL_BINARY_MASK = 0x00080000;
|
||||
|
||||
private final LogicalConnection connection;
|
||||
private final WebSocketSession session;
|
||||
private final OutgoingFrames outgoing;
|
||||
private final AtomicInteger msgState = new AtomicInteger();
|
||||
private final BlockingWriteCallback blocker = new BlockingWriteCallback();
|
||||
private volatile BatchMode batchMode;
|
||||
|
||||
public WebSocketRemoteEndpoint(LogicalConnection connection, OutgoingFrames outgoing)
|
||||
public WebSocketRemoteEndpoint(WebSocketSession session, OutgoingFrames outgoing)
|
||||
{
|
||||
this(connection, outgoing, BatchMode.AUTO);
|
||||
this(session, outgoing, BatchMode.AUTO);
|
||||
}
|
||||
|
||||
public WebSocketRemoteEndpoint(LogicalConnection connection, OutgoingFrames outgoing, BatchMode batchMode)
|
||||
public WebSocketRemoteEndpoint(WebSocketSession session, OutgoingFrames outgoing, BatchMode batchMode)
|
||||
{
|
||||
if (connection == null)
|
||||
{
|
||||
throw new IllegalArgumentException("LogicalConnection cannot be null");
|
||||
}
|
||||
this.connection = connection;
|
||||
Objects.requireNonNull(session, "Session cannot be null");
|
||||
this.session = session;
|
||||
this.outgoing = outgoing;
|
||||
this.batchMode = batchMode;
|
||||
}
|
||||
|
@ -224,9 +223,18 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
|
|||
*/
|
||||
public InetSocketAddress getInetSocketAddress()
|
||||
{
|
||||
if(connection == null)
|
||||
return null;
|
||||
return connection.getRemoteAddress();
|
||||
if(session.isOpen())
|
||||
return session.getRemoteAddress();
|
||||
return null;
|
||||
}
|
||||
|
||||
private void assertIsOpen()
|
||||
{
|
||||
AtomicConnectionState.State state = session.getConnectionState().get();
|
||||
if(state != AtomicConnectionState.State.OPEN && state != AtomicConnectionState.State.CONNECTED)
|
||||
{
|
||||
throw new WebSocketException("RemoteEndpoint unavailable, current state [" + state + "], expecting [OPEN or CONNECTED]");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -237,6 +245,7 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
|
|||
*/
|
||||
private Future<Void> sendAsyncFrame(WebSocketFrame frame)
|
||||
{
|
||||
assertIsOpen();
|
||||
FutureWriteCallback future = new FutureWriteCallback();
|
||||
uncheckedSendFrame(frame, future);
|
||||
return future;
|
||||
|
@ -248,10 +257,10 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
|
|||
@Override
|
||||
public void sendBytes(ByteBuffer data) throws IOException
|
||||
{
|
||||
assertIsOpen();
|
||||
lockMsg(MsgType.BLOCKING);
|
||||
try
|
||||
{
|
||||
connection.getIOState().assertOutputOpen();
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("sendBytes with {}", BufferUtil.toDetailString(data));
|
||||
|
@ -267,6 +276,7 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
|
|||
@Override
|
||||
public Future<Void> sendBytesByFuture(ByteBuffer data)
|
||||
{
|
||||
assertIsOpen();
|
||||
lockMsg(MsgType.ASYNC);
|
||||
try
|
||||
{
|
||||
|
@ -285,6 +295,7 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
|
|||
@Override
|
||||
public void sendBytes(ByteBuffer data, WriteCallback callback)
|
||||
{
|
||||
assertIsOpen();
|
||||
lockMsg(MsgType.ASYNC);
|
||||
try
|
||||
{
|
||||
|
@ -302,23 +313,24 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
|
|||
|
||||
public void uncheckedSendFrame(WebSocketFrame frame, FrameCallback callback)
|
||||
{
|
||||
assertIsOpen();
|
||||
try
|
||||
{
|
||||
BatchMode batchMode = BatchMode.OFF;
|
||||
if (frame.isDataFrame())
|
||||
batchMode = getBatchMode();
|
||||
connection.getIOState().assertOutputOpen();
|
||||
outgoing.outgoingFrame(frame, callback, batchMode);
|
||||
}
|
||||
catch (IOException e)
|
||||
catch (Throwable t)
|
||||
{
|
||||
callback.fail(e);
|
||||
callback.fail(t);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendPartialBytes(ByteBuffer fragment, boolean isLast) throws IOException
|
||||
{
|
||||
assertIsOpen();
|
||||
boolean first = lockMsg(MsgType.PARTIAL_BINARY);
|
||||
try
|
||||
{
|
||||
|
@ -341,6 +353,7 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
|
|||
@Override
|
||||
public void sendPartialString(String fragment, boolean isLast) throws IOException
|
||||
{
|
||||
assertIsOpen();
|
||||
boolean first = lockMsg(MsgType.PARTIAL_TEXT);
|
||||
try
|
||||
{
|
||||
|
@ -363,6 +376,7 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
|
|||
@Override
|
||||
public void sendPing(ByteBuffer applicationData) throws IOException
|
||||
{
|
||||
assertIsOpen();
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("sendPing with {}", BufferUtil.toDetailString(applicationData));
|
||||
|
@ -373,6 +387,7 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
|
|||
@Override
|
||||
public void sendPong(ByteBuffer applicationData) throws IOException
|
||||
{
|
||||
assertIsOpen();
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("sendPong with {}", BufferUtil.toDetailString(applicationData));
|
||||
|
@ -383,6 +398,7 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
|
|||
@Override
|
||||
public void sendString(String text) throws IOException
|
||||
{
|
||||
assertIsOpen();
|
||||
lockMsg(MsgType.BLOCKING);
|
||||
try
|
||||
{
|
||||
|
@ -402,6 +418,7 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
|
|||
@Override
|
||||
public Future<Void> sendStringByFuture(String text)
|
||||
{
|
||||
assertIsOpen();
|
||||
lockMsg(MsgType.ASYNC);
|
||||
try
|
||||
{
|
||||
|
@ -421,6 +438,7 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
|
|||
@Override
|
||||
public void sendString(String text, WriteCallback callback)
|
||||
{
|
||||
assertIsOpen();
|
||||
lockMsg(MsgType.ASYNC);
|
||||
try
|
||||
{
|
||||
|
@ -452,6 +470,7 @@ public class WebSocketRemoteEndpoint implements RemoteEndpoint
|
|||
@Override
|
||||
public void flush() throws IOException
|
||||
{
|
||||
assertIsOpen();
|
||||
lockMsg(MsgType.ASYNC);
|
||||
try (WriteBlocker b = blocker.acquireWriteBlocker())
|
||||
{
|
||||
|
|
|
@ -66,15 +66,13 @@ import org.eclipse.jetty.websocket.common.frames.CloseFrame;
|
|||
import org.eclipse.jetty.websocket.common.function.CommonEndpointFunctions;
|
||||
import org.eclipse.jetty.websocket.common.function.EndpointFunctions;
|
||||
import org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection;
|
||||
import org.eclipse.jetty.websocket.common.io.IOState;
|
||||
import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
|
||||
import org.eclipse.jetty.websocket.common.message.MessageSink;
|
||||
import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope;
|
||||
import org.eclipse.jetty.websocket.common.scopes.WebSocketSessionScope;
|
||||
|
||||
@ManagedObject("A Jetty WebSocket Session")
|
||||
public class WebSocketSession extends ContainerLifeCycle implements Session, RemoteEndpointFactory,
|
||||
WebSocketSessionScope, IncomingFrames, LogicalConnection.ErrorListener, Connection.Listener, ConnectionStateListener
|
||||
WebSocketSessionScope, IncomingFrames, LogicalConnection.Listener, Connection.Listener
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(WebSocketSession.class);
|
||||
private static final Logger LOG_OPEN = Log.getLogger(WebSocketSession.class.getName() + "_OPEN");
|
||||
|
@ -83,6 +81,8 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
|
|||
private final URI requestURI;
|
||||
private final LogicalConnection connection;
|
||||
private final Executor executor;
|
||||
private final AtomicConnectionState connectionState = new AtomicConnectionState();
|
||||
private final AtomicClose closeState = new AtomicClose();
|
||||
|
||||
// The websocket endpoint object itself
|
||||
private final Object endpoint;
|
||||
|
@ -116,7 +116,6 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
|
|||
this.connection = connection;
|
||||
this.executor = connection.getExecutor();
|
||||
this.outgoingHandler = connection;
|
||||
this.connection.getIOState().addListener(this);
|
||||
this.policy = connection.getPolicy();
|
||||
|
||||
addBean(this.connection);
|
||||
|
@ -127,6 +126,11 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
|
|||
return new CommonEndpointFunctions(endpoint, getPolicy(), this.executor);
|
||||
}
|
||||
|
||||
public void connect()
|
||||
{
|
||||
connectionState.onConnecting();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
|
@ -143,7 +147,17 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
|
|||
@Override
|
||||
public void close(int statusCode, String reason)
|
||||
{
|
||||
connection.close(statusCode, reason);
|
||||
if(connectionState.onClosing())
|
||||
{
|
||||
// This is the first CLOSE event
|
||||
if(closeState.onLocal())
|
||||
{
|
||||
// this is Local initiated.
|
||||
CloseInfo closeInfo = new CloseInfo(statusCode, reason);
|
||||
Frame closeFrame = closeInfo.asFrame();
|
||||
outgoingHandler.outgoingFrame(closeFrame, new FrameCallback.Adapter(), BatchMode.AUTO);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -152,10 +166,30 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
|
|||
@Override
|
||||
public void disconnect()
|
||||
{
|
||||
connection.disconnect();
|
||||
|
||||
// notify of harsh disconnect
|
||||
notifyClose(StatusCode.NO_CLOSE, "Harsh disconnect");
|
||||
if(connectionState.onClosed())
|
||||
{
|
||||
connection.disconnect();
|
||||
|
||||
// TODO: notify local endpoint onClose() ?
|
||||
// TODO: notifyClose(close.getStatusCode(), close.getReason());
|
||||
|
||||
try
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{}.onSessionClosed()", containerScope.getClass().getSimpleName());
|
||||
containerScope.onSessionClosed(this);
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
LOG.ignore(t);
|
||||
}
|
||||
|
||||
if (closeState.onLocal())
|
||||
{
|
||||
// notify local endpoint of harsh disconnect
|
||||
notifyClose(StatusCode.SHUTDOWN, "Harsh disconnect");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void dispatch(Runnable runnable)
|
||||
|
@ -272,6 +306,16 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
|
|||
{
|
||||
return connection;
|
||||
}
|
||||
|
||||
public AtomicConnectionState getConnectionState()
|
||||
{
|
||||
return connectionState;
|
||||
}
|
||||
|
||||
public AtomicClose getCloseState()
|
||||
{
|
||||
return closeState;
|
||||
}
|
||||
|
||||
@Override
|
||||
public WebSocketContainerScope getContainerScope()
|
||||
|
@ -339,9 +383,10 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
|
|||
{
|
||||
if (LOG_OPEN.isDebugEnabled())
|
||||
LOG_OPEN.debug("[{}] {}.getRemote()", getPolicy().getBehavior(), this.getClass().getSimpleName());
|
||||
ConnectionState state = connection.getIOState().getConnectionState();
|
||||
|
||||
AtomicConnectionState.State state = connectionState.get();
|
||||
|
||||
if ((state == ConnectionState.OPEN) || (state == ConnectionState.CONNECTED))
|
||||
if ((state == AtomicConnectionState.State.OPEN) || (state == AtomicConnectionState.State.CONNECTED))
|
||||
{
|
||||
return remote;
|
||||
}
|
||||
|
@ -397,7 +442,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
|
|||
try
|
||||
{
|
||||
Thread.currentThread().setContextClassLoader(classLoader);
|
||||
if (connection.getIOState().isInputAvailable())
|
||||
if (connectionState.get() == AtomicConnectionState.State.OPEN)
|
||||
{
|
||||
// For endpoints that want to see raw frames.
|
||||
// These are immutable.
|
||||
|
@ -413,7 +458,21 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
|
|||
CloseInfo close = new CloseInfo(closeframe, validate);
|
||||
|
||||
// process handshake
|
||||
getConnection().getIOState().onCloseRemote(close);
|
||||
if(connectionState.onClosing())
|
||||
{
|
||||
// we transitioned to CLOSING state
|
||||
if(closeState.onRemote())
|
||||
{
|
||||
// Remote initiated.
|
||||
// Send reply to remote
|
||||
close(close.getStatusCode(), close.getReason());
|
||||
}
|
||||
else
|
||||
{
|
||||
// Local initiated, this was the reply.
|
||||
disconnect();
|
||||
}
|
||||
}
|
||||
callback.succeed();
|
||||
|
||||
return;
|
||||
|
@ -499,11 +558,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
|
|||
@Override
|
||||
public boolean isOpen()
|
||||
{
|
||||
if (this.connection == null)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
return this.connection.isOpen();
|
||||
return this.connectionState.get() == AtomicConnectionState.State.OPEN;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -523,11 +578,19 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
|
|||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("notifyClose({},{})", statusCode, reason);
|
||||
LOG.debug("notifyClose({},{}) [{}]", statusCode, reason, getState());
|
||||
}
|
||||
endpointFunctions.onClose(new CloseInfo(statusCode, reason));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Error Event.
|
||||
* <p>
|
||||
* Can be seen from Session and Connection.
|
||||
* </p>
|
||||
*
|
||||
* @param t the raw cause
|
||||
*/
|
||||
@Override
|
||||
public void onError(Throwable t)
|
||||
{
|
||||
|
@ -584,61 +647,32 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
|
|||
close(statusCode, cause.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Connection Disconnect Event
|
||||
* @param connection the connection
|
||||
*/
|
||||
@Override
|
||||
public void onClosed(Connection connection)
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Connection Open Event
|
||||
* @param connection the connection
|
||||
*/
|
||||
@Override
|
||||
public void onOpened(Connection connection)
|
||||
{
|
||||
if (LOG_OPEN.isDebugEnabled())
|
||||
LOG_OPEN.debug("[{}] {}.onOpened()", getPolicy().getBehavior(), this.getClass().getSimpleName());
|
||||
connectionState.onConnecting();
|
||||
open();
|
||||
}
|
||||
|
||||
@SuppressWarnings("incomplete-switch")
|
||||
@Override
|
||||
public void onConnectionStateChange(ConnectionState state)
|
||||
{
|
||||
switch (state)
|
||||
{
|
||||
case CLOSED:
|
||||
IOState ioState = this.connection.getIOState();
|
||||
CloseInfo close = ioState.getCloseInfo();
|
||||
// confirmed close of local endpoint
|
||||
notifyClose(close.getStatusCode(), close.getReason());
|
||||
try
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{}.onSessionClosed()", containerScope.getClass().getSimpleName());
|
||||
containerScope.onSessionClosed(this);
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
LOG.ignore(t);
|
||||
}
|
||||
break;
|
||||
case CONNECTED:
|
||||
// notify session listeners
|
||||
try
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{}.onSessionOpened()", containerScope.getClass().getSimpleName());
|
||||
containerScope.onSessionOpened(this);
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
LOG.ignore(t);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public WebSocketRemoteEndpoint newRemoteEndpoint(LogicalConnection connection, OutgoingFrames outgoingFrames, BatchMode batchMode)
|
||||
{
|
||||
return new WebSocketRemoteEndpoint(connection,outgoingHandler,getBatchMode());
|
||||
return new WebSocketRemoteEndpoint(this,outgoingHandler,getBatchMode());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -658,27 +692,53 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
|
|||
try (ThreadClassLoaderScope scope = new ThreadClassLoaderScope(classLoader))
|
||||
{
|
||||
// Upgrade success
|
||||
connection.getIOState().onConnected();
|
||||
|
||||
// Connect remote
|
||||
remote = remoteEndpointFactory.newRemoteEndpoint(connection,outgoingHandler,getBatchMode());
|
||||
if (LOG_OPEN.isDebugEnabled())
|
||||
LOG_OPEN.debug("[{}] {}.open() remote={}", getPolicy().getBehavior(), this.getClass().getSimpleName(), remote);
|
||||
|
||||
// Open WebSocket
|
||||
endpointFunctions.onOpen(this);
|
||||
|
||||
// Open connection
|
||||
connection.getIOState().onOpened();
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
if(connectionState.onConnected())
|
||||
{
|
||||
LOG.debug("open -> {}", dump());
|
||||
// Connect remote
|
||||
remote = remoteEndpointFactory.newRemoteEndpoint(connection, outgoingHandler, getBatchMode());
|
||||
if (LOG_OPEN.isDebugEnabled())
|
||||
LOG_OPEN.debug("[{}] {}.open() remote={}", getPolicy().getBehavior(), this.getClass().getSimpleName(), remote);
|
||||
|
||||
// Open WebSocket
|
||||
endpointFunctions.onOpen(this);
|
||||
|
||||
// Open connection
|
||||
if(connectionState.onOpen())
|
||||
{
|
||||
// notify session listeners
|
||||
try
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{}.onSessionOpened()", containerScope.getClass().getSimpleName());
|
||||
containerScope.onSessionOpened(this);
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
LOG.ignore(t);
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("open -> {}", dump());
|
||||
}
|
||||
|
||||
if (openFuture != null)
|
||||
{
|
||||
openFuture.complete(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if(openFuture != null)
|
||||
else
|
||||
{
|
||||
openFuture.complete(this);
|
||||
IllegalStateException ise = new IllegalStateException("Unexpected state [" + connectionState.get() + "] when attempting to transition to CONNECTED");
|
||||
if (openFuture != null)
|
||||
{
|
||||
openFuture.completeExceptionally(ise);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw ise;
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Throwable t)
|
||||
|
@ -770,7 +830,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
|
|||
sb.append(',').append(getConnection().getClass().getSimpleName());
|
||||
if (getConnection() instanceof AbstractWebSocketConnection)
|
||||
{
|
||||
if(getConnection().getIOState().isOpen() && remote != null)
|
||||
if(isOpen() && remote != null)
|
||||
{
|
||||
sb.append(',').append(getRemoteAddress());
|
||||
if (getPolicy().getBehavior() == WebSocketBehavior.SERVER)
|
||||
|
|
|
@ -474,5 +474,11 @@ public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames
|
|||
LOG.debug("Exception while notifying failure of callback " + callback,x);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "ExtensionStack$Flusher[" + getState() + "]";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -109,4 +109,10 @@ public class ReadOnlyDelegatedFrame implements Frame
|
|||
{
|
||||
return delegate.isRsv3();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return String.format("%s[%s]", this.getClass().getSimpleName(), delegate.toString());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -111,10 +111,18 @@ public class CommonEndpointFunctions<T extends Session> extends AbstractLifeCycl
|
|||
@Override
|
||||
protected void doStart() throws Exception
|
||||
{
|
||||
LOG.info("Starting");
|
||||
discoverEndpointFunctions(this.endpoint);
|
||||
super.doStart();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() throws Exception
|
||||
{
|
||||
LOG.info("Stopping");
|
||||
super.doStop();
|
||||
}
|
||||
|
||||
protected void discoverEndpointFunctions(Object endpoint)
|
||||
{
|
||||
boolean supportAnnotations = true;
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.net.SocketTimeoutException;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
@ -40,47 +41,33 @@ import org.eclipse.jetty.util.log.Logger;
|
|||
import org.eclipse.jetty.util.thread.Scheduler;
|
||||
import org.eclipse.jetty.websocket.api.BatchMode;
|
||||
import org.eclipse.jetty.websocket.api.FrameCallback;
|
||||
import org.eclipse.jetty.websocket.api.StatusCode;
|
||||
import org.eclipse.jetty.websocket.api.SuspendToken;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
|
||||
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
|
||||
import org.eclipse.jetty.websocket.api.extensions.Frame;
|
||||
import org.eclipse.jetty.websocket.common.CloseInfo;
|
||||
import org.eclipse.jetty.websocket.common.ConnectionState;
|
||||
import org.eclipse.jetty.websocket.common.Generator;
|
||||
import org.eclipse.jetty.websocket.common.LogicalConnection;
|
||||
import org.eclipse.jetty.websocket.common.Parser;
|
||||
import org.eclipse.jetty.websocket.common.extensions.ExtensionStack;
|
||||
import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
|
||||
|
||||
/**
|
||||
* Provides the implementation of {@link LogicalConnection} within the framework of the new {@link org.eclipse.jetty.io.Connection} framework of {@code jetty-io}.
|
||||
*/
|
||||
public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, Connection.UpgradeTo, ConnectionStateListener, Dumpable, Parser.Handler
|
||||
public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, Connection.UpgradeTo, Dumpable, Parser.Handler
|
||||
{
|
||||
private class Flusher extends FrameFlusher
|
||||
{
|
||||
private Flusher(ByteBufferPool bufferPool, int bufferSize, Generator generator, EndPoint endpoint)
|
||||
private Flusher(int bufferSize, Generator generator, EndPoint endpoint)
|
||||
{
|
||||
super(bufferPool,generator,endpoint,bufferSize,8);
|
||||
super(generator,endpoint,bufferSize,8);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onFailure(Throwable x)
|
||||
{
|
||||
notifyError(x);
|
||||
|
||||
if (ioState.wasAbnormalClose())
|
||||
{
|
||||
LOG.ignore(x);
|
||||
return;
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Write flush failure",x);
|
||||
ioState.onWriteFailure(x);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -106,69 +93,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
}
|
||||
}
|
||||
|
||||
public class OnCloseLocalCallback implements WriteCallback
|
||||
{
|
||||
private final WriteCallback callback;
|
||||
private final CloseInfo close;
|
||||
|
||||
public OnCloseLocalCallback(WriteCallback callback, CloseInfo close)
|
||||
{
|
||||
this.callback = callback;
|
||||
this.close = close;
|
||||
}
|
||||
|
||||
public OnCloseLocalCallback(CloseInfo close)
|
||||
{
|
||||
this(null,close);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeFailed(Throwable x)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (callback != null)
|
||||
{
|
||||
callback.writeFailed(x);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
onLocalClose();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeSuccess()
|
||||
{
|
||||
try
|
||||
{
|
||||
if (callback != null)
|
||||
{
|
||||
callback.writeSuccess();
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
onLocalClose();
|
||||
}
|
||||
}
|
||||
|
||||
private void onLocalClose()
|
||||
{
|
||||
if (LOG_CLOSE.isDebugEnabled())
|
||||
LOG_CLOSE.debug("Local Close Confirmed {}",close);
|
||||
if (close.isAbnormal())
|
||||
{
|
||||
ioState.onAbnormalClose(close);
|
||||
}
|
||||
else
|
||||
{
|
||||
ioState.onCloseLocal(close);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static final Logger LOG = Log.getLogger(AbstractWebSocketConnection.class);
|
||||
private static final Logger LOG_OPEN = Log.getLogger(AbstractWebSocketConnection.class.getName() + ".OPEN");
|
||||
private static final Logger LOG_CLOSE = Log.getLogger(AbstractWebSocketConnection.class.getName() + ".CLOSE");
|
||||
|
@ -189,11 +113,10 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
private final FrameFlusher flusher;
|
||||
private final String id;
|
||||
private final ExtensionStack extensionStack;
|
||||
private LogicalConnection.ErrorListener errorListener;
|
||||
private final List<LogicalConnection.Listener> listeners = new CopyOnWriteArrayList<>();
|
||||
private List<ExtensionConfig> extensions;
|
||||
private ByteBuffer networkBuffer;
|
||||
private ByteBuffer prefillBuffer;
|
||||
private IOState ioState;
|
||||
|
||||
public AbstractWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool, ExtensionStack extensionStack)
|
||||
{
|
||||
|
@ -213,9 +136,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
this.scheduler = scheduler;
|
||||
this.extensions = new ArrayList<>();
|
||||
this.suspendToken = new AtomicBoolean(false);
|
||||
this.ioState = new IOState();
|
||||
this.ioState.addListener(this);
|
||||
this.flusher = new Flusher(bufferPool,policy.getOutputBufferSize(),generator,endp);
|
||||
this.flusher = new Flusher(policy.getOutputBufferSize(),generator,endp);
|
||||
this.setInputBufferSize(policy.getInputBufferSize());
|
||||
this.setMaxIdleTimeout(policy.getIdleTimeout());
|
||||
|
||||
|
@ -230,43 +151,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
return super.getExecutor();
|
||||
}
|
||||
|
||||
/**
|
||||
* Close without a close code or reason
|
||||
*/
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
if (LOG_CLOSE.isDebugEnabled())
|
||||
LOG_CLOSE.debug("close()");
|
||||
close(new CloseInfo());
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the connection.
|
||||
* <p> fillInterested();
|
||||
|
||||
* This can result in a close handshake over the network, or a simple local abnormal close
|
||||
*
|
||||
* @param statusCode
|
||||
* the WebSocket status code.
|
||||
* @param reason
|
||||
* the (optional) reason string. (null is allowed)
|
||||
* @see StatusCode
|
||||
*/
|
||||
@Override
|
||||
public void close(int statusCode, String reason)
|
||||
{
|
||||
if (LOG_CLOSE.isDebugEnabled())
|
||||
LOG_CLOSE.debug("close({},{})", statusCode, reason);
|
||||
close(new CloseInfo(statusCode, reason));
|
||||
}
|
||||
|
||||
private void close(CloseInfo closeInfo)
|
||||
{
|
||||
if (closed.compareAndSet(false, true))
|
||||
outgoingFrame(closeInfo.asFrame(), new OnCloseLocalCallback(closeInfo), BatchMode.OFF);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void disconnect()
|
||||
{
|
||||
|
@ -343,12 +227,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
return getEndPoint().getIdleTimeout();
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOState getIOState()
|
||||
{
|
||||
return ioState;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMaxIdleTimeout()
|
||||
{
|
||||
|
@ -393,63 +271,9 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} onClose()",behavior);
|
||||
super.onClose();
|
||||
ioState.onDisconnected();
|
||||
flusher.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onConnectionStateChange(ConnectionState state)
|
||||
{
|
||||
if (LOG_CLOSE.isDebugEnabled())
|
||||
LOG_CLOSE.debug("{} Connection State Change: {}",behavior,state);
|
||||
|
||||
switch (state)
|
||||
{
|
||||
case OPEN:
|
||||
if (BufferUtil.hasContent(prefillBuffer))
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("Parsing Upgrade prefill buffer ({} remaining)",prefillBuffer.remaining());
|
||||
}
|
||||
parser.parse(prefillBuffer);
|
||||
}
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("OPEN: normal fillInterested");
|
||||
}
|
||||
fillInterested();
|
||||
break;
|
||||
case CLOSED:
|
||||
if (LOG_CLOSE.isDebugEnabled())
|
||||
LOG_CLOSE.debug("CLOSED - wasAbnormalClose: {}", ioState.wasAbnormalClose());
|
||||
if (ioState.wasAbnormalClose())
|
||||
{
|
||||
// Fire out a close frame, indicating abnormal shutdown, then disconnect
|
||||
CloseInfo abnormal = new CloseInfo(StatusCode.SHUTDOWN,"Abnormal Close - " + ioState.getCloseInfo().getReason());
|
||||
outgoingFrame(abnormal.asFrame(),new OnDisconnectCallback(false),BatchMode.OFF);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Just disconnect
|
||||
this.disconnect(false);
|
||||
}
|
||||
break;
|
||||
case CLOSING:
|
||||
if (LOG_CLOSE.isDebugEnabled())
|
||||
LOG_CLOSE.debug("CLOSING - wasRemoteCloseInitiated: {}", ioState.wasRemoteCloseInitiated());
|
||||
// First occurrence of .onCloseLocal or .onCloseRemote use
|
||||
if (ioState.wasRemoteCloseInitiated())
|
||||
{
|
||||
CloseInfo close = ioState.getCloseInfo();
|
||||
// reply to close handshake from remote
|
||||
outgoingFrame(close.asFrame(),new OnCloseLocalCallback(new OnDisconnectCallback(true),close),BatchMode.OFF);
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean onFrame(Frame frame)
|
||||
{
|
||||
|
@ -504,31 +328,39 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
{
|
||||
return;
|
||||
}
|
||||
|
||||
if (networkBuffer.hasRemaining())
|
||||
|
||||
if (BufferUtil.hasContent(prefillBuffer))
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("Parsing Upgrade prefill buffer ({})", prefillBuffer.remaining(), BufferUtil.toDetailString(prefillBuffer));
|
||||
}
|
||||
if (!parser.parse(prefillBuffer)) return;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (networkBuffer.hasRemaining())
|
||||
{
|
||||
if (!parser.parse(networkBuffer)) return;
|
||||
}
|
||||
|
||||
int filled = getEndPoint().fill(networkBuffer);
|
||||
|
||||
if (filled < 0)
|
||||
{
|
||||
bufferPool.release(networkBuffer);
|
||||
return;
|
||||
}
|
||||
|
||||
if (filled == 0)
|
||||
{
|
||||
bufferPool.release(networkBuffer);
|
||||
fillInterested();
|
||||
return;
|
||||
}
|
||||
|
||||
if (!parser.parse(networkBuffer)) return;
|
||||
}
|
||||
|
||||
// TODO: flip/fill?
|
||||
|
||||
int filled = getEndPoint().fill(networkBuffer);
|
||||
|
||||
if (filled < 0)
|
||||
{
|
||||
bufferPool.release(networkBuffer);
|
||||
// notifyError(new EOFException("Read EOF"));
|
||||
return;
|
||||
}
|
||||
|
||||
if (filled == 0)
|
||||
{
|
||||
bufferPool.release(networkBuffer);
|
||||
fillInterested();
|
||||
return;
|
||||
}
|
||||
|
||||
if (!parser.parse(networkBuffer)) return;
|
||||
}
|
||||
}
|
||||
catch (Throwable t)
|
||||
|
@ -554,23 +386,34 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
|
||||
private void notifyError(Throwable cause)
|
||||
{
|
||||
if(errorListener != null)
|
||||
if (listeners.isEmpty())
|
||||
{
|
||||
errorListener.onError(cause);
|
||||
LOG.warn("Unhandled Connection Error", cause);
|
||||
}
|
||||
else
|
||||
|
||||
for (LogicalConnection.Listener listener : listeners)
|
||||
{
|
||||
LOG.warn("notifyError() undefined", cause);
|
||||
try
|
||||
{
|
||||
listener.onError(cause);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
cause.addSuppressed(e);
|
||||
LOG.warn("Bad onError() call", cause);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Physical connection Open.
|
||||
*/
|
||||
@Override
|
||||
public void onOpen()
|
||||
{
|
||||
if(LOG_OPEN.isDebugEnabled())
|
||||
LOG_OPEN.debug("[{}] {}.onOpened()",behavior,this.getClass().getSimpleName());
|
||||
super.onOpen();
|
||||
this.ioState.onOpened();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -579,22 +422,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
@Override
|
||||
protected boolean onReadTimeout()
|
||||
{
|
||||
IOState state = getIOState();
|
||||
ConnectionState cstate = state.getConnectionState();
|
||||
if (LOG_CLOSE.isDebugEnabled())
|
||||
LOG_CLOSE.debug("{} Read Timeout - {}",behavior,cstate);
|
||||
|
||||
if (cstate == ConnectionState.CLOSED)
|
||||
{
|
||||
if (LOG_CLOSE.isDebugEnabled())
|
||||
LOG_CLOSE.debug("onReadTimeout - Connection Already CLOSED");
|
||||
// close already completed, extra timeouts not relevant
|
||||
// allow underlying connection and endpoint to disconnect on its own
|
||||
return true;
|
||||
}
|
||||
|
||||
notifyError(new SocketTimeoutException("Timeout on Read"));
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -619,9 +447,16 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
fillAndParse();
|
||||
}
|
||||
|
||||
public void setErrorListener(ErrorListener errorListener)
|
||||
public boolean addListener(LogicalConnection.Listener listener)
|
||||
{
|
||||
this.errorListener = errorListener;
|
||||
super.addListener(listener);
|
||||
return this.listeners.add(listener);
|
||||
}
|
||||
|
||||
public boolean removeListener(LogicalConnection.Listener listener)
|
||||
{
|
||||
super.removeListener(listener);
|
||||
return this.listeners.remove(listener);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -678,10 +513,12 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
@Override
|
||||
public String toConnectionString()
|
||||
{
|
||||
return String.format("%s@%x[ios=%s,f=%s,g=%s,p=%s]",
|
||||
return String.format("%s@%x[f=%s,g=%s,p=%s]",
|
||||
getClass().getSimpleName(),
|
||||
hashCode(),
|
||||
ioState,flusher,generator,parser);
|
||||
flusher,
|
||||
generator,
|
||||
parser);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -27,7 +27,6 @@ import java.util.List;
|
|||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.IteratingCallback;
|
||||
|
@ -62,7 +61,8 @@ public class FrameFlusher
|
|||
{
|
||||
if (aggregate == null)
|
||||
{
|
||||
aggregate = bufferPool.acquire(bufferSize,true);
|
||||
aggregate = generator.getBufferPool().acquire(bufferSize,true);
|
||||
BufferUtil.clearToFill(aggregate);
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("{} acquired aggregate buffer {}",FrameFlusher.this,aggregate);
|
||||
|
@ -221,7 +221,7 @@ public class FrameFlusher
|
|||
{
|
||||
if ((aggregate != null) && BufferUtil.isEmpty(aggregate))
|
||||
{
|
||||
bufferPool.release(aggregate);
|
||||
generator.getBufferPool().release(aggregate);
|
||||
aggregate = null;
|
||||
}
|
||||
}
|
||||
|
@ -288,7 +288,6 @@ public class FrameFlusher
|
|||
|
||||
public static final BinaryFrame FLUSH_FRAME = new BinaryFrame();
|
||||
private static final Logger LOG = Log.getLogger(FrameFlusher.class);
|
||||
private final ByteBufferPool bufferPool;
|
||||
private final EndPoint endpoint;
|
||||
private final int bufferSize;
|
||||
private final Generator generator;
|
||||
|
@ -299,9 +298,8 @@ public class FrameFlusher
|
|||
private final AtomicBoolean closed = new AtomicBoolean();
|
||||
private volatile Throwable failure;
|
||||
|
||||
public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint, int bufferSize, int maxGather)
|
||||
public FrameFlusher(Generator generator, EndPoint endpoint, int bufferSize, int maxGather)
|
||||
{
|
||||
this.bufferPool = bufferPool;
|
||||
this.endpoint = endpoint;
|
||||
this.bufferSize = bufferSize;
|
||||
this.generator = Objects.requireNonNull(generator);
|
||||
|
@ -336,16 +334,29 @@ public class FrameFlusher
|
|||
{
|
||||
if (closed.get())
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("{} discarding/closed {}",this,frame);
|
||||
}
|
||||
notifyCallbackFailure(callback,new EOFException("Connection has been closed locally"));
|
||||
return;
|
||||
}
|
||||
if (flusher.isFailed())
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("{} discarding/failed {}",this,frame);
|
||||
}
|
||||
notifyCallbackFailure(callback,failure);
|
||||
return;
|
||||
}
|
||||
|
||||
FrameEntry entry = new FrameEntry(frame,callback,batchMode);
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("{} queued {}",this,entry);
|
||||
}
|
||||
|
||||
synchronized (lock)
|
||||
{
|
||||
|
@ -374,11 +385,6 @@ public class FrameFlusher
|
|||
}
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("{} queued {}",this,entry);
|
||||
}
|
||||
|
||||
flusher.iterate();
|
||||
}
|
||||
|
||||
|
|
|
@ -28,18 +28,18 @@ import org.eclipse.jetty.util.StringUtil;
|
|||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.api.StatusCode;
|
||||
import org.eclipse.jetty.websocket.common.AtomicConnectionState;
|
||||
import org.eclipse.jetty.websocket.common.CloseInfo;
|
||||
import org.eclipse.jetty.websocket.common.ConnectionState;
|
||||
|
||||
/**
|
||||
* Simple state tracker for Input / Output and {@link ConnectionState}.
|
||||
* Simple state tracker for Input / Output and {@link AtomicConnectionState.State}.
|
||||
* <p>
|
||||
* Use the various known .on*() methods to trigger a state change.
|
||||
* <ul>
|
||||
* <li>{@link #onOpened()} - connection has been opened</li>
|
||||
* </ul>
|
||||
*/
|
||||
public class IOState
|
||||
public class XIOState
|
||||
{
|
||||
/**
|
||||
* The source of a close handshake. (ie: who initiated it).
|
||||
|
@ -58,11 +58,11 @@ public class IOState
|
|||
|
||||
public static interface ConnectionStateListener
|
||||
{
|
||||
public void onConnectionStateChange(ConnectionState state);
|
||||
public void onConnectionStateChange(AtomicConnectionState.State state);
|
||||
}
|
||||
|
||||
private static final Logger LOG = Log.getLogger(IOState.class);
|
||||
private ConnectionState state;
|
||||
private static final Logger LOG = Log.getLogger(XIOState.class);
|
||||
private AtomicConnectionState.State state;
|
||||
private final List<ConnectionStateListener> listeners = new CopyOnWriteArrayList<>();
|
||||
|
||||
/**
|
||||
|
@ -100,11 +100,11 @@ public class IOState
|
|||
private boolean cleanClose;
|
||||
|
||||
/**
|
||||
* Create a new IOState, initialized to {@link ConnectionState#CONNECTING}
|
||||
* Create a new IOState, initialized to {@link AtomicConnectionState.State#CONNECTING}
|
||||
*/
|
||||
public IOState()
|
||||
public XIOState()
|
||||
{
|
||||
this.state = ConnectionState.CONNECTING;
|
||||
this.state = AtomicConnectionState.State.CONNECTING;
|
||||
this.inputAvailable = false;
|
||||
this.outputAvailable = false;
|
||||
this.closeHandshakeSource = CloseHandshakeSource.NONE;
|
||||
|
@ -143,7 +143,7 @@ public class IOState
|
|||
return closeInfo;
|
||||
}
|
||||
|
||||
public ConnectionState getConnectionState()
|
||||
public AtomicConnectionState.State getConnectionState()
|
||||
{
|
||||
return state;
|
||||
}
|
||||
|
@ -152,7 +152,7 @@ public class IOState
|
|||
{
|
||||
synchronized (this)
|
||||
{
|
||||
return (state == ConnectionState.CLOSED);
|
||||
return (state == AtomicConnectionState.State.CLOSED);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -171,7 +171,7 @@ public class IOState
|
|||
return outputAvailable;
|
||||
}
|
||||
|
||||
private void notifyStateListeners(ConnectionState state)
|
||||
private void notifyStateListeners(AtomicConnectionState.State state)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Notify State Listeners: {}",state);
|
||||
|
@ -195,21 +195,21 @@ public class IOState
|
|||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("onAbnormalClose({})",close);
|
||||
ConnectionState event = null;
|
||||
AtomicConnectionState.State event = null;
|
||||
synchronized (this)
|
||||
{
|
||||
if (this.state == ConnectionState.CLOSED)
|
||||
if (this.state == AtomicConnectionState.State.CLOSED)
|
||||
{
|
||||
// already closed
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.state == ConnectionState.OPEN)
|
||||
if (this.state == AtomicConnectionState.State.OPEN)
|
||||
{
|
||||
this.cleanClose = false;
|
||||
}
|
||||
|
||||
this.state = ConnectionState.CLOSED;
|
||||
this.state = AtomicConnectionState.State.CLOSED;
|
||||
finalClose.compareAndSet(null,close);
|
||||
this.inputAvailable = false;
|
||||
this.outputAvailable = false;
|
||||
|
@ -228,10 +228,10 @@ public class IOState
|
|||
boolean open = false;
|
||||
synchronized (this)
|
||||
{
|
||||
ConnectionState initialState = this.state;
|
||||
AtomicConnectionState.State initialState = this.state;
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("onCloseLocal({}) : {}", closeInfo, initialState);
|
||||
if (initialState == ConnectionState.CLOSED)
|
||||
if (initialState == AtomicConnectionState.State.CLOSED)
|
||||
{
|
||||
// already closed
|
||||
if (LOG.isDebugEnabled())
|
||||
|
@ -239,7 +239,7 @@ public class IOState
|
|||
return;
|
||||
}
|
||||
|
||||
if (initialState == ConnectionState.CONNECTED)
|
||||
if (initialState == AtomicConnectionState.State.CONNECTED)
|
||||
{
|
||||
// fast close. a local close request from end-user onConnect/onOpen method
|
||||
if (LOG.isDebugEnabled())
|
||||
|
@ -265,8 +265,8 @@ public class IOState
|
|||
|
||||
private void closeLocal(CloseInfo closeInfo)
|
||||
{
|
||||
ConnectionState event = null;
|
||||
ConnectionState abnormalEvent = null;
|
||||
AtomicConnectionState.State event = null;
|
||||
AtomicConnectionState.State abnormalEvent = null;
|
||||
synchronized (this)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
|
@ -287,20 +287,20 @@ public class IOState
|
|||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Close Handshake satisfied, disconnecting");
|
||||
cleanClose = true;
|
||||
this.state = ConnectionState.CLOSED;
|
||||
this.state = AtomicConnectionState.State.CLOSED;
|
||||
finalClose.compareAndSet(null,closeInfo);
|
||||
event = this.state;
|
||||
}
|
||||
else if (this.state == ConnectionState.OPEN)
|
||||
else if (this.state == AtomicConnectionState.State.OPEN)
|
||||
{
|
||||
// We are now entering CLOSING (or half-closed).
|
||||
this.state = ConnectionState.CLOSING;
|
||||
this.state = AtomicConnectionState.State.CLOSING;
|
||||
event = this.state;
|
||||
|
||||
// If abnormal, we don't expect an answer.
|
||||
if (closeInfo.isAbnormal())
|
||||
{
|
||||
abnormalEvent = ConnectionState.CLOSED;
|
||||
abnormalEvent = AtomicConnectionState.State.CLOSED;
|
||||
finalClose.compareAndSet(null,closeInfo);
|
||||
cleanClose = false;
|
||||
outputAvailable = false;
|
||||
|
@ -329,10 +329,10 @@ public class IOState
|
|||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("onCloseRemote({})", closeInfo);
|
||||
ConnectionState event = null;
|
||||
AtomicConnectionState.State event = null;
|
||||
synchronized (this)
|
||||
{
|
||||
if (this.state == ConnectionState.CLOSED)
|
||||
if (this.state == AtomicConnectionState.State.CLOSED)
|
||||
{
|
||||
// already closed
|
||||
return;
|
||||
|
@ -355,14 +355,14 @@ public class IOState
|
|||
{
|
||||
LOG.debug("Close Handshake satisfied, disconnecting");
|
||||
cleanClose = true;
|
||||
state = ConnectionState.CLOSED;
|
||||
state = AtomicConnectionState.State.CLOSED;
|
||||
finalClose.compareAndSet(null,closeInfo);
|
||||
event = this.state;
|
||||
}
|
||||
else if (this.state == ConnectionState.OPEN)
|
||||
else if (this.state == AtomicConnectionState.State.OPEN)
|
||||
{
|
||||
// We are now entering CLOSING (or half-closed)
|
||||
this.state = ConnectionState.CLOSING;
|
||||
this.state = AtomicConnectionState.State.CLOSING;
|
||||
event = this.state;
|
||||
}
|
||||
}
|
||||
|
@ -377,20 +377,20 @@ public class IOState
|
|||
/**
|
||||
* WebSocket has successfully upgraded, but the end-user onOpen call hasn't run yet.
|
||||
* <p>
|
||||
* This is an intermediate state between the RFC's {@link ConnectionState#CONNECTING} and {@link ConnectionState#OPEN}
|
||||
* This is an intermediate state between the RFC's {@link AtomicConnectionState.State#CONNECTING} and {@link AtomicConnectionState.State#OPEN}
|
||||
*/
|
||||
public void onConnected()
|
||||
{
|
||||
ConnectionState event = null;
|
||||
AtomicConnectionState.State event = null;
|
||||
synchronized (this)
|
||||
{
|
||||
if (this.state != ConnectionState.CONNECTING)
|
||||
if (this.state != AtomicConnectionState.State.CONNECTING)
|
||||
{
|
||||
LOG.debug("Unable to set to connected, not in CONNECTING state: {}",this.state);
|
||||
return;
|
||||
}
|
||||
|
||||
this.state = ConnectionState.CONNECTED;
|
||||
this.state = AtomicConnectionState.State.CONNECTED;
|
||||
inputAvailable = false; // cannot read (yet)
|
||||
outputAvailable = true; // write allowed
|
||||
event = this.state;
|
||||
|
@ -403,11 +403,11 @@ public class IOState
|
|||
*/
|
||||
public void onFailedUpgrade()
|
||||
{
|
||||
assert (this.state == ConnectionState.CONNECTING);
|
||||
ConnectionState event = null;
|
||||
assert (this.state == AtomicConnectionState.State.CONNECTING);
|
||||
AtomicConnectionState.State event = null;
|
||||
synchronized (this)
|
||||
{
|
||||
this.state = ConnectionState.CLOSED;
|
||||
this.state = AtomicConnectionState.State.CLOSED;
|
||||
cleanClose = false;
|
||||
inputAvailable = false;
|
||||
outputAvailable = false;
|
||||
|
@ -423,23 +423,23 @@ public class IOState
|
|||
{
|
||||
if(LOG.isDebugEnabled())
|
||||
LOG.debug("onOpened()");
|
||||
|
||||
ConnectionState event = null;
|
||||
|
||||
AtomicConnectionState.State event = null;
|
||||
synchronized (this)
|
||||
{
|
||||
if (this.state == ConnectionState.OPEN)
|
||||
if (this.state == AtomicConnectionState.State.OPEN)
|
||||
{
|
||||
// already opened
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.state != ConnectionState.CONNECTED)
|
||||
if (this.state != AtomicConnectionState.State.CONNECTED)
|
||||
{
|
||||
LOG.debug("Unable to open, not in CONNECTED state: {}",this.state);
|
||||
return;
|
||||
}
|
||||
|
||||
this.state = ConnectionState.OPEN;
|
||||
this.state = AtomicConnectionState.State.OPEN;
|
||||
this.inputAvailable = true;
|
||||
this.outputAvailable = true;
|
||||
event = this.state;
|
||||
|
@ -455,10 +455,10 @@ public class IOState
|
|||
*/
|
||||
public void onWriteFailure(Throwable t)
|
||||
{
|
||||
ConnectionState event = null;
|
||||
AtomicConnectionState.State event = null;
|
||||
synchronized (this)
|
||||
{
|
||||
if (this.state == ConnectionState.CLOSED)
|
||||
if (this.state == AtomicConnectionState.State.CLOSED)
|
||||
{
|
||||
// already closed
|
||||
return;
|
||||
|
@ -488,7 +488,7 @@ public class IOState
|
|||
finalClose.compareAndSet(null,close);
|
||||
|
||||
this.cleanClose = false;
|
||||
this.state = ConnectionState.CLOSED;
|
||||
this.state = AtomicConnectionState.State.CLOSED;
|
||||
this.inputAvailable = false;
|
||||
this.outputAvailable = false;
|
||||
this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
|
||||
|
@ -499,10 +499,10 @@ public class IOState
|
|||
|
||||
public void onDisconnected()
|
||||
{
|
||||
ConnectionState event = null;
|
||||
AtomicConnectionState.State event = null;
|
||||
synchronized (this)
|
||||
{
|
||||
if (this.state == ConnectionState.CLOSED)
|
||||
if (this.state == AtomicConnectionState.State.CLOSED)
|
||||
{
|
||||
// already closed
|
||||
return;
|
||||
|
@ -511,7 +511,7 @@ public class IOState
|
|||
CloseInfo close = new CloseInfo(StatusCode.ABNORMAL,"Disconnected");
|
||||
|
||||
this.cleanClose = false;
|
||||
this.state = ConnectionState.CLOSED;
|
||||
this.state = AtomicConnectionState.State.CLOSED;
|
||||
this.closeInfo = close;
|
||||
this.inputAvailable = false;
|
||||
this.outputAvailable = false;
|
||||
|
@ -539,7 +539,7 @@ public class IOState
|
|||
str.append('!');
|
||||
}
|
||||
str.append("out");
|
||||
if ((state == ConnectionState.CLOSED) || (state == ConnectionState.CLOSING))
|
||||
if ((state == AtomicConnectionState.State.CLOSED) || (state == AtomicConnectionState.State.CLOSING))
|
||||
{
|
||||
CloseInfo ci = finalClose.get();
|
||||
if (ci != null)
|
|
@ -0,0 +1,56 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.common.util;
|
||||
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.eclipse.jetty.util.component.LifeCycle;
|
||||
|
||||
/**
|
||||
* Simple {@link AutoCloseable} to allow Jetty {@link LifeCycle} components to
|
||||
* be managed using {@code try-with-resources} techniques.
|
||||
* <p>
|
||||
* {@link LifeCycle#start()} occurs at constructor.
|
||||
* {@link LifeCycle#stop()} occurs at {@link #close()}.
|
||||
* </p>
|
||||
*
|
||||
* @param <T> the {@link LifeCycle} to have resource managed
|
||||
*/
|
||||
public class LifeCycleScope<T extends LifeCycle> implements AutoCloseable, Supplier<T>
|
||||
{
|
||||
private final T lifecycle;
|
||||
|
||||
public LifeCycleScope(T lifecycle) throws Exception
|
||||
{
|
||||
this.lifecycle = lifecycle;
|
||||
this.lifecycle.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception
|
||||
{
|
||||
this.lifecycle.stop();
|
||||
}
|
||||
|
||||
@Override
|
||||
public T get()
|
||||
{
|
||||
return this.lifecycle;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,55 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.common;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
public class AtomicCloseTest
|
||||
{
|
||||
@Test
|
||||
public void testCloseLocalNormal()
|
||||
{
|
||||
AtomicClose state = new AtomicClose();
|
||||
|
||||
assertThat("State", state.get(), is(AtomicClose.State.NONE));
|
||||
assertThat("Local 1st", state.onLocal(), is(true));
|
||||
assertThat("State", state.get(), is(AtomicClose.State.LOCAL));
|
||||
assertThat("Local 2nd", state.onLocal(), is(false));
|
||||
assertThat("State", state.get(), is(AtomicClose.State.LOCAL));
|
||||
assertThat("Remote 1st", state.onRemote(), is(false));
|
||||
assertThat("State", state.get(), is(AtomicClose.State.LOCAL));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCloseRemoteNormal()
|
||||
{
|
||||
AtomicClose state = new AtomicClose();
|
||||
|
||||
assertThat("State", state.get(), is(AtomicClose.State.NONE));
|
||||
assertThat("Remote 1st", state.onRemote(), is(true));
|
||||
assertThat("State", state.get(), is(AtomicClose.State.REMOTE));
|
||||
assertThat("Local 1st", state.onRemote(), is(false));
|
||||
assertThat("State", state.get(), is(AtomicClose.State.REMOTE));
|
||||
assertThat("Remote 2nd", state.onRemote(), is(false));
|
||||
assertThat("State", state.get(), is(AtomicClose.State.REMOTE));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.common;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
public class AtomicConnectionStateTest
|
||||
{
|
||||
@Test
|
||||
public void testNormalFlow()
|
||||
{
|
||||
AtomicConnectionState state = new AtomicConnectionState();
|
||||
assertThat("Connecting", state.onConnecting(), is(true));
|
||||
assertThat("Connected", state.onConnected(), is(true));
|
||||
assertThat("Open", state.onOpen(), is(true));
|
||||
assertThat("Closing", state.onClosing(), is(true));
|
||||
assertThat("Closed", state.onClosed(), is(true));
|
||||
}
|
||||
}
|
|
@ -19,6 +19,7 @@
|
|||
package org.eclipse.jetty.websocket.common;
|
||||
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
|
@ -39,7 +40,6 @@ import org.eclipse.jetty.websocket.common.test.IncomingFramesCapture;
|
|||
import org.eclipse.jetty.websocket.common.test.UnitGenerator;
|
||||
import org.eclipse.jetty.websocket.common.test.UnitParser;
|
||||
import org.eclipse.jetty.websocket.common.util.Hex;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
@ -203,7 +203,7 @@ public class ParserTest
|
|||
|
||||
parser.parse(buf);
|
||||
|
||||
Assert.assertThat("Frame Count",capture.getFrames().size(),is(0));
|
||||
assertThat("Frame Count",capture.getFrames().size(),is(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -233,19 +233,19 @@ public class ParserTest
|
|||
ByteBuffer window = networkBytes.slice();
|
||||
int windowSize = Math.min(window.remaining(),4096);
|
||||
window.limit(windowSize);
|
||||
parser.parse(window);
|
||||
assertThat(parser.parse(window), is(true));
|
||||
networkBytes.position(networkBytes.position() + windowSize);
|
||||
}
|
||||
|
||||
Assert.assertThat("Frame Count",capture.getFrames().size(),is(2));
|
||||
assertThat("Frame Count",capture.getFrames().size(),is(2));
|
||||
WebSocketFrame frame = capture.getFrames().poll();
|
||||
Assert.assertThat("Frame[0].opcode",frame.getOpCode(),is(OpCode.TEXT));
|
||||
assertThat("Frame[0].opcode",frame.getOpCode(),is(OpCode.TEXT));
|
||||
ByteBuffer actualPayload = frame.getPayload();
|
||||
Assert.assertThat("Frame[0].payload.length",actualPayload.remaining(),is(payload.length));
|
||||
assertThat("Frame[0].payload.length",actualPayload.remaining(),is(payload.length));
|
||||
// Should be all '*' characters (if masking is correct)
|
||||
for (int i = actualPayload.position(); i < actualPayload.remaining(); i++)
|
||||
{
|
||||
Assert.assertThat("Frame[0].payload[i]",actualPayload.get(i),is((byte)'*'));
|
||||
assertThat("Frame[0].payload[i]",actualPayload.get(i),is((byte)'*'));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.eclipse.jetty.websocket.common;
|
||||
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
|
@ -28,7 +29,6 @@ import org.eclipse.jetty.websocket.api.WebSocketPolicy;
|
|||
import org.eclipse.jetty.websocket.api.extensions.Frame;
|
||||
import org.eclipse.jetty.websocket.common.test.IncomingFramesCapture;
|
||||
import org.eclipse.jetty.websocket.common.test.UnitParser;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
|
@ -69,10 +69,10 @@ public class RFC6455ExamplesParserTest
|
|||
|
||||
WebSocketFrame txt = capture.getFrames().poll();
|
||||
String actual = BufferUtil.toUTF8String(txt.getPayload());
|
||||
Assert.assertThat("TextFrame[0].data",actual,is("Hel"));
|
||||
assertThat("TextFrame[0].data",actual,is("Hel"));
|
||||
txt = capture.getFrames().poll();
|
||||
actual = BufferUtil.toUTF8String(txt.getPayload());
|
||||
Assert.assertThat("TextFrame[1].data",actual,is("lo"));
|
||||
assertThat("TextFrame[1].data",actual,is("lo"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -94,7 +94,7 @@ public class RFC6455ExamplesParserTest
|
|||
|
||||
WebSocketFrame pong = capture.getFrames().poll();
|
||||
String actual = BufferUtil.toUTF8String(pong.getPayload());
|
||||
Assert.assertThat("PongFrame.payload",actual,is("Hello"));
|
||||
assertThat("PongFrame.payload",actual,is("Hello"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -116,7 +116,7 @@ public class RFC6455ExamplesParserTest
|
|||
|
||||
WebSocketFrame txt = capture.getFrames().poll();
|
||||
String actual = BufferUtil.toUTF8String(txt.getPayload());
|
||||
Assert.assertThat("TextFrame.payload",actual,is("Hello"));
|
||||
assertThat("TextFrame.payload",actual,is("Hello"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -145,14 +145,14 @@ public class RFC6455ExamplesParserTest
|
|||
|
||||
Frame bin = capture.getFrames().poll();
|
||||
|
||||
Assert.assertThat("BinaryFrame.payloadLength",bin.getPayloadLength(),is(dataSize));
|
||||
assertThat("BinaryFrame.payloadLength",bin.getPayloadLength(),is(dataSize));
|
||||
|
||||
ByteBuffer data = bin.getPayload();
|
||||
Assert.assertThat("BinaryFrame.payload.length",data.remaining(),is(dataSize));
|
||||
assertThat("BinaryFrame.payload.length",data.remaining(),is(dataSize));
|
||||
|
||||
for (int i = 0; i < dataSize; i++)
|
||||
{
|
||||
Assert.assertThat("BinaryFrame.payload[" + i + "]",data.get(i),is((byte)0x44));
|
||||
assertThat("BinaryFrame.payload[" + i + "]",data.get(i),is((byte)0x44));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -176,19 +176,19 @@ public class RFC6455ExamplesParserTest
|
|||
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.CLIENT);
|
||||
IncomingFramesCapture capture = new IncomingFramesCapture();
|
||||
UnitParser parser = new UnitParser(policy,capture);
|
||||
parser.parse(buf);
|
||||
assertThat(parser.parse(buf), is(true));
|
||||
|
||||
capture.assertHasFrame(OpCode.BINARY,1);
|
||||
|
||||
Frame bin = capture.getFrames().poll();
|
||||
|
||||
Assert.assertThat("BinaryFrame.payloadLength",bin.getPayloadLength(),is(dataSize));
|
||||
assertThat("BinaryFrame.payloadLength",bin.getPayloadLength(),is(dataSize));
|
||||
ByteBuffer data = bin.getPayload();
|
||||
Assert.assertThat("BinaryFrame.payload.length",data.remaining(),is(dataSize));
|
||||
assertThat("BinaryFrame.payload.length",data.remaining(),is(dataSize));
|
||||
|
||||
for (int i = 0; i < dataSize; i++)
|
||||
{
|
||||
Assert.assertThat("BinaryFrame.payload[" + i + "]",data.get(i),is((byte)0x77));
|
||||
assertThat("BinaryFrame.payload[" + i + "]",data.get(i),is((byte)0x77));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -211,7 +211,7 @@ public class RFC6455ExamplesParserTest
|
|||
|
||||
WebSocketFrame ping = capture.getFrames().poll();
|
||||
String actual = BufferUtil.toUTF8String(ping.getPayload());
|
||||
Assert.assertThat("PingFrame.payload",actual,is("Hello"));
|
||||
assertThat("PingFrame.payload",actual,is("Hello"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -233,6 +233,6 @@ public class RFC6455ExamplesParserTest
|
|||
|
||||
WebSocketFrame txt = capture.getFrames().poll();
|
||||
String actual = BufferUtil.toUTF8String(txt.getPayload());
|
||||
Assert.assertThat("TextFrame.payload",actual,is("Hello"));
|
||||
assertThat("TextFrame.payload",actual,is("Hello"));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,12 +20,15 @@ package org.eclipse.jetty.websocket.common;
|
|||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.eclipse.jetty.websocket.common.io.LocalWebSocketConnection;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
|
||||
import org.eclipse.jetty.websocket.common.io.LocalWebSocketSession;
|
||||
import org.eclipse.jetty.websocket.common.scopes.SimpleContainerScope;
|
||||
import org.eclipse.jetty.websocket.common.test.DummySocket;
|
||||
import org.eclipse.jetty.websocket.common.test.LeakTrackingBufferPoolRule;
|
||||
import org.eclipse.jetty.websocket.common.test.OutgoingFramesCapture;
|
||||
import org.eclipse.jetty.websocket.common.util.LifeCycleScope;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
|
@ -40,52 +43,59 @@ public class WebSocketRemoteEndpointTest
|
|||
public LeakTrackingBufferPoolRule bufferPool = new LeakTrackingBufferPoolRule("WebSocketRemoteEndpoint");
|
||||
|
||||
@Test
|
||||
public void testTextBinaryText() throws IOException
|
||||
public void testTextBinaryText() throws Exception
|
||||
{
|
||||
LocalWebSocketConnection conn = new LocalWebSocketConnection(testname,bufferPool);
|
||||
SimpleContainerScope container = new SimpleContainerScope(WebSocketPolicy.newServerPolicy());
|
||||
WebSocketSession session = new LocalWebSocketSession(container, testname, new DummySocket());
|
||||
OutgoingFramesCapture outgoing = new OutgoingFramesCapture();
|
||||
WebSocketRemoteEndpoint remote = new WebSocketRemoteEndpoint(conn,outgoing);
|
||||
conn.connect();
|
||||
conn.open();
|
||||
|
||||
// Start text message
|
||||
remote.sendPartialString("Hello ",false);
|
||||
|
||||
try
|
||||
WebSocketRemoteEndpoint remote = new WebSocketRemoteEndpoint(session,outgoing);
|
||||
try(LifeCycleScope sessionScope = new LifeCycleScope(session))
|
||||
{
|
||||
// Attempt to start Binary Message
|
||||
ByteBuffer bytes = ByteBuffer.wrap(new byte[]
|
||||
{ 0, 1, 2 });
|
||||
remote.sendPartialBytes(bytes,false);
|
||||
Assert.fail("Expected " + IllegalStateException.class.getName());
|
||||
session.connect();
|
||||
session.open();
|
||||
|
||||
// Start text message
|
||||
remote.sendPartialString("Hello ", false);
|
||||
|
||||
try
|
||||
{
|
||||
// Attempt to start Binary Message
|
||||
ByteBuffer bytes = ByteBuffer.wrap(new byte[] {0, 1, 2});
|
||||
remote.sendPartialBytes(bytes, false);
|
||||
Assert.fail("Expected " + IllegalStateException.class.getName());
|
||||
}
|
||||
catch (IllegalStateException e)
|
||||
{
|
||||
// Expected path
|
||||
Assert.assertThat("Exception", e.getMessage(), containsString("Cannot send"));
|
||||
}
|
||||
|
||||
// End text message
|
||||
remote.sendPartialString("World!", true);
|
||||
}
|
||||
catch (IllegalStateException e)
|
||||
{
|
||||
// Expected path
|
||||
Assert.assertThat("Exception",e.getMessage(),containsString("Cannot send"));
|
||||
}
|
||||
|
||||
// End text message
|
||||
remote.sendPartialString("World!",true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTextPingText() throws IOException
|
||||
public void testTextPingText() throws Exception
|
||||
{
|
||||
LocalWebSocketConnection conn = new LocalWebSocketConnection(testname,bufferPool);
|
||||
SimpleContainerScope container = new SimpleContainerScope(WebSocketPolicy.newServerPolicy());
|
||||
WebSocketSession session = new LocalWebSocketSession(container, testname, new DummySocket());
|
||||
OutgoingFramesCapture outgoing = new OutgoingFramesCapture();
|
||||
WebSocketRemoteEndpoint remote = new WebSocketRemoteEndpoint(conn,outgoing);
|
||||
conn.connect();
|
||||
conn.open();
|
||||
|
||||
// Start text message
|
||||
remote.sendPartialString("Hello ",false);
|
||||
|
||||
// Attempt to send Ping Message
|
||||
remote.sendPing(ByteBuffer.wrap(new byte[]
|
||||
{ 0 }));
|
||||
|
||||
// End text message
|
||||
remote.sendPartialString("World!",true);
|
||||
WebSocketRemoteEndpoint remote = new WebSocketRemoteEndpoint(session,outgoing);
|
||||
try(LifeCycleScope sessionScope = new LifeCycleScope(session))
|
||||
{
|
||||
session.connect();
|
||||
session.open();
|
||||
|
||||
// Start text message
|
||||
remote.sendPartialString("Hello ", false);
|
||||
|
||||
// Attempt to send Ping Message
|
||||
remote.sendPing(ByteBuffer.wrap(new byte[]
|
||||
{0}));
|
||||
|
||||
// End text message
|
||||
remote.sendPartialString("World!", true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,245 +0,0 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.common.io;
|
||||
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
import java.util.LinkedList;
|
||||
|
||||
import org.eclipse.jetty.websocket.api.StatusCode;
|
||||
import org.eclipse.jetty.websocket.common.CloseInfo;
|
||||
import org.eclipse.jetty.websocket.common.ConnectionState;
|
||||
import org.junit.Test;
|
||||
|
||||
public class IOStateTest
|
||||
{
|
||||
public static class StateTracker implements IOState.ConnectionStateListener
|
||||
{
|
||||
private LinkedList<ConnectionState> transitions = new LinkedList<>();
|
||||
|
||||
public void assertTransitions(ConnectionState ...states)
|
||||
{
|
||||
assertThat("Transitions.count",transitions.size(),is(states.length));
|
||||
if (states.length > 0)
|
||||
{
|
||||
int len = states.length;
|
||||
for (int i = 0; i < len; i++)
|
||||
{
|
||||
assertThat("Transitions[" + i + "]",transitions.get(i),is(states[i]));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public LinkedList<ConnectionState> getTransitions()
|
||||
{
|
||||
return transitions;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onConnectionStateChange(ConnectionState state)
|
||||
{
|
||||
transitions.add(state);
|
||||
}
|
||||
}
|
||||
|
||||
private void assertCleanClose(IOState state, boolean expected)
|
||||
{
|
||||
assertThat("State.cleanClose",state.wasCleanClose(),is(expected));
|
||||
}
|
||||
|
||||
private void assertInputAvailable(IOState state, boolean available)
|
||||
{
|
||||
assertThat("State.inputAvailable",state.isInputAvailable(),is(available));
|
||||
}
|
||||
|
||||
private void assertLocalInitiated(IOState state, boolean expected)
|
||||
{
|
||||
assertThat("State.localCloseInitiated",state.wasLocalCloseInitiated(),is(expected));
|
||||
}
|
||||
|
||||
private void assertOutputAvailable(IOState state, boolean available)
|
||||
{
|
||||
assertThat("State.outputAvailable",state.isOutputAvailable(),is(available));
|
||||
}
|
||||
|
||||
private void assertRemoteInitiated(IOState state, boolean expected)
|
||||
{
|
||||
assertThat("State.remoteCloseInitiated",state.wasRemoteCloseInitiated(),is(expected));
|
||||
}
|
||||
|
||||
private void assertState(IOState state, ConnectionState expectedState)
|
||||
{
|
||||
assertThat("State",state.getConnectionState(),is(expectedState));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConnectAbnormalClose()
|
||||
{
|
||||
IOState state = new IOState();
|
||||
StateTracker tracker = new StateTracker();
|
||||
state.addListener(tracker);
|
||||
assertState(state,ConnectionState.CONNECTING);
|
||||
|
||||
// connect
|
||||
state.onConnected();
|
||||
assertInputAvailable(state,false);
|
||||
assertOutputAvailable(state,true);
|
||||
|
||||
// open
|
||||
state.onOpened();
|
||||
assertInputAvailable(state,true);
|
||||
assertOutputAvailable(state,true);
|
||||
|
||||
// disconnect
|
||||
state.onAbnormalClose(new CloseInfo(StatusCode.NO_CLOSE,"Oops"));
|
||||
|
||||
assertInputAvailable(state,false);
|
||||
assertOutputAvailable(state,false);
|
||||
tracker.assertTransitions(ConnectionState.CONNECTED,ConnectionState.OPEN,ConnectionState.CLOSED);
|
||||
assertState(state,ConnectionState.CLOSED);
|
||||
|
||||
// not clean
|
||||
assertCleanClose(state,false);
|
||||
assertLocalInitiated(state,false);
|
||||
assertRemoteInitiated(state,false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConnectCloseLocalInitiated()
|
||||
{
|
||||
IOState state = new IOState();
|
||||
StateTracker tracker = new StateTracker();
|
||||
state.addListener(tracker);
|
||||
assertState(state,ConnectionState.CONNECTING);
|
||||
|
||||
// connect
|
||||
state.onConnected();
|
||||
assertInputAvailable(state,false);
|
||||
assertOutputAvailable(state,true);
|
||||
|
||||
// open
|
||||
state.onOpened();
|
||||
assertInputAvailable(state,true);
|
||||
assertOutputAvailable(state,true);
|
||||
|
||||
// close (local initiated)
|
||||
state.onCloseLocal(new CloseInfo(StatusCode.NORMAL,"Hi"));
|
||||
assertInputAvailable(state,true);
|
||||
assertOutputAvailable(state,false);
|
||||
assertState(state,ConnectionState.CLOSING);
|
||||
|
||||
// close (remote response)
|
||||
state.onCloseRemote(new CloseInfo(StatusCode.NORMAL,"Hi"));
|
||||
|
||||
assertInputAvailable(state,false);
|
||||
assertOutputAvailable(state,false);
|
||||
tracker.assertTransitions(ConnectionState.CONNECTED,ConnectionState.OPEN,ConnectionState.CLOSING,ConnectionState.CLOSED);
|
||||
assertState(state,ConnectionState.CLOSED);
|
||||
|
||||
// not clean
|
||||
assertCleanClose(state,true);
|
||||
assertLocalInitiated(state,true);
|
||||
assertRemoteInitiated(state,false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConnectCloseRemoteInitiated()
|
||||
{
|
||||
IOState state = new IOState();
|
||||
StateTracker tracker = new StateTracker();
|
||||
state.addListener(tracker);
|
||||
assertState(state,ConnectionState.CONNECTING);
|
||||
|
||||
// connect
|
||||
state.onConnected();
|
||||
assertInputAvailable(state,false);
|
||||
assertOutputAvailable(state,true);
|
||||
|
||||
// open
|
||||
state.onOpened();
|
||||
assertInputAvailable(state,true);
|
||||
assertOutputAvailable(state,true);
|
||||
|
||||
// close (remote initiated)
|
||||
state.onCloseRemote(new CloseInfo(StatusCode.NORMAL,"Hi"));
|
||||
assertInputAvailable(state,false);
|
||||
assertOutputAvailable(state,true);
|
||||
assertState(state,ConnectionState.CLOSING);
|
||||
|
||||
// close (local response)
|
||||
state.onCloseLocal(new CloseInfo(StatusCode.NORMAL,"Hi"));
|
||||
|
||||
assertInputAvailable(state,false);
|
||||
assertOutputAvailable(state,false);
|
||||
tracker.assertTransitions(ConnectionState.CONNECTED,ConnectionState.OPEN,ConnectionState.CLOSING,ConnectionState.CLOSED);
|
||||
assertState(state,ConnectionState.CLOSED);
|
||||
|
||||
// not clean
|
||||
assertCleanClose(state,true);
|
||||
assertLocalInitiated(state,false);
|
||||
assertRemoteInitiated(state,true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConnectFailure()
|
||||
{
|
||||
IOState state = new IOState();
|
||||
StateTracker tracker = new StateTracker();
|
||||
state.addListener(tracker);
|
||||
assertState(state,ConnectionState.CONNECTING);
|
||||
|
||||
// fail upgrade
|
||||
state.onFailedUpgrade();
|
||||
|
||||
tracker.assertTransitions(ConnectionState.CLOSED);
|
||||
assertState(state,ConnectionState.CLOSED);
|
||||
|
||||
assertInputAvailable(state,false);
|
||||
assertOutputAvailable(state,false);
|
||||
|
||||
// not clean
|
||||
assertCleanClose(state,false);
|
||||
assertLocalInitiated(state,false);
|
||||
assertRemoteInitiated(state,false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInit()
|
||||
{
|
||||
IOState state = new IOState();
|
||||
StateTracker tracker = new StateTracker();
|
||||
state.addListener(tracker);
|
||||
assertState(state,ConnectionState.CONNECTING);
|
||||
|
||||
// do nothing
|
||||
|
||||
tracker.assertTransitions();
|
||||
assertState(state,ConnectionState.CONNECTING);
|
||||
|
||||
// not connected yet
|
||||
assertInputAvailable(state,false);
|
||||
assertOutputAvailable(state,false);
|
||||
|
||||
// no close yet
|
||||
assertCleanClose(state,false);
|
||||
assertLocalInitiated(state,false);
|
||||
assertRemoteInitiated(state,false);
|
||||
}
|
||||
}
|
|
@ -28,25 +28,20 @@ import org.eclipse.jetty.util.log.Logger;
|
|||
import org.eclipse.jetty.util.thread.ExecutorThreadPool;
|
||||
import org.eclipse.jetty.websocket.api.BatchMode;
|
||||
import org.eclipse.jetty.websocket.api.FrameCallback;
|
||||
import org.eclipse.jetty.websocket.api.StatusCode;
|
||||
import org.eclipse.jetty.websocket.api.SuspendToken;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
|
||||
import org.eclipse.jetty.websocket.api.extensions.Frame;
|
||||
import org.eclipse.jetty.websocket.common.CloseInfo;
|
||||
import org.eclipse.jetty.websocket.common.ConnectionState;
|
||||
import org.eclipse.jetty.websocket.common.LogicalConnection;
|
||||
import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
|
||||
import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope;
|
||||
import org.junit.rules.TestName;
|
||||
|
||||
public class LocalWebSocketConnection implements LogicalConnection, ConnectionStateListener
|
||||
public class LocalWebSocketConnection implements LogicalConnection
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(LocalWebSocketConnection.class);
|
||||
private final String id;
|
||||
private final ByteBufferPool bufferPool;
|
||||
private final Executor executor;
|
||||
private WebSocketPolicy policy;
|
||||
private IOState ioState = new IOState();
|
||||
|
||||
public LocalWebSocketConnection(ByteBufferPool bufferPool)
|
||||
{
|
||||
|
@ -63,15 +58,9 @@ public class LocalWebSocketConnection implements LogicalConnection, ConnectionSt
|
|||
this.id = id;
|
||||
this.bufferPool = bufferPool;
|
||||
this.executor = new ExecutorThreadPool();
|
||||
this.ioState.addListener(this);
|
||||
this.policy = WebSocketPolicy.newServerPolicy();
|
||||
}
|
||||
|
||||
public LocalWebSocketConnection(TestName testname, ByteBufferPool bufferPool)
|
||||
{
|
||||
this(testname.getMethodName(),bufferPool);
|
||||
}
|
||||
|
||||
public LocalWebSocketConnection(TestName testname, WebSocketContainerScope containerScope)
|
||||
{
|
||||
this(testname.getMethodName(), containerScope.getBufferPool());
|
||||
|
@ -84,28 +73,6 @@ public class LocalWebSocketConnection implements LogicalConnection, ConnectionSt
|
|||
return executor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
close(StatusCode.NORMAL,null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close(int statusCode, String reason)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("close({}, {})",statusCode,reason);
|
||||
CloseInfo close = new CloseInfo(statusCode,reason);
|
||||
ioState.onCloseLocal(close);
|
||||
}
|
||||
|
||||
public void connect()
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("connect()");
|
||||
ioState.onConnected();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void disconnect()
|
||||
{
|
||||
|
@ -131,12 +98,6 @@ public class LocalWebSocketConnection implements LogicalConnection, ConnectionSt
|
|||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOState getIOState()
|
||||
{
|
||||
return ioState;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InetSocketAddress getLocalAddress()
|
||||
{
|
||||
|
@ -160,43 +121,13 @@ public class LocalWebSocketConnection implements LogicalConnection, ConnectionSt
|
|||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean isOpen()
|
||||
{
|
||||
return getIOState().isOpen();
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onConnectionStateChange(ConnectionState state)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Connection State Change: {}",state);
|
||||
switch (state)
|
||||
{
|
||||
case CLOSED:
|
||||
this.disconnect();
|
||||
break;
|
||||
case CLOSING:
|
||||
if (ioState.wasRemoteCloseInitiated())
|
||||
{
|
||||
// send response close frame
|
||||
CloseInfo close = ioState.getCloseInfo();
|
||||
LOG.debug("write close frame: {}",close);
|
||||
ioState.onCloseLocal(close);
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
public void open()
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("open()");
|
||||
ioState.onOpened();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void outgoingFrame(Frame frame, FrameCallback callback, BatchMode batchMode)
|
||||
{
|
||||
|
|
|
@ -84,6 +84,7 @@ public class MessageOutputStreamTest
|
|||
LocalWebSocketConnection remoteConnection = new LocalWebSocketConnection(remoteURI, bufferPool);
|
||||
remoteSession = new WebSocketSession(containerScope,remoteURI,remoteSocket,remoteConnection);
|
||||
remoteSession.start();
|
||||
remoteSession.connect();
|
||||
remoteSession.open();
|
||||
|
||||
// Local Session
|
||||
|
@ -94,6 +95,7 @@ public class MessageOutputStreamTest
|
|||
|
||||
// talk to our remote socket
|
||||
session.setOutgoingHandler(FramePipes.to(remoteSession));
|
||||
session.connect();
|
||||
// start session
|
||||
session.start();
|
||||
// open connection
|
||||
|
|
|
@ -81,6 +81,7 @@ public class MessageWriterTest
|
|||
LocalWebSocketConnection remoteConnection = new LocalWebSocketConnection(remoteURI, bufferPool);
|
||||
remoteSession = new WebSocketSession(remoteContainerScope, remoteURI, remoteSocket, remoteConnection);
|
||||
remoteSession.start();
|
||||
remoteSession.connect();
|
||||
remoteSession.open();
|
||||
|
||||
// Local Session
|
||||
|
@ -91,6 +92,7 @@ public class MessageWriterTest
|
|||
session = new WebSocketSession(localContainerScope, localURI, localSocket, localConnection);
|
||||
session.setOutgoingHandler(FramePipes.to(remoteSession));
|
||||
session.start();
|
||||
session.connect();
|
||||
session.open();
|
||||
}
|
||||
|
||||
|
|
|
@ -22,22 +22,16 @@ import java.net.InetSocketAddress;
|
|||
import java.util.concurrent.Executor;
|
||||
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.api.BatchMode;
|
||||
import org.eclipse.jetty.websocket.api.FrameCallback;
|
||||
import org.eclipse.jetty.websocket.api.SuspendToken;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
|
||||
import org.eclipse.jetty.websocket.api.extensions.Frame;
|
||||
import org.eclipse.jetty.websocket.common.LogicalConnection;
|
||||
import org.eclipse.jetty.websocket.common.io.IOState;
|
||||
|
||||
public class DummyConnection implements LogicalConnection
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(DummyConnection.class);
|
||||
private IOState iostate;
|
||||
private WebSocketPolicy policy;
|
||||
|
||||
|
||||
@Deprecated
|
||||
public DummyConnection()
|
||||
|
@ -48,17 +42,6 @@ public class DummyConnection implements LogicalConnection
|
|||
public DummyConnection(WebSocketPolicy policy)
|
||||
{
|
||||
this.policy = policy;
|
||||
this.iostate = new IOState();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close(int statusCode, String reason)
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -90,12 +73,6 @@ public class DummyConnection implements LogicalConnection
|
|||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOState getIOState()
|
||||
{
|
||||
return this.iostate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InetSocketAddress getLocalAddress()
|
||||
{
|
||||
|
|
|
@ -16,29 +16,12 @@
|
|||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.websocket.tests;
|
||||
package org.eclipse.jetty.websocket.common.test;
|
||||
|
||||
import org.eclipse.jetty.websocket.common.io.IOState;
|
||||
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
|
||||
|
||||
public enum CloseState
|
||||
@WebSocket
|
||||
public class DummySocket
|
||||
{
|
||||
OPEN,
|
||||
REMOTE_INITIATED,
|
||||
LOCAL_INITIATED;
|
||||
|
||||
public static CloseState from(IOState ios)
|
||||
{
|
||||
if (ios.wasLocalCloseInitiated())
|
||||
{
|
||||
return CloseState.LOCAL_INITIATED;
|
||||
}
|
||||
else if (ios.wasRemoteCloseInitiated())
|
||||
{
|
||||
return CloseState.REMOTE_INITIATED;
|
||||
}
|
||||
else
|
||||
{
|
||||
return CloseState.OPEN;
|
||||
}
|
||||
}
|
||||
/* does nothing */
|
||||
}
|
|
@ -63,15 +63,12 @@ import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
|
|||
import org.eclipse.jetty.websocket.api.util.WSURI;
|
||||
import org.eclipse.jetty.websocket.common.AcceptHash;
|
||||
import org.eclipse.jetty.websocket.common.CloseInfo;
|
||||
import org.eclipse.jetty.websocket.common.ConnectionState;
|
||||
import org.eclipse.jetty.websocket.common.Generator;
|
||||
import org.eclipse.jetty.websocket.common.OpCode;
|
||||
import org.eclipse.jetty.websocket.common.Parser;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketFrame;
|
||||
import org.eclipse.jetty.websocket.common.extensions.ExtensionStack;
|
||||
import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory;
|
||||
import org.eclipse.jetty.websocket.common.io.IOState;
|
||||
import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
|
||||
import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParser;
|
||||
import org.eclipse.jetty.websocket.common.scopes.SimpleContainerScope;
|
||||
import org.junit.Assert;
|
||||
|
@ -88,7 +85,7 @@ import org.junit.Assert;
|
|||
* with regards to basic IO behavior, a write should work as expected, a read should work as expected, but <u>what</u> byte it sends or reads is not within its
|
||||
* scope.
|
||||
*/
|
||||
public class XBlockheadClient implements OutgoingFrames, ConnectionStateListener, AutoCloseable, IBlockheadClient, Parser.Handler
|
||||
public class XBlockheadClient implements OutgoingFrames, AutoCloseable, IBlockheadClient, Parser.Handler
|
||||
{
|
||||
private class FrameReadingThread extends Thread implements Runnable, IncomingFrames
|
||||
{
|
||||
|
@ -195,7 +192,6 @@ public class XBlockheadClient implements OutgoingFrames, ConnectionStateListener
|
|||
private OutgoingFrames outgoing = this;
|
||||
private boolean eof = false;
|
||||
private ExtensionStack extensionStack;
|
||||
private IOState ioState;
|
||||
private CountDownLatch disconnectedLatch = new CountDownLatch(1);
|
||||
private ByteBuffer remainingBuffer;
|
||||
|
||||
|
@ -225,8 +221,6 @@ public class XBlockheadClient implements OutgoingFrames, ConnectionStateListener
|
|||
this.parser = new Parser(policy,bufferPool,this);
|
||||
|
||||
this.extensionFactory = new WebSocketExtensionFactory(new SimpleContainerScope(policy,bufferPool));
|
||||
this.ioState = new IOState();
|
||||
this.ioState.addListener(this);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
|
@ -285,14 +279,14 @@ public class XBlockheadClient implements OutgoingFrames, ConnectionStateListener
|
|||
LOG.debug("close({},{})",statusCode,message);
|
||||
CloseInfo close = new CloseInfo(statusCode,message);
|
||||
|
||||
if (!ioState.isClosed())
|
||||
{
|
||||
ioState.onCloseLocal(close);
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG.debug("Not issuing close. ioState = {}",ioState);
|
||||
}
|
||||
// if (!ioState.isClosed())
|
||||
// {
|
||||
// ioState.onCloseLocal(close);
|
||||
// }
|
||||
// else
|
||||
// {
|
||||
// LOG.debug("Not issuing close. ioState = {}",ioState);
|
||||
// }
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
|
@ -426,7 +420,7 @@ public class XBlockheadClient implements OutgoingFrames, ConnectionStateListener
|
|||
}
|
||||
|
||||
// configure parser
|
||||
ioState.onOpened();
|
||||
// ioState.onOpened();
|
||||
|
||||
LOG.debug("outgoing = {}",outgoing);
|
||||
LOG.debug("incoming = {}",extensionStack);
|
||||
|
@ -482,11 +476,6 @@ public class XBlockheadClient implements OutgoingFrames, ConnectionStateListener
|
|||
return (InetSocketAddress)socket.getLocalSocketAddress();
|
||||
}
|
||||
|
||||
public IOState getIOState()
|
||||
{
|
||||
return ioState;
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
* @see org.eclipse.jetty.websocket.common.test.IBlockheadClient#getProtocols()
|
||||
*/
|
||||
|
@ -550,35 +539,35 @@ public class XBlockheadClient implements OutgoingFrames, ConnectionStateListener
|
|||
return (socket != null) && (socket.isConnected());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onConnectionStateChange(ConnectionState state)
|
||||
{
|
||||
LOG.debug("CLIENT onConnectionStateChange() - {}",state);
|
||||
switch (state)
|
||||
{
|
||||
case CLOSED:
|
||||
// Per Spec, client should not initiate disconnect on its own
|
||||
// this.disconnect();
|
||||
break;
|
||||
case CLOSING:
|
||||
CloseInfo close = ioState.getCloseInfo();
|
||||
|
||||
WebSocketFrame frame = close.asFrame();
|
||||
LOG.debug("Issuing: {}",frame);
|
||||
try
|
||||
{
|
||||
write(frame);
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
LOG.debug(e);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
/* do nothing */
|
||||
break;
|
||||
}
|
||||
}
|
||||
// @Override
|
||||
// public void onConnectionStateChange(ConnectionState state)
|
||||
// {
|
||||
// LOG.debug("CLIENT onConnectionStateChange() - {}",state);
|
||||
// switch (state)
|
||||
// {
|
||||
// case CLOSED:
|
||||
// // Per Spec, client should not initiate disconnect on its own
|
||||
// // this.disconnect();
|
||||
// break;
|
||||
// case CLOSING:
|
||||
// CloseInfo close = ioState.getCloseInfo();
|
||||
//
|
||||
// WebSocketFrame frame = close.asFrame();
|
||||
// LOG.debug("Issuing: {}",frame);
|
||||
// try
|
||||
// {
|
||||
// write(frame);
|
||||
// }
|
||||
// catch (IOException e)
|
||||
// {
|
||||
// LOG.debug(e);
|
||||
// }
|
||||
// break;
|
||||
// default:
|
||||
// /* do nothing */
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
|
||||
@Override
|
||||
public void outgoingFrame(Frame frame, FrameCallback callback, BatchMode batchMode)
|
||||
|
@ -779,11 +768,6 @@ public class XBlockheadClient implements OutgoingFrames, ConnectionStateListener
|
|||
@Override
|
||||
public void write(WebSocketFrame frame) throws IOException
|
||||
{
|
||||
if (!ioState.isOpen())
|
||||
{
|
||||
LOG.debug("IO Not Open / Not Writing: {}",frame);
|
||||
return;
|
||||
}
|
||||
LOG.debug("write(Frame->{}) to {}",frame,outgoing);
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
|
|
|
@ -39,7 +39,6 @@ import org.eclipse.jetty.websocket.common.CloseInfo;
|
|||
import org.eclipse.jetty.websocket.common.Generator;
|
||||
import org.eclipse.jetty.websocket.common.OpCode;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketFrame;
|
||||
import org.eclipse.jetty.websocket.common.io.IOState;
|
||||
import org.junit.Assert;
|
||||
|
||||
/**
|
||||
|
@ -190,24 +189,6 @@ public class XFuzzer implements AutoCloseable
|
|||
// TODO Should test for no more frames. success if connection closed.
|
||||
}
|
||||
|
||||
public CloseState getCloseState()
|
||||
{
|
||||
IOState ios = client.getIOState();
|
||||
|
||||
if (ios.wasLocalCloseInitiated())
|
||||
{
|
||||
return CloseState.LOCAL_INITIATED;
|
||||
}
|
||||
else if (ios.wasRemoteCloseInitiated())
|
||||
{
|
||||
return CloseState.REMOTE_INITIATED;
|
||||
}
|
||||
else
|
||||
{
|
||||
return CloseState.OPEN;
|
||||
}
|
||||
}
|
||||
|
||||
public SendMode getSendMode()
|
||||
{
|
||||
return sendMode;
|
||||
|
|
|
@ -569,7 +569,6 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
|
|||
response.setExtensions(extensionStack.getNegotiatedExtensions());
|
||||
session.setUpgradeResponse(response);
|
||||
wsConnection.addListener(session);
|
||||
wsConnection.setErrorListener(session);
|
||||
|
||||
// Setup Incoming Routing
|
||||
extensionStack.setNextIncoming(session);
|
||||
|
|
|
@ -32,6 +32,7 @@ import java.nio.ByteBuffer;
|
|||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -40,6 +41,8 @@ import org.eclipse.jetty.util.BufferUtil;
|
|||
import org.eclipse.jetty.util.component.ContainerLifeCycle;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.api.BatchMode;
|
||||
import org.eclipse.jetty.websocket.api.FrameCallback;
|
||||
import org.eclipse.jetty.websocket.api.extensions.Frame;
|
||||
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
|
||||
import org.eclipse.jetty.websocket.common.CloseInfo;
|
||||
|
@ -52,6 +55,28 @@ import org.eclipse.jetty.websocket.common.WebSocketFrame;
|
|||
*/
|
||||
public class Fuzzer extends ContainerLifeCycle
|
||||
{
|
||||
public static class BlockerCallback implements FrameCallback
|
||||
{
|
||||
private CompletableFuture<Void> future = new CompletableFuture<>();
|
||||
|
||||
@Override
|
||||
public void fail(Throwable cause)
|
||||
{
|
||||
future.completeExceptionally(cause);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void succeed()
|
||||
{
|
||||
future.complete(null);
|
||||
}
|
||||
|
||||
public void block() throws Exception
|
||||
{
|
||||
future.get(1, TimeUnit.MINUTES);
|
||||
}
|
||||
}
|
||||
|
||||
public static class Session implements AutoCloseable
|
||||
{
|
||||
// Client side framing mask
|
||||
|
@ -102,10 +127,11 @@ public class Fuzzer extends ContainerLifeCycle
|
|||
return this;
|
||||
}
|
||||
|
||||
private void assertIsOpen()
|
||||
private void assertIsOpen() throws Exception
|
||||
{
|
||||
assertThat("Session exists", session, notNullValue());
|
||||
assertThat("Session is open", session.isOpen(), is(true));
|
||||
assertThat("Endpoint is open", session.getUntrustedEndpoint().openLatch.await(5, TimeUnit.SECONDS), is(true));
|
||||
}
|
||||
|
||||
public ByteBuffer asNetworkBuffer(List<WebSocketFrame> send)
|
||||
|
@ -187,13 +213,13 @@ public class Fuzzer extends ContainerLifeCycle
|
|||
expect(Collections.singletonList(expect));
|
||||
}
|
||||
|
||||
public Session send(WebSocketFrame send) throws IOException
|
||||
public Session send(WebSocketFrame send) throws Exception
|
||||
{
|
||||
send(Collections.singletonList(send));
|
||||
return this;
|
||||
}
|
||||
|
||||
public Session send(ByteBuffer buf) throws IOException
|
||||
public Session send(ByteBuffer buf) throws Exception
|
||||
{
|
||||
assertIsOpen();
|
||||
LOG.debug("Sending bytes {}", BufferUtil.toDetailString(buf));
|
||||
|
@ -214,78 +240,18 @@ public class Fuzzer extends ContainerLifeCycle
|
|||
return this;
|
||||
}
|
||||
|
||||
public Session send(List<WebSocketFrame> send) throws IOException
|
||||
public Session send(List<WebSocketFrame> send) throws Exception
|
||||
{
|
||||
assertIsOpen();
|
||||
LOG.debug("[{}] Sending {} frames (mode {})", testcase.getTestMethodName(), send.size(), sendMode);
|
||||
|
||||
try
|
||||
{
|
||||
if ((sendMode == SendMode.BULK) || (sendMode == SLOW))
|
||||
for (WebSocketFrame f : send)
|
||||
{
|
||||
int bufferLen = 0;
|
||||
for (Frame f : send)
|
||||
{
|
||||
bufferLen += f.getPayloadLength() + Generator.MAX_HEADER_LENGTH;
|
||||
}
|
||||
|
||||
ByteBuffer buffer = null;
|
||||
try
|
||||
{
|
||||
buffer = session.getBufferPool().acquire(bufferLen, false);
|
||||
BufferUtil.clearToFill(buffer);
|
||||
|
||||
// Generate frames
|
||||
for (WebSocketFrame f : send)
|
||||
{
|
||||
setClientMask(f);
|
||||
generator.generateHeaderBytes(f, buffer);
|
||||
if (f.hasPayload())
|
||||
{
|
||||
buffer.put(f.getPayload());
|
||||
}
|
||||
}
|
||||
BufferUtil.flipToFlush(buffer, 0);
|
||||
|
||||
// Write Data Frame
|
||||
switch (sendMode)
|
||||
{
|
||||
case BULK:
|
||||
session.getUntrustedConnection().writeRaw(buffer);
|
||||
break;
|
||||
case SLOW:
|
||||
session.getUntrustedConnection().writeRawSlowly(buffer, slowSendSegmentSize);
|
||||
break;
|
||||
default:
|
||||
throw new RuntimeException("Whoops, unsupported sendMode: " + sendMode);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
session.getBufferPool().release(buffer);
|
||||
}
|
||||
}
|
||||
else if (sendMode == SendMode.PER_FRAME)
|
||||
{
|
||||
for (WebSocketFrame f : send)
|
||||
{
|
||||
f.setMask(MASK); // make sure we have mask set
|
||||
// Using lax generator, generate and send
|
||||
|
||||
ByteBuffer buffer = null;
|
||||
try
|
||||
{
|
||||
buffer = session.getBufferPool().acquire(f.getPayloadLength() + Generator.MAX_HEADER_LENGTH, false);
|
||||
BufferUtil.clearToFill(buffer);
|
||||
generator.generateWholeFrame(f, buffer);
|
||||
BufferUtil.flipToFlush(buffer, 0);
|
||||
session.getUntrustedConnection().writeRaw(buffer);
|
||||
}
|
||||
finally
|
||||
{
|
||||
session.getBufferPool().release(buffer);
|
||||
}
|
||||
}
|
||||
BlockerCallback blocker = new BlockerCallback();
|
||||
session.getOutgoingHandler().outgoingFrame(f, blocker, BatchMode.OFF);
|
||||
blocker.block();
|
||||
}
|
||||
}
|
||||
catch (SocketException e)
|
||||
|
@ -360,6 +326,11 @@ public class Fuzzer extends ContainerLifeCycle
|
|||
|
||||
public Fuzzer.Session connect(Fuzzed testcase) throws Exception
|
||||
{
|
||||
// TODO: handle EndPoint behavior here. (BULK/SLOW/FRAME)
|
||||
// BULK = AggregatingEndpoint write (aggregate until .flush() call)
|
||||
// SLOW = FixedBufferEndpoint write (send fixed buffer size)
|
||||
// PERFRAME = No change to Endpoint
|
||||
|
||||
ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest();
|
||||
upgradeRequest.setHeader("X-TestCase", testcase.getTestMethodName());
|
||||
UntrustedWSSession session = client.connect(testcase.getServerURI(), upgradeRequest).get(connectTimeout, connectTimeoutUnit);
|
||||
|
|
|
@ -61,6 +61,11 @@ public class UntrustedWSConnection
|
|||
*/
|
||||
public void writeRaw(ByteBuffer buf) throws IOException
|
||||
{
|
||||
if(LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("{} writeRaw({})", this.internalConnection.getPolicy().getBehavior(), BufferUtil.toDetailString(buf));
|
||||
}
|
||||
|
||||
try(Blocker blocker = writeBlocker.acquire())
|
||||
{
|
||||
internalConnection.getEndPoint().write(blocker, buf);
|
||||
|
@ -77,6 +82,11 @@ public class UntrustedWSConnection
|
|||
*/
|
||||
public void writeRaw(ByteBuffer buf, int numBytes) throws IOException
|
||||
{
|
||||
if(LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("{} writeRaw({}, numBytes={})", this.internalConnection.getPolicy().getBehavior(), BufferUtil.toDetailString(buf), numBytes);
|
||||
}
|
||||
|
||||
try(Blocker blocker = writeBlocker.acquire())
|
||||
{
|
||||
ByteBuffer slice = buf.slice();
|
||||
|
|
|
@ -28,9 +28,10 @@ import java.util.List;
|
|||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketFrameListener;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketListener;
|
||||
|
@ -40,6 +41,8 @@ import org.eclipse.jetty.websocket.common.WebSocketFrame;
|
|||
|
||||
public class UntrustedWSEndpoint implements WebSocketListener, WebSocketFrameListener
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(UntrustedWSEndpoint.class);
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
private Session session;
|
||||
public CountDownLatch openLatch = new CountDownLatch(1);
|
||||
|
@ -47,25 +50,29 @@ public class UntrustedWSEndpoint implements WebSocketListener, WebSocketFrameLis
|
|||
public AtomicReference<CloseInfo> closeInfo = new AtomicReference<>();
|
||||
public AtomicReference<Throwable> error = new AtomicReference<>();
|
||||
|
||||
private CompletableFuture<List<String>> expectedMessagesFuture;
|
||||
private AtomicInteger expectedMessageCount;
|
||||
private CompletableFuture<List<String>> expectedMessagesFuture = new CompletableFuture<>();
|
||||
private AtomicReference<Integer> expectedMessageCount = new AtomicReference<>();
|
||||
private List<String> messages = new ArrayList<>();
|
||||
|
||||
private CompletableFuture<List<WebSocketFrame>> expectedFramesFuture;
|
||||
private AtomicInteger expectedFramesCount;
|
||||
private CompletableFuture<List<WebSocketFrame>> expectedFramesFuture = new CompletableFuture<>();
|
||||
private AtomicReference<Integer> expectedFramesCount = new AtomicReference<>();
|
||||
private List<WebSocketFrame> frames = new ArrayList<>();
|
||||
|
||||
public Future<List<WebSocketFrame>> expectedFrames(int expectedCount)
|
||||
{
|
||||
expectedFramesFuture = new CompletableFuture<>();
|
||||
expectedFramesCount = new AtomicInteger(expectedCount);
|
||||
if (!expectedFramesCount.compareAndSet(null, expectedCount))
|
||||
{
|
||||
throw new IllegalStateException("Can only have 1 registered frame count future");
|
||||
}
|
||||
return expectedFramesFuture;
|
||||
}
|
||||
|
||||
public Future<List<String>> expectedMessages(int expected)
|
||||
public Future<List<String>> expectedMessages(int expectedCount)
|
||||
{
|
||||
expectedMessagesFuture = new CompletableFuture<>();
|
||||
expectedMessageCount = new AtomicInteger(expected);
|
||||
if (!expectedMessageCount.compareAndSet(null, expectedCount))
|
||||
{
|
||||
throw new IllegalStateException("Can only have 1 registered message count future");
|
||||
}
|
||||
return expectedMessagesFuture;
|
||||
}
|
||||
|
||||
|
@ -90,29 +97,21 @@ public class UntrustedWSEndpoint implements WebSocketListener, WebSocketFrameLis
|
|||
assertThat("Error must have value", cause, notNullValue());
|
||||
if (error.compareAndSet(null, cause) == false)
|
||||
{
|
||||
System.err.println("Original Cause");
|
||||
error.get().printStackTrace(System.err);
|
||||
System.err.println("Extra/Excess Cause");
|
||||
cause.printStackTrace(System.err);
|
||||
LOG.warn("Original Cause", error.get());
|
||||
LOG.warn("Extra/Excess Cause", cause);
|
||||
fail("onError should only happen once!");
|
||||
}
|
||||
|
||||
if(expectedMessagesFuture != null)
|
||||
synchronized (expectedMessagesFuture)
|
||||
{
|
||||
synchronized (expectedMessagesFuture)
|
||||
{
|
||||
if (expectedMessagesFuture != null)
|
||||
expectedMessagesFuture.completeExceptionally(cause);
|
||||
}
|
||||
if (expectedMessagesFuture != null)
|
||||
expectedMessagesFuture.completeExceptionally(cause);
|
||||
}
|
||||
|
||||
if(expectedFramesFuture != null)
|
||||
synchronized (expectedFramesFuture)
|
||||
{
|
||||
synchronized (expectedFramesFuture)
|
||||
{
|
||||
if (expectedFramesFuture != null)
|
||||
expectedFramesFuture.completeExceptionally(cause);
|
||||
}
|
||||
if (expectedFramesFuture != null)
|
||||
expectedFramesFuture.completeExceptionally(cause);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -125,13 +124,12 @@ public class UntrustedWSEndpoint implements WebSocketListener, WebSocketFrameLis
|
|||
@Override
|
||||
public void onWebSocketText(String text)
|
||||
{
|
||||
if(expectedMessagesFuture == null)
|
||||
return;
|
||||
|
||||
messages.add(text);
|
||||
synchronized (expectedMessagesFuture)
|
||||
{
|
||||
if (expectedMessageCount.decrementAndGet() <= 0)
|
||||
Integer expected = expectedMessageCount.get();
|
||||
|
||||
if (expected != null && messages.size() >= expected.intValue())
|
||||
{
|
||||
expectedMessagesFuture.complete(messages);
|
||||
}
|
||||
|
@ -141,14 +139,12 @@ public class UntrustedWSEndpoint implements WebSocketListener, WebSocketFrameLis
|
|||
@Override
|
||||
public void onWebSocketFrame(Frame frame)
|
||||
{
|
||||
if (expectedFramesFuture == null)
|
||||
return;
|
||||
|
||||
frames.add(WebSocketFrame.copy(frame));
|
||||
|
||||
synchronized (expectedFramesFuture)
|
||||
{
|
||||
if (expectedFramesCount.decrementAndGet() <= 0)
|
||||
Integer expected = expectedFramesCount.get();
|
||||
|
||||
if (expected != null && frames.size() >= expected.intValue())
|
||||
{
|
||||
expectedFramesFuture.complete(frames);
|
||||
}
|
||||
|
|
|
@ -24,10 +24,11 @@ org.eclipse.jetty.LEVEL=WARN
|
|||
# org.eclipse.jetty.io.WriteFlusher.LEVEL=DEBUG
|
||||
# org.eclipse.jetty.websocket.LEVEL=DEBUG
|
||||
# org.eclipse.jetty.websocket.LEVEL=INFO
|
||||
org.eclipse.jetty.websocket.tests.LEVEL=DEBUG
|
||||
org.eclipse.jetty.websocket.common.io.LEVEL=DEBUG
|
||||
# org.eclipse.jetty.websocket.server.ab.LEVEL=DEBUG
|
||||
org.eclipse.jetty.websocket.common.WebSocketSession.LEVEL=DEBUG
|
||||
# org.eclipse.jetty.websocket.common.Parser.LEVEL=DEBUG
|
||||
org.eclipse.jetty.websocket.common.Parser.LEVEL=DEBUG
|
||||
# org.eclipse.jetty.websocket.common.Generator.LEVEL=DEBUG
|
||||
# org.eclipse.jetty.websocket.server.ab.Fuzzer.LEVEL=DEBUG
|
||||
# org.eclipse.jetty.websocket.server.blockhead.LEVEL=DEBUG
|
||||
|
@ -35,7 +36,7 @@ org.eclipse.jetty.websocket.common.WebSocketSession.LEVEL=DEBUG
|
|||
|
||||
# org.eclipse.jetty.websocket.client.io.ConnectPromise.LEVEL=DEBUG
|
||||
# org.eclipse.jetty.websocket.common.WebSocketSession_OPEN.LEVEL=DEBUG
|
||||
# org.eclipse.jetty.websocket.common.io.IOState.LEVEL=DEBUG
|
||||
org.eclipse.jetty.websocket.common.io.IOState.LEVEL=DEBUG
|
||||
# org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection_OPEN.LEVEL=DEBUG
|
||||
|
||||
### Show state changes on BrowserDebugTool
|
||||
|
|
Loading…
Reference in New Issue