diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java index 95912ffeb1b..edbc153cb2f 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java @@ -23,13 +23,16 @@ import java.net.SocketAddress; import java.net.SocketTimeoutException; import java.net.URI; import java.time.Duration; +import java.util.ArrayDeque; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.concurrent.Executor; import java.util.concurrent.TimeoutException; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.Utf8Appendable; import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.log.Log; @@ -62,6 +65,8 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio private final FrameHandler handler; private final Negotiated negotiated; private final boolean demanding; + private final Queue queue = new ArrayDeque<>(); + private final Flusher flusher = new Flusher(); private WebSocketConnection connection; private boolean autoFragment = WebSocketConstants.DEFAULT_AUTO_FRAGMENT; @@ -287,13 +292,12 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio public void onClosed(Throwable cause) { - CloseStatus closeStatus = new CloseStatus(CloseStatus.NO_CLOSE, cause == null?null:cause.toString()); if (channelState.onClosed(closeStatus)) - onClosed(cause, closeStatus); + closeConnection(cause, closeStatus); } - public void onClosed(Throwable cause, CloseStatus closeStatus) + public void closeConnection(Throwable cause, CloseStatus closeStatus) { connection.cancelDemand(); @@ -306,7 +310,8 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio } catch (Throwable e) { - cause.addSuppressed(e); + if (e != cause) + cause.addSuppressed(e); LOG.warn(cause); } } @@ -352,12 +357,15 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio */ public void processConnectionError(Throwable cause) { + if (LOG.isDebugEnabled()) + LOG.debug("processConnectionError {} {}", this, cause); + CloseStatus closeStatus = abnormalCloseStatusFor(cause); if (closeStatus.getCode() == CloseStatus.PROTOCOL) close(closeStatus, Callback.NOOP, false); else if (channelState.onClosed(closeStatus)) - onClosed(cause, closeStatus); + closeConnection(cause, closeStatus); } /** @@ -368,6 +376,9 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio */ public void processHandlerError(Throwable cause) { + if (LOG.isDebugEnabled()) + LOG.debug("processHandlerError {} {}", this, cause); + close(abnormalCloseStatusFor(cause), Callback.NOOP, false); } @@ -448,71 +459,63 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio @Override public void sendFrame(Frame frame, Callback callback, boolean batch) { - synchronized(this) + if (LOG.isDebugEnabled()) + LOG.debug("sendFrame({}, {}, {})", frame, callback, batch); + + try { - if (LOG.isDebugEnabled()) - LOG.debug("sendFrame({}, {}, {})", frame, callback, batch); + assertValidOutgoing(frame); + } + catch (Throwable ex) + { + callback.failed(ex); + return; + } - try + try + { + synchronized(this) { - assertValidOutgoing(frame); - } - catch (Throwable ex) - { - callback.failed(ex); - return; - } + boolean closeConnection = channelState.onOutgoingFrame(frame); - boolean closeConnection; - try - { - closeConnection = channelState.onOutgoingFrame(frame); - } - catch (Throwable ex) - { - try + if (frame.getOpCode() == OpCode.CLOSE) { - callback.failed(ex); - } - finally - { - if (frame.getOpCode() == OpCode.CLOSE && CloseStatus.getCloseStatus(frame) instanceof AbnormalCloseStatus) + if (LOG.isDebugEnabled()) + LOG.debug("close({}, {}, {})", CloseStatus.getCloseStatus(frame), callback, batch); + + if (closeConnection) { - try + callback = new Callback.Nested(callback) { - handler.onClosed(CloseStatus.getCloseStatus(frame)); - } - finally - { - connection.close(); - } + @Override + public void completed() + { + closeConnection(null, channelState.getCloseStatus()); + } + }; } } - return; + + queue.offer(new FrameEntry(frame, callback, batch)); } - - - if (frame.getOpCode() == OpCode.CLOSE) + flusher.iterate(); + } + catch (Throwable ex) + { + try { - if (LOG.isDebugEnabled()) - LOG.debug("close({}, {}, {})", CloseStatus.getCloseStatus(frame), callback, batch); - - if (closeConnection) + callback.failed(ex); + } + finally + { + if (frame.getOpCode() == OpCode.CLOSE) { - callback = new Callback.Nested(callback) - { - @Override - public void completed() - { - onClosed(null, channelState.getCloseStatus()); - } - }; + CloseStatus closeStatus = CloseStatus.getCloseStatus(frame); + if (closeStatus instanceof AbnormalCloseStatus) + closeConnection(null, closeStatus); } } - - negotiated.getExtensions().sendFrame(frame, callback, batch); } - connection.sendFrameQueue(); } @Override @@ -520,9 +523,9 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio { synchronized(this) { - negotiated.getExtensions().sendFrame(FrameFlusher.FLUSH_FRAME, callback, false); + queue.offer(new FrameEntry(FrameFlusher.FLUSH_FRAME, callback, false)); } - connection.sendFrameQueue(); + flusher.iterate(); } @Override @@ -756,4 +759,54 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio super(statusCode, reasonPhrase); } } + + private class Flusher extends IteratingCallback + { + FrameEntry entry; + + @Override + protected Action process() throws Throwable + { + while(true) + { + synchronized (this) + { + entry = queue.poll(); + } + if (entry==null) + return Action.IDLE; + + negotiated.getExtensions().sendFrame(entry.frame, this, entry.batch); + return Action.SCHEDULED; + } + } + @Override + public void succeeded() + { + FrameEntry succeeded; + synchronized (this) + { + succeeded = entry; + entry = null; + } + if (succeeded!=null) + succeeded.callback.succeeded(); + super.succeeded(); + } + + @Override + public void failed(Throwable x) + { + FrameEntry failed; + synchronized (this) + { + failed = entry; + entry = null; + } + if (failed!=null) + failed.callback.failed(x); + super.failed(x); + } + } + } diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketConnection.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketConnection.java index cdd40ed3e8a..3d5757542f7 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketConnection.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketConnection.java @@ -587,7 +587,6 @@ public class WebSocketConnection extends AbstractConnection implements Connectio /** * Enqueue a Frame to be sent. - * @see #sendFrameQueue() * @param frame The frame to queue * @param callback The callback to call once the frame is sent * @param batch True if batch mode is to be used @@ -602,10 +601,6 @@ public class WebSocketConnection extends AbstractConnection implements Connectio wsf.setMask(mask); } flusher.enqueue(frame, callback, batch); - } - - void sendFrameQueue() - { flusher.iterate(); }