Merge branch 'issue-272-ws-blockingwrite' into jetty-9.4.x
This commit is contained in:
commit
ce039eb4cb
|
@ -157,7 +157,7 @@ public interface Session extends Closeable
|
|||
*
|
||||
* @return whether the session is open
|
||||
*/
|
||||
abstract boolean isOpen();
|
||||
boolean isOpen();
|
||||
|
||||
/**
|
||||
* Return true if and only if the underlying socket is using a secure transport.
|
||||
|
|
|
@ -31,27 +31,10 @@ import org.eclipse.jetty.websocket.common.io.IOState;
|
|||
public interface LogicalConnection extends OutgoingFrames, SuspendToken
|
||||
{
|
||||
/**
|
||||
* Send a websocket Close frame, without a status code or reason.
|
||||
* <p>
|
||||
* Basic usage: results in an non-blocking async write, then connection close.
|
||||
*
|
||||
* @see org.eclipse.jetty.websocket.api.StatusCode
|
||||
* @see #close(int, String)
|
||||
* Called to indicate a close frame was successfully sent to the remote.
|
||||
* @param close the close details
|
||||
*/
|
||||
public void close();
|
||||
|
||||
/**
|
||||
* Send a websocket Close frame, with status code.
|
||||
* <p>
|
||||
* Advanced usage: results in an non-blocking async write, then connection close.
|
||||
*
|
||||
* @param statusCode
|
||||
* the status code
|
||||
* @param reason
|
||||
* the (optional) reason. (can be null for no reason)
|
||||
* @see org.eclipse.jetty.websocket.api.StatusCode
|
||||
*/
|
||||
public void close(int statusCode, String reason);
|
||||
void onLocalClose(CloseInfo close);
|
||||
|
||||
/**
|
||||
* Terminate the connection (no close frame sent)
|
||||
|
@ -75,7 +58,7 @@ public interface LogicalConnection extends OutgoingFrames, SuspendToken
|
|||
*
|
||||
* @return the idle timeout in milliseconds
|
||||
*/
|
||||
public long getIdleTimeout();
|
||||
long getIdleTimeout();
|
||||
|
||||
/**
|
||||
* Get the IOState of the connection.
|
||||
|
@ -119,7 +102,7 @@ public interface LogicalConnection extends OutgoingFrames, SuspendToken
|
|||
*
|
||||
* @return true if connection is open
|
||||
*/
|
||||
public boolean isOpen();
|
||||
boolean isOpen();
|
||||
|
||||
/**
|
||||
* Tests if the connection is actively reading.
|
||||
|
@ -149,6 +132,13 @@ public interface LogicalConnection extends OutgoingFrames, SuspendToken
|
|||
*/
|
||||
void setNextIncomingFrames(IncomingFrames incoming);
|
||||
|
||||
/**
|
||||
* Associate the Active Session with the connection.
|
||||
*
|
||||
* @param session the session for this connection
|
||||
*/
|
||||
void setSession(WebSocketSession session);
|
||||
|
||||
/**
|
||||
* Suspend a the incoming read events on the connection.
|
||||
* @return the suspend token
|
||||
|
@ -159,5 +149,5 @@ public interface LogicalConnection extends OutgoingFrames, SuspendToken
|
|||
* Get Unique ID for the Connection
|
||||
* @return the unique ID for the connection
|
||||
*/
|
||||
public String getId();
|
||||
String getId();
|
||||
}
|
||||
|
|
|
@ -180,7 +180,7 @@ public class Parser
|
|||
return (flagsInUse & 0x10) != 0;
|
||||
}
|
||||
|
||||
protected void notifyFrame(final Frame f)
|
||||
protected void notifyFrame(final Frame f) throws WebSocketException
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} Notify {}",policy.getBehavior(),getIncomingFramesHandler());
|
||||
|
@ -221,25 +221,14 @@ public class Parser
|
|||
}
|
||||
catch (WebSocketException e)
|
||||
{
|
||||
notifyWebSocketException(e);
|
||||
throw e;
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
LOG.warn(t);
|
||||
notifyWebSocketException(new WebSocketException(t));
|
||||
throw new WebSocketException(t);
|
||||
}
|
||||
}
|
||||
|
||||
protected void notifyWebSocketException(WebSocketException e)
|
||||
{
|
||||
LOG.warn(e);
|
||||
if (incomingFramesHandler == null)
|
||||
{
|
||||
return;
|
||||
}
|
||||
incomingFramesHandler.incomingError(e);
|
||||
}
|
||||
|
||||
public void parse(ByteBuffer buffer) throws WebSocketException
|
||||
{
|
||||
if (buffer.remaining() <= 0)
|
||||
|
@ -265,8 +254,6 @@ public class Parser
|
|||
{
|
||||
buffer.position(buffer.limit()); // consume remaining
|
||||
reset();
|
||||
// let session know
|
||||
notifyWebSocketException(e);
|
||||
// need to throw for proper close behavior in connection
|
||||
throw e;
|
||||
}
|
||||
|
@ -274,11 +261,8 @@ public class Parser
|
|||
{
|
||||
buffer.position(buffer.limit()); // consume remaining
|
||||
reset();
|
||||
// let session know
|
||||
WebSocketException e = new WebSocketException(t);
|
||||
notifyWebSocketException(e);
|
||||
// need to throw for proper close behavior in connection
|
||||
throw e;
|
||||
throw new WebSocketException(t);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -29,9 +29,11 @@ import java.util.Objects;
|
|||
import java.util.ServiceLoader;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.io.Connection;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.annotation.ManagedAttribute;
|
||||
import org.eclipse.jetty.util.annotation.ManagedObject;
|
||||
import org.eclipse.jetty.util.component.ContainerLifeCycle;
|
||||
|
@ -51,11 +53,13 @@ import org.eclipse.jetty.websocket.api.UpgradeResponse;
|
|||
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketException;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
|
||||
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||
import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
|
||||
import org.eclipse.jetty.websocket.api.extensions.Frame;
|
||||
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
|
||||
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
|
||||
import org.eclipse.jetty.websocket.common.events.EventDriver;
|
||||
import org.eclipse.jetty.websocket.common.frames.CloseFrame;
|
||||
import org.eclipse.jetty.websocket.common.io.IOState;
|
||||
import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
|
||||
import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope;
|
||||
|
@ -64,6 +68,68 @@ import org.eclipse.jetty.websocket.common.scopes.WebSocketSessionScope;
|
|||
@ManagedObject("A Jetty WebSocket Session")
|
||||
public class WebSocketSession extends ContainerLifeCycle implements Session, RemoteEndpointFactory, WebSocketSessionScope, IncomingFrames, Connection.Listener, ConnectionStateListener
|
||||
{
|
||||
public static class OnCloseLocalCallback implements WriteCallback
|
||||
{
|
||||
private final Callback callback;
|
||||
private final LogicalConnection connection;
|
||||
private final CloseInfo close;
|
||||
|
||||
public OnCloseLocalCallback(Callback callback, LogicalConnection connection, CloseInfo close)
|
||||
{
|
||||
this.callback = callback;
|
||||
this.connection = connection;
|
||||
this.close = close;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeSuccess()
|
||||
{
|
||||
try
|
||||
{
|
||||
if (callback != null)
|
||||
{
|
||||
callback.succeeded();
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
connection.onLocalClose(close);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeFailed(Throwable x)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (callback != null)
|
||||
{
|
||||
callback.failed(x);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
connection.onLocalClose(close);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public class DisconnectCallback implements Callback
|
||||
{
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
disconnect();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static final Logger LOG = Log.getLogger(WebSocketSession.class);
|
||||
private static final Logger LOG_OPEN = Log.getLogger(WebSocketSession.class.getName() + "_OPEN");
|
||||
private final WebSocketContainerScope containerScope;
|
||||
|
@ -72,6 +138,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
|
|||
private final EventDriver websocket;
|
||||
private final Executor executor;
|
||||
private final WebSocketPolicy policy;
|
||||
private final AtomicBoolean closed = new AtomicBoolean();
|
||||
private ClassLoader classLoader;
|
||||
private ExtensionFactory extensionFactory;
|
||||
private RemoteEndpointFactory remoteEndpointFactory;
|
||||
|
@ -100,27 +167,59 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
|
|||
this.connection.getIOState().addListener(this);
|
||||
this.policy = websocket.getPolicy();
|
||||
|
||||
this.connection.setSession(this);
|
||||
|
||||
addBean(this.connection);
|
||||
addBean(this.websocket);
|
||||
}
|
||||
|
||||
/**
|
||||
* Aborts the active session abruptly.
|
||||
*/
|
||||
public void abort(int statusCode, String reason)
|
||||
{
|
||||
close(new CloseInfo(statusCode, reason), new DisconnectCallback());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
/* This is assumed to always be a NORMAL closure, no reason phrase */
|
||||
close(StatusCode.NORMAL, null);
|
||||
close(new CloseInfo(StatusCode.NORMAL), null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close(CloseStatus closeStatus)
|
||||
{
|
||||
close(closeStatus.getCode(),closeStatus.getPhrase());
|
||||
close(new CloseInfo(closeStatus.getCode(),closeStatus.getPhrase()), null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close(int statusCode, String reason)
|
||||
{
|
||||
connection.close(statusCode,reason);
|
||||
close(new CloseInfo(statusCode, reason), null);
|
||||
}
|
||||
|
||||
/**
|
||||
* CLOSE Primary Entry Point.
|
||||
*
|
||||
* <ul>
|
||||
* <li>atomically enqueue CLOSE frame + flip flag to reject more frames</li>
|
||||
* <li>setup CLOSE frame callback: must close flusher</li>
|
||||
* </ul>
|
||||
*
|
||||
* @param closeInfo the close details
|
||||
*/
|
||||
private void close(CloseInfo closeInfo, Callback callback)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("close({})", closeInfo);
|
||||
|
||||
if (closed.compareAndSet(false, true))
|
||||
{
|
||||
CloseFrame frame = closeInfo.asFrame();
|
||||
connection.outgoingFrame(frame, new OnCloseLocalCallback(callback, connection, closeInfo), BatchMode.OFF);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -388,7 +487,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
|
|||
{
|
||||
return false;
|
||||
}
|
||||
return this.connection.isOpen();
|
||||
return !closed.get() && this.connection.isOpen();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -420,11 +519,21 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
|
|||
incomingError(cause);
|
||||
}
|
||||
|
||||
/**
|
||||
* Jetty Connection onClosed event
|
||||
*
|
||||
* @param connection the connection that was closed
|
||||
*/
|
||||
@Override
|
||||
public void onClosed(Connection connection)
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* Jetty Connection onOpen event
|
||||
*
|
||||
* @param connection the connection that was opened
|
||||
*/
|
||||
@Override
|
||||
public void onOpened(Connection connection)
|
||||
{
|
||||
|
|
|
@ -26,7 +26,6 @@ import java.nio.ByteBuffer;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
|
@ -53,6 +52,7 @@ import org.eclipse.jetty.websocket.common.ConnectionState;
|
|||
import org.eclipse.jetty.websocket.common.Generator;
|
||||
import org.eclipse.jetty.websocket.common.LogicalConnection;
|
||||
import org.eclipse.jetty.websocket.common.Parser;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketSession;
|
||||
import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
|
||||
|
||||
/**
|
||||
|
@ -60,8 +60,6 @@ import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
|
|||
*/
|
||||
public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, Connection.UpgradeTo, ConnectionStateListener, Dumpable
|
||||
{
|
||||
private final AtomicBoolean closed = new AtomicBoolean();
|
||||
|
||||
private class Flusher extends FrameFlusher
|
||||
{
|
||||
private Flusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint)
|
||||
|
@ -70,104 +68,19 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void onFailure(Throwable x)
|
||||
public void onCompleteFailure(Throwable failure)
|
||||
{
|
||||
notifyError(x);
|
||||
|
||||
super.onCompleteFailure(failure);
|
||||
notifyError(failure);
|
||||
if (ioState.wasAbnormalClose())
|
||||
{
|
||||
LOG.ignore(x);
|
||||
LOG.ignore(failure);
|
||||
return;
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Write flush failure",x);
|
||||
ioState.onWriteFailure(x);
|
||||
}
|
||||
}
|
||||
|
||||
public class OnDisconnectCallback implements WriteCallback
|
||||
{
|
||||
private final boolean outputOnly;
|
||||
|
||||
public OnDisconnectCallback(boolean outputOnly)
|
||||
{
|
||||
this.outputOnly = outputOnly;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeFailed(Throwable x)
|
||||
{
|
||||
disconnect(outputOnly);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeSuccess()
|
||||
{
|
||||
disconnect(outputOnly);
|
||||
}
|
||||
}
|
||||
|
||||
public class OnCloseLocalCallback implements WriteCallback
|
||||
{
|
||||
private final WriteCallback callback;
|
||||
private final CloseInfo close;
|
||||
|
||||
public OnCloseLocalCallback(WriteCallback callback, CloseInfo close)
|
||||
{
|
||||
this.callback = callback;
|
||||
this.close = close;
|
||||
}
|
||||
|
||||
public OnCloseLocalCallback(CloseInfo close)
|
||||
{
|
||||
this(null,close);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeFailed(Throwable x)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (callback != null)
|
||||
{
|
||||
callback.writeFailed(x);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
onLocalClose();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeSuccess()
|
||||
{
|
||||
try
|
||||
{
|
||||
if (callback != null)
|
||||
{
|
||||
callback.writeSuccess();
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
onLocalClose();
|
||||
}
|
||||
}
|
||||
|
||||
private void onLocalClose()
|
||||
{
|
||||
if (LOG_CLOSE.isDebugEnabled())
|
||||
LOG_CLOSE.debug("Local Close Confirmed {}",close);
|
||||
if (close.isAbnormal())
|
||||
{
|
||||
ioState.onAbnormalClose(close);
|
||||
}
|
||||
else
|
||||
{
|
||||
ioState.onCloseLocal(close);
|
||||
}
|
||||
LOG.debug("Write flush failure", failure);
|
||||
ioState.onWriteFailure(failure);
|
||||
disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -193,7 +106,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
}
|
||||
}
|
||||
|
||||
private static enum ReadMode
|
||||
private enum ReadMode
|
||||
{
|
||||
PARSE,
|
||||
DISCARD,
|
||||
|
@ -201,8 +114,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
}
|
||||
|
||||
private static final Logger LOG = Log.getLogger(AbstractWebSocketConnection.class);
|
||||
private static final Logger LOG_OPEN = Log.getLogger(AbstractWebSocketConnection.class.getName() + "_OPEN");
|
||||
private static final Logger LOG_CLOSE = Log.getLogger(AbstractWebSocketConnection.class.getName() + "_CLOSE");
|
||||
|
||||
/**
|
||||
* Minimum size of a buffer is the determined to be what would be the maximum framing header size (not including payload)
|
||||
|
@ -217,6 +128,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
private final AtomicBoolean suspendToken;
|
||||
private final FrameFlusher flusher;
|
||||
private final String id;
|
||||
private WebSocketSession session;
|
||||
private List<ExtensionConfig> extensions;
|
||||
private boolean isFilling;
|
||||
private ByteBuffer prefillBuffer;
|
||||
|
@ -252,83 +164,56 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
return super.getExecutor();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onLocalClose(CloseInfo close)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Local Close Confirmed {}",close);
|
||||
|
||||
if (close.isAbnormal())
|
||||
{
|
||||
ioState.onAbnormalClose(close);
|
||||
}
|
||||
else
|
||||
{
|
||||
ioState.onCloseLocal(close);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setSession(WebSocketSession session)
|
||||
{
|
||||
this.session = session;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean onIdleExpired()
|
||||
{
|
||||
// TODO: handle closing handshake (see HTTP2Connection).
|
||||
return super.onIdleExpired();
|
||||
}
|
||||
|
||||
/**
|
||||
* Close without a close code or reason
|
||||
* Jetty Connection Close
|
||||
*/
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
if (LOG_CLOSE.isDebugEnabled())
|
||||
LOG_CLOSE.debug("close()");
|
||||
close(new CloseInfo());
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the connection.
|
||||
* <p> fillInterested();
|
||||
|
||||
* This can result in a close handshake over the network, or a simple local abnormal close
|
||||
*
|
||||
* @param statusCode
|
||||
* the WebSocket status code.
|
||||
* @param reason
|
||||
* the (optional) reason string. (null is allowed)
|
||||
* @see StatusCode
|
||||
*/
|
||||
@Override
|
||||
public void close(int statusCode, String reason)
|
||||
{
|
||||
if (LOG_CLOSE.isDebugEnabled())
|
||||
LOG_CLOSE.debug("close({},{})", statusCode, reason);
|
||||
close(new CloseInfo(statusCode, reason));
|
||||
}
|
||||
|
||||
private void close(CloseInfo closeInfo)
|
||||
{
|
||||
if (closed.compareAndSet(false, true))
|
||||
outgoingFrame(closeInfo.asFrame(), new OnCloseLocalCallback(closeInfo), BatchMode.OFF);
|
||||
session.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void disconnect()
|
||||
{
|
||||
if (LOG_CLOSE.isDebugEnabled())
|
||||
LOG_CLOSE.debug("{} disconnect()",policy.getBehavior());
|
||||
disconnect(false);
|
||||
}
|
||||
|
||||
private void disconnect(boolean onlyOutput)
|
||||
{
|
||||
if (LOG_CLOSE.isDebugEnabled())
|
||||
LOG_CLOSE.debug("{} disconnect({})",policy.getBehavior(),onlyOutput?"outputOnly":"both");
|
||||
// close FrameFlusher, we cannot write anymore at this point.
|
||||
flusher.close();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} disconnect()",policy.getBehavior());
|
||||
flusher.terminate(new EOFException("Disconnected"), false);
|
||||
EndPoint endPoint = getEndPoint();
|
||||
// We need to gently close first, to allow
|
||||
// SSL close alerts to be sent by Jetty
|
||||
if (LOG_CLOSE.isDebugEnabled())
|
||||
LOG_CLOSE.debug("Shutting down output {}",endPoint);
|
||||
endPoint.shutdownOutput();
|
||||
if (!onlyOutput)
|
||||
{
|
||||
if (LOG_CLOSE.isDebugEnabled())
|
||||
LOG_CLOSE.debug("Closing {}",endPoint);
|
||||
endPoint.close();
|
||||
}
|
||||
}
|
||||
|
||||
protected void execute(Runnable task)
|
||||
{
|
||||
try
|
||||
{
|
||||
getExecutor().execute(task);
|
||||
}
|
||||
catch (RejectedExecutionException e)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Job not dispatched: {}",task);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fillInterested()
|
||||
|
@ -414,7 +299,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
@Override
|
||||
public boolean isOpen()
|
||||
{
|
||||
return !closed.get();
|
||||
return getEndPoint().isOpen();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -435,14 +320,13 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
LOG.debug("{} onClose()",policy.getBehavior());
|
||||
super.onClose();
|
||||
ioState.onDisconnected();
|
||||
flusher.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onConnectionStateChange(ConnectionState state)
|
||||
{
|
||||
if (LOG_CLOSE.isDebugEnabled())
|
||||
LOG_CLOSE.debug("{} Connection State Change: {}",policy.getBehavior(),state);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} Connection State Change: {}",policy.getBehavior(),state);
|
||||
|
||||
switch (state)
|
||||
{
|
||||
|
@ -464,29 +348,28 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
fillInterested();
|
||||
break;
|
||||
case CLOSED:
|
||||
if (LOG_CLOSE.isDebugEnabled())
|
||||
LOG_CLOSE.debug("CLOSED - wasAbnormalClose: {}", ioState.wasAbnormalClose());
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("CLOSED - wasAbnormalClose: {}", ioState.wasAbnormalClose());
|
||||
if (ioState.wasAbnormalClose())
|
||||
{
|
||||
// Fire out a close frame, indicating abnormal shutdown, then disconnect
|
||||
CloseInfo abnormal = new CloseInfo(StatusCode.SHUTDOWN,"Abnormal Close - " + ioState.getCloseInfo().getReason());
|
||||
outgoingFrame(abnormal.asFrame(),new OnDisconnectCallback(false),BatchMode.OFF);
|
||||
session.close(StatusCode.SHUTDOWN,"Abnormal Close - " + ioState.getCloseInfo().getReason());
|
||||
}
|
||||
else
|
||||
{
|
||||
// Just disconnect
|
||||
this.disconnect(false);
|
||||
this.disconnect();
|
||||
}
|
||||
break;
|
||||
case CLOSING:
|
||||
if (LOG_CLOSE.isDebugEnabled())
|
||||
LOG_CLOSE.debug("CLOSING - wasRemoteCloseInitiated: {}", ioState.wasRemoteCloseInitiated());
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("CLOSING - wasRemoteCloseInitiated: {}", ioState.wasRemoteCloseInitiated());
|
||||
|
||||
// First occurrence of .onCloseLocal or .onCloseRemote use
|
||||
if (ioState.wasRemoteCloseInitiated())
|
||||
{
|
||||
CloseInfo close = ioState.getCloseInfo();
|
||||
// reply to close handshake from remote
|
||||
outgoingFrame(close.asFrame(),new OnCloseLocalCallback(new OnDisconnectCallback(true),close),BatchMode.OFF);
|
||||
session.close(close.getStatusCode(), close.getReason());
|
||||
}
|
||||
default:
|
||||
break;
|
||||
|
@ -561,8 +444,8 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
@Override
|
||||
public void onOpen()
|
||||
{
|
||||
if(LOG_OPEN.isDebugEnabled())
|
||||
LOG_OPEN.debug("[{}] {}.onOpened()",policy.getBehavior(),this.getClass().getSimpleName());
|
||||
if(LOG.isDebugEnabled())
|
||||
LOG.debug("[{}] {}.onOpened()",policy.getBehavior(),this.getClass().getSimpleName());
|
||||
super.onOpen();
|
||||
this.ioState.onOpened();
|
||||
}
|
||||
|
@ -575,13 +458,13 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
{
|
||||
IOState state = getIOState();
|
||||
ConnectionState cstate = state.getConnectionState();
|
||||
if (LOG_CLOSE.isDebugEnabled())
|
||||
LOG_CLOSE.debug("{} Read Timeout - {}",policy.getBehavior(),cstate);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} Read Timeout - {}",policy.getBehavior(),cstate);
|
||||
|
||||
if (cstate == ConnectionState.CLOSED)
|
||||
{
|
||||
if (LOG_CLOSE.isDebugEnabled())
|
||||
LOG_CLOSE.debug("onReadTimeout - Connection Already CLOSED");
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("onReadTimeout - Connection Already CLOSED");
|
||||
// close already completed, extra timeouts not relevant
|
||||
// allow underlying connection and endpoint to disconnect on its own
|
||||
return true;
|
||||
|
@ -594,7 +477,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
finally
|
||||
{
|
||||
// This is an Abnormal Close condition
|
||||
close(StatusCode.SHUTDOWN,"Idle Timeout");
|
||||
session.close(StatusCode.SHUTDOWN,"Idle Timeout");
|
||||
}
|
||||
|
||||
return false;
|
||||
|
@ -628,14 +511,14 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
}
|
||||
else if (filled < 0)
|
||||
{
|
||||
if (LOG_CLOSE.isDebugEnabled())
|
||||
LOG_CLOSE.debug("read - EOF Reached (remote: {})",getRemoteAddress());
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("read - EOF Reached (remote: {})",getRemoteAddress());
|
||||
return ReadMode.EOF;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (LOG_CLOSE.isDebugEnabled())
|
||||
LOG_CLOSE.debug("Discarded {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Discarded {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -682,19 +565,21 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
|
|||
catch (IOException e)
|
||||
{
|
||||
LOG.warn(e);
|
||||
close(StatusCode.PROTOCOL,e.getMessage());
|
||||
session.notifyError(e);
|
||||
session.abort(StatusCode.PROTOCOL,e.getMessage());
|
||||
return ReadMode.DISCARD;
|
||||
}
|
||||
catch (CloseException e)
|
||||
{
|
||||
LOG.debug(e);
|
||||
close(e.getStatusCode(),e.getMessage());
|
||||
session.notifyError(e);
|
||||
session.close(e.getStatusCode(),e.getMessage());
|
||||
return ReadMode.DISCARD;
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
LOG.warn(t);
|
||||
close(StatusCode.ABNORMAL,t.getMessage());
|
||||
session.abort(StatusCode.ABNORMAL,t.getMessage());
|
||||
// TODO: should probably only switch to discard if a non-ws-endpoint error
|
||||
return ReadMode.DISCARD;
|
||||
}
|
||||
|
|
|
@ -18,14 +18,13 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.common.io;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Deque;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
|
@ -40,163 +39,102 @@ import org.eclipse.jetty.websocket.common.Generator;
|
|||
import org.eclipse.jetty.websocket.common.OpCode;
|
||||
import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
|
||||
|
||||
/**
|
||||
* Interface for working with bytes destined for {@link EndPoint#write(org.eclipse.jetty.util.Callback, ByteBuffer...)}
|
||||
*/
|
||||
public class FrameFlusher
|
||||
public class FrameFlusher extends IteratingCallback
|
||||
{
|
||||
private class Flusher extends IteratingCallback
|
||||
{
|
||||
public static final BinaryFrame FLUSH_FRAME = new BinaryFrame();
|
||||
private static final Logger LOG = Log.getLogger(FrameFlusher.class);
|
||||
|
||||
private final ByteBufferPool bufferPool;
|
||||
private final EndPoint endPoint;
|
||||
private final int bufferSize;
|
||||
private final Generator generator;
|
||||
private final int maxGather;
|
||||
private final Deque<FrameEntry> queue = new ArrayDeque<>();
|
||||
private final List<FrameEntry> entries;
|
||||
private final List<ByteBuffer> buffers;
|
||||
private boolean closed;
|
||||
private Throwable terminated;
|
||||
private ByteBuffer aggregate;
|
||||
private BatchMode batchMode;
|
||||
|
||||
public Flusher(int maxGather)
|
||||
public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endPoint, int bufferSize, int maxGather)
|
||||
{
|
||||
entries = new ArrayList<>(maxGather);
|
||||
buffers = new ArrayList<>((maxGather * 2) + 1);
|
||||
this.bufferPool = bufferPool;
|
||||
this.endPoint = endPoint;
|
||||
this.bufferSize = bufferSize;
|
||||
this.generator = Objects.requireNonNull(generator);
|
||||
this.maxGather = maxGather;
|
||||
this.entries = new ArrayList<>(maxGather);
|
||||
this.buffers = new ArrayList<>((maxGather * 2) + 1);
|
||||
}
|
||||
|
||||
private Action batch()
|
||||
public void enqueue(Frame frame, WriteCallback callback, BatchMode batchMode)
|
||||
{
|
||||
if (aggregate == null)
|
||||
FrameEntry entry = new FrameEntry(frame, callback, batchMode);
|
||||
|
||||
Throwable closed;
|
||||
synchronized (this)
|
||||
{
|
||||
aggregate = bufferPool.acquire(bufferSize,true);
|
||||
if (LOG.isDebugEnabled())
|
||||
closed = terminated;
|
||||
if (closed == null)
|
||||
{
|
||||
LOG.debug("{} acquired aggregate buffer {}",FrameFlusher.this,aggregate);
|
||||
byte opCode = frame.getOpCode();
|
||||
if (opCode == OpCode.PING || opCode == OpCode.PONG)
|
||||
queue.offerFirst(entry);
|
||||
else
|
||||
queue.offerLast(entry);
|
||||
}
|
||||
}
|
||||
|
||||
// Do not allocate the iterator here.
|
||||
for (int i = 0; i < entries.size(); ++i)
|
||||
{
|
||||
FrameEntry entry = entries.get(i);
|
||||
|
||||
entry.generateHeaderBytes(aggregate);
|
||||
|
||||
ByteBuffer payload = entry.frame.getPayload();
|
||||
if (BufferUtil.hasContent(payload))
|
||||
{
|
||||
BufferUtil.append(aggregate,payload);
|
||||
}
|
||||
}
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("{} aggregated {} frames: {}",FrameFlusher.this,entries.size(),entries);
|
||||
}
|
||||
succeeded();
|
||||
return Action.SCHEDULED;
|
||||
if (closed == null)
|
||||
iterate();
|
||||
else
|
||||
notifyCallbackFailure(callback, closed);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onCompleteSuccess()
|
||||
protected Action process() throws Throwable
|
||||
{
|
||||
// This IteratingCallback never completes.
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleteFailure(Throwable x)
|
||||
{
|
||||
for (FrameEntry entry : entries)
|
||||
{
|
||||
notifyCallbackFailure(entry.callback,x);
|
||||
entry.release();
|
||||
}
|
||||
entries.clear();
|
||||
failure = x;
|
||||
onFailure(x);
|
||||
}
|
||||
|
||||
private Action flush()
|
||||
{
|
||||
if (!BufferUtil.isEmpty(aggregate))
|
||||
{
|
||||
buffers.add(aggregate);
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("{} flushing aggregate {}",FrameFlusher.this,aggregate);
|
||||
}
|
||||
}
|
||||
LOG.debug("Flushing {}", this);
|
||||
|
||||
// Do not allocate the iterator here.
|
||||
for (int i = 0; i < entries.size(); ++i)
|
||||
{
|
||||
FrameEntry entry = entries.get(i);
|
||||
// Skip the "synthetic" frame used for flushing.
|
||||
if (entry.frame == FLUSH_FRAME)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
buffers.add(entry.generateHeaderBytes());
|
||||
ByteBuffer payload = entry.frame.getPayload();
|
||||
if (BufferUtil.hasContent(payload))
|
||||
{
|
||||
buffers.add(payload);
|
||||
}
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("{} flushing {} frames: {}",FrameFlusher.this,entries.size(),entries);
|
||||
}
|
||||
|
||||
if (buffers.isEmpty())
|
||||
{
|
||||
releaseAggregate();
|
||||
// We may have the FLUSH_FRAME to notify.
|
||||
succeedEntries();
|
||||
return Action.IDLE;
|
||||
}
|
||||
|
||||
endpoint.write(this,buffers.toArray(new ByteBuffer[buffers.size()]));
|
||||
buffers.clear();
|
||||
return Action.SCHEDULED;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Action process() throws Exception
|
||||
{
|
||||
int space = aggregate == null?bufferSize:BufferUtil.space(aggregate);
|
||||
int space = aggregate == null ? bufferSize : BufferUtil.space(aggregate);
|
||||
BatchMode currentBatchMode = BatchMode.AUTO;
|
||||
synchronized (lock)
|
||||
synchronized (this)
|
||||
{
|
||||
while ((entries.size() <= maxGather) && !queue.isEmpty())
|
||||
if (closed)
|
||||
return Action.SUCCEEDED;
|
||||
|
||||
if (terminated != null)
|
||||
throw terminated;
|
||||
|
||||
while (!queue.isEmpty() && entries.size() <= maxGather)
|
||||
{
|
||||
FrameEntry entry = queue.poll();
|
||||
currentBatchMode = BatchMode.max(currentBatchMode,entry.batchMode);
|
||||
currentBatchMode = BatchMode.max(currentBatchMode, entry.batchMode);
|
||||
|
||||
// Force flush if we need to.
|
||||
if (entry.frame == FLUSH_FRAME)
|
||||
{
|
||||
currentBatchMode = BatchMode.OFF;
|
||||
}
|
||||
|
||||
int payloadLength = BufferUtil.length(entry.frame.getPayload());
|
||||
int approxFrameLength = Generator.MAX_HEADER_LENGTH + payloadLength;
|
||||
|
||||
// If it is a "big" frame, avoid copying into the aggregate buffer.
|
||||
if (approxFrameLength > (bufferSize >> 2))
|
||||
{
|
||||
currentBatchMode = BatchMode.OFF;
|
||||
}
|
||||
|
||||
// If the aggregate buffer overflows, do not batch.
|
||||
space -= approxFrameLength;
|
||||
if (space <= 0)
|
||||
{
|
||||
currentBatchMode = BatchMode.OFF;
|
||||
}
|
||||
|
||||
entries.add(entry);
|
||||
}
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("{} processing {} entries: {}",FrameFlusher.this,entries.size(),entries);
|
||||
}
|
||||
LOG.debug("{} processing {} entries: {}", this, entries.size(), entries);
|
||||
|
||||
if (entries.isEmpty())
|
||||
{
|
||||
|
@ -208,21 +146,85 @@ public class FrameFlusher
|
|||
return Action.IDLE;
|
||||
}
|
||||
|
||||
LOG.debug("{} auto flushing",FrameFlusher.this);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} auto flushing", this);
|
||||
|
||||
return flush();
|
||||
}
|
||||
|
||||
batchMode = currentBatchMode;
|
||||
|
||||
return currentBatchMode == BatchMode.OFF?flush():batch();
|
||||
return currentBatchMode == BatchMode.OFF ? flush() : batch();
|
||||
}
|
||||
|
||||
private void releaseAggregate()
|
||||
private Action batch()
|
||||
{
|
||||
if ((aggregate != null) && BufferUtil.isEmpty(aggregate))
|
||||
if (aggregate == null)
|
||||
{
|
||||
bufferPool.release(aggregate);
|
||||
aggregate = null;
|
||||
aggregate = bufferPool.acquire(bufferSize, true);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} acquired aggregate buffer {}", this, aggregate);
|
||||
}
|
||||
|
||||
for (FrameEntry entry : entries)
|
||||
{
|
||||
entry.generateHeaderBytes(aggregate);
|
||||
|
||||
ByteBuffer payload = entry.frame.getPayload();
|
||||
if (BufferUtil.hasContent(payload))
|
||||
BufferUtil.append(aggregate, payload);
|
||||
}
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} aggregated {} frames: {}", this, entries.size(), entries);
|
||||
|
||||
// We just aggregated the entries, so we need to succeed their callbacks.
|
||||
succeeded();
|
||||
|
||||
return Action.SCHEDULED;
|
||||
}
|
||||
|
||||
private Action flush()
|
||||
{
|
||||
if (!BufferUtil.isEmpty(aggregate))
|
||||
{
|
||||
buffers.add(aggregate);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} flushing aggregate {}", this, aggregate);
|
||||
}
|
||||
|
||||
for (FrameEntry entry : entries)
|
||||
{
|
||||
// Skip the "synthetic" frame used for flushing.
|
||||
if (entry.frame == FLUSH_FRAME)
|
||||
continue;
|
||||
|
||||
buffers.add(entry.generateHeaderBytes());
|
||||
ByteBuffer payload = entry.frame.getPayload();
|
||||
if (BufferUtil.hasContent(payload))
|
||||
buffers.add(payload);
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} flushing {} frames: {}", this, entries.size(), entries);
|
||||
|
||||
if (buffers.isEmpty())
|
||||
{
|
||||
releaseAggregate();
|
||||
// We may have the FLUSH_FRAME to notify.
|
||||
succeedEntries();
|
||||
return Action.IDLE;
|
||||
}
|
||||
|
||||
endPoint.write(this, buffers.toArray(new ByteBuffer[buffers.size()]));
|
||||
buffers.clear();
|
||||
return Action.SCHEDULED;
|
||||
}
|
||||
|
||||
private int getQueueSize()
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
return queue.size();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -235,15 +237,108 @@ public class FrameFlusher
|
|||
|
||||
private void succeedEntries()
|
||||
{
|
||||
// Do not allocate the iterator here.
|
||||
for (int i = 0; i < entries.size(); ++i)
|
||||
for (FrameEntry entry : entries)
|
||||
{
|
||||
FrameEntry entry = entries.get(i);
|
||||
notifyCallbackSuccess(entry.callback);
|
||||
entry.release();
|
||||
if (entry.frame.getOpCode() == OpCode.CLOSE)
|
||||
{
|
||||
terminate(new ClosedChannelException(), true);
|
||||
endPoint.shutdownOutput();
|
||||
}
|
||||
}
|
||||
entries.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleteFailure(Throwable failure)
|
||||
{
|
||||
releaseAggregate();
|
||||
|
||||
Throwable closed;
|
||||
synchronized (this)
|
||||
{
|
||||
closed = terminated;
|
||||
if (closed == null)
|
||||
terminated = failure;
|
||||
entries.addAll(queue);
|
||||
queue.clear();
|
||||
}
|
||||
|
||||
for (FrameEntry entry : entries)
|
||||
{
|
||||
notifyCallbackFailure(entry.callback, failure);
|
||||
entry.release();
|
||||
}
|
||||
entries.clear();
|
||||
}
|
||||
|
||||
private void releaseAggregate()
|
||||
{
|
||||
if (BufferUtil.isEmpty(aggregate))
|
||||
{
|
||||
bufferPool.release(aggregate);
|
||||
aggregate = null;
|
||||
}
|
||||
}
|
||||
|
||||
void terminate(Throwable cause, boolean close)
|
||||
{
|
||||
Throwable reason;
|
||||
synchronized (this)
|
||||
{
|
||||
closed = close;
|
||||
reason = terminated;
|
||||
if (reason == null)
|
||||
terminated = cause;
|
||||
}
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} {}", reason == null ? "Terminating" : "Terminated", this);
|
||||
if (reason == null && !close)
|
||||
iterate();
|
||||
}
|
||||
|
||||
protected void notifyCallbackSuccess(WriteCallback callback)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (callback != null)
|
||||
{
|
||||
callback.writeSuccess();
|
||||
}
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Exception while notifying success of callback " + callback, x);
|
||||
}
|
||||
}
|
||||
|
||||
protected void notifyCallbackFailure(WriteCallback callback, Throwable failure)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (callback != null)
|
||||
{
|
||||
callback.writeFailed(failure);
|
||||
}
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Exception while notifying failure of callback " + callback, x);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return String.format("%s@%x[queueSize=%d,aggregateSize=%d,terminated=%s]",
|
||||
getClass().getSimpleName(),
|
||||
hashCode(),
|
||||
getQueueSize(),
|
||||
aggregate == null ? 0 : aggregate.position(),
|
||||
terminated);
|
||||
}
|
||||
|
||||
private class FrameEntry
|
||||
|
@ -267,7 +362,7 @@ public class FrameFlusher
|
|||
|
||||
private void generateHeaderBytes(ByteBuffer buffer)
|
||||
{
|
||||
generator.generateHeaderBytes(frame,buffer);
|
||||
generator.generateHeaderBytes(frame, buffer);
|
||||
}
|
||||
|
||||
private void release()
|
||||
|
@ -282,148 +377,7 @@ public class FrameFlusher
|
|||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return String.format("%s[%s,%s,%s,%s]",getClass().getSimpleName(),frame,callback,batchMode,failure);
|
||||
return String.format("%s[%s,%s,%s,%s]", getClass().getSimpleName(), frame, callback, batchMode, terminated);
|
||||
}
|
||||
}
|
||||
|
||||
public static final BinaryFrame FLUSH_FRAME = new BinaryFrame();
|
||||
private static final Logger LOG = Log.getLogger(FrameFlusher.class);
|
||||
private final ByteBufferPool bufferPool;
|
||||
private final EndPoint endpoint;
|
||||
private final int bufferSize;
|
||||
private final Generator generator;
|
||||
private final int maxGather;
|
||||
private final Object lock = new Object();
|
||||
private final Deque<FrameEntry> queue = new ArrayDeque<>();
|
||||
private final Flusher flusher;
|
||||
private final AtomicBoolean closed = new AtomicBoolean();
|
||||
private volatile Throwable failure;
|
||||
|
||||
public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint, int bufferSize, int maxGather)
|
||||
{
|
||||
this.bufferPool = bufferPool;
|
||||
this.endpoint = endpoint;
|
||||
this.bufferSize = bufferSize;
|
||||
this.generator = Objects.requireNonNull(generator);
|
||||
this.maxGather = maxGather;
|
||||
this.flusher = new Flusher(maxGather);
|
||||
}
|
||||
|
||||
public void close()
|
||||
{
|
||||
if (closed.compareAndSet(false,true))
|
||||
{
|
||||
LOG.debug("{} closing {}",this);
|
||||
EOFException eof = new EOFException("Connection has been closed locally");
|
||||
flusher.failed(eof);
|
||||
|
||||
// Fail also queued entries.
|
||||
List<FrameEntry> entries = new ArrayList<>();
|
||||
synchronized (lock)
|
||||
{
|
||||
entries.addAll(queue);
|
||||
queue.clear();
|
||||
}
|
||||
// Notify outside sync block.
|
||||
for (FrameEntry entry : entries)
|
||||
{
|
||||
notifyCallbackFailure(entry.callback,eof);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void enqueue(Frame frame, WriteCallback callback, BatchMode batchMode)
|
||||
{
|
||||
if (closed.get())
|
||||
{
|
||||
notifyCallbackFailure(callback,new EOFException("Connection has been closed locally"));
|
||||
return;
|
||||
}
|
||||
if (flusher.isFailed())
|
||||
{
|
||||
notifyCallbackFailure(callback,failure);
|
||||
return;
|
||||
}
|
||||
|
||||
FrameEntry entry = new FrameEntry(frame,callback,batchMode);
|
||||
|
||||
synchronized (lock)
|
||||
{
|
||||
switch (frame.getOpCode())
|
||||
{
|
||||
case OpCode.PING:
|
||||
{
|
||||
// Prepend PINGs so they are processed first.
|
||||
queue.offerFirst(entry);
|
||||
break;
|
||||
}
|
||||
case OpCode.CLOSE:
|
||||
{
|
||||
// There may be a chance that other frames are
|
||||
// added after this close frame, but we will
|
||||
// fail them later to keep it simple here.
|
||||
closed.set(true);
|
||||
queue.offer(entry);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
queue.offer(entry);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
{
|
||||
LOG.debug("{} queued {}",this,entry);
|
||||
}
|
||||
|
||||
flusher.iterate();
|
||||
}
|
||||
|
||||
protected void notifyCallbackFailure(WriteCallback callback, Throwable failure)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (callback != null)
|
||||
{
|
||||
callback.writeFailed(failure);
|
||||
}
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Exception while notifying failure of callback " + callback,x);
|
||||
}
|
||||
}
|
||||
|
||||
protected void notifyCallbackSuccess(WriteCallback callback)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (callback != null)
|
||||
{
|
||||
callback.writeSuccess();
|
||||
}
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Exception while notifying success of callback " + callback,x);
|
||||
}
|
||||
}
|
||||
|
||||
protected void onFailure(Throwable x)
|
||||
{
|
||||
LOG.warn(x);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
ByteBuffer aggregate = flusher.aggregate;
|
||||
return String.format("%s[queueSize=%d,aggregateSize=%d,failure=%s]",getClass().getSimpleName(),queue.size(),aggregate == null?0:aggregate.position(),
|
||||
failure);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.common;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -26,6 +27,7 @@ import java.util.Arrays;
|
|||
import java.util.List;
|
||||
|
||||
import org.eclipse.jetty.util.StringUtil;
|
||||
import org.eclipse.jetty.websocket.api.ProtocolException;
|
||||
import org.eclipse.jetty.websocket.api.StatusCode;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
|
||||
|
@ -39,10 +41,15 @@ import org.eclipse.jetty.websocket.common.test.UnitGenerator;
|
|||
import org.eclipse.jetty.websocket.common.test.UnitParser;
|
||||
import org.eclipse.jetty.websocket.common.util.Hex;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
public class ParserTest
|
||||
{
|
||||
@Rule
|
||||
public ExpectedException expectedException = ExpectedException.none();
|
||||
|
||||
/**
|
||||
* Similar to the server side 5.15 testcase. A normal 2 fragment text text message, followed by another continuation.
|
||||
*/
|
||||
|
@ -61,11 +68,9 @@ public class ParserTest
|
|||
IncomingFramesCapture capture = new IncomingFramesCapture();
|
||||
parser.setIncomingFramesHandler(capture);
|
||||
|
||||
expectedException.expect(ProtocolException.class);
|
||||
expectedException.expectMessage(containsString("CONTINUATION frame without prior !FIN"));
|
||||
parser.parseQuietly(completeBuf);
|
||||
|
||||
capture.assertErrorCount(1);
|
||||
capture.assertHasFrame(OpCode.TEXT,1);
|
||||
capture.assertHasFrame(OpCode.CONTINUATION,1);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -83,10 +88,10 @@ public class ParserTest
|
|||
UnitParser parser = new UnitParser();
|
||||
IncomingFramesCapture capture = new IncomingFramesCapture();
|
||||
parser.setIncomingFramesHandler(capture);
|
||||
parser.parseQuietly(completeBuf);
|
||||
|
||||
capture.assertErrorCount(1);
|
||||
capture.assertHasFrame(OpCode.TEXT,1); // fragment 1
|
||||
expectedException.expect(ProtocolException.class);
|
||||
expectedException.expectMessage(containsString("Unexpected TEXT frame"));
|
||||
parser.parseQuietly(completeBuf);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -28,17 +28,21 @@ import java.nio.charset.StandardCharsets;
|
|||
import java.util.Arrays;
|
||||
|
||||
import org.eclipse.jetty.websocket.api.MessageTooLargeException;
|
||||
import org.eclipse.jetty.websocket.api.StatusCode;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
|
||||
import org.eclipse.jetty.websocket.common.test.IncomingFramesCapture;
|
||||
import org.eclipse.jetty.websocket.common.test.UnitParser;
|
||||
import org.eclipse.jetty.websocket.common.util.MaskedByteBuffer;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
public class TextPayloadParserTest
|
||||
{
|
||||
@Rule
|
||||
public ExpectedException expectedException = ExpectedException.none();
|
||||
|
||||
@Test
|
||||
public void testFrameTooLargeDueToPolicy() throws Exception
|
||||
{
|
||||
|
@ -63,19 +67,15 @@ public class TextPayloadParserTest
|
|||
UnitParser parser = new UnitParser(policy);
|
||||
IncomingFramesCapture capture = new IncomingFramesCapture();
|
||||
parser.setIncomingFramesHandler(capture);
|
||||
|
||||
expectedException.expect(MessageTooLargeException.class);
|
||||
parser.parseQuietly(buf);
|
||||
|
||||
capture.assertHasErrors(MessageTooLargeException.class,1);
|
||||
capture.assertHasNoFrames();
|
||||
|
||||
MessageTooLargeException err = (MessageTooLargeException)capture.getErrors().poll();
|
||||
Assert.assertThat("Error.closeCode",err.getStatusCode(),is(StatusCode.MESSAGE_TOO_LARGE));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLongMaskedText() throws Exception
|
||||
{
|
||||
StringBuffer sb = new StringBuffer(); ;
|
||||
StringBuffer sb = new StringBuffer();
|
||||
for (int i = 0; i < 3500; i++)
|
||||
{
|
||||
sb.append("Hell\uFF4f Big W\uFF4Frld ");
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.common.ab;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -39,11 +40,16 @@ import org.eclipse.jetty.websocket.common.test.IncomingFramesCapture;
|
|||
import org.eclipse.jetty.websocket.common.test.UnitGenerator;
|
||||
import org.eclipse.jetty.websocket.common.test.UnitParser;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
public class TestABCase2
|
||||
{
|
||||
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.CLIENT);
|
||||
@Rule
|
||||
public ExpectedException expectedException = ExpectedException.none();
|
||||
|
||||
private WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.CLIENT);
|
||||
|
||||
@Test
|
||||
public void testGenerate125OctetPingCase2_4()
|
||||
|
@ -315,9 +321,10 @@ public class TestABCase2
|
|||
UnitParser parser = new UnitParser(policy);
|
||||
IncomingFramesCapture capture = new IncomingFramesCapture();
|
||||
parser.setIncomingFramesHandler(capture);
|
||||
parser.parseQuietly(expected);
|
||||
|
||||
Assert.assertEquals("error should be returned for too large of ping payload",1,capture.getErrorCount(ProtocolException.class));
|
||||
expectedException.expect(ProtocolException.class);
|
||||
expectedException.expectMessage(containsString("Invalid control frame payload length"));
|
||||
parser.parseQuietly(expected);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -23,16 +23,19 @@ import java.nio.ByteBuffer;
|
|||
import org.eclipse.jetty.util.log.StacklessLogging;
|
||||
import org.eclipse.jetty.websocket.api.ProtocolException;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketException;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
|
||||
import org.eclipse.jetty.websocket.common.Parser;
|
||||
import org.eclipse.jetty.websocket.common.test.IncomingFramesCapture;
|
||||
import org.eclipse.jetty.websocket.common.test.UnitParser;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
public class TestABCase4
|
||||
{
|
||||
@Rule
|
||||
public ExpectedException expectedException = ExpectedException.none();
|
||||
|
||||
private WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.CLIENT);
|
||||
|
||||
@Test
|
||||
|
@ -46,25 +49,13 @@ public class TestABCase4
|
|||
|
||||
IncomingFramesCapture capture = new IncomingFramesCapture();
|
||||
|
||||
try (StacklessLogging logging = new StacklessLogging(Parser.class))
|
||||
try (StacklessLogging ignore = new StacklessLogging(Parser.class))
|
||||
{
|
||||
Parser parser = new UnitParser(policy);
|
||||
parser.setIncomingFramesHandler(capture);
|
||||
try
|
||||
{
|
||||
expectedException.expect(ProtocolException.class);
|
||||
parser.parse(expected);
|
||||
}
|
||||
catch (ProtocolException ignore)
|
||||
{
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
Assert.assertEquals("error on undefined opcode",1,capture.getErrorCount(WebSocketException.class));
|
||||
|
||||
Throwable known = capture.getErrors().poll();
|
||||
|
||||
Assert.assertTrue("undefined option should be in message",known.getMessage().contains("Unknown opcode: 11"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -78,25 +69,13 @@ public class TestABCase4
|
|||
|
||||
IncomingFramesCapture capture = new IncomingFramesCapture();
|
||||
|
||||
try (StacklessLogging logging = new StacklessLogging(Parser.class))
|
||||
try (StacklessLogging ignore = new StacklessLogging(Parser.class))
|
||||
{
|
||||
Parser parser = new UnitParser(policy);
|
||||
parser.setIncomingFramesHandler(capture);
|
||||
try
|
||||
{
|
||||
expectedException.expect(ProtocolException.class);
|
||||
parser.parse(expected);
|
||||
}
|
||||
catch (ProtocolException ignore)
|
||||
{
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
Assert.assertEquals("error on undefined opcode",1,capture.getErrorCount(WebSocketException.class));
|
||||
|
||||
Throwable known = capture.getErrors().poll();
|
||||
|
||||
Assert.assertTrue("undefined option should be in message",known.getMessage().contains("Unknown opcode: 12"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -110,25 +89,13 @@ public class TestABCase4
|
|||
|
||||
IncomingFramesCapture capture = new IncomingFramesCapture();
|
||||
|
||||
try (StacklessLogging logging = new StacklessLogging(Parser.class))
|
||||
try (StacklessLogging ignore = new StacklessLogging(Parser.class))
|
||||
{
|
||||
Parser parser = new UnitParser(policy);
|
||||
parser.setIncomingFramesHandler(capture);
|
||||
try
|
||||
{
|
||||
expectedException.expect(ProtocolException.class);
|
||||
parser.parse(expected);
|
||||
}
|
||||
catch (ProtocolException ignore)
|
||||
{
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
Assert.assertEquals("error on undefined opcode",1,capture.getErrorCount(WebSocketException.class));
|
||||
|
||||
Throwable known = capture.getErrors().poll();
|
||||
|
||||
Assert.assertTrue("undefined option should be in message",known.getMessage().contains("Unknown opcode: 3"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -142,24 +109,12 @@ public class TestABCase4
|
|||
|
||||
IncomingFramesCapture capture = new IncomingFramesCapture();
|
||||
|
||||
try (StacklessLogging logging = new StacklessLogging(Parser.class))
|
||||
try (StacklessLogging ignore = new StacklessLogging(Parser.class))
|
||||
{
|
||||
Parser parser = new UnitParser(policy);
|
||||
parser.setIncomingFramesHandler(capture);
|
||||
try
|
||||
{
|
||||
expectedException.expect(ProtocolException.class);
|
||||
parser.parse(expected);
|
||||
}
|
||||
catch (ProtocolException ignore)
|
||||
{
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
Assert.assertEquals("error on undefined opcode",1,capture.getErrorCount(WebSocketException.class));
|
||||
|
||||
Throwable known = capture.getErrors().poll();
|
||||
|
||||
Assert.assertTrue("undefined option should be in message",known.getMessage().contains("Unknown opcode: 4"));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.common.ab;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -40,11 +39,16 @@ import org.eclipse.jetty.websocket.common.test.UnitGenerator;
|
|||
import org.eclipse.jetty.websocket.common.test.UnitParser;
|
||||
import org.eclipse.jetty.websocket.common.util.Hex;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
public class TestABCase7_3
|
||||
{
|
||||
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.CLIENT);
|
||||
@Rule
|
||||
public ExpectedException expectedException = ExpectedException.none();
|
||||
|
||||
private WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.CLIENT);
|
||||
|
||||
@Test
|
||||
public void testCase7_3_1GenerateEmptyClose()
|
||||
|
@ -83,7 +87,6 @@ public class TestABCase7_3
|
|||
|
||||
Frame pActual = capture.getFrames().poll();
|
||||
Assert.assertThat("CloseFrame.payloadLength",pActual.getPayloadLength(),is(0));
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
@ -104,13 +107,8 @@ public class TestABCase7_3
|
|||
UnitParser parser = new UnitParser(policy);
|
||||
IncomingFramesCapture capture = new IncomingFramesCapture();
|
||||
parser.setIncomingFramesHandler(capture);
|
||||
expectedException.expect(ProtocolException.class);
|
||||
parser.parseQuietly(expected);
|
||||
|
||||
Assert.assertEquals("error on invalid close payload",1,capture.getErrorCount(ProtocolException.class));
|
||||
|
||||
ProtocolException known = (ProtocolException)capture.getErrors().poll();
|
||||
|
||||
Assert.assertThat("Payload.message",known.getMessage(),containsString("Invalid close frame payload length"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -338,12 +336,7 @@ public class TestABCase7_3
|
|||
UnitParser parser = new UnitParser(policy);
|
||||
IncomingFramesCapture capture = new IncomingFramesCapture();
|
||||
parser.setIncomingFramesHandler(capture);
|
||||
expectedException.expect(ProtocolException.class);
|
||||
parser.parseQuietly(expected);
|
||||
|
||||
Assert.assertEquals("error on invalid close payload",1,capture.getErrorCount(ProtocolException.class));
|
||||
|
||||
ProtocolException known = (ProtocolException)capture.getErrors().poll();
|
||||
|
||||
Assert.assertThat("Payload.message",known.getMessage(),containsString("Invalid control frame payload length"));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,7 +26,6 @@ import org.eclipse.jetty.util.log.Log;
|
|||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.util.thread.ExecutorSizedThreadPool;
|
||||
import org.eclipse.jetty.websocket.api.BatchMode;
|
||||
import org.eclipse.jetty.websocket.api.StatusCode;
|
||||
import org.eclipse.jetty.websocket.api.SuspendToken;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
|
||||
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||
|
@ -35,6 +34,7 @@ import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
|
|||
import org.eclipse.jetty.websocket.common.CloseInfo;
|
||||
import org.eclipse.jetty.websocket.common.ConnectionState;
|
||||
import org.eclipse.jetty.websocket.common.LogicalConnection;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketSession;
|
||||
import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
|
||||
import org.junit.rules.TestName;
|
||||
|
||||
|
@ -73,18 +73,14 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram
|
|||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
public void setSession(WebSocketSession session)
|
||||
{
|
||||
close(StatusCode.NORMAL,null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close(int statusCode, String reason)
|
||||
public void onLocalClose(CloseInfo closeInfo)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("close({}, {})",statusCode,reason);
|
||||
CloseInfo close = new CloseInfo(statusCode,reason);
|
||||
ioState.onCloseLocal(close);
|
||||
ioState.onCloseLocal(closeInfo);
|
||||
}
|
||||
|
||||
public void connect()
|
||||
|
|
|
@ -30,7 +30,9 @@ import org.eclipse.jetty.websocket.api.WebSocketPolicy;
|
|||
import org.eclipse.jetty.websocket.api.WriteCallback;
|
||||
import org.eclipse.jetty.websocket.api.extensions.Frame;
|
||||
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
|
||||
import org.eclipse.jetty.websocket.common.CloseInfo;
|
||||
import org.eclipse.jetty.websocket.common.LogicalConnection;
|
||||
import org.eclipse.jetty.websocket.common.WebSocketSession;
|
||||
import org.eclipse.jetty.websocket.common.io.IOState;
|
||||
|
||||
public class DummyConnection implements LogicalConnection
|
||||
|
@ -44,12 +46,12 @@ public class DummyConnection implements LogicalConnection
|
|||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
public void setSession(WebSocketSession session)
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close(int statusCode, String reason)
|
||||
public void onLocalClose(CloseInfo close)
|
||||
{
|
||||
}
|
||||
|
||||
|
|
|
@ -58,10 +58,6 @@ public class UnitParser extends Parser
|
|||
{
|
||||
parse(buf);
|
||||
}
|
||||
catch (Exception ignore)
|
||||
{
|
||||
/* ignore */
|
||||
}
|
||||
}
|
||||
|
||||
public void parseSlowly(ByteBuffer buf, int segmentSize)
|
||||
|
|
Loading…
Reference in New Issue