Fixing WebSocketAsyncConnection close/read logic to prevent unstoppable QTP threads

This commit is contained in:
Joakim Erdfelt 2012-07-30 09:54:45 -07:00
parent 7246b279e2
commit 9999db9c94
2 changed files with 40 additions and 22 deletions

View File

@ -50,17 +50,6 @@ import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
public abstract class WebSocketAsyncConnection extends AbstractAsyncConnection implements RawConnection, OutgoingFrames
{
static final Logger LOG = Log.getLogger(WebSocketAsyncConnection.class);
private static final ThreadLocal<WebSocketAsyncConnection> CURRENT_CONNECTION = new ThreadLocal<WebSocketAsyncConnection>();
public static WebSocketAsyncConnection getCurrentConnection()
{
return CURRENT_CONNECTION.get();
}
protected static void setCurrentConnection(WebSocketAsyncConnection connection)
{
CURRENT_CONNECTION.set(connection);
}
private final ByteBufferPool bufferPool;
private final ScheduledExecutorService scheduler;
@ -217,19 +206,21 @@ public abstract class WebSocketAsyncConnection extends AbstractAsyncConnection i
@Override
public void onFillable()
{
setCurrentConnection(this);
ByteBuffer buffer = bufferPool.acquire(policy.getBufferSize(),false);
BufferUtil.clear(buffer);
boolean readMore = false;
try
{
read(buffer);
readMore = read(buffer) == 0;
}
finally
{
fillInterested();
setCurrentConnection(null);
bufferPool.release(buffer);
}
if (readMore)
{
fillInterested();
}
}
/**
@ -256,34 +247,44 @@ public abstract class WebSocketAsyncConnection extends AbstractAsyncConnection i
flush();
}
private void read(ByteBuffer buffer)
private int read(ByteBuffer buffer)
{
AsyncEndPoint endPoint = getEndPoint();
try
{
while (true)
{
int filled = getEndPoint().fill(buffer);
int filled = endPoint.fill(buffer);
if (filled == 0)
{
break;
return 0;
}
if (LOG.isDebugEnabled() && (filled > 0))
else if (filled < 0)
{
LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
disconnect(false);
return -1;
}
else
{
if (LOG.isDebugEnabled())
{
LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
}
parser.parse(buffer);
}
parser.parse(buffer);
}
}
catch (IOException e)
{
LOG.warn(e);
terminateConnection(StatusCode.PROTOCOL,e.getMessage());
return -1;
}
catch (CloseException e)
{
LOG.warn(e);
terminateConnection(e.getStatusCode(),e.getMessage());
return -1;
}
}

View File

@ -19,6 +19,7 @@ import static org.hamcrest.Matchers.*;
import java.util.LinkedList;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WebSocketException;
@ -66,6 +67,17 @@ public class IncomingFramesCapture implements IncomingFrames
Assert.assertThat("Has no errors",errors.size(),is(0));
}
public void dump()
{
System.out.printf("Captured %d incoming frames%n",frames.size());
for (int i = 0; i < frames.size(); i++)
{
WebSocketFrame frame = frames.get(i);
System.out.printf("[%3d] %s%n",i,frame);
System.out.printf(" %s%n",BufferUtil.toDetailString(frame.getPayload()));
}
}
public int getErrorCount(Class<? extends WebSocketException> errorType)
{
int count = 0;
@ -112,4 +124,9 @@ public class IncomingFramesCapture implements IncomingFrames
{
frames.add(frame);
}
public int size()
{
return frames.size();
}
}