diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/FrameFlusher.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/FrameFlusher.java index a1039932f48..4f95baabeb5 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/FrameFlusher.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/FrameFlusher.java @@ -49,8 +49,6 @@ public class FrameFlusher extends IteratingCallback private final Deque queue = new ArrayDeque<>(); private final List entries; private final List buffers; - private boolean closed; - private Throwable terminated; private ByteBuffer batchBuffer = null; public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endPoint, int bufferSize, int maxGather) @@ -67,25 +65,14 @@ public class FrameFlusher extends IteratingCallback public void enqueue(Frame frame, Callback callback, boolean batch) { Entry entry = new Entry(frame, callback, batch); - - Throwable closed; + byte opCode = frame.getOpCode(); synchronized (this) { - closed = terminated; - if (closed == null) - { - byte opCode = frame.getOpCode(); - if (opCode == OpCode.PING || opCode == OpCode.PONG) - queue.offerFirst(entry); - else - queue.offerLast(entry); - } + if (opCode == OpCode.PING || opCode == OpCode.PONG) + queue.offerFirst(entry); + else + queue.offerLast(entry); } - - if (closed == null) - iterate(); - else - notifyCallbackFailure(callback, closed); } @Override @@ -102,12 +89,6 @@ public class FrameFlusher extends IteratingCallback if (succeedEntries() && batchBuffer != null) BufferUtil.clear(batchBuffer); - if (closed) - return Action.SUCCEEDED; - - if (terminated != null) - throw terminated; - while (!queue.isEmpty() && entries.size() <= maxGather) { Entry entry = queue.poll(); @@ -220,10 +201,7 @@ public class FrameFlusher extends IteratingCallback notifyCallbackSuccess(entry.callback); entry.release(); if (entry.frame.getOpCode() == OpCode.CLOSE) - { - terminate(new ClosedChannelException(), true); endPoint.shutdownOutput(); - } } entries.clear(); return hadEntries; @@ -233,13 +211,8 @@ public class FrameFlusher extends IteratingCallback public void onCompleteFailure(Throwable failure) { releaseAggregate(); - - Throwable closed; synchronized (this) { - closed = terminated; - if (closed == null) - terminated = failure; entries.addAll(queue); queue.clear(); } @@ -261,22 +234,6 @@ public class FrameFlusher extends IteratingCallback } } - public void terminate(Throwable cause, boolean close) - { - Throwable reason; - synchronized (this) - { - closed = close; - reason = terminated; - if (reason == null) - terminated = cause; - } - if (LOG.isDebugEnabled()) - LOG.debug("{} {}", reason == null?"Terminating":"Terminated", this); - if (reason == null && !close) - iterate(); - } - protected void notifyCallbackSuccess(Callback callback) { try @@ -312,12 +269,11 @@ public class FrameFlusher extends IteratingCallback @Override public String toString() { - return String.format("%s@%x[queueSize=%d,aggregate=%s,terminated=%s]", + return String.format("%s@%x[queueSize=%d,aggregate=%s]", getClass().getSimpleName(), hashCode(), getQueueSize(), - BufferUtil.toDetailString(batchBuffer), - terminated); + BufferUtil.toDetailString(batchBuffer)); } private class Entry extends FrameEntry @@ -353,7 +309,7 @@ public class FrameFlusher extends IteratingCallback @Override public String toString() { - return String.format("%s[%s,%s,%b,%s]", getClass().getSimpleName(), frame, callback, batch, terminated); + return String.format("%s{%s,%s,%b}", getClass().getSimpleName(), frame, callback, batch); } } } diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java index 49c610e1925..c48ac38ee03 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java @@ -81,7 +81,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio this.behavior = behavior; this.negotiated = negotiated; this.demanding = handler.isDemanding(); - negotiated.getExtensions().connect(new ExtendedIncoming(), new ExtendedOutgoing(), this); + negotiated.getExtensions().connect(new IncomingAdaptor(), new OutgoingAdaptor(), this); } /** @@ -509,12 +509,14 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio } negotiated.getExtensions().sendFrame(frame, callback, batch); + connection.sendFrameQueue(); } @Override public void flush(Callback callback) { negotiated.getExtensions().sendFrame(FrameFlusher.FLUSH_FRAME, callback, false); + connection.sendFrameQueue(); } @Override @@ -596,7 +598,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio maxTextMessageSize = maxSize; } - private class ExtendedIncoming extends FrameSequence implements IncomingFrames + private class IncomingAdaptor extends FrameSequence implements IncomingFrames { @Override public void onFrame(Frame frame, Callback callback) @@ -666,14 +668,14 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio } } - private class ExtendedOutgoing implements OutgoingFrames + private class OutgoingAdaptor implements OutgoingFrames { @Override public void sendFrame(Frame frame, Callback callback, boolean batch) { try { - connection.sendFrame(frame, callback, batch); + connection.enqueueFrame(frame, callback, batch); } catch (ProtocolException e) { diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketConnection.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketConnection.java index 2a961ccd098..743642065ff 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketConnection.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketConnection.java @@ -31,7 +31,6 @@ import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.websocket.core.Behavior; import org.eclipse.jetty.websocket.core.Frame; import org.eclipse.jetty.websocket.core.MessageTooLargeException; -import org.eclipse.jetty.websocket.core.OutgoingFrames; import org.eclipse.jetty.websocket.core.ProtocolException; import org.eclipse.jetty.websocket.core.WebSocketTimeoutException; @@ -45,7 +44,7 @@ import java.util.concurrent.Executor; /** * Provides the implementation of {@link org.eclipse.jetty.io.Connection} that is suitable for WebSocket */ -public class WebSocketConnection extends AbstractConnection implements Connection.UpgradeTo, Dumpable, OutgoingFrames, Runnable +public class WebSocketConnection extends AbstractConnection implements Connection.UpgradeTo, Dumpable, Runnable { private final Logger LOG = Log.getLogger(this.getClass()); @@ -170,7 +169,6 @@ public class WebSocketConnection extends AbstractConnection implements Connectio // TODO review all close paths IOException e = new IOException("Closed"); - flusher.terminate(e, true); channel.onClosed(e); super.onClose(); } @@ -577,8 +575,14 @@ public class WebSocketConnection extends AbstractConnection implements Connectio setInitialBuffer(prefilled); } - @Override - public void sendFrame(Frame frame, Callback callback, boolean batch) + /** + * Enqueue a Frame to be sent. + * @see #sendFrameQueue() + * @param frame The frame to queue + * @param callback The callback to call once the frame is sent + * @param batch True if batch mode is to be used + */ + void enqueueFrame(Frame frame, Callback callback, boolean batch) { if (channel.getBehavior() == Behavior.CLIENT) { @@ -590,6 +594,11 @@ public class WebSocketConnection extends AbstractConnection implements Connectio flusher.enqueue(frame, callback, batch); } + void sendFrameQueue() + { + flusher.iterate(); + } + private class Flusher extends FrameFlusher { private Flusher(int bufferSize, Generator generator, EndPoint endpoint)