294563 Websocket client connection

git-svn-id: svn+ssh://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/trunk@1476 7e9141cc-0065-0410-87d8-b60c137991c4
This commit is contained in:
Greg Wilkins 2010-04-07 11:09:17 +00:00
parent 7c4cc68017
commit ac50ff46f3
7 changed files with 267 additions and 8 deletions

View File

@ -1,5 +1,6 @@
jetty-7.1-SNAPSHOT
+ 294563 Websocket client connection
+ 306349 ProxyServlet does not work unless deployed at /
+ 307847 Fixed combining mime type parameters
+ 307898 Handle large/async websocket messages

View File

@ -85,5 +85,11 @@
<scope>test</scope>
</dependency>
-->
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-websocket</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -333,16 +333,17 @@ public class HttpConnection implements Connection
no_progress = 0;
if (_exchange != null)
{
HttpExchange exchange=_exchange;
_exchange.disassociate();
_exchange = null;
if (_status==HttpStatus.SWITCHING_PROTOCOLS_101)
{
HttpConnection switched=_exchange.onSwitchProtocol(_endp);
Connection switched=exchange.onSwitchProtocol(_endp);
if (switched!=null)
{
// switched protocol!
HttpExchange exchange = _pipeline;
exchange = _pipeline;
_pipeline = null;
if (exchange!=null)
_destination.send(exchange);
@ -351,7 +352,6 @@ public class HttpConnection implements Connection
}
}
if (_pipeline == null)
{
if (!isReserved())
@ -364,13 +364,13 @@ public class HttpConnection implements Connection
if (!isReserved())
_destination.returnConnection(this,close);
HttpExchange exchange = _pipeline;
exchange = _pipeline;
_pipeline = null;
_destination.send(exchange);
}
else
{
HttpExchange exchange = _pipeline;
exchange = _pipeline;
_pipeline = null;
send(exchange);
}

View File

@ -25,6 +25,7 @@ import org.eclipse.jetty.http.HttpSchemes;
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersions;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.BufferCache.CachedBuffer;
import org.eclipse.jetty.io.ByteArrayBuffer;
@ -659,7 +660,7 @@ public class HttpExchange
/**
*/
protected HttpConnection onSwitchProtocol(EndPoint enpd) throws IOException
protected Connection onSwitchProtocol(EndPoint endp) throws IOException
{
return null;
}

View File

@ -35,6 +35,7 @@ import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.util.IO;
/**
* Functional testing for HttpExchange.
@ -377,7 +378,7 @@ public class HttpExchangeTest extends TestCase
}
public static void copyStream(InputStream in, OutputStream out)
public static void copyStrxeam(InputStream in, OutputStream out)
{
try
{
@ -443,7 +444,7 @@ public class HttpExchangeTest extends TestCase
else
{
// System.err.println("HANDLING "+request.getMethod());
copyStream(request.getInputStream(),response.getOutputStream());
IO.copy(request.getInputStream(),response.getOutputStream());
}
}
catch(IOException e)

View File

@ -0,0 +1,245 @@
// ========================================================================
// Copyright (c) 2006-2009 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.client;
import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import javax.servlet.http.HttpServletRequest;
import junit.framework.TestCase;
import org.eclipse.jetty.http.HttpMethods;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.websocket.WebSocket;
import org.eclipse.jetty.websocket.WebSocketConnection;
import org.eclipse.jetty.websocket.WebSocketHandler;
/**
* Functional testing for HttpExchange.
*/
public class WebSocketUpgradeTest extends TestCase
{
protected Server _server;
protected int _port;
protected HttpClient _httpClient;
protected Connector _connector;
protected ConcurrentLinkedQueue<TestWebSocket> _webSockets= new ConcurrentLinkedQueue<TestWebSocket>();
protected WebSocketHandler _handler;
protected TestWebSocket _websocket;
final BlockingQueue<Object> _results = new ArrayBlockingQueue<Object>(100);
@Override
protected void setUp() throws Exception
{
startServer();
_httpClient=new HttpClient();
_httpClient.setIdleTimeout(2000);
_httpClient.setTimeout(2500);
_httpClient.setConnectTimeout(1000);
_httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
_httpClient.setMaxConnectionsPerAddress(10);
_httpClient.start();
}
@Override
protected void tearDown() throws Exception
{
_httpClient.stop();
Thread.sleep(500);
stopServer();
}
public void testGetWithContentExchange() throws Exception
{
final WebSocket clientWS = new WebSocket()
{
Outbound _outbound;
public void onConnect(Outbound outbound)
{
_outbound=outbound;
_results.add("clientWS.onConnect");
_results.add(_outbound);
}
public void onDisconnect()
{
}
public void onMessage(byte frame, String data)
{
_results.add("clientWS.onMessage");
_results.add(data);
}
public void onMessage(byte frame, byte[] data, int offset, int length)
{
}
};
HttpExchange httpExchange=new HttpExchange()
{
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.client.HttpExchange#onResponseStatus(org.eclipse.jetty.io.Buffer, int, org.eclipse.jetty.io.Buffer)
*/
@Override
protected void onResponseStatus(Buffer version, int status, Buffer reason) throws IOException
{
_results.add(new Integer(status));
super.onResponseStatus(version,status,reason);
}
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.client.HttpExchange#onSwitchProtocol(org.eclipse.jetty.io.EndPoint)
*/
@Override
protected Connection onSwitchProtocol(EndPoint endp) throws IOException
{
WebSocketConnection connection = new WebSocketConnection(clientWS,endp);
// wait for 101.
try
{
int c=10;
while(_results.size()==0 && c-->0)
Thread.sleep(100);
}
catch(InterruptedException e)
{
e.printStackTrace();
}
_results.add("onSwitchProtocol");
_results.add(connection);
clientWS.onConnect(connection);
return connection;
}
};
httpExchange.setURL("http://localhost:"+_port+"/");
httpExchange.setMethod(HttpMethods.GET);
httpExchange.addRequestHeader("Upgrade","WebSocket");
httpExchange.addRequestHeader("Connection","Upgrade");
_httpClient.send(httpExchange);
int status = httpExchange.waitForDone();
assertEquals(HttpExchange.STATUS_COMPLETED, status);
System.err.println("results="+_results);
assertEquals("serverWS.onConnect", _results.poll(1,TimeUnit.SECONDS));
TestWebSocket serverWS = (TestWebSocket)_results.poll(1,TimeUnit.SECONDS);
assertEquals(new Integer(101), _results.poll(1,TimeUnit.SECONDS));
assertEquals("onSwitchProtocol", _results.poll(1,TimeUnit.SECONDS));
WebSocketConnection client_conn=(WebSocketConnection)_results.poll(1,TimeUnit.SECONDS);
assertEquals("clientWS.onConnect", _results.poll(1,TimeUnit.SECONDS));
assertEquals(client_conn, _results.poll(1,TimeUnit.SECONDS));
client_conn.sendMessage("hello world");
assertEquals("serverWS.onMessage", _results.poll(1,TimeUnit.SECONDS));
assertEquals("hello world", _results.poll(1,TimeUnit.SECONDS));
serverWS.sendMessage("buongiorno");
assertEquals("clientWS.onMessage", _results.poll(1,TimeUnit.SECONDS));
assertEquals("buongiorno", _results.poll(1,TimeUnit.SECONDS));
}
protected void newServer() throws Exception
{
_server=new Server();
_server.setGracefulShutdown(500);
_connector=new SelectChannelConnector();
_connector.setPort(0);
_server.setConnectors(new Connector[] { _connector });
}
protected void startServer() throws Exception
{
newServer();
_handler= new WebSocketHandler()
{
@Override
protected WebSocket doWebSocketConnect(HttpServletRequest request, String protocol)
{
_websocket = new TestWebSocket();
return _websocket;
}
};
_server.setHandler(_handler);
_server.start();
_port=_connector.getLocalPort();
}
private void stopServer() throws Exception
{
_server.stop();
}
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
class TestWebSocket implements WebSocket
{
Outbound _outbound;
public void onConnect(Outbound outbound)
{
_outbound=outbound;
_webSockets.add(this);
_results.add("serverWS.onConnect");
_results.add(this);
}
public void onMessage(byte frame, byte[] data,int offset, int length)
{
}
public void onMessage(final byte frame, final String data)
{
_results.add("serverWS.onMessage");
_results.add(data);
}
public void onDisconnect()
{
_results.add("onDisconnect");
_webSockets.remove(this);
}
public void sendMessage(String msg) throws IOException
{
_outbound.sendMessage(msg);
}
}
}

View File

@ -19,6 +19,11 @@ public class WebSocketConnection implements Connection, WebSocket.Outbound
final WebSocket _websocket;
final int _maxIdleTimeMs=300000;
public WebSocketConnection(WebSocket websocket, EndPoint endpoint)
{
this(websocket,endpoint,new WebSocketBuffers(8192),System.currentTimeMillis(),300000);
}
public WebSocketConnection(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, long maxIdleTime)
{
_endp = endpoint;