diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java index ef28131500e..dd7e86f60f7 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameFlusher.java @@ -25,7 +25,6 @@ import java.util.ArrayList; import java.util.Deque; import java.util.List; import java.util.Objects; -import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EndPoint; @@ -296,7 +295,7 @@ public class FrameFlusher private final Object lock = new Object(); private final Deque queue = new ArrayDeque<>(); private final Flusher flusher; - private final AtomicBoolean closed = new AtomicBoolean(); + private boolean closed = false; private volatile Throwable failure; public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint, int bufferSize, int maxGather) @@ -311,75 +310,89 @@ public class FrameFlusher public void close() { - if (closed.compareAndSet(false,true)) + List entries; + EOFException eof; + + synchronized (lock) { - LOG.debug("{} closing {}",this); - EOFException eof = new EOFException("Connection has been closed locally"); + if (closed) + { + // Already closed + return; + } + + closed = true; + LOG.debug("{} closing {}", this); + + eof = new EOFException("Connection has been closed locally"); flusher.failed(eof); // Fail also queued entries. - List entries = new ArrayList<>(); - synchronized (lock) - { - entries.addAll(queue); - queue.clear(); - } - // Notify outside sync block. - for (FrameEntry entry : entries) - { - notifyCallbackFailure(entry.callback,eof); - } + entries = new ArrayList<>(); + entries.addAll(queue); + queue.clear(); + } + + // Notify outside sync block. + for (FrameEntry entry : entries) + { + notifyCallbackFailure(entry.callback, eof); } } public void enqueue(Frame frame, WriteCallback callback, BatchMode batchMode) { - if (closed.get()) - { - notifyCallbackFailure(callback,new EOFException("Connection has been closed locally")); - return; - } - if (flusher.isFailed()) - { - notifyCallbackFailure(callback,failure); - return; - } + Throwable tosser = null; - FrameEntry entry = new FrameEntry(frame,callback,batchMode); - - synchronized (lock) + synchronized(lock) { - switch (frame.getOpCode()) + if (closed) { - case OpCode.PING: + tosser = new EOFException("Connection has been closed locally"); + } + else if (failure != null) + { + tosser = failure; + } + else + { + FrameEntry entry = new FrameEntry(frame,callback,batchMode); + + if (OpCode.isControlFrame(frame.getOpCode())) { - // Prepend PINGs so they are processed first. - queue.offerFirst(entry); - break; + if (frame.getOpCode() == OpCode.CLOSE) + { + // CLOSE after last frame + // There may be a chance that other frames are + // added after this close frame, but we will + // fail them later to keep it simple here. + closed = true; + queue.offer(entry); + } + else + { + // PING and PONG are prepended + queue.offerFirst(entry); + } } - case OpCode.CLOSE: + else { - // There may be a chance that other frames are - // added after this close frame, but we will - // fail them later to keep it simple here. - closed.set(true); + // Data Frame queue.offer(entry); - break; - } - default: - { - queue.offer(entry); - break; } } } - if (LOG.isDebugEnabled()) + if (tosser != null) { - LOG.debug("{} queued {}",this,entry); + // Notify outside of lock + notifyCallbackFailure(callback, tosser); + } + else + { + // Iterate outside of lock + flusher.iterate(); } - - flusher.iterate(); } protected void notifyCallbackFailure(WriteCallback callback, Throwable failure)