test websocket client

git-svn-id: svn+ssh://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/trunk@1114 7e9141cc-0065-0410-87d8-b60c137991c4
This commit is contained in:
Greg Wilkins 2009-12-02 00:02:03 +00:00
parent 5416ec325c
commit d7f7382bf5
6 changed files with 355 additions and 13 deletions

View File

@ -56,16 +56,25 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
_manager = selectSet.getManager();
_selectSet = selectSet;
_connection = _manager.newConnection(channel,this);
_dispatched = false;
_redispatched = false;
_open=true;
_manager.endPointOpened(this);
_open=true;
_key = key;
_connection = _manager.newConnection(channel,this);
_manager.endPointOpened(this);
scheduleIdle();
}
/* ------------------------------------------------------------ */
public SelectionKey getSelectionKey()
{
synchronized (this)
{
return _key;
}
}
/* ------------------------------------------------------------ */
public SelectorManager getSelectManager()

View File

@ -5,12 +5,13 @@ import java.io.IOException;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.util.log.Log;
public class WebSocketConnection implements Connection, WebSocket.Outbound
{
final Connector _connector;
final IdleCheck _idle;
final EndPoint _endp;
final WebSocketParser _parser;
final WebSocketGenerator _generator;
@ -18,9 +19,8 @@ public class WebSocketConnection implements Connection, WebSocket.Outbound
final WebSocket _websocket;
final int _maxIdleTimeMs=30000;
public WebSocketConnection(Connector connector, WebSocketBuffers buffers, EndPoint endpoint, long timestamp, WebSocket websocket)
public WebSocketConnection(WebSocketBuffers buffers, EndPoint endpoint, long timestamp, WebSocket websocket)
{
_connector=connector;
_endp = endpoint;
_timestamp = timestamp;
_websocket = websocket;
@ -61,6 +61,23 @@ public class WebSocketConnection implements Connection, WebSocket.Outbound
}
}
});
_idle = (_endp instanceof SelectChannelEndPoint) ?
new IdleCheck()
{
public void access(EndPoint endp)
{
((SelectChannelEndPoint)_endp).scheduleIdle();
}
}
:new IdleCheck()
{
public void access(EndPoint endp)
{
}
};
}
public void handle() throws IOException
@ -94,7 +111,7 @@ public class WebSocketConnection implements Connection, WebSocket.Outbound
finally
{
if (_endp.isOpen())
_connector.persist(_endp);
_idle.access(_endp);
else
// TODO - not really the best way
_websocket.onDisconnect();
@ -125,21 +142,21 @@ public class WebSocketConnection implements Connection, WebSocket.Outbound
{
_generator.addFrame(frame,content,_maxIdleTimeMs);
_generator.flush();
_connector.persist(_endp);
_idle.access(_endp);
}
public void sendMessage(byte frame, byte[] content) throws IOException
{
_generator.addFrame(frame,content,_maxIdleTimeMs);
_generator.flush();
_connector.persist(_endp);
_idle.access(_endp);
}
public void sendMessage(byte frame, byte[] content, int offset, int length) throws IOException
{
_generator.addFrame(frame,content,offset,length,_maxIdleTimeMs);
_generator.flush();
_connector.persist(_endp);
_idle.access(_endp);
}
public void disconnect()
@ -159,4 +176,9 @@ public class WebSocketConnection implements Connection, WebSocket.Outbound
{
_parser.fill(buffer);
}
private interface IdleCheck
{
void access(EndPoint endp);
}
}

View File

@ -73,7 +73,7 @@ public abstract class WebSocketHandler extends HandlerWrapper
{
HttpConnection http = HttpConnection.getCurrentConnection();
ConnectedEndPoint endp = (ConnectedEndPoint)http.getEndPoint();
WebSocketConnection connection = new WebSocketConnection(http.getConnector(),_buffers,endp,http.getTimeStamp(),websocket);
WebSocketConnection connection = new WebSocketConnection(_buffers,endp,http.getTimeStamp(),websocket);
String uri=request.getRequestURI();
String host=request.getHeader("Host");

View File

@ -56,7 +56,7 @@ public abstract class WebSocketServlet extends HttpServlet
{
HttpConnection http = HttpConnection.getCurrentConnection();
ConnectedEndPoint endp = (ConnectedEndPoint)http.getEndPoint();
WebSocketConnection connection = new WebSocketConnection(http.getConnector(),_buffers,endp,http.getTimeStamp(),websocket);
WebSocketConnection connection = new WebSocketConnection(_buffers,endp,http.getTimeStamp(),websocket);
String uri=request.getRequestURI();
String host=request.getHeader("Host");

View File

@ -0,0 +1,206 @@
package org.eclipse.jetty.websocket;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.io.ConnectedEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.nio.IndirectNIOBuffer;
import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
import org.eclipse.jetty.io.nio.SelectorManager;
import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ThreadPool;
public class WebSocketTestClient extends AbstractLifeCycle
{
final ThreadPool _threadpool = new QueuedThreadPool();
final WebSocketBuffers _buffers = new WebSocketBuffers(4*4096);
SelectorManager _manager = new SelectorManager()
{
@Override
protected SocketChannel acceptChannel(SelectionKey key) throws IOException
{
throw new IllegalStateException();
}
@Override
public boolean dispatch(Runnable task)
{
return _threadpool.dispatch(task);
}
@Override
protected void endPointClosed(SelectChannelEndPoint endpoint)
{
}
@Override
protected void endPointOpened(SelectChannelEndPoint endpoint)
{
}
@Override
protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
{
}
@Override
protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
{
WebSocket ws=(WebSocket)endpoint.getSelectionKey().attachment();
WebSocketConnection connection = new WebSocketConnection(_buffers,endpoint,System.currentTimeMillis(),ws);
// TODO Blocking upgrade code. Should be async
ByteArrayBuffer upgrade=new ByteArrayBuffer(
"GET / HTTP/1.1\r\n"+
"Host: localhost:8080\r\n" +
"Upgrade: WebSocket\r\n" +
"Connection: Upgrade\r\n" +
"\r\n");
try
{
while (upgrade.length()>0 && endpoint.isOpen())
{
int l = endpoint.flush(upgrade);
if (l>0)
upgrade.skip(l);
Thread.sleep(10);
}
IndirectNIOBuffer upgraded = new IndirectNIOBuffer(2048);
String up;
do
{
endpoint.fill(upgraded);
up=upgraded.toString();
}
while(endpoint.isOpen() && !up.contains("\r\n\r\n"));
}
catch(Exception e)
{
Log.warn(e);
}
ws.onConnect(connection);
return connection;
}
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
{
SelectChannelEndPoint ep=new SelectChannelEndPoint(channel,selectSet,key);
return ep;
}
};
protected void doStart()
throws Exception
{
((LifeCycle)_threadpool).start();
_manager.start();
_threadpool.dispatch(new Runnable()
{
public void run()
{
while (isRunning())
{
try
{
_manager.doSelect(0);
}
catch (Exception e)
{
Log.warn(e.toString());
Log.debug(e);
Thread.yield();
}
}
}
});
}
public void open(SocketAddress addr, WebSocket websocket)
throws IOException
{
SocketChannel channel = SocketChannel.open();
channel.configureBlocking( false );
channel.connect(addr);
_manager.register( channel, websocket);
}
public static void main(String[] args)
throws Exception
{
WebSocketTestClient client = new WebSocketTestClient();
client.start();
TestWebSocket[] ws=new TestWebSocket[100];
for (int i=0;i<ws.length;i++)
ws[i]=new TestWebSocket(i);
for (TestWebSocket w : ws)
client.open(new InetSocketAddress("localhost",8080),w);
Thread.sleep(5000);
for (TestWebSocket w : ws)
if (w._out!=null)
w._out.sendMessage(WebSocket.SENTINEL_FRAME,"hello world from "+w._id);
client._threadpool.join();
}
static class TestWebSocket implements WebSocket
{
int _id;
Outbound _out;
TestWebSocket(int i)
{
_id=i;
}
public void onConnect(Outbound outbound)
{
_out=outbound;
System.err.println("onConnect");
}
public void onDisconnect()
{
System.err.println("onDisconnect");
}
public void onMessage(byte frame, String data)
{
System.err.println("onMessage "+data);
}
public void onMessage(byte frame, byte[] data, int offset, int length)
{
}
}
}

View File

@ -0,0 +1,105 @@
package org.eclipse.jetty.websocket;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.servlet.http.HttpServletRequest;
import org.eclipse.jetty.server.LocalConnector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.ResourceHandler;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.websocket.WebSocketTest.TestWebSocket;
public class WebSocketTestServer extends Server
{
TestWebSocket _websocket;
SelectChannelConnector _connector;
WebSocketHandler _handler;
ConcurrentLinkedQueue<TestWebSocket> _webSockets= new ConcurrentLinkedQueue<TestWebSocket>();
public WebSocketTestServer()
{
_connector = new SelectChannelConnector();
_connector.setPort(8080);
addConnector(_connector);
_handler= new WebSocketHandler()
{
@Override
protected WebSocket doWebSocketConnect(HttpServletRequest request, String protocol)
{
_websocket = new TestWebSocket();
return _websocket;
}
};
setHandler(_handler);
}
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
class TestWebSocket implements WebSocket
{
Outbound _outbound;
public void onConnect(Outbound outbound)
{
System.err.println("onConnect");
_outbound=outbound;
_webSockets.add(this);
}
public void onMessage(byte frame, byte[] data,int offset, int length)
{
}
public void onMessage(final byte frame, final String data)
{
System.err.println("onMessage "+data);
for (TestWebSocket ws : _webSockets)
{
if (ws!=this)
{
try
{
if (ws._outbound.isOpen())
ws._outbound.sendMessage(frame,data);
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
}
public void onDisconnect()
{
_webSockets.remove(this);
}
}
public static void main(String[] args)
{
try
{
WebSocketTestServer server = new WebSocketTestServer();
server.start();
server.join();
}
catch(Exception e)
{
Log.warn(e);
}
}
}