Merge branch 'master' into release-9

This commit is contained in:
Jesse McConnell 2013-03-06 09:55:41 -06:00
commit 6d83da729b
65 changed files with 927 additions and 397 deletions

View File

@ -29,7 +29,6 @@ import java.net.URI;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
@ -55,6 +54,7 @@ import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.MappedByteBufferPool;
@ -476,7 +476,7 @@ public class HttpClient extends ContainerLifeCycle
protected void send(final Request request, List<Response.ResponseListener> listeners)
{
String scheme = request.getScheme().toLowerCase(Locale.ENGLISH);
if (!Arrays.asList("http", "https").contains(scheme))
if (!HttpScheme.HTTP.is(scheme) && !HttpScheme.HTTPS.is(scheme))
throw new IllegalArgumentException("Invalid protocol " + scheme);
HttpDestination destination = destinationFor(scheme, request.getHost(), request.getPort());
@ -903,8 +903,12 @@ public class HttpClient extends ContainerLifeCycle
protected int normalizePort(String scheme, int port)
{
return port > 0 ? port :
"https".equalsIgnoreCase(scheme) ? 443 : 80;
return port > 0 ? port : HttpScheme.HTTPS.is(scheme) ? 443 : 80;
}
protected boolean isDefaultPort(String scheme, int port)
{
return HttpScheme.HTTPS.is(scheme) ? port == 443 : port == 80;
}
protected HttpConnection newHttpConnection(HttpClient httpClient, EndPoint endPoint, HttpDestination destination)
@ -949,7 +953,7 @@ public class HttpClient extends ContainerLifeCycle
HttpDestination destination = callback.destination;
SslContextFactory sslContextFactory = getSslContextFactory();
if ("https".equals(destination.getScheme()))
if (HttpScheme.HTTPS.is(destination.getScheme()))
{
if (sslContextFactory == null)
{

View File

@ -205,7 +205,7 @@ public class HttpConnection extends AbstractConnection implements Connection
}
case POST:
{
request.header(HttpHeader.CONTENT_TYPE.asString(), MimeTypes.Type.FORM_ENCODED.asString());
request.header(HttpHeader.CONTENT_TYPE, MimeTypes.Type.FORM_ENCODED.asString());
request.content(new StringContentProvider(params.toString()));
break;
}

View File

@ -85,7 +85,9 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
proxyAddress = proxyConfig != null && proxyConfig.matches(host, port) ?
new Address(proxyConfig.getHost(), proxyConfig.getPort()) : null;
hostField = new HttpField(HttpHeader.HOST, host + ":" + port);
if (!client.isDefaultPort(scheme, port))
host += ":" + port;
hostField = new HttpField(HttpHeader.HOST, host);
}
protected BlockingQueue<Connection> getIdleConnections()
@ -462,7 +464,7 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
public void succeeded(Connection connection)
{
boolean tunnel = isProxied() &&
"https".equalsIgnoreCase(getScheme()) &&
HttpScheme.HTTPS.is(getScheme()) &&
client.getSslContextFactory() != null;
if (tunnel)
tunnel(connection);
@ -483,7 +485,7 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
.scheme(HttpScheme.HTTP.asString())
.method(HttpMethod.CONNECT)
.path(target)
.header(HttpHeader.HOST.asString(), target)
.header(HttpHeader.HOST, target)
.timeout(client.getConnectTimeout(), TimeUnit.MILLISECONDS);
connection.send(connect, new Response.CompleteListener()
{

View File

@ -127,6 +127,14 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
}
}
@Override
public int getHeaderCacheSize()
{
// TODO get from configuration
return 256;
}
@Override
public boolean startResponse(HttpVersion version, int status, String reason)
{
@ -203,11 +211,15 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
private void storeCookie(URI uri, HttpField field)
{
try
{
String value = field.getValue();
if (value != null)
{
Map<String, List<String>> header = new HashMap<>(1);
header.put(field.getHeader().asString(), Collections.singletonList(field.getValue()));
header.put(field.getHeader().asString(), Collections.singletonList(value));
connection.getHttpClient().getCookieManager().put(uri, header);
}
}
catch (IOException x)
{
LOG.debug(x);

View File

@ -223,6 +223,16 @@ public class HttpRequest implements Request
return this;
}
@Override
public Request header(HttpHeader header, String value)
{
if (value == null)
headers.remove(header);
else
headers.add(header, value);
return this;
}
@Override
public Request attribute(String name, Object value)
{
@ -371,7 +381,7 @@ public class HttpRequest implements Request
public Request content(ContentProvider content, String contentType)
{
if (contentType != null)
header(HttpHeader.CONTENT_TYPE.asString(), contentType);
header(HttpHeader.CONTENT_TYPE, contentType);
this.content = content;
return this;
}
@ -386,7 +396,7 @@ public class HttpRequest implements Request
public Request file(Path file, String contentType) throws IOException
{
if (contentType != null)
header(HttpHeader.CONTENT_TYPE.asString(), contentType);
header(HttpHeader.CONTENT_TYPE, contentType);
return content(new PathContentProvider(file));
}

View File

@ -28,17 +28,20 @@ import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class RedirectProtocolHandler extends Response.Listener.Empty implements ProtocolHandler
{
private static String SCHEME_REGEXP = "(^https?)";
private static String AUTHORITY_REGEXP = "([^/\\?#]+)";
private static final Logger LOG = Log.getLogger(RedirectProtocolHandler.class);
private static final String SCHEME_REGEXP = "(^https?)";
private static final String AUTHORITY_REGEXP = "([^/\\?#]+)";
// The location may be relative so the scheme://authority part may be missing
private static String DESTINATION_REGEXP = "(" + SCHEME_REGEXP + "://" + AUTHORITY_REGEXP + ")?";
private static String PATH_REGEXP = "([^\\?#]*)";
private static String QUERY_REGEXP = "([^#]*)";
private static String FRAGMENT_REGEXP = "(.*)";
private static Pattern URI_PATTERN = Pattern.compile(DESTINATION_REGEXP + PATH_REGEXP + QUERY_REGEXP + FRAGMENT_REGEXP);
private static final String DESTINATION_REGEXP = "(" + SCHEME_REGEXP + "://" + AUTHORITY_REGEXP + ")?";
private static final String PATH_REGEXP = "([^\\?#]*)";
private static final String QUERY_REGEXP = "([^#]*)";
private static final String FRAGMENT_REGEXP = "(.*)";
private static final Pattern URI_PATTERN = Pattern.compile(DESTINATION_REGEXP + PATH_REGEXP + QUERY_REGEXP + FRAGMENT_REGEXP);
private static final String ATTRIBUTE = RedirectProtocolHandler.class.getName() + ".redirects";
private final HttpClient client;
@ -81,6 +84,7 @@ public class RedirectProtocolHandler extends Response.Listener.Empty implements
if (location != null)
{
URI newURI = sanitize(location);
LOG.debug("Redirecting to {} (Location: {})", newURI, location);
if (newURI != null)
{
if (!newURI.isAbsolute())

View File

@ -32,6 +32,7 @@ import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.util.InputStreamResponseListener;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.util.Fields;
@ -135,6 +136,13 @@ public interface Request
*/
Request header(String name, String value);
/**
* @param header the header name
* @param value the value of the header
* @return this request object
*/
Request header(HttpHeader header, String value);
/**
* @param name the name of the attribute
* @param value the value of the attribute

View File

@ -99,7 +99,7 @@ public class BasicAuthentication implements Authentication
public void apply(Request request)
{
if (request.getURI().toString().startsWith(uri.toString()))
request.header(HttpHeader.AUTHORIZATION.asString(), value);
request.header(HttpHeader.AUTHORIZATION, value);
}
@Override

View File

@ -262,7 +262,7 @@ public class DigestAuthentication implements Authentication
}
value.append(", response=\"").append(hashA3).append("\"");
request.header(HttpHeader.AUTHORIZATION.asString(), value.toString());
request.header(HttpHeader.AUTHORIZATION, value.toString());
}
private String nextNonceCount()

View File

@ -24,8 +24,10 @@ import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.After;
@ -45,7 +47,7 @@ public class ExternalSiteTest
@Before
public void prepare() throws Exception
{
client = new HttpClient();
client = new HttpClient(new SslContextFactory());
client.start();
}
@ -64,7 +66,7 @@ public class ExternalSiteTest
// Verify that we have connectivity
try
{
new Socket(host, port);
new Socket(host, port).close();
}
catch (IOException x)
{
@ -113,7 +115,7 @@ public class ExternalSiteTest
// Verify that we have connectivity
try
{
new Socket(host, port);
new Socket(host, port).close();
}
catch (IOException x)
{
@ -142,7 +144,7 @@ public class ExternalSiteTest
// Verify that we have connectivity
try
{
new Socket(host, port);
new Socket(host, port).close();
}
catch (IOException x)
{
@ -179,4 +181,27 @@ public class ExternalSiteTest
Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
}
}
@Test
public void testExternalSiteRedirect() throws Exception
{
String host = "twitter.com";
int port = 443;
// Verify that we have connectivity
try
{
new Socket(host, port).close();
}
catch (IOException x)
{
Assume.assumeNoException(x);
}
ContentResponse response = client.newRequest(host, port)
.scheme(HttpScheme.HTTPS.asString())
.path("/twitter")
.send();
Assert.assertEquals(200, response.getStatus());
}
}

View File

@ -84,7 +84,7 @@ public class HttpClientContinueTest extends AbstractHttpClientServerTest
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.header(HttpHeader.EXPECT.asString(), HttpHeaderValue.CONTINUE.asString())
.header(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString())
.content(new BytesContentProvider(contents))
.timeout(5, TimeUnit.SECONDS)
.send();
@ -124,7 +124,7 @@ public class HttpClientContinueTest extends AbstractHttpClientServerTest
byte[] content2 = new byte[16384];
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.header(HttpHeader.EXPECT.asString(), HttpHeaderValue.CONTINUE.asString())
.header(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString())
.content(new BytesContentProvider(content1, content2)
{
@Override
@ -176,7 +176,7 @@ public class HttpClientContinueTest extends AbstractHttpClientServerTest
final CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.header(HttpHeader.EXPECT.asString(), HttpHeaderValue.CONTINUE.asString())
.header(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString())
.content(new BytesContentProvider(content1, content2))
.send(new BufferingResponseListener()
{
@ -227,7 +227,7 @@ public class HttpClientContinueTest extends AbstractHttpClientServerTest
.scheme(scheme)
.method(HttpMethod.POST)
.path("/continue")
.header(HttpHeader.EXPECT.asString(), HttpHeaderValue.CONTINUE.asString())
.header(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString())
.content(new BytesContentProvider(content))
.send(new BufferingResponseListener()
{
@ -276,7 +276,7 @@ public class HttpClientContinueTest extends AbstractHttpClientServerTest
.scheme(scheme)
.method(HttpMethod.POST)
.path("/redirect")
.header(HttpHeader.EXPECT.asString(), HttpHeaderValue.CONTINUE.asString())
.header(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString())
.content(new BytesContentProvider(content))
.send(new BufferingResponseListener()
{
@ -322,7 +322,7 @@ public class HttpClientContinueTest extends AbstractHttpClientServerTest
final CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.header(HttpHeader.EXPECT.asString(), HttpHeaderValue.CONTINUE.asString())
.header(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString())
.content(new BytesContentProvider(content))
.send(new BufferingResponseListener()
{
@ -369,7 +369,7 @@ public class HttpClientContinueTest extends AbstractHttpClientServerTest
final CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.header(HttpHeader.EXPECT.asString(), HttpHeaderValue.CONTINUE.asString())
.header(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString())
.content(new BytesContentProvider(content))
.send(new BufferingResponseListener()
{
@ -433,7 +433,7 @@ public class HttpClientContinueTest extends AbstractHttpClientServerTest
final CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.header(HttpHeader.EXPECT.asString(), HttpHeaderValue.CONTINUE.asString())
.header(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString())
.content(new BytesContentProvider(content))
.send(new BufferingResponseListener()
{
@ -480,7 +480,7 @@ public class HttpClientContinueTest extends AbstractHttpClientServerTest
DeferredContentProvider content = new DeferredContentProvider();
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.header(HttpHeader.EXPECT.asString(), HttpHeaderValue.CONTINUE.asString())
.header(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString())
.content(content)
.send(new BufferingResponseListener()
{
@ -529,7 +529,7 @@ public class HttpClientContinueTest extends AbstractHttpClientServerTest
DeferredContentProvider content = new DeferredContentProvider(ByteBuffer.wrap(chunk1));
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.header(HttpHeader.EXPECT.asString(), HttpHeaderValue.CONTINUE.asString())
.header(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString())
.content(content)
.send(new BufferingResponseListener()
{
@ -602,7 +602,7 @@ public class HttpClientContinueTest extends AbstractHttpClientServerTest
final CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.header(HttpHeader.EXPECT.asString(), HttpHeaderValue.CONTINUE.asString())
.header(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString())
.onRequestHeaders(new org.eclipse.jetty.client.api.Request.HeadersListener()
{
@Override
@ -676,7 +676,7 @@ public class HttpClientContinueTest extends AbstractHttpClientServerTest
final CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.header(HttpHeader.EXPECT.asString(), HttpHeaderValue.CONTINUE.asString())
.header(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString())
.content(content)
.send(new BufferingResponseListener()
{

View File

@ -29,7 +29,6 @@ import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -39,7 +38,9 @@ import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.toolchain.test.annotation.Stress;
@ -128,11 +129,11 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
HttpMethod method = random.nextBoolean() ? HttpMethod.GET : HttpMethod.POST;
request.method(method);
boolean ssl = "https".equalsIgnoreCase(scheme);
boolean ssl = HttpScheme.HTTPS.is(scheme);
// Choose randomly whether to close the connection on the client or on the server
if (!ssl && random.nextBoolean())
request.header("Connection", "close");
request.header(HttpHeader.CONNECTION, "close");
else if (!ssl && random.nextBoolean())
request.header("X-Close", "true");

View File

@ -20,10 +20,10 @@ package org.eclipse.jetty.client;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -184,7 +184,7 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
public void onBegin(Request request)
{
// Remove the host header, this will make the request invalid
request.header(HttpHeader.HOST.asString(), null);
request.header(HttpHeader.HOST, null);
}
@Override
@ -244,7 +244,7 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
public void onBegin(Request request)
{
// Remove the host header, this will make the request invalid
request.header(HttpHeader.HOST.asString(), null);
request.header(HttpHeader.HOST, null);
}
@Override
@ -410,9 +410,11 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
Log.getLogger(HttpConnection.class).info("Expecting java.lang.IllegalStateException: HttpParser{s=CLOSED,...");
final CountDownLatch latch = new CountDownLatch(1);
ByteBuffer buffer = ByteBuffer.allocate(16 * 1024 * 1024);
Arrays.fill(buffer.array(),(byte)'x');
client.newRequest(host, port)
.scheme(scheme)
.content(new ByteBufferContentProvider(ByteBuffer.allocate(16 * 1024 * 1024)))
.content(new ByteBufferContentProvider(buffer))
.send(new Response.Listener.Empty()
{
@Override
@ -428,6 +430,7 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
Assert.assertEquals(0, idleConnections.size());
Assert.assertEquals(0, activeConnections.size());
server.stop();
}
finally
{

View File

@ -22,12 +22,12 @@ import java.io.IOException;
import java.net.HttpCookie;
import java.net.URI;
import java.util.List;
import javax.servlet.ServletException;
import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
@ -101,4 +101,24 @@ public class HttpCookieTest extends AbstractHttpClientServerTest
Response response = client.GET(scheme + "://" + host + ":" + port + path);
Assert.assertEquals(200, response.getStatus());
}
@Test
public void test_CookieWithoutValue() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.addHeader("Set-Cookie", "");
}
});
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send();
Assert.assertEquals(200, response.getStatus());
Assert.assertTrue(client.getCookieStore().getCookies().isEmpty());
}
}

View File

@ -161,21 +161,21 @@ etc/jetty-http.xml
#
# SPDY requires the NPN jar iwhich must be separately downloaded:
#
# http://repo1.maven.org/maven2/org/mortbay/jetty/npn/npn-boot/1.1.1.v20121030/npn-boot-1.1.1.v20121030.jar
# http://repo1.maven.org/maven2/org/mortbay/jetty/npn/npn-boot/1.1.2.v20130305/npn-boot-1.1.2.v20130305.jar
#
# Which should be saved in lib/npn-boot-1.1.1.v20121030.jar
# Which should be saved in lib/npn-boot-1.1.2.v20130305.jar
#
# To include the NPN jar on the boot path, you must either:
#
# a) enable --exec above and uncomment the -Xbootclass line
# below
#
# b) Add -Xbootclasspath/p:lib/npn-boot-1.1.1.v20121030.jar
# b) Add -Xbootclasspath/p:lib/npn-boot-1.1.2.v20130305.jar
# to the command line when running jetty.
#
#-----------------------------------------------------------
# OPTIONS=spdy
# -Xbootclasspath/p:lib/npn-boot-1.1.1.v20121030.jar
# -Xbootclasspath/p:lib/npn-boot-1.1.2.v20130305.jar
# etc/jetty-spdy.xml
#===========================================================

View File

@ -23,10 +23,8 @@ import java.nio.ByteBuffer;
import org.eclipse.jetty.http.HttpTokens.EndOfContent;
import org.eclipse.jetty.util.ArrayTernaryTrie;
import org.eclipse.jetty.util.ArrayTrie;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.TreeTrie;
import org.eclipse.jetty.util.Trie;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -34,6 +32,7 @@ import org.eclipse.jetty.util.log.Logger;
public class HttpParser
{
public static final Logger LOG = Log.getLogger(HttpParser.class);
static final int INITIAL_URI_LENGTH=256;
// States
public enum State
@ -80,7 +79,7 @@ public class HttpParser
private HttpMethod _method;
private String _methodString;
private HttpVersion _version;
private ByteBuffer _uri=ByteBuffer.allocate(256); // Tune?
private ByteBuffer _uri=ByteBuffer.allocate(INITIAL_URI_LENGTH); // Tune?
private byte _eol;
private EndOfContent _endOfContent;
private long _contentLength;
@ -89,7 +88,7 @@ public class HttpParser
private int _chunkPosition;
private boolean _headResponse;
private ByteBuffer _contentChunk;
private final Trie<HttpField> _connectionFields=new ArrayTernaryTrie<>(256);
private Trie<HttpField> _connectionFields;
private int _length;
private final StringBuilder _string=new StringBuilder();
@ -381,7 +380,7 @@ public class HttpParser
badMessage(buffer,HttpStatus.REQUEST_URI_TOO_LONG_414,null);
return true;
}
if (_uri.remaining()<len)
if (_uri.remaining()<=len)
{
ByteBuffer uri = ByteBuffer.allocate(_uri.capacity()+2*len);
_uri.flip();
@ -513,6 +512,14 @@ public class HttpParser
return true;
}
// Should we try to cache header fields?
if (_version.getVersion()>=HttpVersion.HTTP_1_1.getVersion())
{
int header_cache = _handler.getHeaderCacheSize();
if (header_cache>0)
_connectionFields=new ArrayTernaryTrie<>(header_cache);
}
_eol=ch;
setState(State.HEADER);
_uri.flip();
@ -592,7 +599,7 @@ public class HttpParser
break;
case HOST:
add_to_connection_trie=_field==null;
add_to_connection_trie=_connectionFields!=null && _field==null;
_host=true;
String host=_valueString;
int port=0;
@ -630,6 +637,12 @@ public class HttpParser
break;
case CONNECTION:
// Don't cache if not persistent
if (_valueString!=null && _valueString.indexOf("close")>=0)
_connectionFields=null;
break;
case AUTHORIZATION:
case ACCEPT:
case ACCEPT_CHARSET:
@ -638,7 +651,7 @@ public class HttpParser
case COOKIE:
case CACHE_CONTROL:
case USER_AGENT:
add_to_connection_trie=_field==null;
add_to_connection_trie=_connectionFields!=null && _field==null;
}
if (add_to_connection_trie && !_connectionFields.isFull() && _header!=null && _valueString!=null)
@ -786,7 +799,7 @@ public class HttpParser
if (buffer.remaining()>6)
{
// Try a look ahead for the known header name and value.
_field=_connectionFields.getBest(buffer,-1,buffer.remaining());
_field=_connectionFields==null?null:_connectionFields.getBest(buffer,-1,buffer.remaining());
if (_field==null)
_field=HttpField.CACHE.getBest(buffer,-1,buffer.remaining());
@ -1266,7 +1279,6 @@ public class HttpParser
}
LOG.warn("badMessage: "+e.toString()+" for "+_handler);
e.printStackTrace();
LOG.debug(e);
badMessage(buffer,HttpStatus.BAD_REQUEST_400,null);
return true;
@ -1396,6 +1408,11 @@ public class HttpParser
public boolean earlyEOF();
public void badMessage(int status, String reason);
/* ------------------------------------------------------------ */
/** @return the size in bytes of the per parser header cache
*/
public int getHeaderCacheSize();
}
public interface RequestHandler<T> extends HttpHandler<T>

View File

@ -229,6 +229,12 @@ public class HttpTester
}
abstract public HttpGenerator.Info getInfo();
@Override
public int getHeaderCacheSize()
{
return 0;
}
}
public static class Request extends Message implements HttpParser.RequestHandler<ByteBuffer>

View File

@ -52,7 +52,7 @@ jnlp=application/x-java-jnlp-file
jpe=image/jpeg
jpeg=image/jpeg
jpg=image/jpeg
js=application/x-javascript
js=application/javascript
jsp=text/html
kar=audio/midi
latex=application/x-latex

View File

@ -91,6 +91,12 @@ public class HttpGeneratorServerTest
{
throw new IllegalStateException(reason);
}
@Override
public int getHeaderCacheSize()
{
return 256;
}
}
private static class TR

View File

@ -131,6 +131,20 @@ public class HttpParserTest
assertEquals(-1, _h);
}
@Test
public void testLongURLParse() throws Exception
{
ByteBuffer buffer= BufferUtil.toBuffer("POST /123456789abcdef/123456789abcdef/123456789abcdef/123456789abcdef/123456789abcdef/123456789abcdef/123456789abcdef/123456789abcdef/123456789abcdef/123456789abcdef/123456789abcdef/123456789abcdef/123456789abcdef/123456789abcdef/123456789abcdef/123456789abcdef/ HTTP/1.0\015\012" + "\015\012");
Handler handler = new Handler();
HttpParser parser= new HttpParser((HttpParser.RequestHandler)handler);
parseAll(parser,buffer);
assertEquals("POST", _methodOrVersion);
assertEquals("/123456789abcdef/123456789abcdef/123456789abcdef/123456789abcdef/123456789abcdef/123456789abcdef/123456789abcdef/123456789abcdef/123456789abcdef/123456789abcdef/123456789abcdef/123456789abcdef/123456789abcdef/123456789abcdef/123456789abcdef/123456789abcdef/", _uriOrStatus);
assertEquals("HTTP/1.0", _versionOrReason);
assertEquals(-1, _h);
}
@Test
public void testConnect() throws Exception
{
@ -926,5 +940,11 @@ public class HttpParserTest
{
return true;
}
@Override
public int getHeaderCacheSize()
{
return 512;
}
}
}

View File

@ -26,6 +26,7 @@ import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
@ -725,7 +726,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
{
channel.register(_selector, SelectionKey.OP_CONNECT, this);
}
catch (ClosedChannelException x)
catch (ClosedSelectorException | ClosedChannelException x)
{
LOG.debug(x);
}

View File

@ -58,6 +58,7 @@ public class DataSourceCloser implements Destroyable
_shutdown=shutdownSQL;
}
@Override
public void destroy()
{
try

View File

@ -0,0 +1,31 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<groupId>org.eclipse.jetty.osgi</groupId>
<artifactId>jetty-osgi-project</artifactId>
<version>9.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>jetty-osgi-npn</artifactId>
<name>Jetty :: OSGi NPN Fragment</name>
<packaging>jar</packaging>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifestEntries>
<Bundle-ManifestVersion>2</Bundle-ManifestVersion>
<Bundle-SymbolicName>org.eclipse.jetty.osgi.npn.fragment;singleton:=true</Bundle-SymbolicName>
<Bundle-Name>Jetty OSGi NPN Fragment</Bundle-Name>
<Bundle-Version>9.0.0</Bundle-Version>
<Export-Package>org.eclipse.jetty.npn;version="1.1.2"</Export-Package>
<Fragment-Host>system.bundle;extension:=framework</Fragment-Host>
</manifestEntries>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -21,6 +21,7 @@
<module>jetty-osgi-boot</module>
<module>jetty-osgi-boot-jsp</module>
<module>jetty-osgi-boot-warurl</module>
<module>jetty-osgi-npn</module>
<module>jetty-osgi-httpservice</module>
<module>test-jetty-osgi-webapp</module>
<module>test-jetty-osgi-context</module>

View File

@ -19,7 +19,7 @@
<felixversion>4.0.3</felixversion>
<injection.bundle.version>1.0</injection.bundle.version>
<runner.version>1.7.6</runner.version>
<npn-version>1.1.0.v20120525</npn-version>
<npn-version>1.1.2.v20130305</npn-version>
</properties>
<dependencies>
<!-- Pax Exam Dependencies -->
@ -289,6 +289,13 @@
<version>${npn-version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.osgi</groupId>
<artifactId>jetty-osgi-npn</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>

View File

@ -77,8 +77,9 @@ public class TestJettyOSGiBootSpdy
if (!checkNpnBoot.exists()) { throw new IllegalStateException("Unable to find the npn boot jar here: " + npnBoot); }
res.add(CoreOptions.vmOptions("-Xbootclasspath/p:" + npnBoot));
res.add(CoreOptions.bootDelegationPackages("org.eclipse.jetty.npn"));
// res.add(CoreOptions.bootDelegationPackages("org.eclipse.jetty.npn"));
res.add(mavenBundle().groupId("org.eclipse.jetty.osgi").artifactId("jetty-osgi-npn").versionAsInProject().noStart());
res.add(mavenBundle().groupId("org.eclipse.jetty.spdy").artifactId("spdy-core").versionAsInProject().noStart());
res.add(mavenBundle().groupId("org.eclipse.jetty.spdy").artifactId("spdy-server").versionAsInProject().noStart());
res.add(mavenBundle().groupId("org.eclipse.jetty.spdy").artifactId("spdy-http-server").versionAsInProject().noStart());

View File

@ -44,6 +44,7 @@ import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.InputStreamContentProvider;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.server.handler.ContextHandler;
@ -397,14 +398,14 @@ public class ProxyServlet extends HttpServlet
// Force the Host header if configured
if (_hostHeader != null)
proxyRequest.header("Host", _hostHeader);
proxyRequest.header(HttpHeader.HOST, _hostHeader);
// Add proxy headers
proxyRequest.header("Via", "http/1.1 " + _viaHost);
proxyRequest.header("X-Forwarded-For", request.getRemoteAddr());
proxyRequest.header("X-Forwarded-Proto", request.getScheme());
proxyRequest.header("X-Forwarded-Host", request.getHeader("Host"));
proxyRequest.header("X-Forwarded-Server", request.getLocalName());
proxyRequest.header(HttpHeader.VIA, "http/1.1 " + _viaHost);
proxyRequest.header(HttpHeader.X_FORWARDED_FOR, request.getRemoteAddr());
proxyRequest.header(HttpHeader.X_FORWARDED_PROTO, request.getScheme());
proxyRequest.header(HttpHeader.X_FORWARDED_HOST, request.getHeader(HttpHeader.HOST.asString()));
proxyRequest.header(HttpHeader.X_FORWARDED_SERVER, request.getLocalName());
proxyRequest.content(new InputStreamContentProvider(request.getInputStream())
{

View File

@ -59,12 +59,15 @@ import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.toolchain.test.AdvancedRunner;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.StdErrLog;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
@ -178,6 +181,9 @@ public class ProxyServletTest
@Test
public void testServerException() throws Exception
{
((StdErrLog)Log.getLogger(ServletHandler.class)).setHideStacks(true);
try
{
prepareProxy(new ProxyServlet());
prepareServer(new HttpServlet()
@ -185,7 +191,7 @@ public class ProxyServletTest
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
{
throw new ServletException();
throw new ServletException("Expected Test Exception");
}
});
@ -195,6 +201,11 @@ public class ProxyServletTest
Assert.assertEquals(500, response.getStatus());
}
finally
{
((StdErrLog)Log.getLogger(ServletHandler.class)).setHideStacks(false);
}
}
@Test
public void testProxyWithoutContent() throws Exception

View File

@ -82,6 +82,7 @@
<Set name="responseHeaderSize">8192</Set>
<Set name="sendServerVersion">true</Set>
<Set name="sendDateHeader">false</Set>
<Set name="headerCacheSize">512</Set>
<!-- Uncomment to enable handling of X-Forwarded- style headers
<Call name="addCustomizer">

View File

@ -167,6 +167,12 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
return _endPoint.getRemoteAddress();
}
@Override
public int getHeaderCacheSize()
{
return _configuration.getHeaderCacheSize();
}
/**
* If the associated response has the Expect header set to 100 Continue,
* then accessing the input stream indicates that the handler/servlet

View File

@ -44,6 +44,7 @@ public class HttpConfiguration
private int _outputBufferSize=32*1024;
private int _requestHeaderSize=8*1024;
private int _responseHeaderSize=8*1024;
private int _headerCacheSize=512;
private int _securePort;
private String _secureScheme = HttpScheme.HTTPS.asString();
private boolean _sendServerVersion = true; //send Server: header
@ -78,6 +79,7 @@ public class HttpConfiguration
_secureScheme=config._secureScheme;
_sendDateHeader=config._sendDateHeader;
_sendServerVersion=config._sendServerVersion;
_headerCacheSize=config._headerCacheSize;
}
/* ------------------------------------------------------------ */
@ -125,6 +127,12 @@ public class HttpConfiguration
return _responseHeaderSize;
}
@ManagedAttribute("The maximum allowed size in bytes for a HTTP header field cache")
public int getHeaderCacheSize()
{
return _headerCacheSize;
}
@ManagedAttribute("The port to which Integral or Confidential security constraints are redirected")
public int getSecurePort()
{
@ -210,6 +218,15 @@ public class HttpConfiguration
_responseHeaderSize = responseHeaderSize;
}
/* ------------------------------------------------------------ */
/** Set the header field cache size.
* @param headerCacheSize The size in bytes of the header field cache.
*/
public void setHeaderCacheSize(int headerCacheSize)
{
_headerCacheSize = headerCacheSize;
}
/* ------------------------------------------------------------ */
/** Set the TCP/IP port used for CONFIDENTIAL and INTEGRAL
* redirections.

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.server;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeoutException;
@ -447,6 +448,10 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
{
throw new IOException(e);
}
catch (ClosedChannelException e)
{
throw new EofException(e);
}
}
@Override

View File

@ -702,7 +702,12 @@ public class Request implements HttpServletRequest
public String getLocalAddr()
{
InetSocketAddress local=_channel.getLocalAddress();
return local.getAddress().getHostAddress();
if (local==null)
return "";
InetAddress address = local.getAddress();
if (address==null)
return local.getHostString();
return address.getHostAddress();
}
/* ------------------------------------------------------------ */
@ -919,7 +924,14 @@ public class Request implements HttpServletRequest
if (remote==null)
remote=_channel.getRemoteAddress();
return remote==null?"":remote.getHostString();
if (remote==null)
return "";
InetAddress address = remote.getAddress();
if (address==null)
return remote.getHostString();
return address.getHostAddress();
}
/* ------------------------------------------------------------ */

View File

@ -157,7 +157,7 @@ public class ShutdownMonitor extends Thread
return;
}
while (true)
while (serverSocket != null)
{
Socket socket = null;
try
@ -190,7 +190,9 @@ public class ShutdownMonitor extends Thread
// Shutdown Monitor
debug("Shutting down monitor");
close(socket);
socket = null;
close(serverSocket);
serverSocket = null;
if (exitVm)
{

View File

@ -340,16 +340,19 @@ public abstract class AbstractSession implements AbstractSessionManager.SessionI
_manager.removeSession(this,true);
// Notify listeners and unbind values
boolean do_invalidate=false;
synchronized (this)
{
if (!_invalid)
{
if (_requests<=0)
doInvalidate();
do_invalidate=true;
else
_doInvalidate=true;
}
}
if (do_invalidate)
doInvalidate();
}
/* ------------------------------------------------------------- */

View File

@ -55,8 +55,6 @@ import org.junit.Test;
*/
public class HttpConnectionTest
{
private static final Logger LOG = Log.getLogger(HttpConnectionTest.class);
private Server server;
private LocalConnector connector;

View File

@ -318,7 +318,7 @@ public abstract class AbstractCompressedStream extends ServletOutputStream
}
/**
* @see org.eclipse.jetty.http.gzip.CompressedStream#getOutputStream()
* @see org.eclipse.jetty.http.gzip.CompressedStream#createOutputStream()
*/
public OutputStream getOutputStream()
{

View File

@ -99,7 +99,7 @@ public class IncludableGzipFilterMinSizeTest
tester.setGzipFilterClass(IncludableGzipFilter.class);
FilterHolder holder = tester.setContentServlet(testServlet);
holder.setInitParameter("mimeTypes","application/soap+xml,text/javascript,application/x-javascript");
holder.setInitParameter("mimeTypes","application/soap+xml,text/javascript,application/javascript");
holder.setInitParameter("minGzipSize", "2048");
holder.setInitParameter("uncheckedPrintWriter","true");

View File

@ -13,7 +13,8 @@
<name>Jetty :: SPDY :: Parent</name>
<properties>
<npn.version>1.1.0.v20120525</npn.version>
<npn.version>1.1.2.v20130305</npn.version>
<npn.api.version>1.1.0.v20120525</npn.api.version>
</properties>
<modules>

View File

@ -76,7 +76,7 @@
<dependency>
<groupId>org.eclipse.jetty.npn</groupId>
<artifactId>npn-api</artifactId>
<version>${npn.version}</version>
<version>${npn.api.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

View File

@ -1350,6 +1350,7 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
{
bufferPool.release(buffer);
IStream stream = getStream();
dataInfo.consume(size);
flowControlStrategy.updateWindow(StandardSession.this, stream, -size);
if (dataInfo.available() > 0)
{

View File

@ -403,7 +403,7 @@ public class StandardStream implements IStream
if (isLocallyClosed())
{
session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter());
throw new IllegalStateException("Protocol violation: cannot send a DATA frame on a closed stream");
throw new IllegalStateException("Protocol violation: cannot send a DATA frame on a locally closed stream");
}
// Cannot update the close state here, because the data that we send may
@ -481,7 +481,7 @@ public class StandardStream implements IStream
@Override
public String toString()
{
return String.format("stream=%d v%d windowSize=%db reset=%s prio=%d %s %s", getId(), session.getVersion(),
return String.format("stream=%d v%d windowSize=%d reset=%s prio=%d %s %s", getId(), session.getVersion(),
getWindowSize(), isReset(), priority, openState, closeState);
}

View File

@ -97,13 +97,13 @@ public interface Stream
/**
* <p>Initiate a unidirectional spdy pushstream associated to this stream asynchronously<p> <p>Callers may pass a
* non-null completion callback to be notified of when the pushstream has been established.</p>
* non-null completion promise to be notified of when the pushstream has been established.</p>
*
* @param pushInfo the metadata to send on stream creation
* @param callback the completion callback that gets notified once the pushstream is established
* @param promise the completion promise that gets notified once the pushstream is established
* @see #push(PushInfo)
*/
public void push(PushInfo pushInfo, Promise<Stream> callback);
public void push(PushInfo pushInfo, Promise<Stream> promise);
/**
* <p>Sends asynchronously a SYN_REPLY frame in response to a SYN_STREAM frame.</p> <p>Callers may use the returned

View File

@ -99,7 +99,7 @@
<dependency>
<groupId>org.eclipse.jetty.npn</groupId>
<artifactId>npn-api</artifactId>
<version>${npn.version}</version>
<version>${npn.api.version}</version>
<scope>test</scope>
</dependency>
<dependency>

View File

@ -150,6 +150,13 @@ public class ProxyHTTPSPDYConnection extends HttpConnection implements HttpParse
return false;
}
@Override
public int getHeaderCacheSize()
{
// TODO get from configuration
return 256;
}
@Override
public boolean earlyEOF()
{

View File

@ -58,7 +58,7 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
{
private static final Logger LOG = Log.getLogger(SPDYProxyEngine.class);
private static final String STREAM_HANDLER_ATTRIBUTE = "org.eclipse.jetty.spdy.server.http.proxy.streamHandler";
private static final String STREAM_PROMISE_ATTRIBUTE = "org.eclipse.jetty.spdy.server.http.proxy.streamPromise";
private static final String CLIENT_STREAM_ATTRIBUTE = "org.eclipse.jetty.spdy.server.http.proxy.clientStream";
private final ConcurrentMap<String, Session> serverSessions = new ConcurrentHashMap<>();
@ -113,9 +113,9 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
SynInfo serverSynInfo = new SynInfo(headers, clientSynInfo.isClose());
StreamFrameListener listener = new ProxyStreamFrameListener(clientStream);
StreamHandler handler = new StreamHandler(clientStream, serverSynInfo);
clientStream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler);
serverSession.syn(serverSynInfo, listener, handler);
StreamPromise promise = new StreamPromise(clientStream, serverSynInfo);
clientStream.setAttribute(STREAM_PROMISE_ATTRIBUTE, promise);
serverSession.syn(serverSynInfo, listener, promise);
return this;
}
@ -166,8 +166,8 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
}
};
StreamHandler streamHandler = (StreamHandler)clientStream.getAttribute(STREAM_HANDLER_ATTRIBUTE);
streamHandler.data(serverDataInfo);
StreamPromise streamPromise = (StreamPromise)clientStream.getAttribute(STREAM_PROMISE_ATTRIBUTE);
streamPromise.data(serverDataInfo);
}
private Session produceSession(String host, short version, InetSocketAddress address)
@ -219,37 +219,24 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
stream.getSession().rst(rstInfo, new Callback.Adapter());
}
private class ProxyStreamFrameListener extends StreamFrameListener.Adapter
private class ProxyPushStreamFrameListener implements StreamFrameListener
{
private final Stream clientStream;
private volatile ReplyInfo replyInfo;
private PushStreamPromise pushStreamPromise;
public ProxyStreamFrameListener(Stream clientStream)
private ProxyPushStreamFrameListener(PushStreamPromise pushStreamPromise)
{
this.clientStream = clientStream;
this.pushStreamPromise = pushStreamPromise;
}
@Override
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
LOG.debug("S -> P pushed {} on {}", pushInfo, stream);
LOG.debug("S -> P pushed {} on {}. Opening new PushStream P -> C now.", pushInfo, stream);
PushStreamPromise newPushStreamPromise = new PushStreamPromise(stream, pushInfo);
this.pushStreamPromise.push(newPushStreamPromise);
return new ProxyPushStreamFrameListener(newPushStreamPromise);
}
Fields headers = new Fields(pushInfo.getHeaders(), false);
addResponseProxyHeaders(stream, headers);
customizeResponseHeaders(stream, headers);
Stream clientStream = (Stream)stream.getAssociatedStream().getAttribute
(CLIENT_STREAM_ATTRIBUTE);
convert(stream.getSession().getVersion(), clientStream.getSession().getVersion(),
headers);
StreamHandler handler = new StreamHandler(clientStream, pushInfo);
stream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler);
clientStream.push(new PushInfo(getTimeout(), TimeUnit.MILLISECONDS, headers,
pushInfo.isClose()),
handler);
return new Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
@ -278,28 +265,55 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
}
};
StreamHandler handler = (StreamHandler)serverStream.getAttribute(STREAM_HANDLER_ATTRIBUTE);
handler.data(clientDataInfo);
pushStreamPromise.data(clientDataInfo);
}
};
}
private class ProxyStreamFrameListener extends StreamFrameListener.Adapter
{
private final Stream receiverStream;
public ProxyStreamFrameListener(Stream receiverStream)
{
this.receiverStream = receiverStream;
}
@Override
public StreamFrameListener onPush(Stream senderStream, PushInfo pushInfo)
{
LOG.debug("S -> P {} on {}");
PushInfo newPushInfo = convertPushInfo(pushInfo, senderStream, receiverStream);
PushStreamPromise pushStreamPromise = new PushStreamPromise(senderStream, newPushInfo);
receiverStream.push(newPushInfo, pushStreamPromise);
return new ProxyPushStreamFrameListener(pushStreamPromise);
}
@Override
public void onReply(final Stream stream, ReplyInfo replyInfo)
{
LOG.debug("S -> P {} on {}", replyInfo, stream);
final ReplyInfo clientReplyInfo = new ReplyInfo(convertHeaders(stream, receiverStream, replyInfo.getHeaders()),
replyInfo.isClose());
reply(stream, clientReplyInfo);
}
short serverVersion = stream.getSession().getVersion();
Fields headers = new Fields(replyInfo.getHeaders(), false);
private void reply(final Stream stream, final ReplyInfo clientReplyInfo)
{
receiverStream.reply(clientReplyInfo, new Callback()
{
@Override
public void succeeded()
{
LOG.debug("P -> C {} from {} to {}", clientReplyInfo, stream, receiverStream);
}
addResponseProxyHeaders(stream, headers);
customizeResponseHeaders(stream, headers);
short clientVersion = this.clientStream.getSession().getVersion();
convert(serverVersion, clientVersion, headers);
this.replyInfo = new ReplyInfo(headers, replyInfo.isClose());
if (replyInfo.isClose())
reply(stream);
@Override
public void failed(Throwable x)
{
LOG.debug(x);
rst(receiverStream);
}
});
}
@Override
@ -313,101 +327,82 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
public void onData(final Stream stream, final DataInfo dataInfo)
{
LOG.debug("S -> P {} on {}", dataInfo, stream);
if (replyInfo != null)
{
if (dataInfo.isClose())
replyInfo.getHeaders().put("content-length", String.valueOf(dataInfo.available()));
reply(stream);
}
data(stream, dataInfo);
}
private void reply(final Stream stream)
private void data(final Stream stream, final DataInfo serverDataInfo)
{
final ReplyInfo replyInfo = this.replyInfo;
this.replyInfo = null;
clientStream.reply(replyInfo, new Callback()
final ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose())
{
@Override
public void consume(int delta)
{
super.consume(delta);
serverDataInfo.consume(delta);
}
};
receiverStream.data(clientDataInfo, new Callback() //TODO: timeout???
{
@Override
public void succeeded()
{
LOG.debug("P -> C {} from {} to {}", replyInfo, stream, clientStream);
LOG.debug("P -> C {} from {} to {}", clientDataInfo, stream, receiverStream);
}
@Override
public void failed(Throwable x)
{
LOG.debug(x);
rst(clientStream);
}
});
}
private void data(final Stream stream, final DataInfo dataInfo)
{
clientStream.data(dataInfo, new Callback() //TODO: timeout???
{
@Override
public void succeeded()
{
dataInfo.consume(dataInfo.length());
LOG.debug("P -> C {} from {} to {}", dataInfo, stream, clientStream);
}
@Override
public void failed(Throwable x)
{
LOG.debug(x);
rst(clientStream);
rst(receiverStream);
}
});
}
}
/**
* <p>{@link StreamHandler} implements the forwarding of DATA frames from the client to the server.</p> <p>Instances
* of this class buffer DATA frames sent by clients and send them to the server. The buffering happens between the
* send of the SYN_STREAM to the server (where DATA frames may arrive from the client before the SYN_STREAM has been
* fully sent), and between DATA frames, if the client is a fast producer and the server a slow consumer, or if the
* client is a SPDY v2 client (and hence without flow control) while the server is a SPDY v3 server (and hence with
* flow control).</p>
* <p>{@link StreamPromise} implements the forwarding of DATA frames from the client to the server and vice
* versa.</p> <p>Instances of this class buffer DATA frames sent by clients and send them to the server. The
* buffering happens between the send of the SYN_STREAM to the server (where DATA frames may arrive from the client
* before the SYN_STREAM has been fully sent), and between DATA frames, if the client is a fast producer and the
* server a slow consumer, or if the client is a SPDY v2 client (and hence without flow control) while the server is
* a SPDY v3 server (and hence with flow control).</p>
*/
private class StreamHandler implements Promise<Stream>
private class StreamPromise implements Promise<Stream>
{
private final Queue<DataInfoHandler> queue = new LinkedList<>();
private final Stream clientStream;
private final Queue<DataInfoCallback> queue = new LinkedList<>();
private final Stream senderStream;
private final Info info;
private Stream serverStream;
private Stream receiverStream;
private StreamHandler(Stream clientStream, Info info)
private StreamPromise(Stream senderStream, Info info)
{
this.clientStream = clientStream;
this.senderStream = senderStream;
this.info = info;
}
@Override
public void succeeded(Stream serverStream)
public void succeeded(Stream stream)
{
LOG.debug("P -> S {} from {} to {}", info, clientStream, serverStream);
LOG.debug("P -> S {} from {} to {}", info, senderStream, stream);
serverStream.setAttribute(CLIENT_STREAM_ATTRIBUTE, clientStream);
stream.setAttribute(CLIENT_STREAM_ATTRIBUTE, senderStream);
DataInfoHandler dataInfoHandler;
DataInfoCallback dataInfoCallback;
synchronized (queue)
{
this.serverStream = serverStream;
dataInfoHandler = queue.peek();
if (dataInfoHandler != null)
this.receiverStream = stream;
dataInfoCallback = queue.peek();
if (dataInfoCallback != null)
{
if (dataInfoHandler.flushing)
if (dataInfoCallback.flushing)
{
LOG.debug("SYN completed, flushing {}, queue size {}", dataInfoHandler.dataInfo, queue.size());
dataInfoHandler = null;
LOG.debug("SYN completed, flushing {}, queue size {}", dataInfoCallback.dataInfo, queue.size());
dataInfoCallback = null;
}
else
{
dataInfoHandler.flushing = true;
dataInfoCallback.flushing = true;
LOG.debug("SYN completed, queue size {}", queue.size());
}
}
@ -416,37 +411,37 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
LOG.debug("SYN completed, queue empty");
}
}
if (dataInfoHandler != null)
flush(serverStream, dataInfoHandler);
if (dataInfoCallback != null)
flush(stream, dataInfoCallback);
}
@Override
public void failed(Throwable x)
{
LOG.debug(x);
rst(clientStream);
rst(senderStream);
}
public void data(DataInfo dataInfo)
{
Stream serverStream;
DataInfoHandler dataInfoHandler = null;
DataInfoHandler item = new DataInfoHandler(dataInfo);
Stream receiverStream;
DataInfoCallback dataInfoCallbackToFlush = null;
DataInfoCallback dataInfoCallBackToQueue = new DataInfoCallback(dataInfo);
synchronized (queue)
{
queue.offer(item);
serverStream = this.serverStream;
if (serverStream != null)
queue.offer(dataInfoCallBackToQueue);
receiverStream = this.receiverStream;
if (receiverStream != null)
{
dataInfoHandler = queue.peek();
if (dataInfoHandler.flushing)
dataInfoCallbackToFlush = queue.peek();
if (dataInfoCallbackToFlush.flushing)
{
LOG.debug("Queued {}, flushing {}, queue size {}", dataInfo, dataInfoHandler.dataInfo, queue.size());
serverStream = null;
LOG.debug("Queued {}, flushing {}, queue size {}", dataInfo, dataInfoCallbackToFlush.dataInfo, queue.size());
receiverStream = null;
}
else
{
dataInfoHandler.flushing = true;
dataInfoCallbackToFlush.flushing = true;
LOG.debug("Queued {}, queue size {}", dataInfo, queue.size());
}
}
@ -455,22 +450,22 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
LOG.debug("Queued {}, SYN incomplete, queue size {}", dataInfo, queue.size());
}
}
if (serverStream != null)
flush(serverStream, dataInfoHandler);
if (receiverStream != null)
flush(receiverStream, dataInfoCallbackToFlush);
}
private void flush(Stream serverStream, DataInfoHandler dataInfoHandler)
private void flush(Stream receiverStream, DataInfoCallback dataInfoCallback)
{
LOG.debug("P -> S {} on {}", dataInfoHandler.dataInfo, serverStream);
serverStream.data(dataInfoHandler.dataInfo, dataInfoHandler); //TODO: timeout???
LOG.debug("P -> S {} on {}", dataInfoCallback.dataInfo, receiverStream);
receiverStream.data(dataInfoCallback.dataInfo, dataInfoCallback); //TODO: timeout???
}
private class DataInfoHandler implements Callback
private class DataInfoCallback implements Callback
{
private final DataInfo dataInfo;
private boolean flushing;
private DataInfoHandler(DataInfo dataInfo)
private DataInfoCallback(DataInfo dataInfo)
{
this.dataInfo = dataInfo;
}
@ -479,18 +474,18 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
public void succeeded()
{
Stream serverStream;
DataInfoHandler dataInfoHandler;
DataInfoCallback dataInfoCallback;
synchronized (queue)
{
serverStream = StreamHandler.this.serverStream;
serverStream = StreamPromise.this.receiverStream;
assert serverStream != null;
dataInfoHandler = queue.poll();
assert dataInfoHandler == this;
dataInfoHandler = queue.peek();
if (dataInfoHandler != null)
dataInfoCallback = queue.poll();
assert dataInfoCallback == this;
dataInfoCallback = queue.peek();
if (dataInfoCallback != null)
{
assert !dataInfoHandler.flushing;
dataInfoHandler.flushing = true;
assert !dataInfoCallback.flushing;
dataInfoCallback.flushing = true;
LOG.debug("Completed {}, queue size {}", dataInfo, queue.size());
}
else
@ -498,22 +493,71 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
LOG.debug("Completed {}, queue empty", dataInfo);
}
}
if (dataInfoHandler != null)
flush(serverStream, dataInfoHandler);
if (dataInfoCallback != null)
flush(serverStream, dataInfoCallback);
}
@Override
public void failed(Throwable x)
{
LOG.debug(x);
rst(clientStream);
rst(senderStream);
}
}
public Stream getSenderStream()
{
return senderStream;
}
public Info getInfo()
{
return info;
}
public Stream getReceiverStream()
{
synchronized (queue)
{
return receiverStream;
}
}
}
private class PushStreamPromise extends StreamPromise
{
private volatile PushStreamPromise pushStreamPromise;
private PushStreamPromise(Stream senderStream, PushInfo pushInfo)
{
super(senderStream, pushInfo);
}
@Override
public void succeeded(Stream receiverStream)
{
super.succeeded(receiverStream);
LOG.debug("P -> C PushStreamPromise.succeeded() called with pushStreamPromise: {}", pushStreamPromise);
PushStreamPromise promise = pushStreamPromise;
if (promise != null)
receiverStream.push(convertPushInfo((PushInfo)getInfo(), getSenderStream(), receiverStream), pushStreamPromise);
}
public void push(PushStreamPromise pushStreamPromise)
{
Stream receiverStream = getReceiverStream();
if (receiverStream != null)
receiverStream.push(convertPushInfo((PushInfo)getInfo(), getSenderStream(), receiverStream), pushStreamPromise);
else
this.pushStreamPromise = pushStreamPromise;
}
}
private class ProxySessionFrameListener extends SessionFrameListener.Adapter
{
@Override
public void onRst(Session serverSession, RstInfo serverRstInfo)
{
@ -536,4 +580,20 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
serverSessions.values().remove(serverSession);
}
}
private PushInfo convertPushInfo(PushInfo pushInfo, Stream from, Stream to)
{
Fields headersToConvert = pushInfo.getHeaders();
Fields headers = convertHeaders(from, to, headersToConvert);
return new PushInfo(getTimeout(), TimeUnit.MILLISECONDS, headers, pushInfo.isClose());
}
private Fields convertHeaders(Stream from, Stream to, Fields headersToConvert)
{
Fields headers = new Fields(headersToConvert, false);
addResponseProxyHeaders(from, headers);
customizeResponseHeaders(from, headers);
convert(from.getSession().getVersion(), to.getSession().getVersion(), headers);
return headers;
}
}

View File

@ -29,7 +29,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -38,6 +37,7 @@ import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
@ -153,7 +153,7 @@ public class PushStrategyBenchmarkTest extends AbstractHTTPSPDYTest
++result;
httpClient.newRequest("localhost", connector.getLocalPort())
.path(path)
.header("Referer", referrer)
.header(HttpHeader.REFERER, referrer)
.send(new TestListener());
}
for (int i = 0; i < jsResources.length; ++i)
@ -162,7 +162,7 @@ public class PushStrategyBenchmarkTest extends AbstractHTTPSPDYTest
++result;
httpClient.newRequest("localhost", connector.getLocalPort())
.path(path)
.header("Referer", referrer)
.header(HttpHeader.REFERER, referrer)
.send(new TestListener());
}
for (int i = 0; i < pngResources.length; ++i)
@ -171,7 +171,7 @@ public class PushStrategyBenchmarkTest extends AbstractHTTPSPDYTest
++result;
httpClient.newRequest("localhost", connector.getLocalPort())
.path(path)
.header("Referer", referrer)
.header(HttpHeader.REFERER, referrer)
.send(new TestListener());
}

View File

@ -566,7 +566,6 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
final CountDownLatch mainStreamLatch = new CountDownLatch(2);
final CountDownLatch pushDataLatch = new CountDownLatch(2);
Session session2 = startClient(version, address, null);
LOG.warn("REQUEST FOR PUSHED RESOURCES");
session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter()
{
@Override

View File

@ -311,6 +311,8 @@ public class ProxyHTTPToSPDYTest
Fields responseHeaders = new Fields();
responseHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
responseHeaders.put(HTTPSPDYHeader.STATUS.name(version), "200 OK");
responseHeaders.put("content-length", String.valueOf(data.length));
ReplyInfo replyInfo = new ReplyInfo(responseHeaders, false);
stream.reply(replyInfo, new Callback.Adapter());
stream.data(new BytesDataInfo(data, true), new Callback.Adapter());
@ -437,6 +439,7 @@ public class ProxyHTTPToSPDYTest
Fields responseHeaders = new Fields();
responseHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
responseHeaders.put(HTTPSPDYHeader.STATUS.name(version), "200 OK");
responseHeaders.put("content-length", String.valueOf(data.length));
ReplyInfo replyInfo = new ReplyInfo(responseHeaders, false);
stream.reply(replyInfo, new Callback.Adapter());
stream.data(new BytesDataInfo(data, true), new Callback.Adapter());

View File

@ -328,6 +328,140 @@ public class ProxySPDYToSPDYTest
client.goAway(new GoAwayInfo(5, TimeUnit.SECONDS));
}
@Test
public void testSYNThenSPDYNestedPushIsReceived() throws Exception
{
final byte[] data = "0123456789ABCDEF".getBytes("UTF-8");
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
Fields responseHeaders = new Fields();
responseHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
responseHeaders.put(HTTPSPDYHeader.STATUS.name(version), "200 OK");
stream.reply(new ReplyInfo(responseHeaders, false), new Callback.Adapter());
final Fields pushHeaders = new Fields();
pushHeaders.put(HTTPSPDYHeader.URI.name(version), "/push");
stream.push(new PushInfo(5, TimeUnit.SECONDS, pushHeaders, false), new Promise.Adapter<Stream>()
{
@Override
public void succeeded(Stream pushStream)
{
pushHeaders.put(HTTPSPDYHeader.URI.name(version), "/nestedpush");
pushStream.push(new PushInfo(5, TimeUnit.SECONDS, pushHeaders, false), new Adapter<Stream>()
{
@Override
public void succeeded(Stream pushStream)
{
pushHeaders.put(HTTPSPDYHeader.URI.name(version), "/anothernestedpush");
pushStream.push(new PushInfo(5, TimeUnit.SECONDS, pushHeaders, false), new Adapter<Stream>()
{
@Override
public void succeeded(Stream pushStream)
{
pushStream.data(new BytesDataInfo(data, true), new Callback.Adapter());
}
});
pushStream.data(new BytesDataInfo(data, true), new Callback.Adapter());
}
});
pushStream.data(new BytesDataInfo(data, true), new Callback.Adapter());
}
});
stream.data(new BytesDataInfo(data, true), new Callback.Adapter());
return null;
}
}));
proxyConnector.addConnectionFactory(proxyConnector.getConnectionFactory("spdy/" + version));
final CountDownLatch pushSynLatch = new CountDownLatch(3);
final CountDownLatch pushDataLatch = new CountDownLatch(3);
Session client = factory.newSPDYClient(version).connect(proxyAddress, null).get(5, TimeUnit.SECONDS);
Fields headers = new Fields();
headers.put(HTTPSPDYHeader.HOST.name(version), "localhost:" + proxyAddress.getPort());
final CountDownLatch replyLatch = new CountDownLatch(1);
final CountDownLatch dataLatch = new CountDownLatch(1);
client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
{
// onPush for 1st push stream
@Override
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
pushSynLatch.countDown();
return new StreamFrameListener.Adapter()
{
// onPush for 2nd nested push stream
@Override
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
pushSynLatch.countDown();
return new Adapter()
{
// onPush for 3rd nested push stream
@Override
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
pushSynLatch.countDown();
return new Adapter()
{
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
dataInfo.consume(dataInfo.length());
if (dataInfo.isClose())
pushDataLatch.countDown();
}
};
}
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
dataInfo.consume(dataInfo.length());
if (dataInfo.isClose())
pushDataLatch.countDown();
}
};
}
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
dataInfo.consume(dataInfo.length());
if (dataInfo.isClose())
pushDataLatch.countDown();
}
};
}
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
replyLatch.countDown();
}
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
dataInfo.consume(dataInfo.length());
if (dataInfo.isClose())
dataLatch.countDown();
}
});
Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(pushSynLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(pushDataLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
client.goAway(new GoAwayInfo(5, TimeUnit.SECONDS));
}
@Test
public void testPING() throws Exception
{

View File

@ -58,7 +58,7 @@
<configuration>
<instructions>
<Export-Package>org.eclipse.jetty.spdy.server;version="9.0"</Export-Package>
<Import-Package>!org.eclipse.jetty.npn,org.eclipse.jetty.*;version="[9.0,10.0)",*</Import-Package>
<Import-Package>org.eclipse.jetty.npn,org.eclipse.jetty.*;version="[9.0,10.0)",*</Import-Package>
<_nouses>true</_nouses>
</instructions>
</configuration>
@ -87,7 +87,7 @@
<dependency>
<groupId>org.eclipse.jetty.npn</groupId>
<artifactId>npn-api</artifactId>
<version>${npn.version}</version>
<version>${npn.api.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

View File

@ -139,7 +139,7 @@ public class ContainerLifeCycle extends AbstractLifeCycle implements Container,
Collections.reverse(reverse);
for (Bean b : reverse)
{
if (b._bean instanceof Destroyable && b._managed==Managed.MANAGED)
if (b._bean instanceof Destroyable && (b._managed==Managed.MANAGED || b._managed==Managed.POJO))
{
Destroyable d = (Destroyable)b._bean;
d.destroy();
@ -447,6 +447,7 @@ public class ContainerLifeCycle extends AbstractLifeCycle implements Container,
{
if (_beans.remove(bean))
{
boolean managed=bean.isManaged();
unmanage(bean);
for (Container.Listener l:_listeners)
@ -467,6 +468,19 @@ public class ContainerLifeCycle extends AbstractLifeCycle implements Container,
}
}
}
if (managed && bean._bean instanceof Destroyable)
{
try
{
((Destroyable)bean._bean).destroy();
}
catch(Exception e)
{
LOG.warn(e);
}
}
return true;
}
return false;

View File

@ -90,14 +90,6 @@ class BadResource extends URLResource
throw new FileNotFoundException(_message);
}
/* --------------------------------------------------------- */
@Override
public OutputStream getOutputStream()
throws java.io.IOException, SecurityException
{
throw new FileNotFoundException(_message);
}
/* --------------------------------------------------------- */
@Override
public boolean delete()

View File

@ -277,17 +277,6 @@ public class FileResource extends URLResource
return FileChannel.open(_file.toPath(),StandardOpenOption.READ);
}
/* --------------------------------------------------------- */
/**
* Returns an output stream to the resource
*/
@Override
public OutputStream getOutputStream()
throws java.io.IOException, SecurityException
{
return new FileOutputStream(_file);
}
/* --------------------------------------------------------- */
/**
* Deletes the given resource

View File

@ -396,13 +396,6 @@ public abstract class Resource implements ResourceFactory
public abstract ReadableByteChannel getReadableByteChannel()
throws java.io.IOException;
/* ------------------------------------------------------------ */
/**
* Returns an output stream to the resource
*/
public abstract OutputStream getOutputStream()
throws java.io.IOException, SecurityException;
/* ------------------------------------------------------------ */
/**
* Deletes the given resource

View File

@ -362,22 +362,6 @@ public class ResourceCollection extends Resource
return null;
}
/* ------------------------------------------------------------ */
@Override
public OutputStream getOutputStream() throws IOException, SecurityException
{
if(_resources==null)
throw new IllegalStateException("*resources* not set.");
for(Resource r : _resources)
{
OutputStream os = r.getOutputStream();
if(os!=null)
return os;
}
return null;
}
/* ------------------------------------------------------------ */
@Override
public URL getURL()

View File

@ -232,17 +232,6 @@ public class URLResource extends Resource
return null;
}
/* ------------------------------------------------------------ */
/**
* Returns an output stream to the resource
*/
@Override
public OutputStream getOutputStream()
throws java.io.IOException, SecurityException
{
throw new IOException( "Output not supported");
}
/* ------------------------------------------------------------ */
/**
* Deletes the given resource

View File

@ -18,10 +18,12 @@
package org.eclipse.jetty.util.thread;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.eclipse.jetty.util.component.Destroyable;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -79,7 +81,7 @@ public class ShutdownThread extends Thread
catch(Exception e)
{
LOG.ignore(e);
LOG.info("shutdown already commenced");
LOG.debug("shutdown already commenced");
}
}
@ -131,6 +133,12 @@ public class ShutdownThread extends Thread
lifeCycle.stop();
LOG.debug("Stopped {}",lifeCycle);
}
if (lifeCycle instanceof Destroyable)
{
((Destroyable)lifeCycle).destroy();
LOG.debug("Destroyed {}",lifeCycle);
}
}
catch (Exception ex)
{

View File

@ -117,17 +117,17 @@ public class ContainerLifeCycleTest
a0.destroy();
Assert.assertEquals(3,started.get());
Assert.assertEquals(2,stopped.get());
Assert.assertEquals(1,destroyed.get());
Assert.assertEquals(2,destroyed.get());
a1.stop();
Assert.assertEquals(3,started.get());
Assert.assertEquals(3,stopped.get());
Assert.assertEquals(1,destroyed.get());
Assert.assertEquals(2,destroyed.get());
a1.destroy();
Assert.assertEquals(3,started.get());
Assert.assertEquals(3,stopped.get());
Assert.assertEquals(2,destroyed.get());
Assert.assertEquals(3,destroyed.get());
}

View File

@ -112,15 +112,6 @@ public class OrderingTest
return _name;
}
/**
* @see org.eclipse.jetty.util.resource.Resource#getOutputStream()
*/
@Override
public OutputStream getOutputStream() throws IOException, SecurityException
{
return null;
}
/**
* @see org.eclipse.jetty.util.resource.Resource#getURL()
*/

View File

@ -22,6 +22,26 @@ public class CloseStatus
{
private static final int MAX_CONTROL_PAYLOAD = 125;
private static final int MAX_REASON_PHRASE = MAX_CONTROL_PAYLOAD - 2;
/**
* Convenience method for trimming a long reason phrase at the maximum reason phrase length.
*
* @param reason
* the proposed reason phrase
* @return the reason phrase (trimmed if needed)
*/
public static String trimMaxReasonLength(String reason)
{
if (reason.length() > MAX_REASON_PHRASE)
{
return reason.substring(0,MAX_REASON_PHRASE);
}
else
{
return reason;
}
}
private int code;
private String phrase;

View File

@ -113,7 +113,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
public void close(int statusCode, String reason)
{
connection.close(statusCode,reason);
websocket.onClose(new CloseInfo(statusCode,reason));
notifyClose(statusCode,reason);
}
/**
@ -125,7 +125,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
connection.disconnect();
// notify of harsh disconnect
websocket.onClose(new CloseInfo(StatusCode.NO_CLOSE,"Harsh disconnect"));
notifyClose(StatusCode.NO_CLOSE,"Harsh disconnect");
}
@Override
@ -153,6 +153,36 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
}
}
@Override
public boolean equals(Object obj)
{
if (this == obj)
{
return true;
}
if (obj == null)
{
return false;
}
if (getClass() != obj.getClass())
{
return false;
}
WebSocketSession other = (WebSocketSession)obj;
if (connection == null)
{
if (other.connection != null)
{
return false;
}
}
else if (!connection.equals(other.connection))
{
return false;
}
return true;
}
public LogicalConnection getConnection()
{
return connection;
@ -236,6 +266,15 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
return this.upgradeResponse;
}
@Override
public int hashCode()
{
final int prime = 31;
int result = 1;
result = (prime * result) + ((connection == null)?0:connection.hashCode());
return result;
}
/**
* Incoming Errors from Parser
*/
@ -288,6 +327,11 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
return "wss".equalsIgnoreCase(requestURI.getScheme());
}
public void notifyClose(int statusCode, String reason)
{
websocket.onClose(new CloseInfo(statusCode,reason));
}
/**
* Open/Activate the session
*

View File

@ -249,6 +249,23 @@ public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames
@Override
public String toString()
{
return String.format("ExtensionStack[extensions=%s]",extensions);
StringBuilder s = new StringBuilder();
s.append("ExtensionStack[");
s.append("extensions=[");
boolean delim = false;
for (Extension ext : extensions)
{
if (delim)
{
s.append(',');
}
s.append(ext.getName());
delim = true;
}
s.append("],incoming=").append(this.nextIncoming.getClass().getName());
s.append(",outgoing=").append(this.nextOutgoing.getClass().getName());
s.append("]");
return s.toString();
}
}

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.websocket.common.io;
import java.io.EOFException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
@ -35,10 +36,12 @@ import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.ForkInvoker;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.websocket.api.CloseException;
import org.eclipse.jetty.websocket.api.CloseStatus;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.SuspendToken;
import org.eclipse.jetty.websocket.api.WebSocketException;
@ -60,10 +63,40 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
{
private class FlushCallback implements Callback
{
/**
* The Endpoint.write() failure path
*/
@Override
public void failed(Throwable x)
{
LOG.debug("Write flush failure",x);
// Unable to write? can't notify other side of close, so disconnect.
// This is an ABNORMAL closure
String reason = "Websocket write failure";
if (x instanceof EOFException)
{
reason = "EOF";
Throwable cause = x.getCause();
if ((cause != null) && (StringUtil.isNotBlank(cause.getMessage())))
{
reason = "EOF: " + cause.getMessage();
}
}
else
{
if (StringUtil.isNotBlank(x.getMessage()))
{
reason = x.getMessage();
}
}
// Abnormal Close
reason = CloseStatus.trimMaxReasonLength(reason);
session.notifyClose(StatusCode.NO_CLOSE,reason);
disconnect(); // disconnect endpoint & connection
}
@Override
@ -122,20 +155,24 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
}
}
public static class Stats {
public static class Stats
{
private AtomicLong countFillInterestedEvents = new AtomicLong(0);
private AtomicLong countOnFillableEvents = new AtomicLong(0);
private AtomicLong countFillableErrors = new AtomicLong(0);
public long getFillableErrorCount() {
public long getFillableErrorCount()
{
return countFillableErrors.get();
}
public long getFillInterestedCount() {
public long getFillInterestedCount()
{
return countFillInterestedEvents.get();
}
public long getOnFillableCount() {
public long getOnFillableCount()
{
return countOnFillableEvents.get();
}
}
@ -278,12 +315,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
synchronized (writeBytes)
{
if (writeBytes.isFailed())
{
LOG.debug(".flush() - queue is in failed state");
return;
}
if (flushing)
{
return;
@ -398,11 +429,17 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
return isFilling;
}
/**
* Physical connection disconnect.
* <p>
* Not related to WebSocket close handshake.
*/
@Override
public void onClose()
{
super.onClose();
this.getIOState().setState(ConnectionState.CLOSED);
writeBytes.close();
}
@Override
@ -459,7 +496,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
if ((state.getState() == ConnectionState.CLOSING) || (state.getState() == ConnectionState.CLOSED))
{
// close already initiated, extra timeouts not relevant
// allow udnerlying connection and endpoint to disconnect on its own
// allow underlying connection and endpoint to disconnect on its own
return true;
}
@ -467,6 +504,9 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
// Note: it is not possible in 100% of cases during read timeout to send this close frame.
session.close(StatusCode.NORMAL,"Idle Timeout");
// Force closure of writeBytes
writeBytes.close();
return false;
}
@ -489,15 +529,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
LOG.debug("outgoingFrame({}, {})",frame,callback);
}
if (!isOpen())
{
return;
}
synchronized (writeBytes)
{
writeBytes.enqueue(frame,WriteCallbackWrapper.wrap(callback));
}
flush();
}
@ -516,7 +548,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
}
else if (filled < 0)
{
LOG.debug("read - EOF Reached");
LOG.debug("read - EOF Reached (remote: {})",getRemoteAddress());
return -1;
}
else
@ -568,7 +600,8 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
@Override
public void setInputBufferSize(int inputBufferSize)
{
if(inputBufferSize < MIN_BUFFER_SIZE) {
if (inputBufferSize < MIN_BUFFER_SIZE)
{
throw new IllegalArgumentException("Cannot have buffer size less than " + MIN_BUFFER_SIZE);
}
super.setInputBufferSize(inputBufferSize);

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.websocket.common.io;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.LinkedList;
@ -40,6 +41,7 @@ public class WriteBytesProvider implements Callback
{
private class FrameEntry
{
protected final AtomicBoolean failed = new AtomicBoolean(false);
protected final Frame frame;
protected final Callback callback;
@ -58,6 +60,14 @@ public class WriteBytesProvider implements Callback
}
return buffer;
}
public void notifyFailure(Throwable t)
{
if (failed.getAndSet(true) == false)
{
notifySafeFailure(callback,t);
}
}
}
private static final Logger LOG = Log.getLogger(WriteBytesProvider.class);
@ -72,7 +82,7 @@ public class WriteBytesProvider implements Callback
private int bufferSize = 2048;
/** Currently active frame */
private FrameEntry active;
/** Failure state for the entire WriteBytesProvider */
/** Tracking for failure */
private Throwable failure;
/** The last requested buffer */
private ByteBuffer buffer;
@ -97,6 +107,17 @@ public class WriteBytesProvider implements Callback
this.closed = new AtomicBoolean(false);
}
/**
* Force closure of write bytes
*/
public void close()
{
// Set queue closed, no new enqueue allowed.
this.closed.set(true);
// flush out backlog in queue
failAll(new EOFException("Connection has been disconnected"));
}
public void enqueue(Frame frame, Callback callback)
{
Objects.requireNonNull(frame);
@ -106,7 +127,7 @@ public class WriteBytesProvider implements Callback
if (closed.get())
{
// Closed for more frames.
LOG.debug("Write is closed: {}",frame,callback);
LOG.debug("Write is closed: {} {}",frame,callback);
if (callback != null)
{
callback.failed(new IOException("Write is closed"));
@ -114,10 +135,11 @@ public class WriteBytesProvider implements Callback
return;
}
if (isFailed())
if (failure != null)
{
// no changes when failed
notifyFailure(callback);
LOG.debug("Write is in failure: {} {}",frame,callback);
notifySafeFailure(callback,failure);
return;
}
@ -143,28 +165,31 @@ public class WriteBytesProvider implements Callback
{
synchronized (this)
{
if (isFailed())
// fail active (if set)
if (active != null)
{
// already failed.
return;
active.notifyFailure(t);
}
failure = t;
// fail others
for (FrameEntry fe : queue)
{
notifyFailure(fe.callback);
fe.notifyFailure(t);
}
queue.clear();
// notify flush callback
flushCallback.failed(failure);
flushCallback.failed(t);
}
}
/**
* Write of ByteBuffer failed.
* Callback failure.
* <p>
* Conditions: for Endpoint.write() failure.
*
* @param cause
* the cause of the failure
@ -211,11 +236,6 @@ public class WriteBytesProvider implements Callback
return buffer;
}
public Throwable getFailure()
{
return failure;
}
/**
* Used to test for the final frame possible to be enqueued, the CLOSE frame.
*
@ -229,24 +249,16 @@ public class WriteBytesProvider implements Callback
}
}
public boolean isFailed()
private void notifySafeFailure(Callback callback, Throwable t)
{
return (failure != null);
try
{
callback.failed(t);
}
/**
* Notify specific callback of failure.
*
* @param callback
* the callback to notify
*/
private void notifyFailure(Callback callback)
catch (Throwable e)
{
if (callback == null)
{
return;
LOG.warn("Uncaught exception",e);
}
callback.failed(failure);
}
/**
@ -268,6 +280,8 @@ public class WriteBytesProvider implements Callback
@Override
public void succeeded()
{
Callback successCallback = null;
synchronized (this)
{
// Release the active byte buffer first
@ -281,27 +295,28 @@ public class WriteBytesProvider implements Callback
if (active.frame.remaining() <= 0)
{
// All done with active FrameEntry
if (active.callback != null)
{
try
{
// TODO: should probably have callback invoked in new thread as part of scheduler
// notify of success
active.callback.succeeded();
}
catch (Throwable t)
{
LOG.warn("Callback failure",t);
}
}
// null it out
successCallback = active.callback;
// Forget active
active = null;
}
// notify flush callback
flushCallback.succeeded();
}
// Notify success (outside of synchronize lock)
if (successCallback != null)
{
try
{
// notify of success
successCallback.succeeded();
}
catch (Throwable t)
{
LOG.warn("Callback failure",t);
}
}
}
@Override
@ -310,10 +325,10 @@ public class WriteBytesProvider implements Callback
StringBuilder b = new StringBuilder();
b.append("WriteBytesProvider[");
b.append("flushCallback=").append(flushCallback);
if (isFailed())
if (failure != null)
{
b.append(",FAILURE=").append(failure.getClass().getName());
b.append(",").append(failure.getMessage());
b.append(",failure=").append(failure.getClass().getName());
b.append(":").append(failure.getMessage());
}
else
{