From d520ae5a11e3a3357e594d9a11b5f7f5cca3eebc Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Wed, 23 Jan 2019 09:06:27 +1100 Subject: [PATCH] Issue #2175 cleanups after review Re implemented a queue and flusher above extension stack to serialize frame handling without holding a lock when calling callbacks Signed-off-by: Greg Wilkins --- .../core/internal/WebSocketChannel.java | 64 ++++++++++--------- 1 file changed, 34 insertions(+), 30 deletions(-) diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java index edbc153cb2f..ec4e158044a 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java @@ -65,7 +65,6 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio private final FrameHandler handler; private final Negotiated negotiated; private final boolean demanding; - private final Queue queue = new ArrayDeque<>(); private final Flusher flusher = new Flusher(); private WebSocketConnection connection; @@ -474,7 +473,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio try { - synchronized(this) + synchronized(flusher) { boolean closeConnection = channelState.onOutgoingFrame(frame); @@ -496,7 +495,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio } } - queue.offer(new FrameEntry(frame, callback, batch)); + flusher.queue.offer(new FrameEntry(frame, callback, batch)); } flusher.iterate(); } @@ -521,9 +520,9 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio @Override public void flush(Callback callback) { - synchronized(this) + synchronized(flusher) { - queue.offer(new FrameEntry(FrameFlusher.FLUSH_FRAME, callback, false)); + flusher.queue.offer(new FrameEntry(FrameFlusher.FLUSH_FRAME, callback, false)); } flusher.iterate(); } @@ -762,50 +761,55 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio private class Flusher extends IteratingCallback { + private final Queue queue = new ArrayDeque<>(); FrameEntry entry; @Override protected Action process() throws Throwable { - while(true) + synchronized (this) { - synchronized (this) - { - entry = queue.poll(); - } - if (entry==null) - return Action.IDLE; - - negotiated.getExtensions().sendFrame(entry.frame, this, entry.batch); - return Action.SCHEDULED; + entry = queue.poll(); } + if (entry==null) + return Action.IDLE; + + negotiated.getExtensions().sendFrame(entry.frame, this, entry.batch); + return Action.SCHEDULED; } + @Override public void succeeded() { - FrameEntry succeeded; - synchronized (this) - { - succeeded = entry; - entry = null; - } - if (succeeded!=null) - succeeded.callback.succeeded(); + entry.callback.succeeded(); super.succeeded(); } @Override - public void failed(Throwable x) + protected void onCompleteFailure(Throwable cause) { - FrameEntry failed; + entry.callback.failed(cause); + Queue entries; synchronized (this) { - failed = entry; - entry = null; + entries = new ArrayDeque<>(queue); + queue.clear(); + } + entries.forEach(e-> failEntry(cause, e)); + } + + private void failEntry(Throwable cause, FrameEntry e) + { + try + { + e.callback.failed(cause); + } + catch(Throwable x) + { + if (cause != x) + cause.addSuppressed(x); + LOG.warn(cause); } - if (failed!=null) - failed.callback.failed(x); - super.failed(x); } }