Merge from trunk

This commit is contained in:
Joakim Erdfelt 2011-08-26 10:12:45 -07:00
commit 79db394552
31 changed files with 783 additions and 595 deletions

View File

@ -1,4 +1,6 @@
jetty-7.5.0-SNAPSHOT
+ 353623 Added new methods to HttpExchange
+ 353624 HttpURI accepts java.net.URI object in constructor
+ 354080 ServletContextHandler allows to replace any subordinate handler when restarted
jetty-7.5.0.RC1 - 19 August 2011

View File

@ -678,7 +678,7 @@ public class HttpDestination implements Dumpable
setMethod(HttpMethods.CONNECT);
setVersion(exchange.getVersion());
String serverHostAndPort = serverAddress.toString();
setURI(serverHostAndPort);
setRequestURI(serverHostAndPort);
addRequestHeader(HttpHeaders.HOST, serverHostAndPort);
addRequestHeader(HttpHeaders.PROXY_CONNECTION, "keep-alive");
addRequestHeader(HttpHeaders.USER_AGENT, "Jetty-Client");

View File

@ -15,6 +15,7 @@ package org.eclipse.jetty.client;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.client.security.SecurityListener;
@ -38,8 +39,8 @@ import org.eclipse.jetty.util.thread.Timeout;
*
* This object encapsulates:
* <ul>
* <li>The HTTP server address, see {@link #setAddress(Address)} or {@link #setURL(String)})
* <li>The HTTP request method, URI and HTTP version (see {@link #setMethod(String)}, {@link #setURI(String)}, and {@link #setVersion(int)}
* <li>The HTTP server address, see {@link #setAddress(Address)}, or {@link #setURI(URI)}, or {@link #setURL(String)})
* <li>The HTTP request method, URI and HTTP version (see {@link #setMethod(String)}, {@link #setRequestURI(String)}, and {@link #setVersion(int)})
* <li>The request headers (see {@link #addRequestHeader(String, String)} or {@link #setRequestHeader(String, String)})
* <li>The request content (see {@link #setRequestContent(Buffer)} or {@link #setRequestContentSource(InputStream)})
* <li>The status of the exchange (see {@link #getStatus()})
@ -394,33 +395,11 @@ public class HttpExchange
}
/**
* @param url Including protocol, host and port
* @param url an absolute URL (for example 'http://localhost/foo/bar?a=1')
*/
public void setURL(String url)
{
HttpURI uri = new HttpURI(url);
String scheme = uri.getScheme();
if (scheme != null)
{
if (HttpSchemes.HTTP.equalsIgnoreCase(scheme))
setScheme(HttpSchemes.HTTP_BUFFER);
else if (HttpSchemes.HTTPS.equalsIgnoreCase(scheme))
setScheme(HttpSchemes.HTTPS_BUFFER);
else
setScheme(new ByteArrayBuffer(scheme));
}
int port = uri.getPort();
if (port <= 0)
port = "https".equalsIgnoreCase(scheme)?443:80;
setAddress(new Address(uri.getHost(),port));
String completePath = uri.getCompletePath();
if (completePath == null)
completePath = "/";
setURI(completePath);
setURI(URI.create(url));
}
/**
@ -459,6 +438,22 @@ public class HttpExchange
{
_scheme = scheme;
}
/**
* @param scheme the scheme of the URL (for example 'http')
*/
public void setScheme(String scheme)
{
if (scheme != null)
{
if (HttpSchemes.HTTP.equalsIgnoreCase(scheme))
setScheme(HttpSchemes.HTTP_BUFFER);
else if (HttpSchemes.HTTPS.equalsIgnoreCase(scheme))
setScheme(HttpSchemes.HTTPS_BUFFER);
else
setScheme(new ByteArrayBuffer(scheme));
}
}
/**
* @return the scheme of the URL
@ -514,20 +509,81 @@ public class HttpExchange
}
/**
* @return the path of the URL
* @return request URI
* @see #getRequestURI()
* @deprecated
*/
@Deprecated
public String getURI()
{
return getRequestURI();
}
/**
* @return request URI
*/
public String getRequestURI()
{
return _uri;
}
/**
* @param uri the path of the URL (for example '/foo/bar?a=1')
* Set the request URI
*
* @param uri new request URI
* @see #setRequestURI(String)
* @deprecated
*/
@Deprecated
public void setURI(String uri)
{
setRequestURI(uri);
}
/**
* Set the request URI
*
* Per RFC 2616 sec5, Request-URI = "*" | absoluteURI | abs_path | authority<br/>
* where:<br/><br/>
* "*" - request applies to server itself<br/>
* absoluteURI - required for proxy requests, e.g. http://localhost:8080/context<br/>
* (this form is generated automatically by HttpClient)<br/>
* abs_path - used for most methods, e.g. /context<br/>
* authority - used for CONNECT method only, e.g. localhost:8080<br/>
* <br/>
* For complete definition of URI components, see RFC 2396 sec3.<br/>
*
* @param uri new request URI
*/
public void setRequestURI(String uri)
{
_uri = uri;
}
/* ------------------------------------------------------------ */
/**
* @param uri an absolute URI (for example 'http://localhost/foo/bar?a=1')
*/
public void setURI(URI uri)
{
if (!uri.isAbsolute())
throw new IllegalArgumentException("!Absolute URI: "+uri);
if (uri.isOpaque())
throw new IllegalArgumentException("Opaque URI: "+uri);
String scheme = uri.getScheme();
int port = uri.getPort();
if (port <= 0)
port = "https".equalsIgnoreCase(scheme)?443:80;
setScheme(scheme);
setAddress(new Address(uri.getHost(),port));
HttpURI httpUri = new HttpURI(uri);
String completePath = httpUri.getCompletePath();
setRequestURI(completePath==null ? "/" : completePath);
}
/**
* Adds the specified request header

View File

@ -112,7 +112,7 @@ public class RedirectListener extends HttpEventListenerWrapper
if (_location.indexOf("://")>0)
_exchange.setURL(_location);
else
_exchange.setURI(_location);
_exchange.setRequestURI(_location);
// destination may have changed
HttpDestination destination=_destination.getHttpClient().getDestination(_exchange.getAddress(),HttpSchemes.HTTPS.equals(String.valueOf(_exchange.getScheme())));

View File

@ -256,7 +256,7 @@ public class WebdavListener extends HttpEventListenerWrapper
propfindExchange.setScheme( _exchange.getScheme() );
propfindExchange.setEventListener( new SecurityListener( _destination, propfindExchange ) );
propfindExchange.setConfigureListeners( false );
propfindExchange.setURI( uri );
propfindExchange.setRequestURI( uri );
_destination.send( propfindExchange );
@ -281,7 +281,7 @@ public class WebdavListener extends HttpEventListenerWrapper
mkcolExchange.setScheme( _exchange.getScheme() );
mkcolExchange.setEventListener( new SecurityListener( _destination, mkcolExchange ) );
mkcolExchange.setConfigureListeners( false );
mkcolExchange.setURI( uri );
mkcolExchange.setRequestURI( uri );
_destination.send( mkcolExchange );
@ -307,7 +307,7 @@ public class WebdavListener extends HttpEventListenerWrapper
supportedExchange.setScheme( _exchange.getScheme() );
supportedExchange.setEventListener( new SecurityListener( _destination, supportedExchange ) );
supportedExchange.setConfigureListeners( false );
supportedExchange.setURI( _exchange.getURI() );
supportedExchange.setRequestURI( _exchange.getURI() );
_destination.send( supportedExchange );

View File

@ -53,7 +53,7 @@ public abstract class AbstractConnectionTest
CountDownLatch latch = new CountDownLatch(1);
HttpExchange exchange = new ConnectionExchange(latch);
exchange.setAddress(new Address("localhost", port));
exchange.setURI("/");
exchange.setRequestURI("/");
httpClient.send(exchange);
Socket remote = serverSocket.accept();
@ -109,7 +109,7 @@ public abstract class AbstractConnectionTest
CountDownLatch latch = new CountDownLatch(1);
HttpExchange exchange = new ConnectionExchange(latch);
exchange.setAddress(new Address("localhost", port));
exchange.setURI("/");
exchange.setRequestURI("/");
httpClient.send(exchange);
boolean passed = latch.await(4000, TimeUnit.MILLISECONDS);
@ -158,7 +158,7 @@ public abstract class AbstractConnectionTest
}
};
exchange.setAddress(new Address("localhost", port));
exchange.setURI("/");
exchange.setRequestURI("/");
exchanges[i] = exchange;
}
@ -189,7 +189,7 @@ public abstract class AbstractConnectionTest
HttpExchange exchange = new ConnectionExchange(latch);
// Using a IP address has a different behavior than using a host name
exchange.setAddress(new Address("127.0.0.1", 1));
exchange.setURI("/");
exchange.setRequestURI("/");
httpClient.send(exchange);
boolean passed = latch.await(connectTimeout * 2L, TimeUnit.MILLISECONDS);
@ -218,7 +218,7 @@ public abstract class AbstractConnectionTest
{
HttpExchange exchange = new ConnectionExchange();
exchange.setAddress(new Address("localhost", port));
exchange.setURI("/");
exchange.setRequestURI("/");
HttpDestination dest = httpClient.getDestination(new Address("localhost", port),false);
httpClient.send(exchange);
@ -236,7 +236,7 @@ public abstract class AbstractConnectionTest
exchange = new ConnectionExchange();
exchange.setAddress(new Address("localhost", port));
exchange.setURI("/");
exchange.setRequestURI("/");
httpClient.send(exchange);
s.getInputStream().read(buf);

View File

@ -87,7 +87,7 @@ public abstract class AbstractHttpExchangeCancelTest
}
};
exchange.setAddress(newAddress());
exchange.setURI("/");
exchange.setRequestURI("/");
getHttpClient().send(exchange);
// Cancelling here is wrong and makes the test fail spuriously
@ -125,7 +125,7 @@ public abstract class AbstractHttpExchangeCancelTest
}
};
exchange.setAddress(newAddress());
exchange.setURI("/");
exchange.setRequestURI("/");
getHttpClient().send(exchange);
// Cancelling here is wrong and makes the test fail spuriously
@ -155,7 +155,7 @@ public abstract class AbstractHttpExchangeCancelTest
}
};
exchange.setAddress(newAddress());
exchange.setURI("/");
exchange.setRequestURI("/");
getHttpClient().send(exchange);
@ -180,7 +180,7 @@ public abstract class AbstractHttpExchangeCancelTest
}
};
exchange.setAddress(newAddress());
exchange.setURI("/");
exchange.setRequestURI("/");
getHttpClient().send(exchange);
@ -205,7 +205,7 @@ public abstract class AbstractHttpExchangeCancelTest
}
};
exchange.setAddress(newAddress());
exchange.setURI("/");
exchange.setRequestURI("/");
getHttpClient().send(exchange);
@ -230,7 +230,7 @@ public abstract class AbstractHttpExchangeCancelTest
}
};
exchange.setAddress(newAddress());
exchange.setURI("/");
exchange.setRequestURI("/");
getHttpClient().send(exchange);
@ -255,7 +255,7 @@ public abstract class AbstractHttpExchangeCancelTest
}
};
exchange.setAddress(newAddress());
exchange.setURI("/");
exchange.setRequestURI("/");
getHttpClient().send(exchange);
@ -280,7 +280,7 @@ public abstract class AbstractHttpExchangeCancelTest
}
};
exchange.setAddress(newAddress());
exchange.setURI("/?action=body");
exchange.setRequestURI("/?action=body");
getHttpClient().send(exchange);
@ -305,7 +305,7 @@ public abstract class AbstractHttpExchangeCancelTest
}
};
exchange.setAddress(newAddress());
exchange.setURI("/");
exchange.setRequestURI("/");
getHttpClient().send(exchange);
@ -325,7 +325,7 @@ public abstract class AbstractHttpExchangeCancelTest
((StdErrLog)Log.getLog()).setHideStacks(!LOG.isDebugEnabled());
TestHttpExchange exchange = new TestHttpExchange();
exchange.setAddress(newAddress());
exchange.setURI("/?action=throw");
exchange.setRequestURI("/?action=throw");
getHttpClient().send(exchange);
@ -352,7 +352,7 @@ public abstract class AbstractHttpExchangeCancelTest
TestHttpExchange exchange = new TestHttpExchange();
exchange.setAddress(newAddress());
exchange.setURI("/?action=wait5000");
exchange.setRequestURI("/?action=wait5000");
long start = System.currentTimeMillis();
httpClient.send(exchange);

View File

@ -18,7 +18,7 @@ public class AsyncSslHttpExchangeTest extends SslHttpExchangeTest
@Override
public void setUp() throws Exception
{
_scheme="https://";
_scheme="https";
startServer();
_httpClient=new HttpClient();
_httpClient.setIdleTimeout(2000);

View File

@ -23,7 +23,7 @@ public class ExternalKeyStoreAsyncSslHttpExchangeTest extends SslHttpExchangeTes
@Override
public void setUp() throws Exception
{
_scheme = "https://";
_scheme = "https";
startServer();
_httpClient = new HttpClient();
_httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);

View File

@ -16,12 +16,15 @@ package org.eclipse.jetty.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.matchers.JUnitMatchers.containsString;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -32,8 +35,10 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.security.ProxyAuthorization;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeaders;
import org.eclipse.jetty.http.HttpMethods;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.io.Connection;
@ -61,7 +66,7 @@ public class HttpExchangeTest
private static final Logger LOG = Log.getLogger(HttpExchangeTest.class);
protected int _maxConnectionsPerAddress = 2;
protected String _scheme = "http://";
protected String _scheme = "http";
protected Server _server;
protected int _port;
protected HttpClient _httpClient;
@ -236,7 +241,7 @@ public class HttpExchangeTest
}
};
httpExchange[n].setURL(_scheme+"localhost:"+_port+"/"+n);
httpExchange[n].setURL(_scheme+"://localhost:"+_port+"/"+n);
httpExchange[n].addRequestHeader("arbitrary","value");
if (close)
httpExchange[n].setRequestHeader("Connection","close");
@ -264,8 +269,7 @@ public class HttpExchangeTest
for (int i=0;i<20;i++)
{
ContentExchange httpExchange=new ContentExchange();
//httpExchange.setURL(_scheme+"localhost:"+_port+"/");
httpExchange.setURL(_scheme+"localhost:"+_port);
httpExchange.setURI(new URI(_scheme, null, "localhost", _port, null, null, null));
httpExchange.setMethod(HttpMethods.POST);
httpExchange.setRequestContent(new ByteArrayBuffer("<hello />"));
_httpClient.send(httpExchange);
@ -284,7 +288,7 @@ public class HttpExchangeTest
for (int i=0;i<10;i++)
{
ContentExchange httpExchange=new ContentExchange();
httpExchange.setURL(_scheme+"localhost:"+_port+"/?i="+i);
httpExchange.setURI(new URI(_scheme, null, "localhost", _port, "/", "i="+i, null));
httpExchange.setMethod(HttpMethods.GET);
_httpClient.send(httpExchange);
int status = httpExchange.waitForDone();
@ -304,7 +308,7 @@ public class HttpExchangeTest
for (int i=0;i<10;i++)
{
ContentExchange httpExchange=new ContentExchange();
httpExchange.setURL(_scheme+"localhost:"+_port+"/?i="+i);
httpExchange.setURL(_scheme+"://localhost:"+_port+"/?i="+i);
httpExchange.setMethod(HttpMethods.GET);
_httpClient.send(httpExchange);
int status = httpExchange.waitForDone();
@ -351,7 +355,7 @@ public class HttpExchangeTest
throwable.set(x);
}
};
httpExchange.setURL(_scheme+"localhost:"+_port+"/");
httpExchange.setURL(_scheme+"://localhost:"+_port+"/");
httpExchange.setMethod("SLEEP");
_httpClient.send(httpExchange);
new Thread()
@ -390,7 +394,7 @@ public class HttpExchangeTest
niobuf.put(bytes);
}
httpExchange.setURL(_scheme+"localhost:"+_port+"/");
httpExchange.setURL(_scheme+"://localhost:"+_port+"/");
httpExchange.setMethod(HttpMethods.POST);
httpExchange.setRequestContentType("application/data");
httpExchange.setRequestContent(babuf);
@ -403,7 +407,7 @@ public class HttpExchangeTest
assertEquals(babuf.length(),result.length());
httpExchange.reset();
httpExchange.setURL(_scheme+"localhost:"+_port+"/");
httpExchange.setURL(_scheme+"://localhost:"+_port+"/");
httpExchange.setMethod(HttpMethods.POST);
httpExchange.setRequestContentType("application/data");
httpExchange.setRequestContent(niobuf);
@ -422,8 +426,7 @@ public class HttpExchangeTest
{
};
//httpExchange.setURL(_scheme+"localhost:"+_port+"/");
httpExchange.setURL(_scheme+"localhost:"+_port);
httpExchange.setURL(_scheme+"://localhost:"+_port);
httpExchange.setMethod(HttpMethods.POST);
final String data="012345678901234567890123456789012345678901234567890123456789";
@ -481,7 +484,7 @@ public class HttpExchangeTest
@Test
public void testProxy() throws Exception
{
if (_scheme.equals("https://"))
if (_scheme.equals("https"))
return;
try
{
@ -491,7 +494,7 @@ public class HttpExchangeTest
ContentExchange httpExchange=new ContentExchange();
httpExchange.setAddress(new Address("jetty.eclipse.org",8080));
httpExchange.setMethod(HttpMethods.GET);
httpExchange.setURI("/jetty-6");
httpExchange.setRequestURI("/jetty-6");
_httpClient.send(httpExchange);
int status = httpExchange.waitForDone();
//httpExchange.waitForStatus(HttpExchange.STATUS_COMPLETED);
@ -512,14 +515,14 @@ public class HttpExchangeTest
@Test
public void testReserveConnections () throws Exception
{
final HttpDestination destination = _httpClient.getDestination (new Address("localhost", _port), _scheme.equalsIgnoreCase("https://"));
final HttpDestination destination = _httpClient.getDestination (new Address("localhost", _port), _scheme.equalsIgnoreCase("https"));
final org.eclipse.jetty.client.HttpConnection[] connections = new org.eclipse.jetty.client.HttpConnection[_maxConnectionsPerAddress];
for (int i=0; i < _maxConnectionsPerAddress; i++)
{
connections[i] = destination.reserveConnection(200);
assertNotNull(connections[i]);
HttpExchange ex = new ContentExchange();
ex.setURL(_scheme+"localhost:"+_port+"/?i="+i);
ex.setURL(_scheme+"://localhost:"+_port+"/?i="+i);
ex.setMethod(HttpMethods.GET);
connections[i].send(ex);
}
@ -536,6 +539,35 @@ public class HttpExchangeTest
c = destination.reserveConnection(500);
assertNotNull(c);
}
@Test
public void testOptionsWithExchange() throws Exception
{
ContentExchange httpExchange = new ContentExchange(true);
httpExchange.setURL(_scheme+"://localhost:"+_port);
httpExchange.setRequestURI("*");
httpExchange.setMethod(HttpMethods.OPTIONS);
httpExchange.setRequestHeader("Connection","close");
_httpClient.send(httpExchange);
int state = httpExchange.waitForDone();
assertEquals(HttpExchange.STATUS_COMPLETED, state);
assertEquals(HttpStatus.OK_200,httpExchange.getResponseStatus());
HttpFields headers = httpExchange.getResponseFields();
assertTrue("Response contains Allow header", headers.containsKey("Allow"));
String allow = headers.getStringField("Allow");
String expectedMethods[] =
{ "GET", "HEAD", "POST", "PUT", "DELETE", "MOVE", "OPTIONS", "TRACE" };
for (String expectedMethod : expectedMethods)
{
assertThat(allow,containsString(expectedMethod));
}
assertTrue("Response contains Content-Length header", headers.containsKey("Content-Length"));
assertEquals("Content-Length header value", 0, headers.getLongField("Content-Length"));
}
/* ------------------------------------------------------------ */
public static void copyStream(InputStream in, OutputStream out)
@ -603,6 +635,14 @@ public class HttpExchangeTest
}
response.getOutputStream().println("</hello>");
}
else if (request.getMethod().equalsIgnoreCase("OPTIONS"))
{
if ("*".equals(target))
{
response.setContentLength(0);
response.setHeader("Allow","GET,HEAD,POST,PUT,DELETE,MOVE,OPTIONS,TRACE");
}
}
else if (request.getMethod().equalsIgnoreCase("SLEEP"))
{
Thread.sleep(10000);

View File

@ -40,7 +40,7 @@ public class SslHttpExchangeTest extends HttpExchangeTest
@Override
public void setUp() throws Exception
{
_scheme="https://";
_scheme="https";
startServer();
_httpClient=new HttpClient();
_httpClient.setIdleTimeout(2000);

View File

@ -14,6 +14,7 @@
package org.eclipse.jetty.http;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import org.eclipse.jetty.util.MultiMap;
import org.eclipse.jetty.util.StringUtil;
@ -96,6 +97,11 @@ public class HttpURI
{
parse2(raw,offset,length);
}
public HttpURI(URI uri)
{
parse(uri.toASCIIString());
}
public void parse(String raw)
{

View File

@ -0,0 +1,62 @@
// ========================================================================
// Copyright (c) Webtide LLC
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.http;
import static org.junit.Assert.assertEquals;
import java.net.URI;
import org.junit.Test;
/* ------------------------------------------------------------ */
public class HttpURITest
{
public static final String __input = "http://example.com:8080/path/to/context?parameter=%22value%22#fragment";
public static final String __scheme = "http";
public static final String __host = "example.com";
public static final int __port = 8080;
public static final String __path = "/path/to/context";
public static final String __query = "parameter=%22value%22";
public static final String __fragment = "fragment";
/* ------------------------------------------------------------ */
@Test
public void testFromString() throws Exception
{
HttpURI uri = new HttpURI(__input);
assertEquals(__scheme, uri.getScheme());
assertEquals(__host,uri.getHost());
assertEquals(__port,uri.getPort());
assertEquals(__path,uri.getPath());
assertEquals(__query,uri.getQuery());
assertEquals(__fragment,uri.getFragment());
}
/* ------------------------------------------------------------ */
@Test
public void testFromURI() throws Exception
{
HttpURI uri = new HttpURI(new URI(__input));
assertEquals(__scheme, uri.getScheme());
assertEquals(__host,uri.getHost());
assertEquals(__port,uri.getPort());
assertEquals(__path,uri.getPath());
assertEquals(__query,uri.getQuery());
assertEquals(__fragment,uri.getFragment());
}
}

View File

@ -50,12 +50,25 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
private volatile int _capacity;
private Object[] _elements;
private int _head;
private int _tail;
private final ReentrantLock _headLock = new ReentrantLock();
private final Condition _notEmpty = _headLock.newCondition();
private int _head;
// spacers created to prevent false sharing between head and tail http://en.wikipedia.org/wiki/False_sharing
// TODO verify this has benefits
private long _space0;
private long _space1;
private long _space2;
private long _space3;
private long _space4;
private long _space5;
private long _space6;
private long _space7;
private final ReentrantLock _tailLock = new ReentrantLock();
private int _tail;
/* ------------------------------------------------------------ */
/** Create a growing partially blocking Queue
@ -675,4 +688,12 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
_tailLock.unlock();
}
}
/* ------------------------------------------------------------ */
long sumOfSpace()
{
// this method exists to stop clever optimisers removing the spacers
return _space0++ +_space1++ +_space2++ +_space3++ +_space4++ +_space5++ +_space6++ +_space7++;
}
}

View File

@ -79,15 +79,29 @@ public class JettyWebXmlConfiguration extends AbstractConfiguration
try
{
context.setServerClasses(null);
if(LOG.isDebugEnabled())
if(LOG.isDebugEnabled()) {
LOG.debug("Configure: "+jetty);
}
XmlConfiguration jetty_config = (XmlConfiguration)context.getAttribute(XML_CONFIGURATION);
if (jetty_config==null)
{
jetty_config=new XmlConfiguration(jetty.getURL());
}
else
{
context.removeAttribute(XML_CONFIGURATION);
}
setupXmlConfiguration(context,jetty_config, web_inf);
jetty_config.configure(context);
try
{
jetty_config.configure(context);
}
catch (ClassNotFoundException e)
{
LOG.warn("Unable to process jetty-web.xml", e);
}
}
finally
{
@ -118,5 +132,4 @@ public class JettyWebXmlConfiguration extends AbstractConfiguration
Map<String,String> props = jetty_config.getProperties();
props.put(PROPERTY_THIS_WEB_INF_URL, String.valueOf(web_inf.getURL()));
}
}

View File

@ -0,0 +1,24 @@
package org.eclipse.jetty.websocket;
public class FixedMaskGen implements MaskGen
{
final byte[] _mask;
public FixedMaskGen()
{
_mask=new byte[]{(byte)0xff,(byte)0xff,(byte)0xff,(byte)0xff};
}
public FixedMaskGen(byte[] mask)
{
_mask=mask;
}
public void genMask(byte[] mask)
{
mask[0]=_mask[0];
mask[1]=_mask[1];
mask[2]=_mask[2];
mask[3]=_mask[3];
}
}

View File

@ -0,0 +1,6 @@
package org.eclipse.jetty.websocket;
public interface MaskGen
{
void genMask(byte[] mask);
}

View File

@ -0,0 +1,23 @@
package org.eclipse.jetty.websocket;
import java.util.Random;
public class RandomMaskGen implements MaskGen
{
final Random _random;
public RandomMaskGen()
{
_random=new Random();
}
public RandomMaskGen(Random random)
{
_random=random;
}
public void genMask(byte[] mask)
{
_random.nextBytes(mask);
}
}

View File

@ -22,7 +22,7 @@ import org.eclipse.jetty.util.TypeUtil;
*/
public class TestClient implements WebSocket.OnFrame
{
private static WebSocketClient __client = new WebSocketClient();
private static WebSocketClientFactory __clientFactory = new WebSocketClientFactory();
private static boolean _verbose=false;
private static final Random __random = new Random();
@ -119,7 +119,7 @@ public class TestClient implements WebSocket.OnFrame
private void open() throws Exception
{
WebSocketClient client = new WebSocketClient(__client);
WebSocketClient client = new WebSocketClient(__clientFactory);
client.setProtocol(_protocol);
client.setMaxIdleTime(_timeout);
client.open(new URI("ws://"+_host+":"+_port+"/"),this).get(10,TimeUnit.SECONDS);
@ -179,7 +179,7 @@ public class TestClient implements WebSocket.OnFrame
public static void main(String[] args) throws Exception
{
__client.start();
__clientFactory.start();
String host="localhost";
int port=8080;
@ -277,7 +277,7 @@ public class TestClient implements WebSocket.OnFrame
"time "+duration+"ms "+ (1000L*__messagesReceived.get()/duration)+" req/s");
System.out.printf("rtt min/ave/max = %.3f/%.3f/%.3f ms\n",__minDuration.get()/1000000.0,__messagesReceived.get()==0?0.0:(__totalTime.get()/__messagesReceived.get()/1000000.0),__maxDuration.get()/1000000.0);
__client.stop();
__clientFactory.stop();
}
}

View File

@ -1,12 +1,10 @@
package org.eclipse.jetty.websocket;
import java.io.EOFException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ProtocolException;
import java.net.URI;
import java.nio.channels.ByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.List;
import java.util.Map;
@ -19,24 +17,8 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Buffers;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.io.ConnectedEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.SimpleBuffers;
import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
import org.eclipse.jetty.io.nio.SelectorManager;
import org.eclipse.jetty.util.B64Code;
import org.eclipse.jetty.util.QuotedStringTokenizer;
import org.eclipse.jetty.util.component.AggregateLifeCycle;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.eclipse.jetty.websocket.WebSocketGeneratorD12.MaskGen;
/* ------------------------------------------------------------ */
@ -44,8 +26,9 @@ import org.eclipse.jetty.websocket.WebSocketGeneratorD12.MaskGen;
* <p>This WebSocket Client class can create multiple websocket connections to multiple destinations.
* It uses the same {@link WebSocket} endpoint API as the server.
* Simple usage is as follows: <pre>
* WebSocketClient client = new WebSocketClient();
* client.setMaxIdleTime(500);
* WebSocketClientFactory factory = new WebSocketClientFactory();
* factory.start();
* WebSocketClient client = factory.newClient();
* client.start();
*
* WebSocket.Connection connection = client.open(new URI("ws://127.0.0.1:8080/"),new WebSocket.OnTextMessage()
@ -69,84 +52,38 @@ import org.eclipse.jetty.websocket.WebSocketGeneratorD12.MaskGen;
* connection.sendMessage("Hello World");
* </pre>
*/
public class WebSocketClient extends AggregateLifeCycle
public class WebSocketClient
{
private final static Logger __log = org.eclipse.jetty.util.log.Log.getLogger(WebSocketClient.class.getName());
private final static Random __random = new Random();
private final static ByteArrayBuffer __ACCEPT = new ByteArrayBuffer.CaseInsensitive("Sec-WebSocket-Accept");
private final WebSocketClient _root;
private final WebSocketClient _parent;
private final ThreadPool _threadPool;
private final WebSocketClientSelector _selector;
private final WebSocketClientFactory _factory;
private final Map<String,String> _cookies=new ConcurrentHashMap<String, String>();
private final List<String> _extensions=new CopyOnWriteArrayList<String>();
private int _bufferSize=64*1024;
private String _origin;
private String _protocol;
private int _maxIdleTime=-1;
private WebSocketBuffers _buffers;
private boolean _maskingEnabled = true;
private MaskGen _maskGen;
/* ------------------------------------------------------------ */
/** Create a WebSocket Client with default configuration.
/** Create a WebSocket Client with private factory.
* <p>Creates a WebSocketClient from a private WebSocketClientFactory. This can be wasteful of resources if many clients are created.
*/
public WebSocketClient()
public WebSocketClient() throws Exception
{
this(new QueuedThreadPool());
_factory=new WebSocketClientFactory();
_factory.start();
_maskGen=_factory.getMaskGen();
}
/* ------------------------------------------------------------ */
/** Create a WebSocket Client with shared threadpool.
/** Create a WebSocket Client with shared factory.
* @param threadpool
*/
public WebSocketClient(ThreadPool threadpool)
public WebSocketClient(WebSocketClientFactory factory)
{
_root=this;
_parent=null;
_threadPool=threadpool;
_selector=new WebSocketClientSelector();
addBean(_selector);
addBean(_threadPool);
}
/* ------------------------------------------------------------ */
/** Create a WebSocket Client from another.
* <p>If multiple clients are required so that connections created may have different
* configurations, then it is more efficient to create a client based on another, so
* that the thread pool and IO infrastructure may be shared.
*/
public WebSocketClient(WebSocketClient parent)
{
_root=parent._root;
_parent=parent;
_threadPool=parent._threadPool;
_selector=parent._selector;
_parent.addBean(this);
}
/* ------------------------------------------------------------ */
/**
* Get the selectorManager. Used to configure the manager.
* @return The {@link SelectorManager} instance.
*/
public SelectorManager getSelectorManager()
{
return _selector;
}
/* ------------------------------------------------------------ */
/** Get the ThreadPool.
* <p>Used to set/query the thread pool configuration.
* @return The {@link ThreadPool}
*/
public ThreadPool getThreadPool()
{
return _threadPool;
_factory=factory;
_maskGen=_factory.getMaskGen();
}
/* ------------------------------------------------------------ */
@ -167,26 +104,6 @@ public class WebSocketClient extends AggregateLifeCycle
_maxIdleTime=maxIdleTime;
}
/* ------------------------------------------------------------ */
/** Get the WebSocket Buffer size for connections opened by this client.
* @return the buffer size in bytes.
*/
public int getBufferSize()
{
return _bufferSize;
}
/* ------------------------------------------------------------ */
/** Set the WebSocket Buffer size for connections opened by this client.
* @param bufferSize the buffer size in bytes.
*/
public void setBufferSize(int bufferSize)
{
if (isRunning())
throw new IllegalStateException(getState());
_bufferSize = bufferSize;
}
/* ------------------------------------------------------------ */
/** Get the subprotocol string for connections opened by this client.
* @return The subprotocol
@ -235,23 +152,16 @@ public class WebSocketClient extends AggregateLifeCycle
return _extensions;
}
/* ------------------------------------------------------------ */
/**
* @return whether masking is enabled.
*/
public boolean isMaskingEnabled()
public MaskGen getMaskGen()
{
return _maskingEnabled;
return _maskGen;
}
/* ------------------------------------------------------------ */
/**
* @param maskingEnabled whether to enable masking
*/
public void setMaskingEnabled(boolean maskingEnabled)
public void setMaskGen(MaskGen maskGen)
{
_maskingEnabled=maskingEnabled;
_maskGen = maskGen;
}
/* ------------------------------------------------------------ */
@ -297,8 +207,8 @@ public class WebSocketClient extends AggregateLifeCycle
*/
public Future<WebSocket.Connection> open(URI uri, WebSocket websocket) throws IOException
{
if (!isStarted())
throw new IllegalStateException("!started");
if (!_factory.isStarted())
throw new IllegalStateException("Factory !started");
String scheme=uri.getScheme();
if (!("ws".equalsIgnoreCase(scheme) || "wss".equalsIgnoreCase(scheme)))
throw new IllegalArgumentException("Bad WebSocket scheme '"+scheme+"'");
@ -309,293 +219,32 @@ public class WebSocketClient extends AggregateLifeCycle
channel.socket().setTcpNoDelay(true);
int maxIdleTime = getMaxIdleTime();
if (maxIdleTime<0)
maxIdleTime=(int)_selector.getMaxIdleTime();
maxIdleTime=(int)_factory.getSelectorManager().getMaxIdleTime();
if (maxIdleTime>0)
channel.socket().setSoTimeout(maxIdleTime);
InetSocketAddress address=new InetSocketAddress(uri.getHost(),uri.getPort());
final WebSocketFuture holder=new WebSocketFuture(websocket,uri,_protocol,maxIdleTime,_cookies,_extensions,channel);
final WebSocketFuture holder=new WebSocketFuture(websocket,uri,_protocol,_origin,_maskGen,maxIdleTime,_cookies,_extensions,channel);
channel.configureBlocking(false);
channel.connect(address);
_selector.register( channel, holder);
_factory.getSelectorManager().register( channel, holder);
return holder;
}
@Override
protected void doStart() throws Exception
{
if (_parent!=null && !_parent.isRunning())
throw new IllegalStateException("parent:"+getState());
_buffers = new WebSocketBuffers(_bufferSize);
super.doStart();
// Start a selector and timer if this is the root client
if (_parent==null)
{
for (int i=0;i<_selector.getSelectSets();i++)
{
final int id=i;
_threadPool.dispatch(new Runnable()
{
public void run()
{
while(isRunning())
{
try
{
_selector.doSelect(id);
}
catch (IOException e)
{
__log.warn(e);
}
}
}
});
}
}
}
/* ------------------------------------------------------------ */
/** WebSocket Client Selector Manager
*/
class WebSocketClientSelector extends SelectorManager
{
@Override
public boolean dispatch(Runnable task)
{
return _threadPool.dispatch(task);
}
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, final SelectionKey sKey) throws IOException
{
return new SelectChannelEndPoint(channel,selectSet,sKey);
}
@Override
protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
{
WebSocketFuture holder = (WebSocketFuture) endpoint.getSelectionKey().attachment();
return new HandshakeConnection(endpoint,holder);
}
@Override
protected void endPointOpened(SelectChannelEndPoint endpoint)
{
// TODO expose on outer class
}
@Override
protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
{
throw new IllegalStateException();
}
@Override
protected void endPointClosed(SelectChannelEndPoint endpoint)
{
endpoint.getConnection().closed();
}
@Override
protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
{
if (!(attachment instanceof WebSocketFuture))
super.connectionFailed(channel,ex,attachment);
else
{
__log.debug(ex);
WebSocketFuture holder = (WebSocketFuture)attachment;
holder.handshakeFailed(ex);
}
}
}
/* ------------------------------------------------------------ */
/** Handshake Connection.
* Handles the connection until the handshake succeeds or fails.
*/
class HandshakeConnection extends AbstractConnection
{
private final SelectChannelEndPoint _endp;
private final WebSocketFuture _holder;
private final String _key;
private final HttpParser _parser;
private String _accept;
private String _error;
public HandshakeConnection(SelectChannelEndPoint endpoint, WebSocketFuture holder)
{
super(endpoint,System.currentTimeMillis());
_endp=endpoint;
_holder=holder;
byte[] bytes=new byte[16];
__random.nextBytes(bytes);
_key=new String(B64Code.encode(bytes));
Buffers buffers = new SimpleBuffers(_buffers.getBuffer(),null);
_parser=new HttpParser(buffers,_endp,
new HttpParser.EventHandler()
{
@Override
public void startResponse(Buffer version, int status, Buffer reason) throws IOException
{
if (status!=101)
{
_error="Bad response status "+status+" "+reason;
_endp.close();
}
}
@Override
public void parsedHeader(Buffer name, Buffer value) throws IOException
{
if (__ACCEPT.equals(name))
_accept=value.toString();
}
@Override
public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
{
if (_error==null)
_error="Bad response: "+method+" "+url+" "+version;
_endp.close();
}
@Override
public void content(Buffer ref) throws IOException
{
if (_error==null)
_error="Bad response. "+ref.length()+"B of content?";
_endp.close();
}
});
String path=_holder.getURI().getPath();
if (path==null || path.length()==0)
path="/";
String request=
"GET "+path+" HTTP/1.1\r\n"+
"Host: "+holder.getURI().getHost()+":"+_holder.getURI().getPort()+"\r\n"+
"Upgrade: websocket\r\n"+
"Connection: Upgrade\r\n"+
"Sec-WebSocket-Key: "+_key+"\r\n"+
(_origin==null?"":"Origin: "+_origin+"\r\n")+
"Sec-WebSocket-Version: "+WebSocketConnectionD12.VERSION+"\r\n";
if (holder.getProtocol()!=null)
request+="Sec-WebSocket-Protocol: "+holder.getProtocol()+"\r\n";
if (holder.getCookies()!=null && holder.getCookies().size()>0)
{
for (String cookie : holder.getCookies().keySet())
request+="Cookie: "+QuotedStringTokenizer.quoteIfNeeded(cookie,HttpFields.__COOKIE_DELIM)+
"="+
QuotedStringTokenizer.quoteIfNeeded(holder.getCookies().get(cookie),HttpFields.__COOKIE_DELIM)+
"\r\n";
}
request+="\r\n";
// TODO extensions
try
{
Buffer handshake = new ByteArrayBuffer(request,false);
int len=handshake.length();
if (len!=_endp.flush(handshake))
throw new IOException("incomplete");
}
catch(IOException e)
{
holder.handshakeFailed(e);
}
}
public Connection handle() throws IOException
{
while (_endp.isOpen() && !_parser.isComplete())
{
switch (_parser.parseAvailable())
{
case -1:
_holder.handshakeFailed(new IOException("Incomplete handshake response"));
return this;
case 0:
return this;
default:
break;
}
}
if (_error==null)
{
if (_accept==null)
_error="No Sec-WebSocket-Accept";
else if (!WebSocketConnectionD12.hashKey(_key).equals(_accept))
_error="Bad Sec-WebSocket-Accept";
else
{
Buffer header=_parser.getHeaderBuffer();
MaskGen maskGen=_maskingEnabled?new WebSocketGeneratorD12.RandomMaskGen():new WebSocketGeneratorD12.NullMaskGen();
WebSocketConnectionD12 connection = new WebSocketConnectionD12(_holder.getWebSocket(),_endp,_buffers,System.currentTimeMillis(),_holder.getMaxIdleTime(),_holder.getProtocol(),null,10,maskGen);
if (header.hasContent())
connection.fillBuffersFrom(header);
_buffers.returnBuffer(header);
_holder.onConnection(connection);
return connection;
}
}
_endp.close();
return this;
}
public boolean isIdle()
{
return false;
}
public boolean isSuspended()
{
return false;
}
public void closed()
{
if (_error!=null)
_holder.handshakeFailed(new ProtocolException(_error));
else
_holder.handshakeFailed(new EOFException());
}
}
/* ------------------------------------------------------------ */
/** The Future Websocket Connection.
*/
class WebSocketFuture implements Future<WebSocket.Connection>
static class WebSocketFuture implements Future<WebSocket.Connection>
{
final WebSocket _websocket;;
final URI _uri;
final String _protocol;
final String _origin;
final MaskGen _maskGen;
final int _maxIdleTime;
final Map<String,String> _cookies;
final List<String> _extensions;
@ -605,11 +254,13 @@ public class WebSocketClient extends AggregateLifeCycle
WebSocketConnection _connection;
Throwable _exception;
public WebSocketFuture(WebSocket websocket, URI uri, String protocol, int maxIdleTime, Map<String,String> cookies,List<String> extensions, ByteChannel channel)
private WebSocketFuture(WebSocket websocket, URI uri, String protocol, String origin, MaskGen maskGen, int maxIdleTime, Map<String,String> cookies,List<String> extensions, ByteChannel channel)
{
_websocket=websocket;
_uri=uri;
_protocol=protocol;
_origin=origin;
_maskGen=maskGen;
_maxIdleTime=maxIdleTime;
_cookies=cookies;
_extensions=extensions;
@ -695,6 +346,16 @@ public class WebSocketClient extends AggregateLifeCycle
return _maxIdleTime;
}
public String getOrigin()
{
return _origin;
}
public MaskGen getMaskGen()
{
return _maskGen;
}
public String toString()
{
return "[" + _uri + ","+_websocket+"]@"+hashCode();
@ -804,6 +465,6 @@ public class WebSocketClient extends AggregateLifeCycle
__log.debug(e);
}
}
}
}
}

View File

@ -0,0 +1,380 @@
package org.eclipse.jetty.websocket;
import java.io.EOFException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ProtocolException;
import java.net.URI;
import java.nio.channels.ByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Buffers;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.io.ConnectedEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.SimpleBuffers;
import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
import org.eclipse.jetty.io.nio.SelectorManager;
import org.eclipse.jetty.util.B64Code;
import org.eclipse.jetty.util.QuotedStringTokenizer;
import org.eclipse.jetty.util.component.AggregateLifeCycle;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ThreadPool;
/* ------------------------------------------------------------ */
/** WebSocket Client Factory.
* The WebSocketClientFactory contains the common mechanisms for multiple WebSocketClient instances (eg threadpool, NIO selector).
* WebSocketClients with different configurations should share the same factory to avoid wasted resources.
* @see WebSocketClient
*/
public class WebSocketClientFactory extends AggregateLifeCycle
{
private final static Logger __log = org.eclipse.jetty.util.log.Log.getLogger(WebSocketClientFactory.class.getName());
private final static Random __random = new Random();
private final static ByteArrayBuffer __ACCEPT = new ByteArrayBuffer.CaseInsensitive("Sec-WebSocket-Accept");
private final ThreadPool _threadPool;
private final WebSocketClientSelector _selector;
private final WebSocketBuffers _buffers;
private final MaskGen _maskGen;
/* ------------------------------------------------------------ */
/** Create a WebSocket Client with default configuration.
*/
public WebSocketClientFactory()
{
this(new QueuedThreadPool(),new RandomMaskGen(),16*1024);
}
/* ------------------------------------------------------------ */
/** Create a WebSocket Client with ThreadPool .
*/
public WebSocketClientFactory(ThreadPool threadPool)
{
this(threadPool,new RandomMaskGen(),16*1024);
}
/* ------------------------------------------------------------ */
/** Create a WebSocket Client with shared threadpool.
* @param threadpool
*/
public WebSocketClientFactory(ThreadPool threadpool,MaskGen maskGen,int bufferSize)
{
_threadPool=threadpool;
_selector=new WebSocketClientSelector();
_buffers=new WebSocketBuffers(bufferSize);
_maskGen=maskGen;
addBean(_selector);
addBean(_threadPool);
}
/* ------------------------------------------------------------ */
/**
* Get the selectorManager. Used to configure the manager.
* @return The {@link SelectorManager} instance.
*/
public SelectorManager getSelectorManager()
{
return _selector;
}
/* ------------------------------------------------------------ */
/** Get the ThreadPool.
* <p>Used to set/query the thread pool configuration.
* @return The {@link ThreadPool}
*/
public ThreadPool getThreadPool()
{
return _threadPool;
}
/* ------------------------------------------------------------ */
public MaskGen getMaskGen()
{
return _maskGen;
}
/* ------------------------------------------------------------ */
public WebSocketClient newWebSocketClient()
{
return new WebSocketClient(this);
}
/* ------------------------------------------------------------ */
@Override
protected void doStart() throws Exception
{
super.doStart();
// Start a selector threads
for (int i=0;i<_selector.getSelectSets();i++)
{
final int id=i;
_threadPool.dispatch(new Runnable()
{
public void run()
{
while(isRunning())
{
try
{
_selector.doSelect(id);
}
catch (IOException e)
{
__log.warn(e);
}
}
}
});
}
}
/* ------------------------------------------------------------ */
/** WebSocket Client Selector Manager
*/
class WebSocketClientSelector extends SelectorManager
{
@Override
public boolean dispatch(Runnable task)
{
return _threadPool.dispatch(task);
}
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, final SelectionKey sKey) throws IOException
{
return new SelectChannelEndPoint(channel,selectSet,sKey);
}
@Override
protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
{
WebSocketClient.WebSocketFuture holder = (WebSocketClient.WebSocketFuture) endpoint.getSelectionKey().attachment();
return new HandshakeConnection(endpoint,holder);
}
@Override
protected void endPointOpened(SelectChannelEndPoint endpoint)
{
// TODO expose on outer class ??
}
@Override
protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
{
throw new IllegalStateException();
}
@Override
protected void endPointClosed(SelectChannelEndPoint endpoint)
{
endpoint.getConnection().closed();
}
@Override
protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
{
if (!(attachment instanceof WebSocketClient.WebSocketFuture))
super.connectionFailed(channel,ex,attachment);
else
{
__log.debug(ex);
WebSocketClient.WebSocketFuture future = (WebSocketClient.WebSocketFuture)attachment;
future.handshakeFailed(ex);
}
}
}
/* ------------------------------------------------------------ */
/** Handshake Connection.
* Handles the connection until the handshake succeeds or fails.
*/
class HandshakeConnection extends AbstractConnection
{
private final SelectChannelEndPoint _endp;
private final WebSocketClient.WebSocketFuture _holder;
private final String _key;
private final HttpParser _parser;
private String _accept;
private String _error;
public HandshakeConnection(SelectChannelEndPoint endpoint, WebSocketClient.WebSocketFuture future)
{
super(endpoint,System.currentTimeMillis());
_endp=endpoint;
_holder=future;
byte[] bytes=new byte[16];
__random.nextBytes(bytes);
_key=new String(B64Code.encode(bytes));
Buffers buffers = new SimpleBuffers(_buffers.getBuffer(),null);
_parser=new HttpParser(buffers,_endp,
new HttpParser.EventHandler()
{
@Override
public void startResponse(Buffer version, int status, Buffer reason) throws IOException
{
if (status!=101)
{
_error="Bad response status "+status+" "+reason;
_endp.close();
}
}
@Override
public void parsedHeader(Buffer name, Buffer value) throws IOException
{
if (__ACCEPT.equals(name))
_accept=value.toString();
}
@Override
public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
{
if (_error==null)
_error="Bad response: "+method+" "+url+" "+version;
_endp.close();
}
@Override
public void content(Buffer ref) throws IOException
{
if (_error==null)
_error="Bad response. "+ref.length()+"B of content?";
_endp.close();
}
});
String path=_holder.getURI().getPath();
if (path==null || path.length()==0)
path="/";
String origin = future.getOrigin();
String request=
"GET "+path+" HTTP/1.1\r\n"+
"Host: "+future.getURI().getHost()+":"+_holder.getURI().getPort()+"\r\n"+
"Upgrade: websocket\r\n"+
"Connection: Upgrade\r\n"+
"Sec-WebSocket-Key: "+_key+"\r\n"+
(origin==null?"":"Origin: "+origin+"\r\n")+
"Sec-WebSocket-Version: "+WebSocketConnectionD12.VERSION+"\r\n";
if (future.getProtocol()!=null)
request+="Sec-WebSocket-Protocol: "+future.getProtocol()+"\r\n";
if (future.getCookies()!=null && future.getCookies().size()>0)
{
for (String cookie : future.getCookies().keySet())
request+="Cookie: "+QuotedStringTokenizer.quoteIfNeeded(cookie,HttpFields.__COOKIE_DELIM)+
"="+
QuotedStringTokenizer.quoteIfNeeded(future.getCookies().get(cookie),HttpFields.__COOKIE_DELIM)+
"\r\n";
}
request+="\r\n";
// TODO extensions
try
{
Buffer handshake = new ByteArrayBuffer(request,false);
int len=handshake.length();
if (len!=_endp.flush(handshake))
throw new IOException("incomplete");
}
catch(IOException e)
{
future.handshakeFailed(e);
}
}
public Connection handle() throws IOException
{
while (_endp.isOpen() && !_parser.isComplete())
{
switch (_parser.parseAvailable())
{
case -1:
_holder.handshakeFailed(new IOException("Incomplete handshake response"));
return this;
case 0:
return this;
default:
break;
}
}
if (_error==null)
{
if (_accept==null)
_error="No Sec-WebSocket-Accept";
else if (!WebSocketConnectionD12.hashKey(_key).equals(_accept))
_error="Bad Sec-WebSocket-Accept";
else
{
Buffer header=_parser.getHeaderBuffer();
MaskGen maskGen=_holder.getMaskGen();
WebSocketConnectionD12 connection = new WebSocketConnectionD12(_holder.getWebSocket(),_endp,_buffers,System.currentTimeMillis(),_holder.getMaxIdleTime(),_holder.getProtocol(),null,10,maskGen);
if (header.hasContent())
connection.fillBuffersFrom(header);
_buffers.returnBuffer(header);
_holder.onConnection(connection);
return connection;
}
}
_endp.close();
return this;
}
public boolean isIdle()
{
return false;
}
public boolean isSuspended()
{
return false;
}
public void closed()
{
if (_error!=null)
_holder.handshakeFailed(new ProtocolException(_error));
else
_holder.handshakeFailed(new EOFException());
}
}
}

View File

@ -38,7 +38,6 @@ import org.eclipse.jetty.websocket.WebSocket.OnBinaryMessage;
import org.eclipse.jetty.websocket.WebSocket.OnControl;
import org.eclipse.jetty.websocket.WebSocket.OnFrame;
import org.eclipse.jetty.websocket.WebSocket.OnTextMessage;
import org.eclipse.jetty.websocket.WebSocketGeneratorD12.MaskGen;
public class WebSocketConnectionD12 extends AbstractConnection implements WebSocketConnection
{

View File

@ -38,61 +38,6 @@ public class WebSocketGeneratorD06 implements WebSocketGenerator
private int _m;
private boolean _opsent;
private final MaskGen _maskGen;
public interface MaskGen
{
void genMask(byte[] mask);
}
public static class NullMaskGen implements MaskGen
{
public void genMask(byte[] mask)
{
mask[0]=mask[1]=mask[2]=mask[3]=0;
}
}
public static class FixedMaskGen implements MaskGen
{
final byte[] _mask;
public FixedMaskGen()
{
_mask=new byte[]{(byte)0xff,(byte)0xff,(byte)0xff,(byte)0xff};
}
public FixedMaskGen(byte[] mask)
{
_mask=mask;
}
public void genMask(byte[] mask)
{
mask[0]=_mask[0];
mask[1]=_mask[1];
mask[2]=_mask[2];
mask[3]=_mask[3];
}
}
public static class RandomMaskGen implements MaskGen
{
final Random _random;
public RandomMaskGen()
{
_random=new SecureRandom();
}
public RandomMaskGen(Random random)
{
_random=random;
}
public void genMask(byte[] mask)
{
_random.nextBytes(mask);
}
}
public WebSocketGeneratorD06(WebSocketBuffers buffers, EndPoint endp)
{

View File

@ -14,7 +14,6 @@
package org.eclipse.jetty.websocket;
import java.io.IOException;
import java.util.Random;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.EndPoint;
@ -38,61 +37,6 @@ public class WebSocketGeneratorD12 implements WebSocketGenerator
private boolean _opsent;
private final MaskGen _maskGen;
public interface MaskGen
{
void genMask(byte[] mask);
}
public static class NullMaskGen implements MaskGen
{
public void genMask(byte[] mask)
{
mask[0]=mask[1]=mask[2]=mask[3]=0;
}
}
public static class FixedMaskGen implements MaskGen
{
final byte[] _mask;
public FixedMaskGen()
{
_mask=new byte[]{(byte)0xff,(byte)0xff,(byte)0xff,(byte)0xff};
}
public FixedMaskGen(byte[] mask)
{
_mask=mask;
}
public void genMask(byte[] mask)
{
mask[0]=_mask[0];
mask[1]=_mask[1];
mask[2]=_mask[2];
mask[3]=_mask[3];
}
}
public static class RandomMaskGen implements MaskGen
{
final Random _random;
public RandomMaskGen()
{
_random=new Random();
}
public RandomMaskGen(Random random)
{
_random=random;
}
public void genMask(byte[] mask)
{
_random.nextBytes(mask);
}
}
public WebSocketGeneratorD12(WebSocketBuffers buffers, EndPoint endp)
{
_buffers=buffers;

View File

@ -0,0 +1,10 @@
package org.eclipse.jetty.websocket;
public class ZeroMaskGen implements MaskGen
{
public void genMask(byte[] mask)
{
mask[0]=mask[1]=mask[2]=mask[3]=0;
}
}

View File

@ -27,28 +27,32 @@ import org.junit.Test;
public class WebSocketClientTest
{
private WebSocketClientFactory _factory = new WebSocketClientFactory();
private ServerSocket _server;
private int _serverPort;
@Before
public void startServer() throws IOException {
public void startServer() throws Exception
{
_server = new ServerSocket();
_server.bind(null);
_serverPort = _server.getLocalPort();
_factory.start();
}
@After
public void stopServer() throws IOException {
public void stopServer() throws Exception
{
if(_server != null) {
_server.close();
}
_factory.stop();
}
@Test
public void testBadURL() throws Exception
{
WebSocketClient client = new WebSocketClient();
client.start();
WebSocketClient client = new WebSocketClient(_factory);
boolean bad=false;
final AtomicBoolean open = new AtomicBoolean();
@ -79,8 +83,7 @@ public class WebSocketClientTest
@Test
public void testAsyncConnectionRefused() throws Exception
{
WebSocketClient client = new WebSocketClient();
client.start();
WebSocketClient client = new WebSocketClient(_factory);
final AtomicBoolean open = new AtomicBoolean();
final AtomicInteger close = new AtomicInteger();
@ -120,8 +123,7 @@ public class WebSocketClientTest
@Test
public void testConnectionNotAccepted() throws Exception
{
WebSocketClient client = new WebSocketClient();
client.start();
WebSocketClient client = new WebSocketClient(_factory);
final AtomicBoolean open = new AtomicBoolean();
final AtomicInteger close = new AtomicInteger();
@ -159,8 +161,7 @@ public class WebSocketClientTest
@Test
public void testConnectionTimeout() throws Exception
{
WebSocketClient client = new WebSocketClient();
client.start();
WebSocketClient client = new WebSocketClient(_factory);
final AtomicBoolean open = new AtomicBoolean();
final AtomicInteger close = new AtomicInteger();
@ -200,8 +201,7 @@ public class WebSocketClientTest
@Test
public void testBadHandshake() throws Exception
{
WebSocketClient client = new WebSocketClient();
client.start();
WebSocketClient client = new WebSocketClient(_factory);
final AtomicBoolean open = new AtomicBoolean();
final AtomicInteger close = new AtomicInteger();
@ -242,8 +242,7 @@ public class WebSocketClientTest
@Test
public void testBadUpgrade() throws Exception
{
WebSocketClient client = new WebSocketClient();
client.start();
WebSocketClient client = new WebSocketClient(_factory);
final AtomicBoolean open = new AtomicBoolean();
final AtomicInteger close = new AtomicInteger();
@ -285,8 +284,7 @@ public class WebSocketClientTest
@Test
public void testUpgradeThenTCPClose() throws Exception
{
WebSocketClient client = new WebSocketClient();
client.start();
WebSocketClient client = new WebSocketClient(_factory);
final AtomicBoolean open = new AtomicBoolean();
final AtomicInteger close = new AtomicInteger();
@ -323,9 +321,8 @@ public class WebSocketClientTest
@Test
public void testIdle() throws Exception
{
WebSocketClient client = new WebSocketClient();
WebSocketClient client = new WebSocketClient(_factory);
client.setMaxIdleTime(500);
client.start();
final AtomicBoolean open = new AtomicBoolean();
final AtomicInteger close = new AtomicInteger();
@ -362,9 +359,8 @@ public class WebSocketClientTest
@Test
public void testNotIdle() throws Exception
{
WebSocketClient client = new WebSocketClient();
WebSocketClient client = new WebSocketClient(_factory);
client.setMaxIdleTime(500);
client.start();
final AtomicBoolean open = new AtomicBoolean();
final AtomicInteger close = new AtomicInteger();

View File

@ -20,7 +20,7 @@ public class WebSocketGeneratorD06Test
byte[] _mask = new byte[4];
int _m;
public WebSocketGeneratorD06.MaskGen _maskGen = new WebSocketGeneratorD06.FixedMaskGen(
public MaskGen _maskGen = new FixedMaskGen(
new byte[]{(byte)0x00,(byte)0x00,(byte)0x0f,(byte)0xff});
@Before

View File

@ -20,7 +20,7 @@ public class WebSocketGeneratorD12Test
byte[] _mask = new byte[4];
int _m;
public WebSocketGeneratorD12.MaskGen _maskGen = new WebSocketGeneratorD12.FixedMaskGen(
public MaskGen _maskGen = new FixedMaskGen(
new byte[]{(byte)0x00,(byte)0x00,(byte)0x0f,(byte)0xff});
@Before

View File

@ -167,7 +167,7 @@ public class WebSocketLoadD12Test
this.iterations = iterations;
_endp=new SocketEndPoint(socket);
_generator = new WebSocketGeneratorD12(new WebSocketBuffers(32*1024),_endp,new WebSocketGeneratorD12.FixedMaskGen());
_generator = new WebSocketGeneratorD12(new WebSocketBuffers(32*1024),_endp,new FixedMaskGen());
_parser = new WebSocketParserD12(new WebSocketBuffers(32*1024),_endp,_handler,false);
}

View File

@ -662,7 +662,7 @@ public class WebSocketMessageD06Test
final AtomicReference<String> received = new AtomicReference<String>();
ByteArrayEndPoint endp = new ByteArrayEndPoint(new byte[0],4096);
WebSocketGeneratorD06.MaskGen maskGen = new WebSocketGeneratorD06.RandomMaskGen();
MaskGen maskGen = new RandomMaskGen();
WebSocketGeneratorD06 gen = new WebSocketGeneratorD06(new WebSocketBuffers(8096),endp,maskGen);
byte[] data = message.getBytes(StringUtil.__UTF8);

View File

@ -861,7 +861,7 @@ public class WebSocketMessageD12Test
final AtomicReference<String> received = new AtomicReference<String>();
ByteArrayEndPoint endp = new ByteArrayEndPoint(new byte[0],4096);
WebSocketGeneratorD12.MaskGen maskGen = new WebSocketGeneratorD12.RandomMaskGen();
MaskGen maskGen = new RandomMaskGen();
WebSocketGeneratorD12 gen = new WebSocketGeneratorD12(new WebSocketBuffers(8096),endp,maskGen);
byte[] data = message.getBytes(StringUtil.__UTF8);