diff --git a/VERSION.txt b/VERSION.txt index f213d20ee44..64e30904c23 100644 --- a/VERSION.txt +++ b/VERSION.txt @@ -1,5 +1,6 @@ jetty-7.1-SNAPSHOT + + 294563 Websocket client connection + 306349 ProxyServlet does not work unless deployed at / + 307847 Fixed combining mime type parameters + 307898 Handle large/async websocket messages diff --git a/jetty-client/pom.xml b/jetty-client/pom.xml index d6c9f7c03c2..e3029f1c4f6 100644 --- a/jetty-client/pom.xml +++ b/jetty-client/pom.xml @@ -85,5 +85,11 @@ test --> + + org.eclipse.jetty + jetty-websocket + ${project.version} + test + diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java index 9633c7e9ec8..362fa6156de 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java @@ -333,16 +333,17 @@ public class HttpConnection implements Connection no_progress = 0; if (_exchange != null) { + HttpExchange exchange=_exchange; _exchange.disassociate(); _exchange = null; if (_status==HttpStatus.SWITCHING_PROTOCOLS_101) { - HttpConnection switched=_exchange.onSwitchProtocol(_endp); + Connection switched=exchange.onSwitchProtocol(_endp); if (switched!=null) { // switched protocol! - HttpExchange exchange = _pipeline; + exchange = _pipeline; _pipeline = null; if (exchange!=null) _destination.send(exchange); @@ -351,7 +352,6 @@ public class HttpConnection implements Connection } } - if (_pipeline == null) { if (!isReserved()) @@ -364,13 +364,13 @@ public class HttpConnection implements Connection if (!isReserved()) _destination.returnConnection(this,close); - HttpExchange exchange = _pipeline; + exchange = _pipeline; _pipeline = null; _destination.send(exchange); } else { - HttpExchange exchange = _pipeline; + exchange = _pipeline; _pipeline = null; send(exchange); } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java index eae8542cdcb..20b880fa2de 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java @@ -25,6 +25,7 @@ import org.eclipse.jetty.http.HttpSchemes; import org.eclipse.jetty.http.HttpURI; import org.eclipse.jetty.http.HttpVersions; import org.eclipse.jetty.io.Buffer; +import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.BufferCache.CachedBuffer; import org.eclipse.jetty.io.ByteArrayBuffer; @@ -659,7 +660,7 @@ public class HttpExchange /** */ - protected HttpConnection onSwitchProtocol(EndPoint enpd) throws IOException + protected Connection onSwitchProtocol(EndPoint endp) throws IOException { return null; } diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpExchangeTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpExchangeTest.java index a0dc69d08f1..edefa203fb5 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpExchangeTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpExchangeTest.java @@ -35,6 +35,7 @@ import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.server.nio.SelectChannelConnector; +import org.eclipse.jetty.util.IO; /** * Functional testing for HttpExchange. @@ -377,7 +378,7 @@ public class HttpExchangeTest extends TestCase } - public static void copyStream(InputStream in, OutputStream out) + public static void copyStrxeam(InputStream in, OutputStream out) { try { @@ -443,7 +444,7 @@ public class HttpExchangeTest extends TestCase else { // System.err.println("HANDLING "+request.getMethod()); - copyStream(request.getInputStream(),response.getOutputStream()); + IO.copy(request.getInputStream(),response.getOutputStream()); } } catch(IOException e) diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/WebSocketUpgradeTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/WebSocketUpgradeTest.java new file mode 100644 index 00000000000..026e6a2b625 --- /dev/null +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/WebSocketUpgradeTest.java @@ -0,0 +1,245 @@ +// ======================================================================== +// Copyright (c) 2006-2009 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== + +package org.eclipse.jetty.client; + +import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; + +import javax.servlet.http.HttpServletRequest; + +import junit.framework.TestCase; + +import org.eclipse.jetty.http.HttpMethods; +import org.eclipse.jetty.io.Buffer; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.server.Connector; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.nio.SelectChannelConnector; +import org.eclipse.jetty.websocket.WebSocket; +import org.eclipse.jetty.websocket.WebSocketConnection; +import org.eclipse.jetty.websocket.WebSocketHandler; + +/** + * Functional testing for HttpExchange. + */ +public class WebSocketUpgradeTest extends TestCase +{ + protected Server _server; + protected int _port; + protected HttpClient _httpClient; + protected Connector _connector; + protected ConcurrentLinkedQueue _webSockets= new ConcurrentLinkedQueue(); + protected WebSocketHandler _handler; + protected TestWebSocket _websocket; + final BlockingQueue _results = new ArrayBlockingQueue(100); + + @Override + protected void setUp() throws Exception + { + startServer(); + _httpClient=new HttpClient(); + _httpClient.setIdleTimeout(2000); + _httpClient.setTimeout(2500); + _httpClient.setConnectTimeout(1000); + _httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL); + _httpClient.setMaxConnectionsPerAddress(10); + _httpClient.start(); + } + + @Override + protected void tearDown() throws Exception + { + _httpClient.stop(); + Thread.sleep(500); + stopServer(); + } + + + public void testGetWithContentExchange() throws Exception + { + final WebSocket clientWS = new WebSocket() + { + Outbound _outbound; + + public void onConnect(Outbound outbound) + { + _outbound=outbound; + _results.add("clientWS.onConnect"); + _results.add(_outbound); + } + + public void onDisconnect() + { + } + + public void onMessage(byte frame, String data) + { + _results.add("clientWS.onMessage"); + _results.add(data); + } + + public void onMessage(byte frame, byte[] data, int offset, int length) + { + } + }; + + + HttpExchange httpExchange=new HttpExchange() + { + /* ------------------------------------------------------------ */ + /** + * @see org.eclipse.jetty.client.HttpExchange#onResponseStatus(org.eclipse.jetty.io.Buffer, int, org.eclipse.jetty.io.Buffer) + */ + @Override + protected void onResponseStatus(Buffer version, int status, Buffer reason) throws IOException + { + _results.add(new Integer(status)); + super.onResponseStatus(version,status,reason); + } + + /* ------------------------------------------------------------ */ + /** + * @see org.eclipse.jetty.client.HttpExchange#onSwitchProtocol(org.eclipse.jetty.io.EndPoint) + */ + @Override + protected Connection onSwitchProtocol(EndPoint endp) throws IOException + { + WebSocketConnection connection = new WebSocketConnection(clientWS,endp); + // wait for 101. + try + { + int c=10; + while(_results.size()==0 && c-->0) + Thread.sleep(100); + } + catch(InterruptedException e) + { + e.printStackTrace(); + } + _results.add("onSwitchProtocol"); + _results.add(connection); + clientWS.onConnect(connection); + return connection; + } + }; + + httpExchange.setURL("http://localhost:"+_port+"/"); + httpExchange.setMethod(HttpMethods.GET); + + httpExchange.addRequestHeader("Upgrade","WebSocket"); + httpExchange.addRequestHeader("Connection","Upgrade"); + + _httpClient.send(httpExchange); + int status = httpExchange.waitForDone(); + assertEquals(HttpExchange.STATUS_COMPLETED, status); + + System.err.println("results="+_results); + + assertEquals("serverWS.onConnect", _results.poll(1,TimeUnit.SECONDS)); + TestWebSocket serverWS = (TestWebSocket)_results.poll(1,TimeUnit.SECONDS); + + assertEquals(new Integer(101), _results.poll(1,TimeUnit.SECONDS)); + + assertEquals("onSwitchProtocol", _results.poll(1,TimeUnit.SECONDS)); + WebSocketConnection client_conn=(WebSocketConnection)_results.poll(1,TimeUnit.SECONDS); + + assertEquals("clientWS.onConnect", _results.poll(1,TimeUnit.SECONDS)); + assertEquals(client_conn, _results.poll(1,TimeUnit.SECONDS)); + + client_conn.sendMessage("hello world"); + + assertEquals("serverWS.onMessage", _results.poll(1,TimeUnit.SECONDS)); + assertEquals("hello world", _results.poll(1,TimeUnit.SECONDS)); + + serverWS.sendMessage("buongiorno"); + + assertEquals("clientWS.onMessage", _results.poll(1,TimeUnit.SECONDS)); + assertEquals("buongiorno", _results.poll(1,TimeUnit.SECONDS)); + + } + + protected void newServer() throws Exception + { + _server=new Server(); + _server.setGracefulShutdown(500); + _connector=new SelectChannelConnector(); + + _connector.setPort(0); + _server.setConnectors(new Connector[] { _connector }); + } + + protected void startServer() throws Exception + { + newServer(); + _handler= new WebSocketHandler() + { + @Override + protected WebSocket doWebSocketConnect(HttpServletRequest request, String protocol) + { + _websocket = new TestWebSocket(); + return _websocket; + } + }; + + _server.setHandler(_handler); + _server.start(); + _port=_connector.getLocalPort(); + } + + private void stopServer() throws Exception + { + _server.stop(); + } + + /* ------------------------------------------------------------ */ + /* ------------------------------------------------------------ */ + class TestWebSocket implements WebSocket + { + Outbound _outbound; + + public void onConnect(Outbound outbound) + { + _outbound=outbound; + _webSockets.add(this); + _results.add("serverWS.onConnect"); + _results.add(this); + } + + public void onMessage(byte frame, byte[] data,int offset, int length) + { + } + + public void onMessage(final byte frame, final String data) + { + _results.add("serverWS.onMessage"); + _results.add(data); + } + + public void onDisconnect() + { + _results.add("onDisconnect"); + _webSockets.remove(this); + } + + public void sendMessage(String msg) throws IOException + { + _outbound.sendMessage(msg); + } + } +} diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnection.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnection.java index 5903481976f..e11a6976ab0 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnection.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnection.java @@ -19,6 +19,11 @@ public class WebSocketConnection implements Connection, WebSocket.Outbound final WebSocket _websocket; final int _maxIdleTimeMs=300000; + public WebSocketConnection(WebSocket websocket, EndPoint endpoint) + { + this(websocket,endpoint,new WebSocketBuffers(8192),System.currentTimeMillis(),300000); + } + public WebSocketConnection(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, long maxIdleTime) { _endp = endpoint;