401427 - WebSocket messages sent from onConnect fail to be read by jetty websocket-client

+ Adding carryover of bytes remaining from UpgradeConnection to
   AbstractWebSocketConnection.parser
This commit is contained in:
Joakim Erdfelt 2013-02-21 15:30:56 -07:00
parent 8418acdae4
commit 61470cde2e
7 changed files with 86 additions and 20 deletions

View File

@ -19,19 +19,32 @@
package org.eclipse.jetty.websocket.client;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
public class ClientUpgradeResponse extends UpgradeResponse
{
private ByteBuffer remainingBuffer;
public ClientUpgradeResponse()
{
super();
}
public ByteBuffer getRemainingBuffer()
{
return remainingBuffer;
}
@Override
public void sendForbidden(String message) throws IOException
{
throw new UnsupportedOperationException("Not supported on client implementation");
}
public void setRemainingBuffer(ByteBuffer remainingBuffer)
{
this.remainingBuffer = remainingBuffer;
}
}

View File

@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.Utf8LineParser;
import org.eclipse.jetty.websocket.client.ClientUpgradeResponse;
@ -79,6 +80,11 @@ public class HttpResponseHeaderParser
{
if (parseHeader(line))
{
// Finished parsing entire header
ByteBuffer copy = ByteBuffer.allocate(buf.remaining());
BufferUtil.put(buf,copy);
BufferUtil.flipToFlush(copy,0);
this.response.setRemainingBuffer(copy);
return this.response;
}
}

View File

@ -180,6 +180,10 @@ public class UpgradeConnection extends AbstractConnection
validateResponse(resp);
notifyConnect(resp);
upgradeConnection(resp);
if (buffer.hasRemaining())
{
LOG.debug("Has remaining client bytebuffer of {}",buffer.remaining());
}
return false; // do no more reading
}
}

View File

@ -19,7 +19,9 @@
package org.eclipse.jetty.websocket.client.io;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.log.Log;
@ -41,13 +43,12 @@ public class WebSocketClientConnection extends AbstractWebSocketConnection
private static final Logger LOG = Log.getLogger(WebSocketClientConnection.class);
private final ConnectPromise connectPromise;
private final Masker masker;
private boolean connected;
private final AtomicBoolean opened = new AtomicBoolean(false);
public WebSocketClientConnection(EndPoint endp, Executor executor, ConnectPromise connectPromise)
{
super(endp,executor,connectPromise.getClient().getScheduler(),connectPromise.getClient().getPolicy(),connectPromise.getClient().getBufferPool());
this.connectPromise = connectPromise;
this.connected = false;
this.masker = connectPromise.getMasker();
assert (this.masker != null);
}
@ -75,13 +76,20 @@ public class WebSocketClientConnection extends AbstractWebSocketConnection
@Override
public void onOpen()
{
if (!connected)
boolean beenOpened = opened.getAndSet(true);
if (!beenOpened)
{
WebSocketSession session = getSession();
ConnectionManager connectionManager = connectPromise.getClient().getConnectionManager();
connectionManager.addSession(session);
connectPromise.succeeded(session);
connected = true;
ByteBuffer extraBuf = connectPromise.getResponse().getRemainingBuffer();
if (extraBuf.hasRemaining())
{
LOG.debug("Parsing extra remaining buffer from UpgradeConnection");
getParser().parse(extraBuf);
}
}
super.onOpen();
}

View File

@ -6,15 +6,5 @@ org.eclipse.jetty.LEVEL=WARN
# org.eclipse.jetty.websocket.client.TrackingSocket.LEVEL=DEBUG
# Hide the stacktraces during testing
org.eclipse.jetty.websocket.client.internal.io.UpgradeConnection.STACKS=false
# See the read/write traffic
# org.eclipse.jetty.websocket.io.Frames.LEVEL=DEBUG
# org.eclipse.jetty.websocket.io.LEVEL=DEBUG
# org.eclipse.jetty.websocket.io.WebSocketAsyncConnection.LEVEL=DEBUG
# org.eclipse.jetty.io.SelectorManager.LEVEL=INFO
# org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection$DataFrameBytes.LEVEL=WARN
# org.eclipse.jetty.websocket.io.ControlFrameBytes.LEVEL=DEBUG
# org.eclipse.jetty.websocket.driver.WebSocketEventDriver.LEVEL=DEBUG
# org.eclipse.jetty.websocket.extensions.LEVEL=DEBUG
# org.eclipse.jetty.websocket.protocol.Generator.LEVEL=INFO
# org.eclipse.jetty.websocket.protocol.Parser.LEVEL=DEBUG
# org.eclipse.jetty.websocket.io.payload.LEVEL=DEBUG

View File

@ -26,6 +26,7 @@ import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
@ -121,6 +122,24 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
}
}
public static class Stats {
private AtomicLong countFillInterestedEvents = new AtomicLong(0);
private AtomicLong countOnFillableEvents = new AtomicLong(0);
private AtomicLong countFillableErrors = new AtomicLong(0);
public long getFillableErrorCount() {
return countFillableErrors.get();
}
public long getFillInterestedCount() {
return countFillInterestedEvents.get();
}
public long getOnFillableCount() {
return countOnFillableEvents.get();
}
}
private static final Logger LOG = Log.getLogger(AbstractWebSocketConnection.class);
/**
@ -141,11 +160,11 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
private boolean flushing;
private boolean isFilling;
private IOState ioState;
private Stats stats = new Stats();
public AbstractWebSocketConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool)
{
super(endp,executor,EXECUTE_ONFILLABLE); // TODO review if this is best. Specifically with MUX
endp.setIdleTimeout(policy.getIdleTimeout());
this.policy = policy;
this.bufferPool = bufferPool;
this.generator = new Generator(policy,bufferPool);
@ -246,6 +265,13 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
}
}
@Override
public void fillInterested()
{
stats.countFillInterestedEvents.incrementAndGet();
super.fillInterested();
}
public void flush()
{
ByteBuffer buffer = null;
@ -349,6 +375,11 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
return session;
}
public Stats getStats()
{
return stats;
}
@Override
public boolean isOpen()
{
@ -372,6 +403,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
public void onFillable()
{
LOG.debug("{} onFillable()",policy.getBehavior());
stats.countOnFillableEvents.incrementAndGet();
ByteBuffer buffer = bufferPool.acquire(getInputBufferSize(),false);
BufferUtil.clear(buffer);
boolean readMore = false;
@ -395,6 +427,14 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
}
}
@Override
protected void onFillInterestedFailed(Throwable cause)
{
LOG.ignore(cause);
stats.countFillInterestedEvents.incrementAndGet();
super.onFillInterestedFailed(cause);
}
@Override
public void onOpen()
{
@ -409,7 +449,8 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
{
LOG.warn("Read Timeout");
if ((ioState.getState() == ConnectionState.CLOSING) || (ioState.getState() == ConnectionState.CLOSED))
IOState state = getIOState();
if ((state.getState() == ConnectionState.CLOSING) || (state.getState() == ConnectionState.CLOSED))
{
// close already initiated, extra timeouts not relevant
// allow udnerlying connection and endpoint to disconnect on its own

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.websocket.server;
import java.net.InetSocketAddress;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
@ -31,14 +32,17 @@ import org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection;
public class WebSocketServerConnection extends AbstractWebSocketConnection
{
private final WebSocketServerFactory factory;
private boolean connected;
private final AtomicBoolean opened = new AtomicBoolean(false);
public WebSocketServerConnection(EndPoint endp, Executor executor, Scheduler scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool,
WebSocketServerFactory factory)
{
super(endp,executor,scheduler,policy,bufferPool);
if (policy.getIdleTimeout() > 0)
{
endp.setIdleTimeout(policy.getIdleTimeout());
}
this.factory = factory;
this.connected = false;
}
@Override
@ -63,10 +67,10 @@ public class WebSocketServerConnection extends AbstractWebSocketConnection
@Override
public void onOpen()
{
if (!connected)
boolean beenOpened = opened.getAndSet(true);
if (!beenOpened)
{
factory.sessionOpened(getSession());
connected = true;
}
super.onOpen();
}