467036 - WebSocketClient fails to process immediate frames from server

+ Using Connection.UpgradeFrom and Connection.UpgradeTo with
  client connections and endpoints too.
This commit is contained in:
Joakim Erdfelt 2015-05-11 13:28:58 -07:00
parent 79a841da4b
commit d0251349c5
7 changed files with 92 additions and 95 deletions

View File

@ -21,11 +21,15 @@ package org.eclipse.jetty.websocket.client;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.UpgradeResponse; import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParseListener; import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParseListener;
public class ClientUpgradeResponse extends UpgradeResponse implements HttpResponseHeaderParseListener public class ClientUpgradeResponse extends UpgradeResponse implements HttpResponseHeaderParseListener
{ {
private static final Logger LOG = Log.getLogger(ClientUpgradeResponse.class);
private ByteBuffer remainingBuffer; private ByteBuffer remainingBuffer;
public ClientUpgradeResponse() public ClientUpgradeResponse()
@ -47,6 +51,10 @@ public class ClientUpgradeResponse extends UpgradeResponse implements HttpRespon
@Override @Override
public void setRemainingBuffer(ByteBuffer remainingBuffer) public void setRemainingBuffer(ByteBuffer remainingBuffer)
{ {
if (LOG.isDebugEnabled())
{
LOG.debug("Saving remaining header: {}",BufferUtil.toDetailString(remainingBuffer));
}
this.remainingBuffer = remainingBuffer; this.remainingBuffer = remainingBuffer;
} }
} }

View File

@ -29,6 +29,7 @@ import java.util.concurrent.Executor;
import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.FutureCallback;
@ -49,11 +50,13 @@ import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParser;
import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParser.ParseException; import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParser.ParseException;
/** /**
* This is the initial connection handling that exists immediately after physical connection is established to destination server. * This is the initial connection handling that exists immediately after physical connection is established to
* destination server.
* <p> * <p>
* Eventually, upon successful Upgrade request/response, this connection swaps itself out for the WebSocektClientConnection handler. * Eventually, upon successful Upgrade request/response, this connection swaps itself out for the
* WebSocektClientConnection handler.
*/ */
public class UpgradeConnection extends AbstractConnection public class UpgradeConnection extends AbstractConnection implements Connection.UpgradeFrom
{ {
public class SendUpgradeRequest extends FutureCallback implements Runnable public class SendUpgradeRequest extends FutureCallback implements Runnable
{ {
@ -71,7 +74,7 @@ public class UpgradeConnection extends AbstractConnection
String rawRequest = request.generate(); String rawRequest = request.generate();
ByteBuffer buf = BufferUtil.toBuffer(rawRequest, StandardCharsets.UTF_8); ByteBuffer buf = BufferUtil.toBuffer(rawRequest,StandardCharsets.UTF_8);
getEndPoint().write(this,buf); getEndPoint().write(this,buf);
} }
@ -81,6 +84,7 @@ public class UpgradeConnection extends AbstractConnection
LOG.debug("Upgrade Request Write Success"); LOG.debug("Upgrade Request Write Success");
// Writing the request header is complete. // Writing the request header is complete.
super.succeeded(); super.succeeded();
state = State.RESPONSE;
// start the interest in fill // start the interest in fill
fillInterested(); fillInterested();
} }
@ -88,8 +92,9 @@ public class UpgradeConnection extends AbstractConnection
@Override @Override
public void failed(Throwable cause) public void failed(Throwable cause)
{ {
LOG.warn("Upgrade Request Write Failure", cause); LOG.warn("Upgrade Request Write Failure",cause);
super.failed(cause); super.failed(cause);
state = State.FAILURE;
// Fail the connect promise when a fundamental exception during connect occurs. // Fail the connect promise when a fundamental exception during connect occurs.
connectPromise.failed(cause); connectPromise.failed(cause);
} }
@ -98,11 +103,21 @@ public class UpgradeConnection extends AbstractConnection
/** HTTP Response Code: 101 Switching Protocols */ /** HTTP Response Code: 101 Switching Protocols */
private static final int SWITCHING_PROTOCOLS = 101; private static final int SWITCHING_PROTOCOLS = 101;
private enum State
{
REQUEST,
RESPONSE,
FAILURE,
UPGRADE
}
private static final Logger LOG = Log.getLogger(UpgradeConnection.class); private static final Logger LOG = Log.getLogger(UpgradeConnection.class);
private final ByteBufferPool bufferPool; private final ByteBufferPool bufferPool;
private final ConnectPromise connectPromise; private final ConnectPromise connectPromise;
private final HttpResponseHeaderParser parser; private final HttpResponseHeaderParser parser;
private State state = State.REQUEST;
private ClientUpgradeRequest request; private ClientUpgradeRequest request;
private ClientUpgradeResponse response;
public UpgradeConnection(EndPoint endp, Executor executor, ConnectPromise connectPromise) public UpgradeConnection(EndPoint endp, Executor executor, ConnectPromise connectPromise)
{ {
@ -147,6 +162,12 @@ public class UpgradeConnection extends AbstractConnection
handshakeListener.onHandshakeResponse(response); handshakeListener.onHandshakeResponse(response);
} }
} }
@Override
public ByteBuffer onUpgradeFrom()
{
return connectPromise.getResponse().getRemainingBuffer();
}
@Override @Override
public void onFillable() public void onFillable()
@ -157,20 +178,25 @@ public class UpgradeConnection extends AbstractConnection
} }
ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),false); ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),false);
BufferUtil.clear(buffer); BufferUtil.clear(buffer);
boolean readMore = false;
try try
{ {
readMore = read(buffer); read(buffer);
} }
finally finally
{ {
bufferPool.release(buffer); bufferPool.release(buffer);
} }
if (readMore) if (state == State.RESPONSE)
{ {
// Continue Reading
fillInterested(); fillInterested();
} }
else if (state == State.UPGRADE)
{
// Stop Reading, upgrade the connection now
upgradeConnection(response);
}
} }
@Override @Override
@ -179,7 +205,7 @@ public class UpgradeConnection extends AbstractConnection
super.onOpen(); super.onOpen();
getExecutor().execute(new SendUpgradeRequest()); getExecutor().execute(new SendUpgradeRequest());
} }
@Override @Override
public void onClose() public void onClose()
{ {
@ -189,7 +215,7 @@ public class UpgradeConnection extends AbstractConnection
} }
super.onClose(); super.onClose();
} }
@Override @Override
protected boolean onReadTimeout() protected boolean onReadTimeout()
{ {
@ -197,9 +223,9 @@ public class UpgradeConnection extends AbstractConnection
{ {
LOG.warn("Timeout on connection {}",this); LOG.warn("Timeout on connection {}",this);
} }
failUpgrade(new IOException("Timeout while performing WebSocket Upgrade")); failUpgrade(new IOException("Timeout while performing WebSocket Upgrade"));
return super.onReadTimeout(); return super.onReadTimeout();
} }
@ -208,9 +234,8 @@ public class UpgradeConnection extends AbstractConnection
* *
* @param buffer * @param buffer
* the buffer to fill into from the endpoint * the buffer to fill into from the endpoint
* @return true if there is more to read, false if reading should stop
*/ */
private boolean read(ByteBuffer buffer) private void read(ByteBuffer buffer)
{ {
EndPoint endPoint = getEndPoint(); EndPoint endPoint = getEndPoint();
try try
@ -220,13 +245,14 @@ public class UpgradeConnection extends AbstractConnection
int filled = endPoint.fill(buffer); int filled = endPoint.fill(buffer);
if (filled == 0) if (filled == 0)
{ {
return true; return;
} }
else if (filled < 0) else if (filled < 0)
{ {
LOG.warn("read - EOF Reached"); LOG.warn("read - EOF Reached");
state = State.FAILURE;
failUpgrade(new EOFException("Reading WebSocket Upgrade response")); failUpgrade(new EOFException("Reading WebSocket Upgrade response"));
return false; return;
} }
else else
{ {
@ -234,34 +260,32 @@ public class UpgradeConnection extends AbstractConnection
{ {
LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer)); LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
} }
ClientUpgradeResponse resp = (ClientUpgradeResponse)parser.parse(buffer); response = (ClientUpgradeResponse)parser.parse(buffer);
if (resp != null) if (response != null)
{ {
// Got a response! // Got a response!
validateResponse(resp); validateResponse(response);
notifyConnect(resp); notifyConnect(response);
upgradeConnection(resp); state = State.UPGRADE;
if (buffer.hasRemaining()) return; // do no more reading
{
LOG.debug("Has remaining client bytebuffer of {}",buffer.remaining());
}
return false; // do no more reading
} }
} }
} }
} }
catch (IOException | ParseException e) catch (IOException | ParseException e)
{ {
LOG.ignore(e);
state = State.FAILURE;
UpgradeException ue = new UpgradeException(request.getRequestURI(),e); UpgradeException ue = new UpgradeException(request.getRequestURI(),e);
connectPromise.failed(ue); connectPromise.failed(ue);
disconnect(false); disconnect(false);
return false;
} }
catch (UpgradeException e) catch (UpgradeException e)
{ {
LOG.ignore(e);
state = State.FAILURE;
connectPromise.failed(e); connectPromise.failed(e);
disconnect(false); disconnect(false);
return false;
} }
} }
@ -269,7 +293,7 @@ public class UpgradeConnection extends AbstractConnection
{ {
EndPoint endp = getEndPoint(); EndPoint endp = getEndPoint();
Executor executor = getExecutor(); Executor executor = getExecutor();
EventDriver websocket = connectPromise.getDriver(); EventDriver websocket = connectPromise.getDriver();
WebSocketPolicy policy = websocket.getPolicy(); WebSocketPolicy policy = websocket.getPolicy();
@ -301,9 +325,7 @@ public class UpgradeConnection extends AbstractConnection
connectPromise.getClient().addManaged(session); connectPromise.getClient().addManaged(session);
// Now swap out the connection // Now swap out the connection
// TODO use endp.upgrade ??? endp.upgrade(connection);
endp.setConnection(connection);
connection.onOpen();
} }
private void validateResponse(ClientUpgradeResponse response) private void validateResponse(ClientUpgradeResponse response)

View File

@ -19,13 +19,10 @@
package org.eclipse.jetty.websocket.client.io; package org.eclipse.jetty.websocket.client.io;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.WriteCallback;
@ -41,7 +38,6 @@ import org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection;
*/ */
public class WebSocketClientConnection extends AbstractWebSocketConnection public class WebSocketClientConnection extends AbstractWebSocketConnection
{ {
private static final Logger LOG = Log.getLogger(WebSocketClientConnection.class);
private final ConnectPromise connectPromise; private final ConnectPromise connectPromise;
private final Masker masker; private final Masker masker;
private final AtomicBoolean opened = new AtomicBoolean(false); private final AtomicBoolean opened = new AtomicBoolean(false);
@ -84,14 +80,6 @@ public class WebSocketClientConnection extends AbstractWebSocketConnection
ConnectionManager connectionManager = connectPromise.getClient().getConnectionManager(); ConnectionManager connectionManager = connectPromise.getClient().getConnectionManager();
connectionManager.addSession(session); connectionManager.addSession(session);
connectPromise.succeeded(session); connectPromise.succeeded(session);
ByteBuffer extraBuf = connectPromise.getResponse().getRemainingBuffer();
setInitialBuffer(extraBuf);
if (extraBuf.hasRemaining())
{
LOG.debug("Parsing extra remaining buffer from UpgradeConnection");
getParser().parse(extraBuf);
}
} }
super.onOpen(); super.onOpen();
} }

View File

@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.component.ContainerLifeCycle;
@ -58,7 +59,7 @@ import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
/** /**
* Provides the implementation of {@link LogicalConnection} within the framework of the new {@link org.eclipse.jetty.io.Connection} framework of {@code jetty-io}. * Provides the implementation of {@link LogicalConnection} within the framework of the new {@link org.eclipse.jetty.io.Connection} framework of {@code jetty-io}.
*/ */
public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, ConnectionStateListener, Dumpable public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, Connection.UpgradeTo, ConnectionStateListener, Dumpable
{ {
private class Flusher extends FrameFlusher private class Flusher extends FrameFlusher
{ {
@ -428,18 +429,17 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
{ {
LOG.debug("OPEN: has prefill - onFillable called"); LOG.debug("Parsing Upgrade prefill buffer ({} remaining)",prefillBuffer.remaining());
} }
onFillable(); parser.parse(prefillBuffer);
} }
else if (LOG.isDebugEnabled())
{ {
if (LOG.isDebugEnabled()) LOG.debug("OPEN: normal fillInterested");
{
LOG.debug("OPEN: normal fillInterested");
}
fillInterested();
} }
// TODO: investigate what happens if a failure occurs during prefill, and an attempt to write close fails,
// should a fill interested occur? or just a quick disconnect?
fillInterested();
break; break;
case CLOSED: case CLOSED:
if (ioState.wasAbnormalClose()) if (ioState.wasAbnormalClose())
@ -624,31 +624,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
EndPoint endPoint = getEndPoint(); EndPoint endPoint = getEndPoint();
try try
{ {
// Process any prefill first
while (BufferUtil.hasContent(prefillBuffer))
{
if (BufferUtil.hasContent(prefillBuffer))
{
int pos = BufferUtil.flipToFill(buffer);
int size = BufferUtil.put(prefillBuffer,buffer);
BufferUtil.flipToFlush(buffer,pos);
if (LOG.isDebugEnabled())
{
LOG.debug("Filled {} bytes of Upgrade prefill buffer for parse ({} remaining)",size,prefillBuffer.remaining());
}
if (!prefillBuffer.hasRemaining())
{
prefillBuffer = null;
}
}
if (buffer.hasRemaining())
{
parser.parse(buffer);
}
}
// Process the content from the Endpoint next // Process the content from the Endpoint next
while(true) // TODO: should this honor the LogicalConnection.suspend() ? while(true) // TODO: should this honor the LogicalConnection.suspend() ?
{ {
@ -762,4 +737,15 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
return String.format("%s{f=%s,g=%s,p=%s}",super.toString(),flusher,generator,parser); return String.format("%s{f=%s,g=%s,p=%s}",super.toString(),flusher,generator,parser);
} }
/**
* Extra bytes from the initial HTTP upgrade that need to
* be processed by the websocket parser before starting
* to read bytes from the connection
*/
@Override
public void onUpgradeTo(ByteBuffer prefilled)
{
setInitialBuffer(prefilled);
}
} }

View File

@ -80,7 +80,9 @@ public class HttpResponseHeaderParser
{ {
if (parseHeader(line)) if (parseHeader(line))
{ {
// Finished parsing entire header // Now finished with parsing the entire response header
// Save the remaining bytes for WebSocket to process.
ByteBuffer copy = ByteBuffer.allocate(buf.remaining()); ByteBuffer copy = ByteBuffer.allocate(buf.remaining());
BufferUtil.put(buf,copy); BufferUtil.put(buf,copy);
BufferUtil.flipToFlush(copy,0); BufferUtil.flipToFlush(copy,0);

View File

@ -19,7 +19,6 @@
package org.eclipse.jetty.websocket.server; package org.eclipse.jetty.websocket.server;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -56,17 +55,6 @@ public class WebSocketServerConnection extends AbstractWebSocketConnection imple
return getEndPoint().getRemoteAddress(); return getEndPoint().getRemoteAddress();
} }
/**
* Extra bytes from the initial HTTP upgrade that need to
* be processed by the websocket parser before starting
* to read bytes from the connection
*/
@Override
public void onUpgradeTo(ByteBuffer prefilled)
{
setInitialBuffer(prefilled);
}
@Override @Override
public void onOpen() public void onOpen()
{ {

View File

@ -40,6 +40,7 @@ import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.MappedByteBufferPool; import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConnection; import org.eclipse.jetty.server.HttpConnection;
import org.eclipse.jetty.server.handler.ContextHandler; import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletContextHandler;
@ -65,6 +66,7 @@ import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.eclipse.jetty.websocket.common.events.EventDriverFactory; import org.eclipse.jetty.websocket.common.events.EventDriverFactory;
import org.eclipse.jetty.websocket.common.extensions.ExtensionStack; import org.eclipse.jetty.websocket.common.extensions.ExtensionStack;
import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory; import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory;
import org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection;
import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope; import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest; import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
@ -601,11 +603,12 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
// Get original HTTP connection // Get original HTTP connection
EndPoint endp = http.getEndPoint(); EndPoint endp = http.getEndPoint();
Executor executor = http.getConnector().getExecutor(); Connector connector = http.getConnector();
ByteBufferPool bufferPool = http.getConnector().getByteBufferPool(); Executor executor = connector.getExecutor();
ByteBufferPool bufferPool = connector.getByteBufferPool();
// Setup websocket connection // Setup websocket connection
WebSocketServerConnection wsConnection = new WebSocketServerConnection(endp, executor, scheduler, driver.getPolicy(), bufferPool); AbstractWebSocketConnection wsConnection = new WebSocketServerConnection(endp, executor, scheduler, driver.getPolicy(), bufferPool);
extensionStack.setPolicy(driver.getPolicy()); extensionStack.setPolicy(driver.getPolicy());
extensionStack.configure(wsConnection.getParser()); extensionStack.configure(wsConnection.getParser());
@ -659,7 +662,7 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Handshake Response: {}", handshaker); LOG.debug("Handshake Response: {}", handshaker);
// Process (version specific) handshake response // Process (version specific) handshake response
handshaker.doHandshakeResponse(request, response); handshaker.doHandshakeResponse(request, response);