From e308f843db4e0576abf23e4f3395842449487065 Mon Sep 17 00:00:00 2001 From: Joakim Erdfelt Date: Fri, 14 Dec 2012 16:11:28 -0700 Subject: [PATCH] Replacing FrameBytes tree with WriteBytesProvider class --- .../websocket/common/events/EventDriver.java | 2 +- .../io/AbstractWebSocketConnection.java | 382 +++--------------- .../jetty/websocket/common/io/FrameQueue.java | 93 ----- .../common/io/FutureWriteCallback.java | 6 + .../jetty/websocket/common/io/IOState.java | 47 +-- .../common/io/WriteBytesProvider.java | 111 +++-- .../server/WebSocketServerConnection.java | 3 +- .../websocket/server/ab/TestABCase2.java | 1 - .../server/blockhead/BlockheadClient.java | 4 +- .../test/resources/jetty-logging.properties | 4 +- .../java/com/acme/WebSocketChatServlet.java | 11 +- 11 files changed, 167 insertions(+), 497 deletions(-) delete mode 100644 jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameQueue.java diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/EventDriver.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/EventDriver.java index 2206c7123f3..1faa2a69f78 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/EventDriver.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/events/EventDriver.java @@ -104,7 +104,7 @@ public abstract class EventDriver implements IncomingFrames onClose(close); // process handshake - if (session.getConnection().getIOState().onCloseHandshake(true,close)) + if (session.getConnection().getIOState().onCloseHandshake(true)) { // handshake resolved, disconnect. session.getConnection().disconnect(); diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java index 1626d6c5dfc..5a64a45a1ac 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java @@ -26,13 +26,11 @@ import java.util.List; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; -import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.ForkInvoker; @@ -52,7 +50,6 @@ import org.eclipse.jetty.websocket.common.CloseInfo; import org.eclipse.jetty.websocket.common.ConnectionState; import org.eclipse.jetty.websocket.common.Generator; import org.eclipse.jetty.websocket.common.LogicalConnection; -import org.eclipse.jetty.websocket.common.OpCode; import org.eclipse.jetty.websocket.common.Parser; import org.eclipse.jetty.websocket.common.WebSocketSession; @@ -61,139 +58,19 @@ import org.eclipse.jetty.websocket.common.WebSocketSession; */ public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection { - private class ControlFrameBytes extends FrameBytes + private class FlushCallback implements Callback { - private ByteBuffer buffer; - private ByteBuffer origPayload; - - public ControlFrameBytes(Frame frame, Callback childCallback) + @Override + public void failed(Throwable x) { - super(frame,childCallback); + LOG.warn("Write flush failure",x); } @Override - public void completeWrite() + public void succeeded() { - if (LOG.isDebugEnabled()) - { - LOG.debug("completeWrite() - frame: {}",frame); - } - - if (buffer != null) - { - if (LOG.isDebugEnabled()) - { - LOG.debug("Releasing Buffer: {}",BufferUtil.toDetailString(buffer)); - } - - getBufferPool().release(buffer); - buffer = null; - } - - queue.remove(this); - super.completeFrame(); - - if (frame.getType().getOpCode() == OpCode.CLOSE) - { - CloseInfo close = new CloseInfo(origPayload,false); - // TODO: change into DisconnectWebSocketCallback - onWriteWebSocketClose(close); - } - - getBufferPool().release(origPayload); - } - - @Override - public ByteBuffer getByteBuffer() - { - synchronized (queue) - { - state.set(STARTED); - if (buffer == null) - { - if (frame.hasPayload()) - { - int len = frame.getPayload().remaining(); - origPayload = getBufferPool().acquire(len,false); - BufferUtil.put(frame.getPayload(),origPayload); - } - buffer = getGenerator().generate(frame); - } - } - return buffer; - } - } - - private class DataFrameBytes extends FrameBytes - { - private ByteBuffer buffer; - - public DataFrameBytes(Frame frame, Callback childCallback) - { - super(frame,childCallback); - } - - @Override - public void completeWrite() - { - if (LOG.isDebugEnabled()) - { - LOG.debug("completeWrite() - frame.remaining() = {}",frame.remaining()); - } - - if (buffer != null) - { - if (LOG.isDebugEnabled()) - { - LOG.debug("Releasing Buffer: {}",BufferUtil.toDetailString(buffer)); - } - - getBufferPool().release(buffer); - buffer = null; - } - - if (frame.remaining() > 0) - { - LOG.debug("More to send"); - // We have written a partial frame per windowing size. - // We need to keep the correct ordering of frames, to avoid that another - // Data frame for the same stream is written before this one is finished. - super.completeWrite(); - } - else - { - LOG.debug("Send complete"); - synchronized (queue) - { - queue.remove(this); - } - // TODO: Notify the rest of the callback chain (extension, close/disconnect, and user callbacks) - completeFrame(); - } - } - - @Override - public ByteBuffer getByteBuffer() - { - try - { - synchronized (queue) - { - state.set(STARTED); - int windowSize = getInputBufferSize(); - buffer = getGenerator().generate(windowSize,frame); - if (LOG.isDebugEnabled()) - { - LOG.debug("getByteBuffer() - {}",BufferUtil.toDetailString(buffer)); - } - return buffer; - } - } - catch (Throwable x) - { - failFrame(x); - return null; - } + // Lets process the next set of bytes... + AbstractWebSocketConnection.this.complete(writeBytes); } } @@ -207,7 +84,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp @Override public void call(Callback callback) { - callback.succeeded(); flush(); } @@ -219,7 +95,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp @Override public void run() { - callback.succeeded(); flush(); } }); @@ -232,154 +107,22 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp } } - public abstract class FrameBytes implements Callback + private class OnCloseCallback implements WriteCallback { - // no bytes have yet been flushed - public int UNSTARTED = 0; - // some bytes have been provided for being flushed - public int STARTED = 1; - // all bytes have been flushed - public int FINISHED = 2; - // is in failure state - public int FAILED = 3; - - protected final Logger LOG; - protected final Frame frame; - protected final Callback childCallback; - protected final AtomicInteger state = new AtomicInteger(UNSTARTED); - - public FrameBytes(Frame frame, Callback childCallback) + @Override + public void writeFailed(Throwable x) { - this.LOG = Log.getLogger(this.getClass()); - this.frame = frame; - this.childCallback = childCallback; - } - - public void completeFrame() - { - LOG.debug("completeFrame() {}",this); - synchronized (queue) - { - state.set(FINISHED); - if (LOG.isDebugEnabled()) - { - LOG.debug("Completed Write of {} ({} frame(s) in queue)",this,queue.size()); - } - flushing = false; - } - AbstractWebSocketConnection.this.complete(childCallback); - } - - public void completeWrite() - { - // handle reflush. - if (isUnfinished()) - { - AbstractWebSocketConnection.this.complete(this); - } + disconnect(); } @Override - public boolean equals(Object obj) + public void writeSuccess() { - if (this == obj) - { - return true; - } - if (obj == null) - { - return false; - } - if (getClass() != obj.getClass()) - { - return false; - } - FrameBytes other = (FrameBytes)obj; - if (frame == null) - { - if (other.frame != null) - { - return false; - } - } - else if (!frame.equals(other.frame)) - { - return false; - } - return true; - } - - /** - * Entry point for EndPoint.write failure - */ - @Override - public void failed(Throwable x) - { - // Log failure - if (x instanceof EofException) - { - // Abbreviate the EofException - LOG.warn("failed() - " + EofException.class); - } - else - { - LOG.warn("failed()",x); - } - synchronized (queue) - { - state.set(FAILED); - flushing = false; - queue.fail(x); - } - failFrame(x); - } - - public void failFrame(Throwable t) - { - failed(t); - flush(); - } - - public abstract ByteBuffer getByteBuffer(); - - @Override - public int hashCode() - { - final int prime = 31; - int result = 1; - result = (prime * result) + ((frame == null)?0:frame.hashCode()); - return result; - } - - /** - * If the FrameBytes have been started, but not yet finished - * - * @return - */ - public boolean isUnfinished() - { - return (state.get() == STARTED); - } - - /** - * Entry point for EndPoint.write success - */ - @Override - public void succeeded() - { - LOG.debug("succeeded() {}",this); - completeWrite(); - } - - @Override - public String toString() - { - return frame.toString(); + onWriteWebSocketClose(); } } private static final Logger LOG = Log.getLogger(AbstractWebSocketConnection.class); - private static final Logger LOG_FRAMES = Log.getLogger("org.eclipse.jetty.websocket.io.Frames"); private final ForkInvoker invoker = new FlushInvoker(); private final ByteBufferPool bufferPool; @@ -387,7 +130,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp private final Generator generator; private final Parser parser; private final WebSocketPolicy policy; - private final FrameQueue queue = new FrameQueue(); + private final WriteBytesProvider writeBytes; private final AtomicBoolean suspendToken; private WebSocketSession session; private List extensions; @@ -407,6 +150,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp this.suspendToken = new AtomicBoolean(false); this.ioState = new IOState(); this.ioState.setState(ConnectionState.CONNECTING); + this.writeBytes = new WriteBytesProvider(generator,new FlushCallback()); } @Override @@ -423,14 +167,18 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp public void complete(final Callback callback) { - if (callback == null) + LOG.debug("complete({})",callback); + synchronized (writeBytes) + { + flushing = false; + } + + if (!ioState.isOpen() || (callback == null)) { return; } - if (ioState.isOpen()) - { - invoker.invoke(callback); - } + + invoker.invoke(callback); } @Override @@ -465,9 +213,19 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp */ private void enqueClose(int statusCode, String reason) { + synchronized (writeBytes) + { + // It is possible to get close events from many different sources. + // Make sure we only sent 1 over the network. + if (writeBytes.isClosed()) + { + // already sent the close + return; + } + } CloseInfo close = new CloseInfo(statusCode,reason); // TODO: create DisconnectCallback? - outgoingFrame(close.asFrame(),null); + outgoingFrame(close.asFrame(),new OnCloseCallback()); } private void execute(Runnable task) @@ -484,39 +242,34 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp public void flush() { - FrameBytes frameBytes = null; ByteBuffer buffer = null; - synchronized (queue) + synchronized (writeBytes) { - if (queue.isFailed()) + if (writeBytes.isFailed()) { LOG.debug(".flush() - queue is in failed state"); return; } - if (flushing || queue.isEmpty()) + if (flushing) { return; } if (LOG.isDebugEnabled()) { - LOG.debug(".flush() - flushing={} - queue.size = {}",flushing,queue.size()); + LOG.debug(".flush() - flushing={} - writeBytes={}",flushing,writeBytes); } - frameBytes = queue.peek(); - if (!isOpen()) { // No longer have an open connection, drop them all. - queue.fail(new WebSocketException("Connection closed")); + writeBytes.failAll(new WebSocketException("Connection closed")); return; } - LOG.debug("Next FrameBytes: {}",frameBytes); - - buffer = frameBytes.getByteBuffer(); + buffer = writeBytes.getByteBuffer(); if (buffer == null) { @@ -527,11 +280,11 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp if (LOG.isDebugEnabled()) { - LOG.debug("Flushing {}, {} frame(s) in queue",frameBytes,queue.size()); + LOG.debug("Flushing {} - {}",BufferUtil.toDetailString(buffer),writeBytes); } } - write(buffer,frameBytes); + write(buffer); } public ByteBufferPool getBufferPool() @@ -653,9 +406,9 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp return true; } - public void onWriteWebSocketClose(CloseInfo close) + public void onWriteWebSocketClose() { - if (ioState.onCloseHandshake(false,close)) + if (ioState.onCloseHandshake(false)) { disconnect(); } @@ -669,34 +422,17 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp { if (LOG.isDebugEnabled()) { - LOG.debug("outgoingFrame({}, callback)",frame); + LOG.debug("outgoingFrame({}, {})",frame,callback); } - synchronized (queue) + if (!isOpen()) { - FrameBytes bytes = null; - Callback jettyCallback = WriteCallbackWrapper.wrap(callback); + return; + } - if (frame.getType().isControl()) - { - bytes = new ControlFrameBytes(frame,jettyCallback); - } - else - { - bytes = new DataFrameBytes(frame,jettyCallback); - } - - if (isOpen()) - { - if (frame.getType().getOpCode() == OpCode.PING) - { - queue.prepend(bytes); - } - else - { - queue.append(bytes); - } - } + synchronized (writeBytes) + { + writeBytes.enque(frame,WriteCallbackWrapper.wrap(callback)); } flush(); @@ -784,29 +520,23 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp return String.format("%s{g=%s,p=%s}",super.toString(),generator,parser); } - private void write(ByteBuffer buffer, FrameBytes frameBytes) + private void write(ByteBuffer buffer) { EndPoint endpoint = getEndPoint(); - if (LOG_FRAMES.isDebugEnabled()) - { - LOG_FRAMES.debug("{} Writing {} of {} ",policy.getBehavior(),BufferUtil.toDetailString(buffer),frameBytes); - } - if (!isOpen()) { - // connection is closed, STOP WRITING, geez. - frameBytes.failed(new WebSocketException("Connection closed")); + writeBytes.failAll(new IOException("Connection closed")); return; } try { - endpoint.write(frameBytes,buffer); + endpoint.write(writeBytes,buffer); } catch (Throwable t) { - frameBytes.failed(t); + writeBytes.failed(t); } } } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameQueue.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameQueue.java deleted file mode 100644 index 8ad563ae088..00000000000 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FrameQueue.java +++ /dev/null @@ -1,93 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// -// You may elect to redistribute this code under either of these licenses. -// ======================================================================== -// - -package org.eclipse.jetty.websocket.common.io; - -import java.util.LinkedList; - -import org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.FrameBytes; - -/** - * Queue for outgoing frames. - */ -@SuppressWarnings("serial") -public class FrameQueue extends LinkedList -{ - private Throwable failure; - - public void append(FrameBytes bytes) - { - synchronized (this) - { - if (isFailed()) - { - // no changes when failed - bytes.failed(failure); - return; - } - addLast(bytes); - } - } - - public synchronized void fail(Throwable t) - { - synchronized (this) - { - if (isFailed()) - { - // already failed. - return; - } - - failure = t; - - for (FrameBytes fb : this) - { - fb.failed(failure); - } - - clear(); - } - } - - public Throwable getFailure() - { - return failure; - } - - public boolean isFailed() - { - return (failure != null); - } - - public void prepend(FrameBytes bytes) - { - synchronized (this) - { - if (isFailed()) - { - // no changes when failed - bytes.failed(failure); - return; - } - - // TODO: make sure that we don't go in front of started but not yet finished frames. - addFirst(bytes); - } - } -} diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FutureWriteCallback.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FutureWriteCallback.java index 6e2a8c57ac7..6b6e17cf620 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FutureWriteCallback.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/FutureWriteCallback.java @@ -21,6 +21,8 @@ package org.eclipse.jetty.websocket.common.io; import java.util.concurrent.Future; import org.eclipse.jetty.util.FutureCallback; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.websocket.api.WriteCallback; /** @@ -28,15 +30,19 @@ import org.eclipse.jetty.websocket.api.WriteCallback; */ public class FutureWriteCallback extends FutureCallback implements WriteCallback { + private static final Logger LOG = Log.getLogger(FutureWriteCallback.class); + @Override public void writeFailed(Throwable cause) { + LOG.debug(".writeFailed",cause); failed(cause); } @Override public void writeSuccess() { + LOG.debug(".writeSuccess"); succeeded(); } } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/IOState.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/IOState.java index aa92630ff4a..a4589602dbe 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/IOState.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/IOState.java @@ -23,7 +23,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -import org.eclipse.jetty.websocket.common.CloseInfo; import org.eclipse.jetty.websocket.common.ConnectionState; /** @@ -39,7 +38,7 @@ public class IOState private final AtomicBoolean cleanClose; private final AtomicBoolean remoteCloseInitiated; private final AtomicBoolean localCloseInitiated; - + public IOState() { this.state = ConnectionState.CONNECTING; @@ -86,6 +85,11 @@ public class IOState return (isInputClosed() && isOutputClosed()); } + public boolean isCloseInitiated() + { + return remoteCloseInitiated.get() || localCloseInitiated.get(); + } + public boolean isInputClosed() { return inputClosed.get(); @@ -110,7 +114,7 @@ public class IOState * the close details. * @return true if connection should be disconnected now, or false if response to close should be issued. */ - public boolean onCloseHandshake(boolean incoming, CloseInfo close) + public boolean onCloseHandshake(boolean incoming) { boolean in = inputClosed.get(); boolean out = outputClosed.get(); @@ -118,7 +122,7 @@ public class IOState { in = true; this.inputClosed.set(true); - + if (!localCloseInitiated.get()) { remoteCloseInitiated.set(true); @@ -128,14 +132,14 @@ public class IOState { out = true; this.outputClosed.set(true); - + if ( !remoteCloseInitiated.get() ) { localCloseInitiated.set(true); } } - LOG.debug("onCloseHandshake({},{}), input={}, output={}",incoming,close,in,out); + LOG.debug("onCloseHandshake({}), input={}, output={}",incoming,in,out); if (in && out) { @@ -144,10 +148,6 @@ public class IOState return true; } - /* - * if (close.isHarsh()) { LOG.debug("Close status code was harsh, disconnecting"); return true; } - */ - return false; } @@ -160,24 +160,19 @@ public class IOState { this.state = state; } - - public boolean isCloseInitiated() - { - return remoteCloseInitiated.get() || localCloseInitiated.get(); - } - - public boolean wasRemoteCloseInitiated() - { - return remoteCloseInitiated.get(); - } - - public boolean wasLocalCloseInitiated() - { - return localCloseInitiated.get(); - } - + public boolean wasCleanClose() { return cleanClose.get(); } + + public boolean wasLocalCloseInitiated() + { + return localCloseInitiated.get(); + } + + public boolean wasRemoteCloseInitiated() + { + return remoteCloseInitiated.get(); + } } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/WriteBytesProvider.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/WriteBytesProvider.java index c610736f2eb..e14ba5dee89 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/WriteBytesProvider.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/WriteBytesProvider.java @@ -18,9 +18,11 @@ package org.eclipse.jetty.websocket.common.io; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.LinkedList; import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.EndPoint; @@ -38,7 +40,6 @@ public class WriteBytesProvider implements Callback { private class FrameEntry { - private final Logger LOG = Log.getLogger(FrameEntry.class); protected final Frame frame; protected final Callback callback; @@ -59,6 +60,8 @@ public class WriteBytesProvider implements Callback } } + private static final Logger LOG = Log.getLogger(WriteBytesProvider.class); + /** The websocket generator */ private final Generator generator; /** Flush callback, for notifying when a flush should be performed */ @@ -73,6 +76,8 @@ public class WriteBytesProvider implements Callback private Throwable failure; /** The last requested buffer */ private ByteBuffer buffer; + /** Is WriteBytesProvider closed to more WriteBytes being enqueued? */ + private AtomicBoolean closed; /** * Create a WriteBytesProvider with specified Generator and "flush" Callback. @@ -89,28 +94,48 @@ public class WriteBytesProvider implements Callback this.generator = Objects.requireNonNull(generator); this.flushCallback = Objects.requireNonNull(flushCallback); this.queue = new LinkedList<>(); + this.closed = new AtomicBoolean(false); } - /** - * Append a Frame & Callback to the pending queue. - * - * @param frame - * the frame to add - * @param callback - * the optional callback for the frame write (can be null) - */ - public void append(Frame frame, Callback callback) + public void enque(Frame frame, Callback callback) { Objects.requireNonNull(frame); + LOG.debug("enque({}, {})",frame,callback); synchronized (this) { + if (closed.get()) + { + // Closed for more frames. + LOG.debug("Write is closed: {}",frame,callback); + if (callback != null) + { + callback.failed(new IOException("Write is closed")); + } + return; + } + if (isFailed()) { // no changes when failed notifyFailure(callback); return; } - queue.addLast(new FrameEntry(frame,callback)); + + FrameEntry entry = new FrameEntry(frame,callback); + + switch (frame.getType()) + { + case PING: + queue.addFirst(entry); + break; + case CLOSE: + closed.set(true); + // drop the rest of the queue? + queue.addLast(entry); + break; + default: + queue.addLast(entry); + } } } @@ -191,6 +216,19 @@ public class WriteBytesProvider implements Callback return failure; } + /** + * Used to test for the final frame possible to be enqueued, the CLOSE frame. + * + * @return true if close frame has been enqueued already. + */ + public boolean isClosed() + { + synchronized (this) + { + return closed.get(); + } + } + public boolean isFailed() { return (failure != null); @@ -211,30 +249,6 @@ public class WriteBytesProvider implements Callback callback.failed(failure); } - /** - * Prepend a Frame & Callback to the pending queue. - * - * @param frame - * the frame to add - * @param callback - * the optional callback for the frame write (can be null) - */ - public void prepend(Frame frame, Callback callback) - { - Objects.requireNonNull(frame); - synchronized (this) - { - if (isFailed()) - { - // no changes when failed - notifyFailure(callback); - return; - } - - queue.addFirst(new FrameEntry(frame,callback)); - } - } - /** * Set the buffer size used for generating ByteBuffers from the frames. *

@@ -267,6 +281,13 @@ public class WriteBytesProvider implements Callback if (active.frame.remaining() <= 0) { // All done with active FrameEntry + if (active.callback != null) + { + // notify of success + active.callback.succeeded(); + } + + // null it out active = null; } @@ -274,4 +295,24 @@ public class WriteBytesProvider implements Callback flushCallback.succeeded(); } } + + @Override + public String toString() + { + StringBuilder b = new StringBuilder(); + b.append("WriteBytesProvider["); + b.append("flushCallback=").append(flushCallback); + if (isFailed()) + { + b.append(",FAILURE=").append(failure.getClass().getName()); + b.append(",").append(failure.getMessage()); + } + else + { + b.append(",active=").append(active); + b.append(",queue.size=").append(queue.size()); + } + b.append(']'); + return b.toString(); + } } diff --git a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerConnection.java b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerConnection.java index 48e30aa2e40..a98e0282cb7 100644 --- a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerConnection.java +++ b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerConnection.java @@ -26,7 +26,6 @@ import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.api.extensions.IncomingFrames; -import org.eclipse.jetty.websocket.common.CloseInfo; import org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection; public class WebSocketServerConnection extends AbstractWebSocketConnection @@ -73,7 +72,7 @@ public class WebSocketServerConnection extends AbstractWebSocketConnection } @Override - public void onWriteWebSocketClose(CloseInfo close) + public void onWriteWebSocketClose() { // as server, always disconnect if writing close disconnect(); diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ab/TestABCase2.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ab/TestABCase2.java index 2096ff702f5..eb768cb2519 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ab/TestABCase2.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/ab/TestABCase2.java @@ -263,7 +263,6 @@ public class TestABCase2 extends AbstractABCase @Test public void testCase2_6() throws Exception { - System.err.println("=================================================================================================="); byte payload[] = new byte[125]; Arrays.fill(payload,(byte)'6'); diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/blockhead/BlockheadClient.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/blockhead/BlockheadClient.java index 29f75db42d8..d9f67e4c91c 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/blockhead/BlockheadClient.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/blockhead/BlockheadClient.java @@ -180,7 +180,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames { CloseInfo close = new CloseInfo(statusCode,message); - if (ioState.onCloseHandshake(false,close)) + if (ioState.onCloseHandshake(false)) { this.disconnect(); } @@ -364,7 +364,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames if (frame.getType() == Frame.Type.CLOSE) { CloseInfo close = new CloseInfo(frame); - if (ioState.onCloseHandshake(true,close)) + if (ioState.onCloseHandshake(true)) { this.disconnect(); } diff --git a/jetty-websocket/websocket-server/src/test/resources/jetty-logging.properties b/jetty-websocket/websocket-server/src/test/resources/jetty-logging.properties index da1ce944c48..151e8c31dc8 100644 --- a/jetty-websocket/websocket-server/src/test/resources/jetty-logging.properties +++ b/jetty-websocket/websocket-server/src/test/resources/jetty-logging.properties @@ -1,8 +1,8 @@ org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog # org.eclipse.jetty.LEVEL=WARN -org.eclipse.jetty.websocket.LEVEL=DEBUG -# org.eclipse.jetty.websocket.LEVEL=INFO +# org.eclipse.jetty.websocket.LEVEL=DEBUG +org.eclipse.jetty.websocket.LEVEL=WARN # org.eclipse.jetty.websocket.common.io.LEVEL=DEBUG # org.eclipse.jetty.websocket.server.ab.LEVEL=DEBUG # org.eclipse.jetty.websocket.common.io.LEVEL=DEBUG diff --git a/tests/test-webapps/test-jetty-webapp/src/main/java/com/acme/WebSocketChatServlet.java b/tests/test-webapps/test-jetty-webapp/src/main/java/com/acme/WebSocketChatServlet.java index 4cac5a22c12..e4c616b053f 100644 --- a/tests/test-webapps/test-jetty-webapp/src/main/java/com/acme/WebSocketChatServlet.java +++ b/tests/test-webapps/test-jetty-webapp/src/main/java/com/acme/WebSocketChatServlet.java @@ -126,15 +126,8 @@ public class WebSocketChatServlet extends WebSocketServlet implements WebSocketC continue; } - try - { - // Async write the message back. - member.connection.write(data); - } - catch (IOException e) - { - getServletContext().log("write failed",e); - } + // Async write the message back. + member.connection.write(data); } }