Fixing websocket-client issues around fillInterested on physical connection

This commit is contained in:
Joakim Erdfelt 2012-08-20 15:05:47 -07:00
parent 9080882900
commit 4ed72ad585
10 changed files with 508 additions and 45 deletions

View File

@ -19,6 +19,10 @@
package org.eclipse.jetty.websocket.client;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@ -30,6 +34,7 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.websocket.api.Extension;
import org.eclipse.jetty.websocket.api.ExtensionRegistry;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.client.internal.ConnectionManager;
@ -37,6 +42,8 @@ import org.eclipse.jetty.websocket.client.internal.IWebSocketClient;
import org.eclipse.jetty.websocket.driver.EventMethodsCache;
import org.eclipse.jetty.websocket.driver.WebSocketEventDriver;
import org.eclipse.jetty.websocket.extensions.WebSocketExtensionRegistry;
import org.eclipse.jetty.websocket.io.WebSocketSession;
import org.eclipse.jetty.websocket.protocol.ExtensionConfig;
public class WebSocketClientFactory extends AggregateLifeCycle
{
@ -47,9 +54,10 @@ public class WebSocketClientFactory extends AggregateLifeCycle
private final ScheduledExecutorService scheduler;
private final EventMethodsCache methodsCache;
private final WebSocketPolicy policy;
private final ExtensionRegistry extensionRegistry;
private final WebSocketExtensionRegistry extensionRegistry;
private SocketAddress bindAddress;
private final Queue<WebSocketSession> sessions = new ConcurrentLinkedQueue<>();
private ConnectionManager connectionManager;
public WebSocketClientFactory()
@ -69,6 +77,7 @@ public class WebSocketClientFactory extends AggregateLifeCycle
public WebSocketClientFactory(Executor executor, ScheduledExecutorService scheduler, SslContextFactory sslContextFactory)
{
LOG.debug("new WebSocketClientFactory()");
if (executor == null)
{
throw new IllegalArgumentException("Executor is required");
@ -101,6 +110,20 @@ public class WebSocketClientFactory extends AggregateLifeCycle
this(new QueuedThreadPool(),Executors.newSingleThreadScheduledExecutor(),sslContextFactory);
}
@Override
protected void doStart() throws Exception
{
super.doStart();
LOG.debug("doStart()");
}
@Override
protected void doStop() throws Exception
{
super.doStop();
LOG.debug("doStop()");
}
/**
* The address to bind local physical (outgoing) TCP Sockets to.
*
@ -142,6 +165,26 @@ public class WebSocketClientFactory extends AggregateLifeCycle
return scheduler;
}
public List<Extension> initExtensions(List<ExtensionConfig> requested)
{
List<Extension> extensions = new ArrayList<Extension>();
for (ExtensionConfig cfg : requested)
{
Extension extension = extensionRegistry.newInstance(cfg);
if (extension == null)
{
continue;
}
LOG.debug("added {}",extension);
extensions.add(extension);
}
LOG.debug("extensions={}",extensions);
return extensions;
}
public WebSocketClient newWebSocketClient(Object websocketPojo)
{
LOG.debug("Creating new WebSocket for {}",websocketPojo);
@ -149,6 +192,33 @@ public class WebSocketClientFactory extends AggregateLifeCycle
return new IWebSocketClient(this,websocket);
}
public boolean sessionClosed(WebSocketSession session)
{
return isRunning() && sessions.remove(session);
}
public boolean sessionOpened(WebSocketSession session)
{
if (LOG.isDebugEnabled())
{
LOG.debug("Session Opened: {}",session);
}
// FIXME: what is going on?
// if (!isRunning())
// {
// LOG.debug("Factory.isRunning: {}",this.isRunning());
// LOG.debug("Factory.isStarted: {}",this.isStarted());
// LOG.debug("Factory.isStarting: {}",this.isStarting());
// LOG.debug("Factory.isStopped: {}",this.isStopped());
// LOG.debug("Factory.isStopping: {}",this.isStopping());
// LOG.warn("Factory is not running");
// return false;
// }
boolean ret = sessions.offer(session);
session.onConnect();
return ret;
}
/**
* @param bindAddress
* the address to bind the socket channel to

View File

@ -57,6 +57,8 @@ public class IWebSocketClient extends FutureCallback<UpgradeResponse> implements
public IWebSocketClient(WebSocketClientFactory factory, WebSocketEventDriver websocket)
{
this.factory = factory;
LOG.debug("factory.isRunning(): {}",factory.isRunning());
LOG.debug("factory.isStarted(): {}",factory.isStarted());
this.policy = factory.getPolicy();
this.websocket = websocket;
this.upgradeRequest = new ClientUpgradeRequest();
@ -210,9 +212,8 @@ public class IWebSocketClient extends FutureCallback<UpgradeResponse> implements
return websocketUri;
}
public void setUpgradeResponse(UpgradeResponse response)
public void setUpgradeResponse(ClientUpgradeResponse response)
{
// TODO Auto-generated method stub
this.upgradeResponse = response;
}
}

View File

@ -25,7 +25,6 @@ import java.util.regex.Pattern;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.Utf8LineParser;
import org.eclipse.jetty.websocket.api.UpgradeException;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.client.internal.ClientUpgradeResponse;
/**
@ -58,7 +57,7 @@ public class HttpResponseHeaderParser
return (state == State.END);
}
public UpgradeResponse parse(ByteBuffer buf) throws UpgradeException
public ClientUpgradeResponse parse(ByteBuffer buf) throws UpgradeException
{
while (!isDone() && (buf.remaining() > 0))
{

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executor;
@ -36,10 +37,17 @@ import org.eclipse.jetty.util.QuotedStringTokenizer;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.Extension;
import org.eclipse.jetty.websocket.api.UpgradeException;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.client.internal.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.internal.ClientUpgradeResponse;
import org.eclipse.jetty.websocket.client.internal.IWebSocketClient;
import org.eclipse.jetty.websocket.driver.WebSocketEventDriver;
import org.eclipse.jetty.websocket.io.IncomingFrames;
import org.eclipse.jetty.websocket.io.OutgoingFrames;
import org.eclipse.jetty.websocket.io.WebSocketSession;
import org.eclipse.jetty.websocket.protocol.AcceptHash;
import org.eclipse.jetty.websocket.protocol.ExtensionConfig;
@ -176,14 +184,14 @@ public class UpgradeConnection extends AbstractConnection
{
LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
}
UpgradeResponse resp = parser.parse(buffer);
ClientUpgradeResponse resp = parser.parse(buffer);
if (resp != null)
{
// Got a response!
client.setUpgradeResponse(resp);
validateResponse(resp);
notifyConnect();
upgradeConnection();
upgradeConnection(resp);
return false; // do no more reading
}
}
@ -205,12 +213,72 @@ public class UpgradeConnection extends AbstractConnection
}
}
private void upgradeConnection()
private void upgradeConnection(ClientUpgradeResponse response)
{
EndPoint endp = getEndPoint();
Executor executor = getExecutor();
WebSocketClientConnection conn = new WebSocketClientConnection(endp,executor,client);
endp.setConnection(conn);
WebSocketClientConnection connection = new WebSocketClientConnection(endp,executor,client);
// Initialize / Negotiate Extensions
WebSocketEventDriver websocket = client.getWebSocket();
WebSocketPolicy policy = client.getPolicy();
String acceptedSubProtocol = response.getAcceptedSubProtocol();
WebSocketSession session = new WebSocketSession(websocket,connection,policy,acceptedSubProtocol);
connection.setSession(session);
List<Extension> extensions = client.getFactory().initExtensions(response.getExtensions());
// Start with default routing.
IncomingFrames incoming = session;
OutgoingFrames outgoing = connection;
// Connect extensions
if (extensions != null)
{
Iterator<Extension> extIter;
// Connect outgoings
extIter = extensions.iterator();
while (extIter.hasNext())
{
Extension ext = extIter.next();
ext.setNextOutgoingFrames(outgoing);
outgoing = ext;
// Handle RSV reservations
if (ext.useRsv1())
{
connection.getGenerator().setRsv1InUse(true);
connection.getParser().setRsv1InUse(true);
}
if (ext.useRsv2())
{
connection.getGenerator().setRsv2InUse(true);
connection.getParser().setRsv2InUse(true);
}
if (ext.useRsv3())
{
connection.getGenerator().setRsv3InUse(true);
connection.getParser().setRsv3InUse(true);
}
}
// Connect incomings
Collections.reverse(extensions);
extIter = extensions.iterator();
while (extIter.hasNext())
{
Extension ext = extIter.next();
ext.setNextIncomingFrames(incoming);
incoming = ext;
}
}
// configure session for outgoing flows
session.setOutgoing(outgoing);
// configure connection for incoming flows
connection.getParser().setIncomingFramesHandler(incoming);
// Now swap out the connection
endp.setConnection(connection);
}
private void validateResponse(UpgradeResponse response)

View File

@ -21,17 +21,25 @@ package org.eclipse.jetty.websocket.client.internal.io;
import java.util.concurrent.Executor;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.websocket.client.WebSocketClientFactory;
import org.eclipse.jetty.websocket.client.internal.IWebSocketClient;
import org.eclipse.jetty.websocket.io.AbstractWebSocketConnection;
/**
* Client side WebSocket physical connection.
*/
public class WebSocketClientConnection extends AbstractWebSocketConnection
{
private final WebSocketClientFactory factory;
private final IWebSocketClient client;
private boolean connected;
public WebSocketClientConnection(EndPoint endp, Executor executor, IWebSocketClient client)
{
super(endp,executor,client.getFactory().getScheduler(),client.getPolicy(),client.getFactory().getBufferPool());
this.client = client;
this.factory = client.getFactory();
this.connected = false;
}
public IWebSocketClient getClient()
@ -39,9 +47,21 @@ public class WebSocketClientConnection extends AbstractWebSocketConnection
return client;
}
@Override
public void onClose()
{
super.onClose();
factory.sessionClosed(getSession());
}
@Override
public void onOpen()
{
if (!connected)
{
factory.sessionOpened(getSession());
connected = true;
}
super.onOpen();
}
}

View File

@ -22,33 +22,39 @@ import static org.hamcrest.Matchers.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.junit.Assert;
/**
* Testing Socket used on client side WebSocket testing.
*/
public class TrackingSocket extends WebSocketAdapter
{
public AtomicBoolean open = new AtomicBoolean(false);
public AtomicInteger close = new AtomicInteger(-1);
private static final Logger LOG = Log.getLogger(TrackingSocket.class);
public int closeCode = -1;
public StringBuilder closeMessage = new StringBuilder();
public CountDownLatch openLatch = new CountDownLatch(1);
public CountDownLatch closeLatch = new CountDownLatch(1);
public CountDownLatch dataLatch = new CountDownLatch(1);
public BlockingQueue<String> messageQueue = new BlockingArrayQueue<String>();
public void assertClose(int expectedStatusCode, String expectedReason)
public void assertClose(int expectedStatusCode, String expectedReason) throws InterruptedException
{
assertCloseCode(expectedStatusCode);
assertCloseReason(expectedReason);
}
public void assertCloseCode(int expectedCode)
public void assertCloseCode(int expectedCode) throws InterruptedException
{
Assert.assertThat("Close Code",close.get(),is(expectedCode));
Assert.assertThat("Was Closed",closeLatch.await(500,TimeUnit.MILLISECONDS),is(true));
Assert.assertThat("Close Code",closeCode,is(expectedCode));
}
private void assertCloseReason(String expectedReason)
@ -56,36 +62,37 @@ public class TrackingSocket extends WebSocketAdapter
Assert.assertThat("Close Reaosn",closeMessage.toString(),is(expectedReason));
}
public void assertIsOpen()
public void assertIsOpen() throws InterruptedException
{
assertWasOpened();
assertNotClosed();
}
public void assertMessage(String string)
public void assertMessage(String expected)
{
// TODO Auto-generated method stub
String actual = messageQueue.poll();
Assert.assertEquals("Message",expected,actual);
}
public void assertNotClosed()
{
Assert.assertThat("Close Code",close.get(),is(-1));
Assert.assertThat("Closed Latch",closeLatch.getCount(),greaterThanOrEqualTo(1L));
}
public void assertNotOpened()
{
Assert.assertThat("Opened State",open.get(),is(false));
Assert.assertThat("Open Latch",openLatch.getCount(),greaterThanOrEqualTo(1L));
}
public void assertWasOpened()
public void assertWasOpened() throws InterruptedException
{
Assert.assertThat("Opened State",open.get(),is(true));
Assert.assertThat("Was Opened",openLatch.await(500,TimeUnit.MILLISECONDS),is(true));
}
@Override
public void onWebSocketBinary(byte[] payload, int offset, int len)
{
LOG.debug("onWebSocketBinary()");
dataLatch.countDown();
}
@ -93,7 +100,7 @@ public class TrackingSocket extends WebSocketAdapter
public void onWebSocketClose(int statusCode, String reason)
{
super.onWebSocketClose(statusCode,reason);
close.set(statusCode);
closeCode = statusCode;
closeMessage.append(reason);
closeLatch.countDown();
}
@ -102,20 +109,19 @@ public class TrackingSocket extends WebSocketAdapter
public void onWebSocketConnect(WebSocketConnection connection)
{
super.onWebSocketConnect(connection);
open.set(true);
openLatch.countDown();
}
@Override
public void onWebSocketText(String message)
{
dataLatch.countDown();
LOG.debug("onWebSocketText({})",message);
messageQueue.add(message);
dataLatch.countDown();
}
public void waitForResponseMessage()
public void waitForMessage(TimeUnit timeoutUnit, int timeoutDuration) throws InterruptedException
{
// TODO Auto-generated method stub
Assert.assertThat("Message Received",dataLatch.await(timeoutDuration,timeoutUnit),is(true));
}
}

View File

@ -49,7 +49,7 @@ import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
@Ignore("work in progress")
@Ignore("Work in Progress")
public class WebSocketClientTest
{
private BlockheadServer server;
@ -149,9 +149,9 @@ public class WebSocketClientTest
final ServerConnection srvSock = server.accept();
srvSock.upgrade();
UpgradeResponse resp = future.get(250,TimeUnit.MILLISECONDS);
UpgradeResponse resp = future.get(500,TimeUnit.MILLISECONDS);
Assert.assertThat("Response",resp,notNullValue());
Assert.assertEquals("Response.success",resp.isSuccess(),is(true));
Assert.assertThat("Response.success",resp.isSuccess(),is(true));
cliSock.assertWasOpened();
cliSock.assertNotClosed();
@ -159,9 +159,9 @@ public class WebSocketClientTest
Assert.assertThat("Factory.sockets.size",factory.getConnectionManager().getClients().size(),is(1));
cliSock.getConnection().write(null,new FutureCallback<Void>(),"Hello World!");
srvSock.echoMessage();
srvSock.echoMessage(1,TimeUnit.MILLISECONDS,500);
// wait for response from server
cliSock.waitForResponseMessage();
cliSock.waitForMessage(TimeUnit.MILLISECONDS,500);
cliSock.assertMessage("Hello World!");
}
@ -177,6 +177,11 @@ public class WebSocketClientTest
final ServerConnection srvSock = server.accept();
srvSock.upgrade();
// Validate connect
UpgradeResponse resp = future.get(500,TimeUnit.MILLISECONDS);
Assert.assertThat("Response",resp,notNullValue());
Assert.assertThat("Response.success",resp.isSuccess(),is(true));
// Have server send initial message
srvSock.write(WebSocketFrame.text("Hello World"));

View File

@ -31,11 +31,26 @@ import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.io.IncomingFrames;
import org.eclipse.jetty.websocket.io.OutgoingFrames;
import org.eclipse.jetty.websocket.protocol.AcceptHash;
import org.eclipse.jetty.websocket.protocol.Generator;
import org.eclipse.jetty.websocket.protocol.OpCode;
import org.eclipse.jetty.websocket.protocol.Parser;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
import org.junit.Assert;
@ -46,15 +61,33 @@ import org.junit.Assert;
*/
public class BlockheadServer
{
public static class ServerConnection
public static class ServerConnection implements IncomingFrames, OutgoingFrames
{
private final Socket socket;
private final ByteBufferPool bufferPool;
private final WebSocketPolicy policy;
private final IncomingFramesCapture incomingFrames;
private final Parser parser;
private final Generator generator;
private final AtomicInteger parseCount;
/** Set to true to disable timeouts (for debugging reasons) */
private boolean debug = false;
private OutputStream out;
private InputStream in;
private IncomingFrames incoming = this;
private OutgoingFrames outgoing = this;
public ServerConnection(Socket socket)
{
this.socket = socket;
this.incomingFrames = new IncomingFramesCapture();
this.policy = WebSocketPolicy.newServerPolicy();
this.bufferPool = new StandardByteBufferPool(policy.getBufferSize());
this.parser = new Parser(policy);
this.parseCount = new AtomicInteger(0);
this.generator = new Generator(policy,bufferPool);
}
public void close() throws IOException
@ -62,10 +95,33 @@ public class BlockheadServer
this.socket.close();
}
public void echoMessage()
public void disconnect()
{
// TODO Auto-generated method stub
LOG.debug("disconnect");
IO.close(in);
IO.close(out);
if (socket != null)
{
try
{
socket.close();
}
catch (IOException ignore)
{
/* ignore */
}
}
}
public void echoMessage(int expectedFrames, TimeUnit timeoutUnit, int timeoutDuration) throws IOException, TimeoutException
{
LOG.debug("Echo Frames [expecting {}]",expectedFrames);
IncomingFramesCapture cap = readFrames(expectedFrames,timeoutUnit,timeoutDuration);
// now echo them back.
for (WebSocketFrame frame : cap.getFrames())
{
write(frame);
}
}
public void flush() throws IOException
@ -91,6 +147,95 @@ public class BlockheadServer
return out;
}
@Override
public void incoming(WebSocketException e)
{
// TODO Auto-generated method stub
}
@Override
public void incoming(WebSocketFrame frame)
{
// TODO Auto-generated method stub
}
@Override
public <C> void output(C context, Callback<C> callback, WebSocketFrame frame) throws IOException
{
ByteBuffer buf = generator.generate(frame);
if (LOG.isDebugEnabled())
{
LOG.debug("writing out: {}",BufferUtil.toDetailString(buf));
}
BufferUtil.writeTo(buf,out);
out.flush();
if (frame.getOpCode() == OpCode.CLOSE)
{
disconnect();
}
}
public int read(ByteBuffer buf) throws IOException
{
int len = 0;
while ((in.available() > 0) && (buf.remaining() > 0))
{
buf.put((byte)in.read());
len++;
}
return len;
}
public IncomingFramesCapture readFrames(int expectedCount, TimeUnit timeoutUnit, int timeoutDuration) throws IOException, TimeoutException
{
LOG.debug("Read: waiting for {} frame(s) from server",expectedCount);
int startCount = incomingFrames.size();
ByteBuffer buf = bufferPool.acquire(policy.getBufferSize(),false);
BufferUtil.clearToFill(buf);
try
{
long msDur = TimeUnit.MILLISECONDS.convert(timeoutDuration,timeoutUnit);
long now = System.currentTimeMillis();
long expireOn = now + msDur;
LOG.debug("Now: {} - expireOn: {} ({} ms)",now,expireOn,msDur);
int len = 0;
while (incomingFrames.size() < (startCount + expectedCount))
{
BufferUtil.clearToFill(buf);
len = read(buf);
if (len > 0)
{
LOG.debug("Read {} bytes",len);
BufferUtil.flipToFlush(buf,0);
parser.parse(buf);
}
try
{
TimeUnit.MILLISECONDS.sleep(20);
}
catch (InterruptedException gnore)
{
/* ignore */
}
if (!debug && (System.currentTimeMillis() > expireOn))
{
incomingFrames.dump();
throw new TimeoutException(String.format("Timeout reading all %d expected frames. (managed to only read %d frame(s))",expectedCount,
incomingFrames.size()));
}
}
}
finally
{
bufferPool.release(buf);
}
return incomingFrames;
}
public String readRequest() throws IOException
{
LOG.debug("Reading client request");
@ -138,14 +283,21 @@ public class BlockheadServer
}
}
// TODO: parse extensions
// TODO: setup extensions
StringBuilder resp = new StringBuilder();
resp.append("HTTP/1.1 101 Upgrade\r\n");
resp.append("Sec-WebSocket-Accept: ");
resp.append(AcceptHash.hashKey(key));
resp.append("\r\n");
resp.append(AcceptHash.hashKey(key)).append("\r\n");
// TODO: respond to used extensions
resp.append("\r\n");
write(resp.toString().getBytes());
// Configure Parser
parser.setIncomingFramesHandler(incomingFrames);
}
private void write(byte[] bytes) throws IOException
@ -163,10 +315,10 @@ public class BlockheadServer
getOutputStream().write(b);
}
public void write(WebSocketFrame frame)
public void write(WebSocketFrame frame) throws IOException
{
// TODO Auto-generated method stub
LOG.debug("write(Frame->{}) to {}",frame,outgoing);
outgoing.output(null,null,frame);
}
}

View File

@ -0,0 +1,140 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.client.blockhead;
import static org.hamcrest.Matchers.*;
import java.util.LinkedList;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.io.IncomingFrames;
import org.eclipse.jetty.websocket.protocol.OpCode;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
import org.junit.Assert;
public class IncomingFramesCapture implements IncomingFrames
{
private static final Logger LOG = Log.getLogger(IncomingFramesCapture.class);
private LinkedList<WebSocketFrame> frames = new LinkedList<>();
private LinkedList<WebSocketException> errors = new LinkedList<>();
public void assertErrorCount(int expectedCount)
{
Assert.assertThat("Captured error count",errors.size(),is(expectedCount));
}
public void assertFrameCount(int expectedCount)
{
Assert.assertThat("Captured frame count",frames.size(),is(expectedCount));
}
public void assertHasErrors(Class<? extends WebSocketException> errorType, int expectedCount)
{
Assert.assertThat(errorType.getSimpleName(),getErrorCount(errorType),is(expectedCount));
}
public void assertHasFrame(byte op)
{
Assert.assertThat(OpCode.name(op),getFrameCount(op),greaterThanOrEqualTo(1));
}
public void assertHasFrame(byte op, int expectedCount)
{
Assert.assertThat(OpCode.name(op),getFrameCount(op),is(expectedCount));
}
public void assertHasNoFrames()
{
Assert.assertThat("Has no frames",frames.size(),is(0));
}
public void assertNoErrors()
{
Assert.assertThat("Has no errors",errors.size(),is(0));
}
public void dump()
{
System.err.printf("Captured %d incoming frames%n",frames.size());
for (int i = 0; i < frames.size(); i++)
{
WebSocketFrame frame = frames.get(i);
System.err.printf("[%3d] %s%n",i,frame);
System.err.printf(" %s%n",BufferUtil.toDetailString(frame.getPayload()));
}
}
public int getErrorCount(Class<? extends WebSocketException> errorType)
{
int count = 0;
for (WebSocketException error : errors)
{
if (errorType.isInstance(error))
{
count++;
}
}
return count;
}
public LinkedList<WebSocketException> getErrors()
{
return errors;
}
public int getFrameCount(byte op)
{
int count = 0;
for (WebSocketFrame frame : frames)
{
if (frame.getOpCode() == op)
{
count++;
}
}
return count;
}
public LinkedList<WebSocketFrame> getFrames()
{
return frames;
}
@Override
public void incoming(WebSocketException e)
{
LOG.debug(e);
errors.add(e);
}
@Override
public void incoming(WebSocketFrame frame)
{
WebSocketFrame copy = new WebSocketFrame(frame);
frames.add(copy);
}
public int size()
{
return frames.size();
}
}

View File

@ -224,6 +224,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
@Override
public void onFillable()
{
LOG.debug("{} onFillable()",policy.getBehavior());
ByteBuffer buffer = bufferPool.acquire(policy.getBufferSize(),false);
BufferUtil.clear(buffer);
boolean readMore = false;
@ -251,6 +252,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
public void onOpen()
{
super.onOpen();
LOG.debug("fillInterested");
fillInterested();
}