Issue #2175 cleanups after review

Re implemented a queue and flusher above extension stack to serialize
frame handling without holding a lock when calling callbacks

Signed-off-by: Greg Wilkins <gregw@webtide.com>
This commit is contained in:
Greg Wilkins 2019-01-23 08:49:55 +11:00
parent d598e0dc6f
commit 3d7ee93112
2 changed files with 110 additions and 62 deletions

View File

@ -23,13 +23,16 @@ import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.Utf8Appendable;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
@ -62,6 +65,8 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
private final FrameHandler handler;
private final Negotiated negotiated;
private final boolean demanding;
private final Queue<FrameEntry> queue = new ArrayDeque<>();
private final Flusher flusher = new Flusher();
private WebSocketConnection connection;
private boolean autoFragment = WebSocketConstants.DEFAULT_AUTO_FRAGMENT;
@ -287,13 +292,12 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
public void onClosed(Throwable cause)
{
CloseStatus closeStatus = new CloseStatus(CloseStatus.NO_CLOSE, cause == null?null:cause.toString());
if (channelState.onClosed(closeStatus))
onClosed(cause, closeStatus);
closeConnection(cause, closeStatus);
}
public void onClosed(Throwable cause, CloseStatus closeStatus)
public void closeConnection(Throwable cause, CloseStatus closeStatus)
{
connection.cancelDemand();
@ -306,7 +310,8 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
}
catch (Throwable e)
{
cause.addSuppressed(e);
if (e != cause)
cause.addSuppressed(e);
LOG.warn(cause);
}
}
@ -352,12 +357,15 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
*/
public void processConnectionError(Throwable cause)
{
if (LOG.isDebugEnabled())
LOG.debug("processConnectionError {} {}", this, cause);
CloseStatus closeStatus = abnormalCloseStatusFor(cause);
if (closeStatus.getCode() == CloseStatus.PROTOCOL)
close(closeStatus, Callback.NOOP, false);
else if (channelState.onClosed(closeStatus))
onClosed(cause, closeStatus);
closeConnection(cause, closeStatus);
}
/**
@ -368,6 +376,9 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
*/
public void processHandlerError(Throwable cause)
{
if (LOG.isDebugEnabled())
LOG.debug("processHandlerError {} {}", this, cause);
close(abnormalCloseStatusFor(cause), Callback.NOOP, false);
}
@ -448,71 +459,63 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
@Override
public void sendFrame(Frame frame, Callback callback, boolean batch)
{
synchronized(this)
if (LOG.isDebugEnabled())
LOG.debug("sendFrame({}, {}, {})", frame, callback, batch);
try
{
if (LOG.isDebugEnabled())
LOG.debug("sendFrame({}, {}, {})", frame, callback, batch);
assertValidOutgoing(frame);
}
catch (Throwable ex)
{
callback.failed(ex);
return;
}
try
try
{
synchronized(this)
{
assertValidOutgoing(frame);
}
catch (Throwable ex)
{
callback.failed(ex);
return;
}
boolean closeConnection = channelState.onOutgoingFrame(frame);
boolean closeConnection;
try
{
closeConnection = channelState.onOutgoingFrame(frame);
}
catch (Throwable ex)
{
try
if (frame.getOpCode() == OpCode.CLOSE)
{
callback.failed(ex);
}
finally
{
if (frame.getOpCode() == OpCode.CLOSE && CloseStatus.getCloseStatus(frame) instanceof AbnormalCloseStatus)
if (LOG.isDebugEnabled())
LOG.debug("close({}, {}, {})", CloseStatus.getCloseStatus(frame), callback, batch);
if (closeConnection)
{
try
callback = new Callback.Nested(callback)
{
handler.onClosed(CloseStatus.getCloseStatus(frame));
}
finally
{
connection.close();
}
@Override
public void completed()
{
closeConnection(null, channelState.getCloseStatus());
}
};
}
}
return;
queue.offer(new FrameEntry(frame, callback, batch));
}
if (frame.getOpCode() == OpCode.CLOSE)
flusher.iterate();
}
catch (Throwable ex)
{
try
{
if (LOG.isDebugEnabled())
LOG.debug("close({}, {}, {})", CloseStatus.getCloseStatus(frame), callback, batch);
if (closeConnection)
callback.failed(ex);
}
finally
{
if (frame.getOpCode() == OpCode.CLOSE)
{
callback = new Callback.Nested(callback)
{
@Override
public void completed()
{
onClosed(null, channelState.getCloseStatus());
}
};
CloseStatus closeStatus = CloseStatus.getCloseStatus(frame);
if (closeStatus instanceof AbnormalCloseStatus)
closeConnection(null, closeStatus);
}
}
negotiated.getExtensions().sendFrame(frame, callback, batch);
}
connection.sendFrameQueue();
}
@Override
@ -520,9 +523,9 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
{
synchronized(this)
{
negotiated.getExtensions().sendFrame(FrameFlusher.FLUSH_FRAME, callback, false);
queue.offer(new FrameEntry(FrameFlusher.FLUSH_FRAME, callback, false));
}
connection.sendFrameQueue();
flusher.iterate();
}
@Override
@ -756,4 +759,54 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
super(statusCode, reasonPhrase);
}
}
private class Flusher extends IteratingCallback
{
FrameEntry entry;
@Override
protected Action process() throws Throwable
{
while(true)
{
synchronized (this)
{
entry = queue.poll();
}
if (entry==null)
return Action.IDLE;
negotiated.getExtensions().sendFrame(entry.frame, this, entry.batch);
return Action.SCHEDULED;
}
}
@Override
public void succeeded()
{
FrameEntry succeeded;
synchronized (this)
{
succeeded = entry;
entry = null;
}
if (succeeded!=null)
succeeded.callback.succeeded();
super.succeeded();
}
@Override
public void failed(Throwable x)
{
FrameEntry failed;
synchronized (this)
{
failed = entry;
entry = null;
}
if (failed!=null)
failed.callback.failed(x);
super.failed(x);
}
}
}

View File

@ -587,7 +587,6 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
/**
* Enqueue a Frame to be sent.
* @see #sendFrameQueue()
* @param frame The frame to queue
* @param callback The callback to call once the frame is sent
* @param batch True if batch mode is to be used
@ -602,10 +601,6 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
wsf.setMask(mask);
}
flusher.enqueue(frame, callback, batch);
}
void sendFrameQueue()
{
flusher.iterate();
}