diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/ClientUpgradeResponse.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/ClientUpgradeResponse.java index e5de092b2ae..7c9e971cb93 100644 --- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/ClientUpgradeResponse.java +++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/ClientUpgradeResponse.java @@ -19,19 +19,32 @@ package org.eclipse.jetty.websocket.client; import java.io.IOException; +import java.nio.ByteBuffer; import org.eclipse.jetty.websocket.api.UpgradeResponse; public class ClientUpgradeResponse extends UpgradeResponse { + private ByteBuffer remainingBuffer; + public ClientUpgradeResponse() { super(); } + public ByteBuffer getRemainingBuffer() + { + return remainingBuffer; + } + @Override public void sendForbidden(String message) throws IOException { throw new UnsupportedOperationException("Not supported on client implementation"); } + + public void setRemainingBuffer(ByteBuffer remainingBuffer) + { + this.remainingBuffer = remainingBuffer; + } } diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/HttpResponseHeaderParser.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/HttpResponseHeaderParser.java index 87da0304d42..8b67dfdef44 100644 --- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/HttpResponseHeaderParser.java +++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/HttpResponseHeaderParser.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.Utf8LineParser; import org.eclipse.jetty.websocket.client.ClientUpgradeResponse; @@ -79,6 +80,11 @@ public class HttpResponseHeaderParser { if (parseHeader(line)) { + // Finished parsing entire header + ByteBuffer copy = ByteBuffer.allocate(buf.remaining()); + BufferUtil.put(buf,copy); + BufferUtil.flipToFlush(copy,0); + this.response.setRemainingBuffer(copy); return this.response; } } diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/UpgradeConnection.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/UpgradeConnection.java index e28af0f2793..b37a02a4a83 100644 --- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/UpgradeConnection.java +++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/UpgradeConnection.java @@ -180,6 +180,10 @@ public class UpgradeConnection extends AbstractConnection validateResponse(resp); notifyConnect(resp); upgradeConnection(resp); + if (buffer.hasRemaining()) + { + LOG.debug("Has remaining client bytebuffer of {}",buffer.remaining()); + } return false; // do no more reading } } diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientConnection.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientConnection.java index edbca561730..d1d4a6ef654 100644 --- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientConnection.java +++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientConnection.java @@ -19,7 +19,9 @@ package org.eclipse.jetty.websocket.client.io; import java.net.InetSocketAddress; +import java.nio.ByteBuffer; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.log.Log; @@ -41,13 +43,12 @@ public class WebSocketClientConnection extends AbstractWebSocketConnection private static final Logger LOG = Log.getLogger(WebSocketClientConnection.class); private final ConnectPromise connectPromise; private final Masker masker; - private boolean connected; + private final AtomicBoolean opened = new AtomicBoolean(false); public WebSocketClientConnection(EndPoint endp, Executor executor, ConnectPromise connectPromise) { super(endp,executor,connectPromise.getClient().getScheduler(),connectPromise.getClient().getPolicy(),connectPromise.getClient().getBufferPool()); this.connectPromise = connectPromise; - this.connected = false; this.masker = connectPromise.getMasker(); assert (this.masker != null); } @@ -75,13 +76,20 @@ public class WebSocketClientConnection extends AbstractWebSocketConnection @Override public void onOpen() { - if (!connected) + boolean beenOpened = opened.getAndSet(true); + if (!beenOpened) { WebSocketSession session = getSession(); ConnectionManager connectionManager = connectPromise.getClient().getConnectionManager(); connectionManager.addSession(session); connectPromise.succeeded(session); - connected = true; + + ByteBuffer extraBuf = connectPromise.getResponse().getRemainingBuffer(); + if (extraBuf.hasRemaining()) + { + LOG.debug("Parsing extra remaining buffer from UpgradeConnection"); + getParser().parse(extraBuf); + } } super.onOpen(); } diff --git a/jetty-websocket/websocket-client/src/test/resources/jetty-logging.properties b/jetty-websocket/websocket-client/src/test/resources/jetty-logging.properties index 1aed71b5585..f5e58f45604 100644 --- a/jetty-websocket/websocket-client/src/test/resources/jetty-logging.properties +++ b/jetty-websocket/websocket-client/src/test/resources/jetty-logging.properties @@ -6,15 +6,5 @@ org.eclipse.jetty.LEVEL=WARN # org.eclipse.jetty.websocket.client.TrackingSocket.LEVEL=DEBUG # Hide the stacktraces during testing org.eclipse.jetty.websocket.client.internal.io.UpgradeConnection.STACKS=false -# See the read/write traffic -# org.eclipse.jetty.websocket.io.Frames.LEVEL=DEBUG -# org.eclipse.jetty.websocket.io.LEVEL=DEBUG -# org.eclipse.jetty.websocket.io.WebSocketAsyncConnection.LEVEL=DEBUG # org.eclipse.jetty.io.SelectorManager.LEVEL=INFO # org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection$DataFrameBytes.LEVEL=WARN -# org.eclipse.jetty.websocket.io.ControlFrameBytes.LEVEL=DEBUG -# org.eclipse.jetty.websocket.driver.WebSocketEventDriver.LEVEL=DEBUG -# org.eclipse.jetty.websocket.extensions.LEVEL=DEBUG -# org.eclipse.jetty.websocket.protocol.Generator.LEVEL=INFO -# org.eclipse.jetty.websocket.protocol.Parser.LEVEL=DEBUG -# org.eclipse.jetty.websocket.io.payload.LEVEL=DEBUG diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java index 81fca184431..3c5b6b2dd0d 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.ByteBufferPool; @@ -121,6 +122,24 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp } } + public static class Stats { + private AtomicLong countFillInterestedEvents = new AtomicLong(0); + private AtomicLong countOnFillableEvents = new AtomicLong(0); + private AtomicLong countFillableErrors = new AtomicLong(0); + + public long getFillableErrorCount() { + return countFillableErrors.get(); + } + + public long getFillInterestedCount() { + return countFillInterestedEvents.get(); + } + + public long getOnFillableCount() { + return countOnFillableEvents.get(); + } + } + private static final Logger LOG = Log.getLogger(AbstractWebSocketConnection.class); /** @@ -141,11 +160,11 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp private boolean flushing; private boolean isFilling; private IOState ioState; + private Stats stats = new Stats(); public AbstractWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool) { super(endp,executor,EXECUTE_ONFILLABLE); // TODO review if this is best. Specifically with MUX - endp.setIdleTimeout(policy.getIdleTimeout()); this.policy = policy; this.bufferPool = bufferPool; this.generator = new Generator(policy,bufferPool); @@ -246,6 +265,13 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp } } + @Override + public void fillInterested() + { + stats.countFillInterestedEvents.incrementAndGet(); + super.fillInterested(); + } + public void flush() { ByteBuffer buffer = null; @@ -349,6 +375,11 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp return session; } + public Stats getStats() + { + return stats; + } + @Override public boolean isOpen() { @@ -372,6 +403,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp public void onFillable() { LOG.debug("{} onFillable()",policy.getBehavior()); + stats.countOnFillableEvents.incrementAndGet(); ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),false); BufferUtil.clear(buffer); boolean readMore = false; @@ -395,6 +427,14 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp } } + @Override + protected void onFillInterestedFailed(Throwable cause) + { + LOG.ignore(cause); + stats.countFillInterestedEvents.incrementAndGet(); + super.onFillInterestedFailed(cause); + } + @Override public void onOpen() { @@ -409,7 +449,8 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp { LOG.warn("Read Timeout"); - if ((ioState.getState() == ConnectionState.CLOSING) || (ioState.getState() == ConnectionState.CLOSED)) + IOState state = getIOState(); + if ((state.getState() == ConnectionState.CLOSING) || (state.getState() == ConnectionState.CLOSED)) { // close already initiated, extra timeouts not relevant // allow udnerlying connection and endpoint to disconnect on its own diff --git a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerConnection.java b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerConnection.java index f28ae54739b..afe2e78a632 100644 --- a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerConnection.java +++ b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerConnection.java @@ -20,6 +20,7 @@ package org.eclipse.jetty.websocket.server; import java.net.InetSocketAddress; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EndPoint; @@ -31,14 +32,17 @@ import org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection; public class WebSocketServerConnection extends AbstractWebSocketConnection { private final WebSocketServerFactory factory; - private boolean connected; + private final AtomicBoolean opened = new AtomicBoolean(false); public WebSocketServerConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool, WebSocketServerFactory factory) { super(endp,executor,scheduler,policy,bufferPool); + if (policy.getIdleTimeout() > 0) + { + endp.setIdleTimeout(policy.getIdleTimeout()); + } this.factory = factory; - this.connected = false; } @Override @@ -63,10 +67,10 @@ public class WebSocketServerConnection extends AbstractWebSocketConnection @Override public void onOpen() { - if (!connected) + boolean beenOpened = opened.getAndSet(true); + if (!beenOpened) { factory.sessionOpened(getSession()); - connected = true; } super.onOpen(); }