From d0251349c52fdfc0ba8718d74c9120f849eca62d Mon Sep 17 00:00:00 2001 From: Joakim Erdfelt Date: Mon, 11 May 2015 13:28:58 -0700 Subject: [PATCH] 467036 - WebSocketClient fails to process immediate frames from server + Using Connection.UpgradeFrom and Connection.UpgradeTo with client connections and endpoints too. --- .../client/ClientUpgradeResponse.java | 8 ++ .../client/io/UpgradeConnection.java | 86 ++++++++++++------- .../client/io/WebSocketClientConnection.java | 12 --- .../io/AbstractWebSocketConnection.java | 54 +++++------- .../io/http/HttpResponseHeaderParser.java | 4 +- .../server/WebSocketServerConnection.java | 12 --- .../server/WebSocketServerFactory.java | 11 ++- 7 files changed, 92 insertions(+), 95 deletions(-) diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/ClientUpgradeResponse.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/ClientUpgradeResponse.java index 0c468765d24..10dfc9635bf 100644 --- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/ClientUpgradeResponse.java +++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/ClientUpgradeResponse.java @@ -21,11 +21,15 @@ package org.eclipse.jetty.websocket.client; import java.io.IOException; import java.nio.ByteBuffer; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.websocket.api.UpgradeResponse; import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParseListener; public class ClientUpgradeResponse extends UpgradeResponse implements HttpResponseHeaderParseListener { + private static final Logger LOG = Log.getLogger(ClientUpgradeResponse.class); private ByteBuffer remainingBuffer; public ClientUpgradeResponse() @@ -47,6 +51,10 @@ public class ClientUpgradeResponse extends UpgradeResponse implements HttpRespon @Override public void setRemainingBuffer(ByteBuffer remainingBuffer) { + if (LOG.isDebugEnabled()) + { + LOG.debug("Saving remaining header: {}",BufferUtil.toDetailString(remainingBuffer)); + } this.remainingBuffer = remainingBuffer; } } diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/UpgradeConnection.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/UpgradeConnection.java index f300e7e027a..8078a22f581 100644 --- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/UpgradeConnection.java +++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/UpgradeConnection.java @@ -29,6 +29,7 @@ import java.util.concurrent.Executor; import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.FutureCallback; @@ -49,11 +50,13 @@ import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParser; import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParser.ParseException; /** - * This is the initial connection handling that exists immediately after physical connection is established to destination server. + * This is the initial connection handling that exists immediately after physical connection is established to + * destination server. *

- * Eventually, upon successful Upgrade request/response, this connection swaps itself out for the WebSocektClientConnection handler. + * Eventually, upon successful Upgrade request/response, this connection swaps itself out for the + * WebSocektClientConnection handler. */ -public class UpgradeConnection extends AbstractConnection +public class UpgradeConnection extends AbstractConnection implements Connection.UpgradeFrom { public class SendUpgradeRequest extends FutureCallback implements Runnable { @@ -71,7 +74,7 @@ public class UpgradeConnection extends AbstractConnection String rawRequest = request.generate(); - ByteBuffer buf = BufferUtil.toBuffer(rawRequest, StandardCharsets.UTF_8); + ByteBuffer buf = BufferUtil.toBuffer(rawRequest,StandardCharsets.UTF_8); getEndPoint().write(this,buf); } @@ -81,6 +84,7 @@ public class UpgradeConnection extends AbstractConnection LOG.debug("Upgrade Request Write Success"); // Writing the request header is complete. super.succeeded(); + state = State.RESPONSE; // start the interest in fill fillInterested(); } @@ -88,8 +92,9 @@ public class UpgradeConnection extends AbstractConnection @Override public void failed(Throwable cause) { - LOG.warn("Upgrade Request Write Failure", cause); + LOG.warn("Upgrade Request Write Failure",cause); super.failed(cause); + state = State.FAILURE; // Fail the connect promise when a fundamental exception during connect occurs. connectPromise.failed(cause); } @@ -98,11 +103,21 @@ public class UpgradeConnection extends AbstractConnection /** HTTP Response Code: 101 Switching Protocols */ private static final int SWITCHING_PROTOCOLS = 101; + private enum State + { + REQUEST, + RESPONSE, + FAILURE, + UPGRADE + } + private static final Logger LOG = Log.getLogger(UpgradeConnection.class); private final ByteBufferPool bufferPool; private final ConnectPromise connectPromise; private final HttpResponseHeaderParser parser; + private State state = State.REQUEST; private ClientUpgradeRequest request; + private ClientUpgradeResponse response; public UpgradeConnection(EndPoint endp, Executor executor, ConnectPromise connectPromise) { @@ -147,6 +162,12 @@ public class UpgradeConnection extends AbstractConnection handshakeListener.onHandshakeResponse(response); } } + + @Override + public ByteBuffer onUpgradeFrom() + { + return connectPromise.getResponse().getRemainingBuffer(); + } @Override public void onFillable() @@ -157,20 +178,25 @@ public class UpgradeConnection extends AbstractConnection } ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),false); BufferUtil.clear(buffer); - boolean readMore = false; try { - readMore = read(buffer); + read(buffer); } finally { bufferPool.release(buffer); } - if (readMore) + if (state == State.RESPONSE) { + // Continue Reading fillInterested(); } + else if (state == State.UPGRADE) + { + // Stop Reading, upgrade the connection now + upgradeConnection(response); + } } @Override @@ -179,7 +205,7 @@ public class UpgradeConnection extends AbstractConnection super.onOpen(); getExecutor().execute(new SendUpgradeRequest()); } - + @Override public void onClose() { @@ -189,7 +215,7 @@ public class UpgradeConnection extends AbstractConnection } super.onClose(); } - + @Override protected boolean onReadTimeout() { @@ -197,9 +223,9 @@ public class UpgradeConnection extends AbstractConnection { LOG.warn("Timeout on connection {}",this); } - + failUpgrade(new IOException("Timeout while performing WebSocket Upgrade")); - + return super.onReadTimeout(); } @@ -208,9 +234,8 @@ public class UpgradeConnection extends AbstractConnection * * @param buffer * the buffer to fill into from the endpoint - * @return true if there is more to read, false if reading should stop */ - private boolean read(ByteBuffer buffer) + private void read(ByteBuffer buffer) { EndPoint endPoint = getEndPoint(); try @@ -220,13 +245,14 @@ public class UpgradeConnection extends AbstractConnection int filled = endPoint.fill(buffer); if (filled == 0) { - return true; + return; } else if (filled < 0) { LOG.warn("read - EOF Reached"); + state = State.FAILURE; failUpgrade(new EOFException("Reading WebSocket Upgrade response")); - return false; + return; } else { @@ -234,34 +260,32 @@ public class UpgradeConnection extends AbstractConnection { LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer)); } - ClientUpgradeResponse resp = (ClientUpgradeResponse)parser.parse(buffer); - if (resp != null) + response = (ClientUpgradeResponse)parser.parse(buffer); + if (response != null) { // Got a response! - validateResponse(resp); - notifyConnect(resp); - upgradeConnection(resp); - if (buffer.hasRemaining()) - { - LOG.debug("Has remaining client bytebuffer of {}",buffer.remaining()); - } - return false; // do no more reading + validateResponse(response); + notifyConnect(response); + state = State.UPGRADE; + return; // do no more reading } } } } catch (IOException | ParseException e) { + LOG.ignore(e); + state = State.FAILURE; UpgradeException ue = new UpgradeException(request.getRequestURI(),e); connectPromise.failed(ue); disconnect(false); - return false; } catch (UpgradeException e) { + LOG.ignore(e); + state = State.FAILURE; connectPromise.failed(e); disconnect(false); - return false; } } @@ -269,7 +293,7 @@ public class UpgradeConnection extends AbstractConnection { EndPoint endp = getEndPoint(); Executor executor = getExecutor(); - + EventDriver websocket = connectPromise.getDriver(); WebSocketPolicy policy = websocket.getPolicy(); @@ -301,9 +325,7 @@ public class UpgradeConnection extends AbstractConnection connectPromise.getClient().addManaged(session); // Now swap out the connection - // TODO use endp.upgrade ??? - endp.setConnection(connection); - connection.onOpen(); + endp.upgrade(connection); } private void validateResponse(ClientUpgradeResponse response) diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientConnection.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientConnection.java index 21d34a65b51..4ed122b2ff6 100644 --- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientConnection.java +++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientConnection.java @@ -19,13 +19,10 @@ package org.eclipse.jetty.websocket.client.io; import java.net.InetSocketAddress; -import java.nio.ByteBuffer; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.jetty.io.EndPoint; -import org.eclipse.jetty.util.log.Log; -import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.api.WriteCallback; @@ -41,7 +38,6 @@ import org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection; */ public class WebSocketClientConnection extends AbstractWebSocketConnection { - private static final Logger LOG = Log.getLogger(WebSocketClientConnection.class); private final ConnectPromise connectPromise; private final Masker masker; private final AtomicBoolean opened = new AtomicBoolean(false); @@ -84,14 +80,6 @@ public class WebSocketClientConnection extends AbstractWebSocketConnection ConnectionManager connectionManager = connectPromise.getClient().getConnectionManager(); connectionManager.addSession(session); connectPromise.succeeded(session); - - ByteBuffer extraBuf = connectPromise.getResponse().getRemainingBuffer(); - setInitialBuffer(extraBuf); - if (extraBuf.hasRemaining()) - { - LOG.debug("Parsing extra remaining buffer from UpgradeConnection"); - getParser().parse(extraBuf); - } } super.onOpen(); } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java index 873430a524e..e278a4c82ac 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java @@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.component.ContainerLifeCycle; @@ -58,7 +59,7 @@ import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener; /** * Provides the implementation of {@link LogicalConnection} within the framework of the new {@link org.eclipse.jetty.io.Connection} framework of {@code jetty-io}. */ -public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, ConnectionStateListener, Dumpable +public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, Connection.UpgradeTo, ConnectionStateListener, Dumpable { private class Flusher extends FrameFlusher { @@ -428,18 +429,17 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp { if (LOG.isDebugEnabled()) { - LOG.debug("OPEN: has prefill - onFillable called"); + LOG.debug("Parsing Upgrade prefill buffer ({} remaining)",prefillBuffer.remaining()); } - onFillable(); + parser.parse(prefillBuffer); } - else + if (LOG.isDebugEnabled()) { - if (LOG.isDebugEnabled()) - { - LOG.debug("OPEN: normal fillInterested"); - } - fillInterested(); + LOG.debug("OPEN: normal fillInterested"); } + // TODO: investigate what happens if a failure occurs during prefill, and an attempt to write close fails, + // should a fill interested occur? or just a quick disconnect? + fillInterested(); break; case CLOSED: if (ioState.wasAbnormalClose()) @@ -624,31 +624,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp EndPoint endPoint = getEndPoint(); try { - // Process any prefill first - while (BufferUtil.hasContent(prefillBuffer)) - { - if (BufferUtil.hasContent(prefillBuffer)) - { - int pos = BufferUtil.flipToFill(buffer); - int size = BufferUtil.put(prefillBuffer,buffer); - BufferUtil.flipToFlush(buffer,pos); - if (LOG.isDebugEnabled()) - { - LOG.debug("Filled {} bytes of Upgrade prefill buffer for parse ({} remaining)",size,prefillBuffer.remaining()); - } - - if (!prefillBuffer.hasRemaining()) - { - prefillBuffer = null; - } - } - - if (buffer.hasRemaining()) - { - parser.parse(buffer); - } - } - // Process the content from the Endpoint next while(true) // TODO: should this honor the LogicalConnection.suspend() ? { @@ -762,4 +737,15 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp return String.format("%s{f=%s,g=%s,p=%s}",super.toString(),flusher,generator,parser); } + /** + * Extra bytes from the initial HTTP upgrade that need to + * be processed by the websocket parser before starting + * to read bytes from the connection + */ + @Override + public void onUpgradeTo(ByteBuffer prefilled) + { + setInitialBuffer(prefilled); + } + } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/http/HttpResponseHeaderParser.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/http/HttpResponseHeaderParser.java index d7c78479c2c..2b72738ba47 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/http/HttpResponseHeaderParser.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/http/HttpResponseHeaderParser.java @@ -80,7 +80,9 @@ public class HttpResponseHeaderParser { if (parseHeader(line)) { - // Finished parsing entire header + // Now finished with parsing the entire response header + // Save the remaining bytes for WebSocket to process. + ByteBuffer copy = ByteBuffer.allocate(buf.remaining()); BufferUtil.put(buf,copy); BufferUtil.flipToFlush(copy,0); diff --git a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerConnection.java b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerConnection.java index 577a109a04a..b3ffe52eb78 100644 --- a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerConnection.java +++ b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerConnection.java @@ -19,7 +19,6 @@ package org.eclipse.jetty.websocket.server; import java.net.InetSocketAddress; -import java.nio.ByteBuffer; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; @@ -56,17 +55,6 @@ public class WebSocketServerConnection extends AbstractWebSocketConnection imple return getEndPoint().getRemoteAddress(); } - /** - * Extra bytes from the initial HTTP upgrade that need to - * be processed by the websocket parser before starting - * to read bytes from the connection - */ - @Override - public void onUpgradeTo(ByteBuffer prefilled) - { - setInitialBuffer(prefilled); - } - @Override public void onOpen() { diff --git a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java index 9c90cc81a6a..78ad6571843 100644 --- a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java +++ b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java @@ -40,6 +40,7 @@ import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.MappedByteBufferPool; +import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.HttpConnection; import org.eclipse.jetty.server.handler.ContextHandler; import org.eclipse.jetty.servlet.ServletContextHandler; @@ -65,6 +66,7 @@ import org.eclipse.jetty.websocket.common.events.EventDriver; import org.eclipse.jetty.websocket.common.events.EventDriverFactory; import org.eclipse.jetty.websocket.common.extensions.ExtensionStack; import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory; +import org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection; import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope; import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest; import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; @@ -601,11 +603,12 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc // Get original HTTP connection EndPoint endp = http.getEndPoint(); - Executor executor = http.getConnector().getExecutor(); - ByteBufferPool bufferPool = http.getConnector().getByteBufferPool(); + Connector connector = http.getConnector(); + Executor executor = connector.getExecutor(); + ByteBufferPool bufferPool = connector.getByteBufferPool(); // Setup websocket connection - WebSocketServerConnection wsConnection = new WebSocketServerConnection(endp, executor, scheduler, driver.getPolicy(), bufferPool); + AbstractWebSocketConnection wsConnection = new WebSocketServerConnection(endp, executor, scheduler, driver.getPolicy(), bufferPool); extensionStack.setPolicy(driver.getPolicy()); extensionStack.configure(wsConnection.getParser()); @@ -659,7 +662,7 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc if (LOG.isDebugEnabled()) LOG.debug("Handshake Response: {}", handshaker); - + // Process (version specific) handshake response handshaker.doHandshakeResponse(request, response);