Merge branch 'master' into gcloud-session-manager

This commit is contained in:
Jan Bartel 2015-10-08 11:41:49 +11:00
commit 71c2d79daa
21 changed files with 747 additions and 325 deletions

View File

@ -137,7 +137,18 @@ public class HTTP2Client extends ContainerLifeCycle
setByteBufferPool(new MappedByteBufferPool());
if (connectionFactory == null)
setClientConnectionFactory(new HTTP2ClientConnectionFactory());
{
HTTP2ClientConnectionFactory h2 = new HTTP2ClientConnectionFactory();
ALPNClientConnectionFactory alpn = new ALPNClientConnectionFactory(getExecutor(), h2, getProtocols());
setClientConnectionFactory((endPoint, context) ->
{
ClientConnectionFactory factory = h2;
SslContextFactory sslContextFactory = (SslContextFactory)context.get(SslClientConnectionFactory.SSL_CONTEXT_FACTORY_CONTEXT_KEY);
if (sslContextFactory != null)
factory = new SslClientConnectionFactory(sslContextFactory, getByteBufferPool(), getExecutor(), alpn);
return factory.newConnection(endPoint, context);
});
}
if (sessions == null)
{
@ -356,17 +367,7 @@ public class HTTP2Client extends ContainerLifeCycle
context.put(HTTP2ClientConnectionFactory.BYTE_BUFFER_POOL_CONTEXT_KEY, getByteBufferPool());
context.put(HTTP2ClientConnectionFactory.EXECUTOR_CONTEXT_KEY, getExecutor());
context.put(HTTP2ClientConnectionFactory.SCHEDULER_CONTEXT_KEY, getScheduler());
ClientConnectionFactory factory = getClientConnectionFactory();
SslContextFactory sslContextFactory = (SslContextFactory)context.get(SslClientConnectionFactory.SSL_CONTEXT_FACTORY_CONTEXT_KEY);
if (sslContextFactory != null)
{
ALPNClientConnectionFactory alpn = new ALPNClientConnectionFactory(getExecutor(), factory, getProtocols());
factory = new SslClientConnectionFactory(sslContextFactory, getByteBufferPool(), getExecutor(), alpn);
}
return factory.newConnection(endpoint, context);
return getClientConnectionFactory().newConnection(endpoint, context);
}
@Override

View File

@ -15,6 +15,38 @@
</properties>
<build>
<plugins>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy</id>
<phase>generate-resources</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.mortbay.jetty.alpn</groupId>
<artifactId>alpn-boot</artifactId>
<version>${alpn.version}</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/alpn</outputDirectory>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<argLine>-Xbootclasspath/p:${project.build.directory}/alpn/alpn-boot-${alpn.version}.jar</argLine>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>

View File

@ -22,19 +22,24 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
import org.eclipse.jetty.alpn.client.ALPNClientConnectionFactory;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.http2.client.HTTP2ClientConnectionFactory;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.ssl.SslContextFactory;
@ManagedObject("The HTTP/2 client transport")
public class HttpClientTransportOverHTTP2 extends ContainerLifeCycle implements HttpClientTransport
@ -68,7 +73,7 @@ public class HttpClientTransportOverHTTP2 extends ContainerLifeCycle implements
addBean(client);
super.doStart();
this.connectionFactory = client.getClientConnectionFactory();
this.connectionFactory = new HTTP2ClientConnectionFactory();
client.setClientConnectionFactory((endPoint, context) ->
{
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
@ -128,13 +133,21 @@ public class HttpClientTransportOverHTTP2 extends ContainerLifeCycle implements
}
};
client.connect(httpClient.getSslContextFactory(), address, listener, promise, context);
SslContextFactory sslContextFactory = null;
if (HttpScheme.HTTPS.is(destination.getScheme()))
sslContextFactory = httpClient.getSslContextFactory();
client.connect(sslContextFactory, address, listener, promise, context);
}
@Override
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
return connectionFactory.newConnection(endPoint, context);
ClientConnectionFactory factory = connectionFactory;
SslContextFactory sslContextFactory = (SslContextFactory)context.get(SslClientConnectionFactory.SSL_CONTEXT_FACTORY_CONTEXT_KEY);
if (sslContextFactory != null)
factory = new ALPNClientConnectionFactory(client.getExecutor(), factory, client.getProtocols());
return factory.newConnection(endPoint, context);
}
protected HttpConnectionOverHTTP2 newHttpConnection(HttpDestination destination, Session session)

View File

@ -21,9 +21,13 @@ package org.eclipse.jetty.http2.client.http;
import java.util.concurrent.Executor;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
public class HttpClientTransportOverHTTP2Test
@ -51,4 +55,24 @@ public class HttpClientTransportOverHTTP2Test
Assert.assertTrue(http2Client.isStopped());
}
@Ignore
@Test
public void testExternalServer() throws Exception
{
HTTP2Client http2Client = new HTTP2Client();
SslContextFactory sslContextFactory = new SslContextFactory();
HttpClient httpClient = new HttpClient(new HttpClientTransportOverHTTP2(http2Client), sslContextFactory);
Executor executor = new QueuedThreadPool();
httpClient.setExecutor(executor);
httpClient.start();
// ContentResponse response = httpClient.GET("https://http2.akamai.com/");
ContentResponse response = httpClient.GET("https://webtide.com/");
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
httpClient.stop();
}
}

View File

@ -0,0 +1,5 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
#org.eclipse.jetty.client.LEVEL=DEBUG
org.eclipse.jetty.http2.hpack.LEVEL=INFO
#org.eclipse.jetty.http2.LEVEL=DEBUG
#org.eclipse.jetty.io.ssl.LEVEL=DEBUG

View File

@ -84,7 +84,19 @@ public abstract class AbstractConnection implements Connection
protected void failedCallback(final Callback callback, final Throwable x)
{
// TODO always dispatch failure ?
if (callback.isNonBlocking())
{
try
{
callback.failed(x);
}
catch (Exception e)
{
LOG.warn(e);
}
}
else
{
try
{
getExecutor().execute(new Runnable()
@ -109,6 +121,7 @@ public abstract class AbstractConnection implements Connection
callback.failed(x);
}
}
}
/**
* <p>Utility method to be called to register read interest.</p>

View File

@ -216,8 +216,8 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
isOpen()?"Open":"CLOSED",
isInputShutdown()?"ISHUT":"in",
isOutputShutdown()?"OSHUT":"out",
_fillInterest.isInterested()?"R":"-",
_writeFlusher.isInProgress()?"W":"-",
_fillInterest.toStateString(),
_writeFlusher.toStateString(),
getIdleFor(),
getIdleTimeout(),
getConnection()==null?null:getConnection().getClass().getSimpleName());

View File

@ -138,7 +138,13 @@ public abstract class FillInterest
@Override
public String toString()
{
return String.format("FillInterest@%x{%b,%s}", hashCode(), _interested.get(), _interested.get());
return String.format("FillInterest@%x{%b,%s}", hashCode(), _interested.get()!=null, _interested.get());
}
public String toStateString()
{
return _interested.get()==null?"-":"FI";
}
/**

View File

@ -522,4 +522,23 @@ abstract public class WriteFlusher
{
return String.format("WriteFlusher@%x{%s}", hashCode(), _state.get());
}
public String toStateString()
{
switch(_state.get().getType())
{
case WRITING:
return "W";
case PENDING:
return "P";
case COMPLETING:
return "C";
case IDLE:
return "-";
case FAILED:
return "F";
default:
return "?";
}
}
}

View File

@ -2435,25 +2435,6 @@ public class Request implements HttpServletRequest
@Override
public <T extends HttpUpgradeHandler> T upgrade(Class<T> handlerClass) throws IOException, ServletException
{
if (getContext() == null)
throw new ServletException ("Unable to instantiate "+handlerClass);
try
{
//Instantiate an instance and inject it
T h = getContext().createInstance(handlerClass);
//TODO handle the rest of the upgrade process
return h;
}
catch (Exception e)
{
if (e instanceof ServletException)
throw (ServletException)e;
throw new ServletException(e);
throw new ServletException("HttpServletRequest.upgrade() not supported in Jetty");
}
}
}

View File

@ -73,6 +73,7 @@ public class ServletHolder extends Holder<Servlet> implements UserIdentity.Scope
private static final Logger LOG = Log.getLogger(ServletHolder.class);
private int _initOrder = -1;
private boolean _initOnStartup=false;
private boolean _initialized = false;
private Map<String, String> _roleMap;
private String _forcedPath;
private String _runAsRole;
@ -81,7 +82,6 @@ public class ServletHolder extends Holder<Servlet> implements UserIdentity.Scope
private ServletRegistration.Dynamic _registration;
private JspContainer _jspContainer;
private transient Servlet _servlet;
private transient Config _config;
private transient long _unavailable;
@ -396,6 +396,7 @@ public class ServletHolder extends Holder<Servlet> implements UserIdentity.Scope
public void initialize ()
throws Exception
{
if(!_initialized){
super.initialize();
if (_extInstance || _initOnStartup)
{
@ -412,6 +413,8 @@ public class ServletHolder extends Holder<Servlet> implements UserIdentity.Scope
}
}
}
_initialized = true;
}
/* ------------------------------------------------------------ */
@ -443,6 +446,7 @@ public class ServletHolder extends Holder<Servlet> implements UserIdentity.Scope
_servlet=null;
_config=null;
_initialized = false;
}
/* ------------------------------------------------------------ */

View File

@ -21,6 +21,8 @@ package org.eclipse.jetty.servlet;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
@ -88,6 +90,7 @@ public class DispatcherForwardTest
// 2. assert query => a=1 one
// 1. assert query => a=1 one
CountDownLatch latch = new CountDownLatch(1);
final String query1 = "a=1%20one";
servlet1 = new HttpServlet()
{
@ -100,6 +103,7 @@ public class DispatcherForwardTest
checkThat(req.getQueryString(),Matchers.equalTo(query1));
checkThat(req.getParameter("a"),Matchers.equalTo("1 one"));
latch.countDown();
}
};
servlet2 = new HttpServlet()
@ -120,6 +124,7 @@ public class DispatcherForwardTest
"Connection: close\r\n" +
"\r\n";
String response = connector.getResponses(request);
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(response, response.startsWith("HTTP/1.1 200"));
}
@ -131,7 +136,8 @@ public class DispatcherForwardTest
// 2. assert query => a=2
// 1. assert query => a=1
final String query1 = "a=1$20one&b=2%20two";
CountDownLatch latch = new CountDownLatch(1);
final String query1 = "a=1%20one&b=2%20two";
final String query2 = "a=3%20three";
final String query3 = "a=3%20three&b=2%20two";
servlet1 = new HttpServlet()
@ -146,6 +152,7 @@ public class DispatcherForwardTest
checkThat(req.getQueryString(), Matchers.equalTo(query1));
checkThat(req.getParameter("a"),Matchers.equalTo("1 one"));
checkThat(req.getParameter("b"),Matchers.equalTo("2 two"));
latch.countDown();
}
};
servlet2 = new HttpServlet()
@ -167,6 +174,7 @@ public class DispatcherForwardTest
"Connection: close\r\n" +
"\r\n";
String response = connector.getResponses(request);
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(response, response.startsWith("HTTP/1.1 200"));
}
@ -178,6 +186,7 @@ public class DispatcherForwardTest
// 2. assert query => a=1&b=2
// 1. assert query => a=1
CountDownLatch latch = new CountDownLatch(1);
final String query1 = "a=1%20one";
final String query2 = "b=2%20two";
final String query3 = "b=2%20two&a=1%20one";
@ -192,6 +201,7 @@ public class DispatcherForwardTest
checkThat(req.getQueryString(),Matchers.equalTo(query1));
checkThat(req.getParameter("a"),Matchers.equalTo("1 one"));
latch.countDown();
}
};
servlet2 = new HttpServlet()
@ -213,6 +223,7 @@ public class DispatcherForwardTest
"Connection: close\r\n" +
"\r\n";
String response = connector.getResponses(request);
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(response, response.startsWith("HTTP/1.1 200"));
}
@ -224,6 +235,7 @@ public class DispatcherForwardTest
// 2. assert query => a=1 + params => a=1,2
// 1. assert query => a=1 + params => a=1,2
CountDownLatch latch = new CountDownLatch(1);
final String query1 = "a=1%20one";
final String form = "a=2%20two";
servlet1 = new HttpServlet()
@ -240,6 +252,7 @@ public class DispatcherForwardTest
checkThat(values, Matchers.notNullValue());
checkThat(2, Matchers.equalTo(values.length));
checkThat(values, Matchers.arrayContainingInAnyOrder("1 one", "2 two"));
latch.countDown();
}
};
servlet2 = new HttpServlet()
@ -266,6 +279,7 @@ public class DispatcherForwardTest
"\r\n" +
form;
String response = connector.getResponses(request);
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(response, response.startsWith("HTTP/1.1 200"));
}
@ -277,6 +291,7 @@ public class DispatcherForwardTest
// 2. assert query => a=3 + params => a=3,2,1
// 1. assert query => a=1 + params => a=1,2
CountDownLatch latch = new CountDownLatch(1);
final String query1 = "a=1%20one";
final String query2 = "a=3%20three";
final String form = "a=2%20two";
@ -294,6 +309,7 @@ public class DispatcherForwardTest
checkThat(values, Matchers.notNullValue());
checkThat(2, Matchers.equalTo(values.length));
checkThat(values, Matchers.arrayContainingInAnyOrder("1 one", "2 two"));
latch.countDown();
}
};
servlet2 = new HttpServlet()
@ -320,6 +336,7 @@ public class DispatcherForwardTest
"\r\n" +
form;
String response = connector.getResponses(request);
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(response, response.startsWith("HTTP/1.1 200"));
}
@ -331,6 +348,7 @@ public class DispatcherForwardTest
// 2. assert query => a=1&c=3 + params => a=1&b=2&c=3
// 1. assert query => a=1 + params => a=1&b=2
CountDownLatch latch = new CountDownLatch(1);
final String query1 = "a=1%20one";
final String query2 = "c=3%20three";
final String query3 = "c=3%20three&a=1%20one";
@ -348,6 +366,7 @@ public class DispatcherForwardTest
checkThat(req.getParameter("a"),Matchers.equalTo("1 one"));
checkThat(req.getParameter("b"),Matchers.equalTo("2 two"));
checkThat(req.getParameter("c"), Matchers.nullValue());
latch.countDown();
}
};
servlet2 = new HttpServlet()
@ -373,6 +392,7 @@ public class DispatcherForwardTest
"\r\n" +
form;
String response = connector.getResponses(request);
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(response, response.startsWith("HTTP/1.1 200"));
}
@ -385,6 +405,7 @@ public class DispatcherForwardTest
// 2. assert query => a=1&c=3 + params => a=1&b=2&c=3
// 1. assert query => a=1 + params => a=1&b=2
CountDownLatch latch = new CountDownLatch(1);
final String query1 = "a=1%20one";
final String query2 = "c=3%20three";
final String query3 = "c=3%20three&a=1%20one";
@ -404,6 +425,7 @@ public class DispatcherForwardTest
checkThat(req.getParameter("a"),Matchers.equalTo("1 one"));
checkThat(req.getParameter("b"),Matchers.equalTo("2 two"));
checkThat(req.getParameter("c"), Matchers.nullValue());
latch.countDown();
}
};
servlet2 = new HttpServlet()
@ -429,12 +451,14 @@ public class DispatcherForwardTest
"\r\n" +
form;
String response = connector.getResponses(request);
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(response, response.startsWith("HTTP/1.1 200"));
}
@Test
public void testContentCanBeReadViaInputStreamAfterForwardWithoutQuery() throws Exception
{
CountDownLatch latch = new CountDownLatch(1);
final String query1 = "a=1%20one";
final String form = "c=3%20three";
servlet1 = new HttpServlet()
@ -448,6 +472,7 @@ public class DispatcherForwardTest
checkThat(req.getQueryString(),Matchers.equalTo(query1));
checkThat(req.getParameter("c"), Matchers.nullValue());
latch.countDown();
}
};
servlet2 = new HttpServlet()
@ -473,12 +498,14 @@ public class DispatcherForwardTest
"\r\n" +
form;
String response = connector.getResponses(request);
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(response, response.startsWith("HTTP/1.1 200"));
}
@Test
public void testContentCanBeReadViaInputStreamAfterForwardWithQuery() throws Exception
{
CountDownLatch latch = new CountDownLatch(1);
final String query1 = "a=1%20one";
final String query2 = "b=2%20two";
final String query3 = "b=2%20two&a=1%20one";
@ -494,6 +521,7 @@ public class DispatcherForwardTest
checkThat(req.getQueryString(),Matchers.equalTo(query1));
checkThat(req.getParameter("c"), Matchers.nullValue());
latch.countDown();
}
};
servlet2 = new HttpServlet()
@ -520,6 +548,7 @@ public class DispatcherForwardTest
"\r\n" +
form;
String response = connector.getResponses(request);
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(response, response.startsWith("HTTP/1.1 200"));
}

View File

@ -1,213 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.servlets;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
public class DefaultServletStarvationTest
{
@Rule
public TestTracker tracker = new TestTracker();
private Server _server;
@After
public void dispose() throws Exception
{
if (_server != null)
_server.stop();
}
@Test
public void testDefaultServletStarvation() throws Exception
{
int maxThreads = 2;
QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads, maxThreads);
threadPool.setDetailedDump(true);
_server = new Server(threadPool);
// Prepare a big file to download.
File directory = MavenTestingUtils.getTargetTestingDir();
Files.createDirectories(directory.toPath());
String resourceName = "resource.bin";
Path resourcePath = Paths.get(directory.getPath(), resourceName);
try (OutputStream output = Files.newOutputStream(resourcePath, StandardOpenOption.CREATE, StandardOpenOption.WRITE))
{
byte[] chunk = new byte[1024];
Arrays.fill(chunk,(byte)'X');
chunk[chunk.length-2]='\r';
chunk[chunk.length-1]='\n';
for (int i = 0; i < 256 * 1024; ++i)
output.write(chunk);
}
final CountDownLatch writePending = new CountDownLatch(1);
ServerConnector connector = new ServerConnector(_server, 0, 1)
{
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
{
return new SelectChannelEndPoint(channel, selectSet, key, getScheduler(), getIdleTimeout())
{
@Override
protected void onIncompleteFlush()
{
super.onIncompleteFlush();
writePending.countDown();
}
};
}
};
_server.addConnector(connector);
ServletContextHandler context = new ServletContextHandler(_server, "/");
context.setResourceBase(directory.toURI().toString());
context.addServlet(DefaultServlet.class, "/*").setAsyncSupported(false);
_server.setHandler(context);
_server.start();
List<Socket> sockets = new ArrayList<>();
for (int i = 0; i < maxThreads; ++i)
{
Socket socket = new Socket("localhost", connector.getLocalPort());
sockets.add(socket);
OutputStream output = socket.getOutputStream();
String request = "" +
"GET /" + resourceName + " HTTP/1.1\r\n" +
"Host: localhost\r\n" +
// "Connection: close\r\n" +
"\r\n";
output.write(request.getBytes(StandardCharsets.UTF_8));
output.flush();
Thread.sleep(100);
}
// Wait for a the servlet to block.
Assert.assertTrue(writePending.await(5, TimeUnit.SECONDS));
Thread.sleep(1000);
_server.dumpStdErr();
Thread.sleep(1000);
ScheduledFuture<?> dumper = Executors.newSingleThreadScheduledExecutor().schedule(new Runnable()
{
@Override
public void run()
{
_server.dumpStdErr();
}
}, 10, TimeUnit.SECONDS);
long expected = Files.size(resourcePath);
byte[] buffer = new byte[48 * 1024];
for (Socket socket : sockets)
{
String socketString = socket.toString();
long total = 0;
InputStream input = socket.getInputStream();
// look for CRLFCRLF
StringBuilder header = new StringBuilder();
int state=0;
while (state<4 && header.length()<2048)
{
int ch=input.read();
if (ch<0)
break;
header.append((char)ch);
switch(state)
{
case 0:
if (ch=='\r')
state=1;
break;
case 1:
if (ch=='\n')
state=2;
else
state=0;
break;
case 2:
if (ch=='\r')
state=3;
else
state=0;
break;
case 3:
if (ch=='\n')
state=4;
else
state=0;
break;
}
}
while (total<expected)
{
int read=input.read(buffer);
if (read<0)
break;
total+=read;
}
Assert.assertEquals(expected,total);
}
dumper.cancel(false);
// We could read everything, good.
for (Socket socket : sockets)
socket.close();
}
}

View File

@ -0,0 +1,419 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.servlets;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Exchanger;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.StdErrLog;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
public class ThreadStarvationTest
{
@Rule
public TestTracker tracker = new TestTracker();
private Server _server;
@After
public void dispose() throws Exception
{
if (_server != null)
_server.stop();
}
@Test
@Slow
public void testDefaultServletSuccess() throws Exception
{
int maxThreads = 10;
QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads, maxThreads);
threadPool.setDetailedDump(true);
_server = new Server(threadPool);
// Prepare a big file to download.
File directory = MavenTestingUtils.getTargetTestingDir();
Files.createDirectories(directory.toPath());
String resourceName = "resource.bin";
Path resourcePath = Paths.get(directory.getPath(), resourceName);
try (OutputStream output = Files.newOutputStream(resourcePath, StandardOpenOption.CREATE, StandardOpenOption.WRITE))
{
byte[] chunk = new byte[1024];
Arrays.fill(chunk,(byte)'X');
chunk[chunk.length-2]='\r';
chunk[chunk.length-1]='\n';
for (int i = 0; i < 256 * 1024; ++i)
output.write(chunk);
}
final CountDownLatch writePending = new CountDownLatch(1);
ServerConnector connector = new ServerConnector(_server, 0, 1)
{
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
{
return new SelectChannelEndPoint(channel, selectSet, key, getScheduler(), getIdleTimeout())
{
@Override
protected void onIncompleteFlush()
{
super.onIncompleteFlush();
writePending.countDown();
}
};
}
};
connector.setIdleTimeout(Long.MAX_VALUE);
_server.addConnector(connector);
ServletContextHandler context = new ServletContextHandler(_server, "/");
context.setResourceBase(directory.toURI().toString());
context.addServlet(DefaultServlet.class, "/*").setAsyncSupported(false);
_server.setHandler(context);
_server.start();
List<Socket> sockets = new ArrayList<>();
for (int i = 0; i < maxThreads*2; ++i)
{
Socket socket = new Socket("localhost", connector.getLocalPort());
sockets.add(socket);
OutputStream output = socket.getOutputStream();
String request = "" +
"GET /" + resourceName + " HTTP/1.1\r\n" +
"Host: localhost\r\n" +
"\r\n";
output.write(request.getBytes(StandardCharsets.UTF_8));
output.flush();
Thread.sleep(100);
}
// Wait for a the servlet to block.
Assert.assertTrue(writePending.await(5, TimeUnit.SECONDS));
long expected = Files.size(resourcePath);
byte[] buffer = new byte[48 * 1024];
List<Exchanger<Long>> totals = new ArrayList<>();
for (Socket socket : sockets)
{
final Exchanger<Long> x = new Exchanger<>();
totals.add(x);
final InputStream input = socket.getInputStream();
new Thread()
{
@Override
public void run()
{
long total=0;
try
{
// look for CRLFCRLF
StringBuilder header = new StringBuilder();
int state=0;
while (state<4 && header.length()<2048)
{
int ch=input.read();
if (ch<0)
break;
header.append((char)ch);
switch(state)
{
case 0:
if (ch=='\r')
state=1;
break;
case 1:
if (ch=='\n')
state=2;
else
state=0;
break;
case 2:
if (ch=='\r')
state=3;
else
state=0;
break;
case 3:
if (ch=='\n')
state=4;
else
state=0;
break;
}
}
while (total<expected)
{
int read=input.read(buffer);
if (read<0)
break;
total+=read;
}
}
catch (IOException e)
{
e.printStackTrace();
}
finally
{
try
{
x.exchange(total);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
}.start();
}
for (Exchanger<Long> x : totals)
{
Long total = x.exchange(-1L,10000,TimeUnit.SECONDS);
Assert.assertEquals(expected,total.longValue());
}
// We could read everything, good.
for (Socket socket : sockets)
socket.close();
}
@Test
public void testFailureStarvation() throws Exception
{
try
{
((StdErrLog)Log.getLogger(HttpChannel.class)).setHideStacks(true);
int acceptors = 0;
int selectors = 1;
int maxThreads = 10;
final int barried=maxThreads-acceptors-selectors;
final CyclicBarrier barrier = new CyclicBarrier(barried);
QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads, maxThreads);
threadPool.setDetailedDump(true);
_server = new Server(threadPool);
ServerConnector connector = new ServerConnector(_server, acceptors, selectors)
{
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
{
return new SelectChannelEndPoint(channel, selectSet, key, getScheduler(), getIdleTimeout())
{
@Override
public boolean flush(ByteBuffer... buffers) throws IOException
{
super.flush(buffers[0]);
throw new IOException("TEST FAILURE");
}
};
}
};
connector.setIdleTimeout(Long.MAX_VALUE);
_server.addConnector(connector);
final AtomicInteger count = new AtomicInteger(0);
_server.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
int c=count.getAndIncrement();
try
{
if (c<barried)
{
barrier.await(10,TimeUnit.SECONDS);
}
}
catch (InterruptedException | BrokenBarrierException | TimeoutException e)
{
throw new ServletException(e);
}
baseRequest.setHandled(true);
response.setStatus(200);
response.setContentLength(13);
response.getWriter().print("Hello World!\n");
response.getWriter().flush();
}
});
_server.start();
List<Socket> sockets = new ArrayList<>();
for (int i = 0; i < maxThreads*2; ++i)
{
Socket socket = new Socket("localhost", connector.getLocalPort());
sockets.add(socket);
OutputStream output = socket.getOutputStream();
String request = "" +
"GET / HTTP/1.1\r\n" +
"Host: localhost\r\n" +
// "Connection: close\r\n" +
"\r\n";
output.write(request.getBytes(StandardCharsets.UTF_8));
output.flush();
}
byte[] buffer = new byte[48 * 1024];
List<Exchanger<Integer>> totals = new ArrayList<>();
for (Socket socket : sockets)
{
final Exchanger<Integer> x = new Exchanger<>();
totals.add(x);
final InputStream input = socket.getInputStream();
new Thread()
{
@Override
public void run()
{
int read=0;
try
{
// look for CRLFCRLF
StringBuilder header = new StringBuilder();
int state=0;
while (state<4 && header.length()<2048)
{
int ch=input.read();
if (ch<0)
break;
header.append((char)ch);
switch(state)
{
case 0:
if (ch=='\r')
state=1;
break;
case 1:
if (ch=='\n')
state=2;
else
state=0;
break;
case 2:
if (ch=='\r')
state=3;
else
state=0;
break;
case 3:
if (ch=='\n')
state=4;
else
state=0;
break;
}
}
read=input.read(buffer);
}
catch (IOException e)
{
// e.printStackTrace();
}
finally
{
try
{
x.exchange(read);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
}.start();
}
for (Exchanger<Integer> x : totals)
{
Integer read = x.exchange(-1,10,TimeUnit.SECONDS);
Assert.assertEquals(-1,read.intValue());
}
// We could read everything, good.
for (Socket socket : sockets)
socket.close();
_server.stop();
}
finally
{
((StdErrLog)Log.getLogger(HttpChannel.class)).setHideStacks(false);
}
}
}

View File

@ -17,6 +17,36 @@
<build>
<plugins>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy</id>
<phase>generate-resources</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.mortbay.jetty.alpn</groupId>
<artifactId>alpn-boot</artifactId>
<version>${alpn.version}</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/alpn</outputDirectory>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<argLine>-Xbootclasspath/p:${project.build.directory}/alpn/alpn-boot-${alpn.version}.jar</argLine>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
@ -47,6 +77,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-alpn-server</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.http2</groupId>
<artifactId>http2-http-client-transport</artifactId>

View File

@ -18,22 +18,28 @@
package org.eclipse.jetty.http.client;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
import org.eclipse.jetty.alpn.server.ALPNServerConnectionFactory;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.http2.HTTP2Cipher;
import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2;
import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After;
import org.junit.Rule;
@ -44,15 +50,16 @@ import org.junit.runners.Parameterized;
public abstract class AbstractTest
{
@Parameterized.Parameters(name = "transport: {0}")
public static List<Object[]> parameters() throws Exception
public static Object[] parameters() throws Exception
{
return Arrays.asList(new Object[]{Transport.HTTP}, new Object[]{Transport.HTTP2});
return new Object[]{Transport.HTTP, Transport.HTTPS, Transport.H2C, Transport.H2};
}
@Rule
public final TestTracker tracker = new TestTracker();
protected final Transport transport;
protected SslContextFactory sslContextFactory;
protected Server server;
protected ServerConnector connector;
protected HttpClient client;
@ -64,11 +71,18 @@ public abstract class AbstractTest
public void start(Handler handler) throws Exception
{
sslContextFactory = new SslContextFactory();
sslContextFactory.setKeyStorePath("src/test/resources/keystore.jks");
sslContextFactory.setKeyStorePassword("storepwd");
sslContextFactory.setTrustStorePath("src/test/resources/truststore.jks");
sslContextFactory.setTrustStorePassword("storepwd");
sslContextFactory.setUseCipherSuitesOrder(true);
sslContextFactory.setCipherComparator(HTTP2Cipher.COMPARATOR);
startServer(handler);
startClient();
}
protected void startServer(Handler handler) throws Exception
private void startServer(Handler handler) throws Exception
{
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
@ -79,37 +93,71 @@ public abstract class AbstractTest
server.start();
}
protected void startClient() throws Exception
private void startClient() throws Exception
{
QueuedThreadPool clientThreads = new QueuedThreadPool();
clientThreads.setName("client");
client = new HttpClient(provideClientTransport(transport), null);
client = new HttpClient(provideClientTransport(transport), sslContextFactory);
client.setExecutor(clientThreads);
client.start();
}
private ConnectionFactory provideServerConnectionFactory(Transport transport)
private ConnectionFactory[] provideServerConnectionFactory(Transport transport)
{
List<ConnectionFactory> result = new ArrayList<>();
switch (transport)
{
case HTTP:
return new HttpConnectionFactory(new HttpConfiguration());
case HTTP2:
return new HTTP2ServerConnectionFactory(new HttpConfiguration());
{
result.add(new HttpConnectionFactory(new HttpConfiguration()));
break;
}
case HTTPS:
{
HttpConfiguration configuration = new HttpConfiguration();
configuration.addCustomizer(new SecureRequestCustomizer());
HttpConnectionFactory http = new HttpConnectionFactory(configuration);
SslConnectionFactory ssl = new SslConnectionFactory(sslContextFactory, http.getProtocol());
result.add(ssl);
result.add(http);
break;
}
case H2C:
{
result.add(new HTTP2CServerConnectionFactory(new HttpConfiguration()));
break;
}
case H2:
{
HttpConfiguration configuration = new HttpConfiguration();
configuration.addCustomizer(new SecureRequestCustomizer());
HTTP2ServerConnectionFactory h2 = new HTTP2ServerConnectionFactory(configuration);
ALPNServerConnectionFactory alpn = new ALPNServerConnectionFactory("h2");
SslConnectionFactory ssl = new SslConnectionFactory(sslContextFactory, alpn.getProtocol());
result.add(ssl);
result.add(alpn);
result.add(h2);
break;
}
default:
{
throw new IllegalArgumentException();
}
}
return result.toArray(new ConnectionFactory[result.size()]);
}
private HttpClientTransport provideClientTransport(Transport transport)
{
switch (transport)
{
case HTTP:
case HTTPS:
{
return new HttpClientTransportOverHTTP(1);
}
case HTTP2:
case H2C:
case H2:
{
HTTP2Client http2Client = new HTTP2Client();
http2Client.setSelectors(1);
@ -122,6 +170,21 @@ public abstract class AbstractTest
}
}
protected String newURI()
{
switch (transport)
{
case HTTP:
case H2C:
return "http://localhost:" + connector.getLocalPort();
case HTTPS:
case H2:
return "https://localhost:" + connector.getLocalPort();
default:
throw new IllegalArgumentException();
}
}
@After
public void stop() throws Exception
{
@ -133,6 +196,6 @@ public abstract class AbstractTest
protected enum Transport
{
HTTP, HTTP2
HTTP, HTTPS, H2C, H2
}
}

View File

@ -53,7 +53,7 @@ public class AsyncRequestContentTest extends AbstractTest
DeferredContentProvider contentProvider = new DeferredContentProvider();
CountDownLatch latch = new CountDownLatch(1);
client.POST("http://localhost:" + connector.getLocalPort())
client.POST(newURI())
.content(contentProvider)
.send(result ->
{
@ -73,7 +73,7 @@ public class AsyncRequestContentTest extends AbstractTest
DeferredContentProvider contentProvider = new DeferredContentProvider();
CountDownLatch latch = new CountDownLatch(1);
client.POST("http://localhost:" + connector.getLocalPort())
client.POST(newURI())
.content(contentProvider)
.send(result ->
{
@ -95,7 +95,7 @@ public class AsyncRequestContentTest extends AbstractTest
InputStreamContentProvider contentProvider =
new InputStreamContentProvider(new ByteArrayInputStream(new byte[0]));
CountDownLatch latch = new CountDownLatch(1);
client.POST("http://localhost:" + connector.getLocalPort())
client.POST(newURI())
.content(contentProvider)
.send(result ->
{
@ -116,7 +116,7 @@ public class AsyncRequestContentTest extends AbstractTest
InputStreamContentProvider contentProvider =
new InputStreamContentProvider(new ByteArrayInputStream(new byte[1]));
CountDownLatch latch = new CountDownLatch(1);
client.POST("http://localhost:" + connector.getLocalPort())
client.POST(newURI())
.content(contentProvider)
.send(result ->
{
@ -136,7 +136,7 @@ public class AsyncRequestContentTest extends AbstractTest
OutputStreamContentProvider contentProvider = new OutputStreamContentProvider();
CountDownLatch latch = new CountDownLatch(1);
client.POST("http://localhost:" + connector.getLocalPort())
client.POST(newURI())
.content(contentProvider)
.send(result ->
{
@ -156,7 +156,7 @@ public class AsyncRequestContentTest extends AbstractTest
OutputStreamContentProvider contentProvider = new OutputStreamContentProvider();
CountDownLatch latch = new CountDownLatch(1);
client.POST("http://localhost:" + connector.getLocalPort())
client.POST(newURI())
.content(contentProvider)
.send(result ->
{

View File

@ -27,8 +27,6 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.junit.Assert;
@ -61,14 +59,10 @@ public class HttpClientIdleTimeoutTest extends AbstractTest
client.start();
final CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort()).send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
client.newRequest(newURI()).send(result ->
{
if (result.isFailed())
latch.countDown();
}
});
Assert.assertTrue(latch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
@ -89,16 +83,12 @@ public class HttpClientIdleTimeoutTest extends AbstractTest
});
final CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
client.newRequest(newURI())
.idleTimeout(idleTimeout, TimeUnit.MILLISECONDS)
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
.send(result ->
{
if (result.isFailed())
latch.countDown();
}
});
Assert.assertTrue(latch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));

View File

@ -61,7 +61,7 @@ public class HttpClientTest extends AbstractTest
}
});
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
ContentResponse response = client.newRequest(newURI())
.timeout(5, TimeUnit.SECONDS)
.send();
@ -95,7 +95,7 @@ public class HttpClientTest extends AbstractTest
}
});
org.eclipse.jetty.client.api.Request request = client.newRequest("localhost", connector.getLocalPort());
org.eclipse.jetty.client.api.Request request = client.newRequest(newURI());
FutureResponseListener listener = new FutureResponseListener(request, length);
request.timeout(10, TimeUnit.SECONDS).send(listener);
ContentResponse response = listener.get();
@ -139,7 +139,7 @@ public class HttpClientTest extends AbstractTest
}
});
org.eclipse.jetty.client.api.Request request = client.newRequest("localhost", connector.getLocalPort());
org.eclipse.jetty.client.api.Request request = client.newRequest(newURI());
FutureResponseListener listener = new FutureResponseListener(request, 2 * length);
request.timeout(10, TimeUnit.SECONDS).send(listener);
ContentResponse response = listener.get();
@ -183,7 +183,7 @@ public class HttpClientTest extends AbstractTest
}
});
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
ContentResponse response = client.newRequest(newURI())
.method(HttpMethod.POST)
.content(new BytesContentProvider(bytes))
.timeout(15, TimeUnit.SECONDS)
@ -228,7 +228,7 @@ public class HttpClientTest extends AbstractTest
int chunkSize = 16;
byte[][] bytes = IntStream.range(0, chunks).mapToObj(x -> new byte[chunkSize]).toArray(byte[][]::new);
BytesContentProvider contentProvider = new BytesContentProvider("application/octet-stream", bytes);
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
ContentResponse response = client.newRequest(newURI())
.method(HttpMethod.POST)
.content(contentProvider)
.timeout(15, TimeUnit.SECONDS)