Merged branch 'jetty-9.3.x' into 'master'.

This commit is contained in:
Simone Bordet 2016-05-31 18:32:42 +02:00
commit b1ccba2285
3 changed files with 128 additions and 111 deletions

View File

@ -94,7 +94,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Web
this.incomingHandler = websocket;
this.connection.getIOState().addListener(this);
this.policy = containerScope.getPolicy();
addBean(this.connection);
addBean(this.websocket);
}
@ -103,13 +103,13 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Web
public void close()
{
/* This is assumed to always be a NORMAL closure, no reason phrase */
connection.close(StatusCode.NORMAL, null);
close(StatusCode.NORMAL, null);
}
@Override
public void close(CloseStatus closeStatus)
{
this.close(closeStatus.getCode(),closeStatus.getPhrase());
close(closeStatus.getCode(),closeStatus.getPhrase());
}
@Override
@ -134,7 +134,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Web
{
executor.execute(runnable);
}
@Override
protected void doStart() throws Exception
{
@ -143,27 +143,23 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Web
super.doStart();
}
@Override
protected void doStop() throws Exception
{
if(LOG.isDebugEnabled())
LOG.debug("stopping - {}",this);
if (getConnection() != null)
try
{
try
{
getConnection().close(StatusCode.SHUTDOWN,"Shutdown");
}
catch (Throwable t)
{
LOG.debug("During Connection Shutdown",t);
}
close(StatusCode.SHUTDOWN,"Shutdown");
}
catch (Throwable t)
{
LOG.debug("During Connection Shutdown",t);
}
super.doStop();
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
@ -223,7 +219,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Web
{
return this.connection.getBufferPool();
}
public ClassLoader getClassLoader()
{
return this.getClass().getClassLoader();
@ -321,7 +317,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Web
{
return this.upgradeResponse;
}
@Override
public WebSocketSession getWebSocketSession()
@ -409,12 +405,12 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Web
{
incomingError(cause);
}
@Override
public void onClosed(Connection connection)
{
}
@Override
public void onOpened(Connection connection)
{
@ -460,7 +456,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Web
break;
}
}
/**
* Open/Activate the session
*/
@ -474,17 +470,17 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Web
// already opened
return;
}
try(ThreadClassLoaderScope scope = new ThreadClassLoaderScope(classLoader))
try(ThreadClassLoaderScope scope = new ThreadClassLoaderScope(classLoader))
{
// Upgrade success
connection.getIOState().onConnected();
// Connect remote
remote = new WebSocketRemoteEndpoint(connection,outgoingHandler,getBatchMode());
if(LOG_OPEN.isDebugEnabled())
LOG_OPEN.debug("[{}] {}.open() remote={}",policy.getBehavior(),this.getClass().getSimpleName(),remote);
// Open WebSocket
websocket.openSession(this);
@ -514,7 +510,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Web
close(statusCode,t.getMessage());
}
}
public void setExtensionFactory(ExtensionFactory extensionFactory)
{
this.extensionFactory = extensionFactory;

View File

@ -58,8 +58,10 @@ import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
/**
* Provides the implementation of {@link LogicalConnection} within the framework of the new {@link org.eclipse.jetty.io.Connection} framework of {@code jetty-io}.
*/
public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, Connection.UpgradeTo, ConnectionStateListener, Dumpable
public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, Connection.UpgradeTo, ConnectionStateListener, Dumpable
{
private final AtomicBoolean closed = new AtomicBoolean();
private class Flusher extends FrameFlusher
{
private Flusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint)
@ -190,7 +192,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
return countOnFillableEvents.get();
}
}
private static enum ReadMode
{
PARSE,
@ -256,10 +258,9 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
@Override
public void close()
{
if(LOG_CLOSE.isDebugEnabled())
LOG_CLOSE.debug(".close()");
CloseInfo close = new CloseInfo();
this.outgoingFrame(close.asFrame(),new OnCloseLocalCallback(close),BatchMode.OFF);
if (LOG_CLOSE.isDebugEnabled())
LOG_CLOSE.debug("close()");
close(new CloseInfo());
}
/**
@ -267,7 +268,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
* <p> fillInterested();
* This can result in a close handshake over the network, or a simple local abnormal close
*
*
* @param statusCode
* the WebSocket status code.
* @param reason
@ -278,9 +279,14 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
public void close(int statusCode, String reason)
{
if (LOG_CLOSE.isDebugEnabled())
LOG_CLOSE.debug("close({},{})",statusCode,reason);
CloseInfo close = new CloseInfo(statusCode,reason);
this.outgoingFrame(close.asFrame(),new OnCloseLocalCallback(close),BatchMode.OFF);
LOG_CLOSE.debug("close({},{})", statusCode, reason);
close(new CloseInfo(statusCode, reason));
}
private void close(CloseInfo closeInfo)
{
if (closed.compareAndSet(false, true))
outgoingFrame(closeInfo.asFrame(), new OnCloseLocalCallback(closeInfo), BatchMode.OFF);
}
@Override
@ -341,7 +347,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
* Get the list of extensions in use.
* <p>
* This list is negotiated during the WebSocket Upgrade Request/Response handshake.
*
*
* @return the list of negotiated extensions in use.
*/
public List<ExtensionConfig> getExtensions()
@ -353,7 +359,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
{
return generator;
}
@Override
public String getId()
{
@ -408,7 +414,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
@Override
public boolean isOpen()
{
return getIOState().isOpen() && getEndPoint().isOpen();
return !closed.get();
}
@Override
@ -437,7 +443,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
{
if (LOG_CLOSE.isDebugEnabled())
LOG_CLOSE.debug("{} Connection State Change: {}",policy.getBehavior(),state);
switch (state)
{
case OPEN:
@ -493,9 +499,9 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
if (LOG.isDebugEnabled())
LOG.debug("{} onFillable()",policy.getBehavior());
stats.countOnFillableEvents.incrementAndGet();
ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),true);
try
{
isFilling = true;
@ -503,7 +509,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
if(readMode == ReadMode.PARSE)
{
readMode = readParse(buffer);
}
}
else
{
readMode = readDiscard(buffer);
@ -524,8 +530,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
}
}
@Override
protected void onFillInterestedFailed(Throwable cause)
{
@ -548,12 +552,12 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
}
prefillBuffer = prefilled;
}
private void notifyError(Throwable t)
{
getParser().getIncomingFramesHandler().incomingError(t);
}
@Override
public void onOpen()
{
@ -646,7 +650,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
return ReadMode.DISCARD;
}
}
private ReadMode readParse(ByteBuffer buffer)
{
EndPoint endPoint = getEndPoint();
@ -667,7 +671,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
// Done reading, wait for next onFillable
return ReadMode.PARSE;
}
if (LOG.isDebugEnabled())
{
LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
@ -695,7 +699,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
return ReadMode.DISCARD;
}
}
@Override
public void resume()
{
@ -709,7 +713,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
* Get the list of extensions in use.
* <p>
* This list is negotiated during the WebSocket Upgrade Request/Response handshake.
*
*
* @param extensions
* the list of negotiated extensions in use.
*/
@ -764,7 +768,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
{
final int prime = 31;
int result = 1;
EndPoint endp = getEndPoint();
if(endp != null)
{

View File

@ -44,7 +44,7 @@ public class IOState
/**
* The source of a close handshake. (ie: who initiated it).
*/
private static enum CloseHandshakeSource
private enum CloseHandshakeSource
{
/** No close handshake initiated (yet) */
NONE,
@ -53,7 +53,7 @@ public class IOState
/** Remote side initiated the close handshake */
REMOTE,
/** An abnormal close situation (disconnect, timeout, etc...) */
ABNORMAL;
ABNORMAL
}
public static interface ConnectionStateListener
@ -65,17 +65,17 @@ public class IOState
private ConnectionState state;
private final List<ConnectionStateListener> listeners = new CopyOnWriteArrayList<>();
/**
/**
* Is input on websocket available (for reading frames).
* Used to determine close handshake completion, and track half-close states
*/
private boolean inputAvailable;
/**
/**
* Is output on websocket available (for writing frames).
* Used to determine close handshake completion, and track half-closed states.
*/
private boolean outputAvailable;
/**
/**
* Initiator of the close handshake.
* Used to determine who initiated a close handshake for reply reasons.
*/
@ -150,7 +150,7 @@ public class IOState
public boolean isClosed()
{
synchronized (state)
synchronized (this)
{
return (state == ConnectionState.CLOSED);
}
@ -163,7 +163,7 @@ public class IOState
public boolean isOpen()
{
return (getConnectionState() != ConnectionState.CLOSED);
return !isClosed();
}
public boolean isOutputAvailable()
@ -221,67 +221,87 @@ public class IOState
/**
* A close handshake has been issued from the local endpoint
* @param close the close information
* @param closeInfo the close information
*/
public void onCloseLocal(CloseInfo close)
public void onCloseLocal(CloseInfo closeInfo)
{
boolean open = false;
synchronized (this)
{
ConnectionState initialState = this.state;
if (LOG.isDebugEnabled())
LOG.debug("onCloseLocal({}) : {}", closeInfo, initialState);
if (initialState == ConnectionState.CLOSED)
{
// already closed
if (LOG.isDebugEnabled())
LOG.debug("already closed");
return;
}
if (initialState == ConnectionState.CONNECTED)
{
// fast close. a local close request from end-user onConnect/onOpen method
if (LOG.isDebugEnabled())
LOG.debug("FastClose in CONNECTED detected");
open = true;
}
}
if (open)
openAndCloseLocal(closeInfo);
else
closeLocal(closeInfo);
}
private void openAndCloseLocal(CloseInfo closeInfo)
{
// Force the state open (to allow read/write to endpoint)
onOpened();
if (LOG.isDebugEnabled())
LOG.debug("FastClose continuing with Closure");
closeLocal(closeInfo);
}
private void closeLocal(CloseInfo closeInfo)
{
ConnectionState event = null;
ConnectionState abnormalEvent = null;
ConnectionState initialState = this.state;
if (LOG.isDebugEnabled())
LOG.debug("onCloseLocal({}) : {}",close,initialState);
if (initialState == ConnectionState.CLOSED)
{
// already closed
LOG.debug("already closed");
return;
}
if (initialState == ConnectionState.CONNECTED)
{
// fast close. a local close request from end-user onConnect/onOpen method
LOG.debug("FastClose in CONNECTED detected");
// Force the state open (to allow read/write to endpoint)
onOpened();
if (LOG.isDebugEnabled())
LOG.debug("FastClose continuing with Closure");
}
synchronized (this)
{
closeInfo = close;
// Turn off further output
if (LOG.isDebugEnabled())
LOG.debug("onCloseLocal(), input={}, output={}", inputAvailable, outputAvailable);
this.closeInfo = closeInfo;
// Turn off further output.
outputAvailable = false;
boolean in = inputAvailable;
boolean out = outputAvailable;
if (closeHandshakeSource == CloseHandshakeSource.NONE)
{
closeHandshakeSource = CloseHandshakeSource.LOCAL;
}
LOG.debug("onCloseLocal(), input={}, output={}",in,out);
if (!in && !out)
if (!inputAvailable)
{
LOG.debug("Close Handshake satisfied, disconnecting");
if (LOG.isDebugEnabled())
LOG.debug("Close Handshake satisfied, disconnecting");
cleanClose = true;
this.state = ConnectionState.CLOSED;
finalClose.compareAndSet(null,close);
finalClose.compareAndSet(null,closeInfo);
event = this.state;
}
else if (this.state == ConnectionState.OPEN)
{
// We are now entering CLOSING (or half-closed)
// We are now entering CLOSING (or half-closed).
this.state = ConnectionState.CLOSING;
event = this.state;
// if abnormal, we don't expect an answer.
if (close.isAbnormal())
// If abnormal, we don't expect an answer.
if (closeInfo.isAbnormal())
{
abnormalEvent = ConnectionState.CLOSED;
finalClose.compareAndSet(null,close);
finalClose.compareAndSet(null,closeInfo);
cleanClose = false;
outputAvailable = false;
inputAvailable = false;
@ -294,8 +314,7 @@ public class IOState
if (event != null)
{
notifyStateListeners(event);
if(abnormalEvent != null)
if (abnormalEvent != null)
{
notifyStateListeners(abnormalEvent);
}
@ -304,12 +323,12 @@ public class IOState
/**
* A close handshake has been received from the remote endpoint
* @param close the close information
* @param closeInfo the close information
*/
public void onCloseRemote(CloseInfo close)
public void onCloseRemote(CloseInfo closeInfo)
{
if (LOG.isDebugEnabled())
LOG.debug("onCloseRemote({})",close);
LOG.debug("onCloseRemote({})", closeInfo);
ConnectionState event = null;
synchronized (this)
{
@ -319,27 +338,25 @@ public class IOState
return;
}
closeInfo = close;
if (LOG.isDebugEnabled())
LOG.debug("onCloseRemote(), input={}, output={}", inputAvailable, outputAvailable);
this.closeInfo = closeInfo;
// turn off further input
inputAvailable = false;
boolean in = inputAvailable;
boolean out = outputAvailable;
if (closeHandshakeSource == CloseHandshakeSource.NONE)
{
closeHandshakeSource = CloseHandshakeSource.REMOTE;
}
if (LOG.isDebugEnabled())
LOG.debug("onCloseRemote(), input={}, output={}",in,out);
if (!in && !out)
if (!outputAvailable)
{
LOG.debug("Close Handshake satisfied, disconnecting");
cleanClose = true;
state = ConnectionState.CLOSED;
finalClose.compareAndSet(null,close);
finalClose.compareAndSet(null,closeInfo);
event = this.state;
}
else if (this.state == ConnectionState.OPEN)
@ -406,7 +423,7 @@ public class IOState
{
if(LOG.isDebugEnabled())
LOG.debug(" onOpened()");
ConnectionState event = null;
synchronized (this)
{