Moved HttpClientLoadTest to "tests" module to test all transports.
This commit is contained in:
parent
caa45283c7
commit
1056536155
|
@ -41,6 +41,7 @@ import org.eclipse.jetty.server.Server;
|
|||
import org.eclipse.jetty.server.ServerConnector;
|
||||
import org.eclipse.jetty.server.SslConnectionFactory;
|
||||
import org.eclipse.jetty.toolchain.test.TestTracker;
|
||||
import org.eclipse.jetty.util.SocketAddressResolver;
|
||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||
import org.junit.After;
|
||||
|
@ -89,22 +90,28 @@ public abstract class AbstractTest
|
|||
QueuedThreadPool serverThreads = new QueuedThreadPool();
|
||||
serverThreads.setName("server");
|
||||
server = new Server(serverThreads);
|
||||
connector = new ServerConnector(server, provideServerConnectionFactory(transport));
|
||||
connector = newServerConnector(server);
|
||||
server.addConnector(connector);
|
||||
server.setHandler(handler);
|
||||
server.start();
|
||||
}
|
||||
|
||||
protected ServerConnector newServerConnector(Server server)
|
||||
{
|
||||
return new ServerConnector(server, provideServerConnectionFactory(transport));
|
||||
}
|
||||
|
||||
private void startClient() throws Exception
|
||||
{
|
||||
QueuedThreadPool clientThreads = new QueuedThreadPool();
|
||||
clientThreads.setName("client");
|
||||
client = newHttpClient(provideClientTransport(transport), sslContextFactory);
|
||||
client.setExecutor(clientThreads);
|
||||
client.setSocketAddressResolver(new SocketAddressResolver.Sync());
|
||||
client.start();
|
||||
}
|
||||
|
||||
private ConnectionFactory[] provideServerConnectionFactory(Transport transport)
|
||||
protected ConnectionFactory[] provideServerConnectionFactory(Transport transport)
|
||||
{
|
||||
List<ConnectionFactory> result = new ArrayList<>();
|
||||
switch (transport)
|
||||
|
@ -194,18 +201,7 @@ public abstract class AbstractTest
|
|||
|
||||
protected String getScheme()
|
||||
{
|
||||
switch (transport)
|
||||
{
|
||||
case HTTP:
|
||||
case H2C:
|
||||
case FCGI:
|
||||
return "http";
|
||||
case HTTPS:
|
||||
case H2:
|
||||
return "https";
|
||||
default:
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
return isTransportSecure() ? "https" : "http";
|
||||
}
|
||||
|
||||
protected String newURI()
|
||||
|
@ -213,6 +209,22 @@ public abstract class AbstractTest
|
|||
return getScheme() + "://localhost:" + connector.getLocalPort();
|
||||
}
|
||||
|
||||
protected boolean isTransportSecure()
|
||||
{
|
||||
switch (transport)
|
||||
{
|
||||
case HTTP:
|
||||
case H2C:
|
||||
case FCGI:
|
||||
return false;
|
||||
case HTTPS:
|
||||
case H2:
|
||||
return true;
|
||||
default:
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
}
|
||||
|
||||
@After
|
||||
public void stop() throws Exception
|
||||
{
|
||||
|
|
|
@ -16,12 +16,11 @@
|
|||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.client;
|
||||
package org.eclipse.jetty.http.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
@ -35,29 +34,33 @@ import javax.servlet.ServletException;
|
|||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.eclipse.jetty.client.api.Connection;
|
||||
import org.eclipse.jetty.client.ConnectionPool;
|
||||
import org.eclipse.jetty.client.HttpClient;
|
||||
import org.eclipse.jetty.client.HttpClientTransport;
|
||||
import org.eclipse.jetty.client.HttpDestination;
|
||||
import org.eclipse.jetty.client.LeakTrackingConnectionPool;
|
||||
import org.eclipse.jetty.client.Origin;
|
||||
import org.eclipse.jetty.client.api.Request;
|
||||
import org.eclipse.jetty.client.api.Response;
|
||||
import org.eclipse.jetty.client.api.Result;
|
||||
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
|
||||
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
|
||||
import org.eclipse.jetty.client.http.HttpDestinationOverHTTP;
|
||||
import org.eclipse.jetty.client.util.BytesContentProvider;
|
||||
import org.eclipse.jetty.fcgi.client.http.HttpClientTransportOverFCGI;
|
||||
import org.eclipse.jetty.fcgi.client.http.HttpDestinationOverFCGI;
|
||||
import org.eclipse.jetty.http.HttpHeader;
|
||||
import org.eclipse.jetty.http.HttpMethod;
|
||||
import org.eclipse.jetty.http.HttpScheme;
|
||||
import org.eclipse.jetty.io.ArrayByteBufferPool;
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.io.LeakTrackingByteBufferPool;
|
||||
import org.eclipse.jetty.io.MappedByteBufferPool;
|
||||
import org.eclipse.jetty.server.AbstractConnectionFactory;
|
||||
import org.eclipse.jetty.server.HttpConnectionFactory;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.server.ServerConnector;
|
||||
import org.eclipse.jetty.server.handler.AbstractHandler;
|
||||
import org.eclipse.jetty.util.IO;
|
||||
import org.eclipse.jetty.util.LeakDetector;
|
||||
import org.eclipse.jetty.util.SocketAddressResolver;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||
import org.eclipse.jetty.util.thread.Scheduler;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.Assert;
|
||||
|
@ -65,65 +68,100 @@ import org.junit.Test;
|
|||
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
public class HttpClientLoadTest extends AbstractHttpClientServerTest
|
||||
public class HttpClientLoadTest extends AbstractTest
|
||||
{
|
||||
private final Logger logger = Log.getLogger(HttpClientLoadTest.class);
|
||||
private final AtomicLong requestCount = new AtomicLong();
|
||||
private final AtomicLong connectionLeaks = new AtomicLong();
|
||||
|
||||
public HttpClientLoadTest(SslContextFactory sslContextFactory)
|
||||
public HttpClientLoadTest(Transport transport)
|
||||
{
|
||||
super(sslContextFactory);
|
||||
super(transport);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIterative() throws Exception
|
||||
@Override
|
||||
protected ServerConnector newServerConnector(Server server)
|
||||
{
|
||||
int cores = Runtime.getRuntime().availableProcessors();
|
||||
ByteBufferPool byteBufferPool = new ArrayByteBufferPool();
|
||||
byteBufferPool = new LeakTrackingByteBufferPool(byteBufferPool);
|
||||
return new ServerConnector(server, null, null, byteBufferPool,
|
||||
1, Math.min(1, cores / 2), provideServerConnectionFactory(transport));
|
||||
}
|
||||
|
||||
final AtomicLong connectionLeaks = new AtomicLong();
|
||||
|
||||
start(new LoadHandler());
|
||||
server.stop();
|
||||
server.removeConnector(connector);
|
||||
LeakTrackingByteBufferPool serverBufferPool = new LeakTrackingByteBufferPool(new MappedByteBufferPool.Tagged());
|
||||
connector = new ServerConnector(server, connector.getExecutor(), connector.getScheduler(),
|
||||
serverBufferPool , 1, Math.min(1, cores / 2),
|
||||
AbstractConnectionFactory.getFactories(sslContextFactory, new HttpConnectionFactory()));
|
||||
server.addConnector(connector);
|
||||
server.start();
|
||||
|
||||
client.stop();
|
||||
|
||||
HttpClient newClient = new HttpClient(new HttpClientTransportOverHTTP()
|
||||
@Override
|
||||
protected HttpClientTransport provideClientTransport(Transport transport)
|
||||
{
|
||||
switch (transport)
|
||||
{
|
||||
@Override
|
||||
public HttpDestination newHttpDestination(Origin origin)
|
||||
case HTTP:
|
||||
case HTTPS:
|
||||
{
|
||||
return new HttpDestinationOverHTTP(getHttpClient(), origin)
|
||||
return new HttpClientTransportOverHTTP(1)
|
||||
{
|
||||
@Override
|
||||
protected DuplexConnectionPool newConnectionPool(HttpClient client)
|
||||
public HttpDestination newHttpDestination(Origin origin)
|
||||
{
|
||||
return new LeakTrackingConnectionPool(this, client.getMaxConnectionsPerDestination(), this)
|
||||
return new HttpDestinationOverHTTP(getHttpClient(), origin)
|
||||
{
|
||||
@Override
|
||||
protected void leaked(LeakDetector.LeakInfo resource)
|
||||
protected ConnectionPool newConnectionPool(HttpClient client)
|
||||
{
|
||||
connectionLeaks.incrementAndGet();
|
||||
return new LeakTrackingConnectionPool(this, client.getMaxConnectionsPerDestination(), this)
|
||||
{
|
||||
@Override
|
||||
protected void leaked(LeakDetector.LeakInfo leakInfo)
|
||||
{
|
||||
super.leaked(leakInfo);
|
||||
connectionLeaks.incrementAndGet();
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
}
|
||||
}, sslContextFactory);
|
||||
newClient.setExecutor(client.getExecutor());
|
||||
newClient.setSocketAddressResolver(new SocketAddressResolver.Sync());
|
||||
client = newClient;
|
||||
LeakTrackingByteBufferPool clientBufferPool = new LeakTrackingByteBufferPool(new MappedByteBufferPool.Tagged());
|
||||
client.setByteBufferPool(clientBufferPool);
|
||||
case FCGI:
|
||||
{
|
||||
return new HttpClientTransportOverFCGI(1, false, "")
|
||||
{
|
||||
@Override
|
||||
public HttpDestination newHttpDestination(Origin origin)
|
||||
{
|
||||
return new HttpDestinationOverFCGI(getHttpClient(), origin)
|
||||
{
|
||||
@Override
|
||||
protected ConnectionPool newConnectionPool(HttpClient client)
|
||||
{
|
||||
return new LeakTrackingConnectionPool(this, client.getMaxConnectionsPerDestination(), this)
|
||||
{
|
||||
@Override
|
||||
protected void leaked(LeakDetector.LeakInfo leakInfo)
|
||||
{
|
||||
super.leaked(leakInfo);
|
||||
connectionLeaks.incrementAndGet();
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
}
|
||||
default:
|
||||
{
|
||||
return super.provideClientTransport(transport);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIterative() throws Exception
|
||||
{
|
||||
start(new LoadHandler());
|
||||
|
||||
client.setByteBufferPool(new LeakTrackingByteBufferPool(new MappedByteBufferPool.Tagged()));
|
||||
client.setMaxConnectionsPerDestination(32768);
|
||||
client.setMaxRequestsQueuedPerDestination(1024 * 1024);
|
||||
client.setStrictEventOrdering(false);
|
||||
client.start();
|
||||
|
||||
// At least 25k requests to warmup properly (use -XX:+PrintCompilation to verify JIT activity)
|
||||
int runs = 1;
|
||||
|
@ -138,13 +176,23 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
|
|||
|
||||
System.gc();
|
||||
|
||||
assertThat("Server BufferPool - leaked acquires", serverBufferPool.getLeakedAcquires(), Matchers.is(0L));
|
||||
assertThat("Server BufferPool - leaked releases", serverBufferPool.getLeakedReleases(), Matchers.is(0L));
|
||||
assertThat("Server BufferPool - unreleased", serverBufferPool.getLeakedResources(), Matchers.is(0L));
|
||||
ByteBufferPool byteBufferPool = connector.getByteBufferPool();
|
||||
if (byteBufferPool instanceof LeakTrackingByteBufferPool)
|
||||
{
|
||||
LeakTrackingByteBufferPool serverBufferPool = (LeakTrackingByteBufferPool)byteBufferPool;
|
||||
assertThat("Server BufferPool - leaked acquires", serverBufferPool.getLeakedAcquires(), Matchers.is(0L));
|
||||
assertThat("Server BufferPool - leaked releases", serverBufferPool.getLeakedReleases(), Matchers.is(0L));
|
||||
assertThat("Server BufferPool - unreleased", serverBufferPool.getLeakedResources(), Matchers.is(0L));
|
||||
}
|
||||
|
||||
assertThat("Client BufferPool - leaked acquires", clientBufferPool.getLeakedAcquires(), Matchers.is(0L));
|
||||
assertThat("Client BufferPool - leaked releases", clientBufferPool.getLeakedReleases(), Matchers.is(0L));
|
||||
assertThat("Client BufferPool - unreleased", clientBufferPool.getLeakedResources(), Matchers.is(0L));
|
||||
byteBufferPool = client.getByteBufferPool();
|
||||
if (byteBufferPool instanceof LeakTrackingByteBufferPool)
|
||||
{
|
||||
LeakTrackingByteBufferPool clientBufferPool = (LeakTrackingByteBufferPool)byteBufferPool;
|
||||
assertThat("Client BufferPool - leaked acquires", clientBufferPool.getLeakedAcquires(), Matchers.is(0L));
|
||||
assertThat("Client BufferPool - leaked releases", clientBufferPool.getLeakedReleases(), Matchers.is(0L));
|
||||
assertThat("Client BufferPool - unreleased", clientBufferPool.getLeakedResources(), Matchers.is(0L));
|
||||
}
|
||||
|
||||
assertThat("Connection Leaks", connectionLeaks.get(), Matchers.is(0L));
|
||||
}
|
||||
|
@ -154,6 +202,10 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
|
|||
{
|
||||
start(new LoadHandler());
|
||||
|
||||
client.setByteBufferPool(new LeakTrackingByteBufferPool(new MappedByteBufferPool.Tagged()));
|
||||
client.setMaxConnectionsPerDestination(32768);
|
||||
client.setMaxRequestsQueuedPerDestination(1024 * 1024);
|
||||
|
||||
int runs = 1;
|
||||
int iterations = 256;
|
||||
IntStream.range(0, 16).parallel().forEach(i ->
|
||||
|
@ -166,24 +218,15 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
|
|||
CountDownLatch latch = new CountDownLatch(iterations);
|
||||
List<String> failures = new ArrayList<>();
|
||||
|
||||
int factor = logger.isDebugEnabled() ? 25 : 1;
|
||||
factor *= "http".equalsIgnoreCase(scheme) ? 10 : 1000;
|
||||
int factor = (logger.isDebugEnabled() ? 25 : 1) * 100;
|
||||
|
||||
// Dumps the state of the client if the test takes too long
|
||||
final Thread testThread = Thread.currentThread();
|
||||
Scheduler.Task task = client.getScheduler().schedule(() ->
|
||||
{
|
||||
logger.warn("Interrupting test, it is taking too long");
|
||||
for (String host : Arrays.asList("localhost", "127.0.0.1"))
|
||||
{
|
||||
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, connector.getLocalPort());
|
||||
DuplexConnectionPool connectionPool = destination.getConnectionPool();
|
||||
for (Connection connection : new ArrayList<>(connectionPool.getActiveConnections()))
|
||||
{
|
||||
HttpConnectionOverHTTP active = (HttpConnectionOverHTTP)connection;
|
||||
logger.warn(active.getEndPoint() + " exchange " + active.getHttpChannel().getHttpExchange());
|
||||
}
|
||||
}
|
||||
logger.warn("Interrupting test, it is taking too long{}{}{}{}",
|
||||
System.lineSeparator(), server.dump(),
|
||||
System.lineSeparator(), client.dump());
|
||||
testThread.interrupt();
|
||||
}, iterations * factor, TimeUnit.MILLISECONDS);
|
||||
|
||||
|
@ -200,7 +243,7 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
|
|||
logger.info("{} requests in {} ms, {} req/s", iterations, elapsed, elapsed > 0 ? iterations * 1000 / elapsed : -1);
|
||||
|
||||
for (String failure : failures)
|
||||
System.err.println("FAILED: "+failure);
|
||||
logger.info("FAILED: {}", failure);
|
||||
|
||||
Assert.assertTrue(failures.toString(), failures.isEmpty());
|
||||
}
|
||||
|
@ -213,26 +256,28 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
|
|||
// Choose a random method
|
||||
HttpMethod method = random.nextBoolean() ? HttpMethod.GET : HttpMethod.POST;
|
||||
|
||||
boolean ssl = HttpScheme.HTTPS.is(scheme);
|
||||
boolean ssl = isTransportSecure();
|
||||
|
||||
// Choose randomly whether to close the connection on the client or on the server
|
||||
boolean clientClose = false;
|
||||
if (!ssl && random.nextBoolean())
|
||||
if (!ssl && random.nextInt(100) < 5)
|
||||
clientClose = true;
|
||||
boolean serverClose = false;
|
||||
if (!ssl && random.nextBoolean())
|
||||
if (!ssl && random.nextInt(100) < 5)
|
||||
serverClose = true;
|
||||
|
||||
int maxContentLength = 64 * 1024;
|
||||
int contentLength = random.nextInt(maxContentLength) + 1;
|
||||
|
||||
test(scheme, host, method.asString(), clientClose, serverClose, contentLength, true, latch, failures);
|
||||
test(getScheme(), host, method.asString(), clientClose, serverClose, contentLength, true, latch, failures);
|
||||
}
|
||||
|
||||
private void test(String scheme, String host, String method, boolean clientClose, boolean serverClose, int contentLength, final boolean checkContentLength, final CountDownLatch latch, final List<String> failures)
|
||||
{
|
||||
long requestId = requestCount.incrementAndGet();
|
||||
Request request = client.newRequest(host, connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.path("/" + requestId)
|
||||
.method(method);
|
||||
|
||||
if (clientClose)
|
||||
|
@ -290,7 +335,12 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
|
|||
latch.countDown();
|
||||
}
|
||||
});
|
||||
await(requestLatch, 5, TimeUnit.SECONDS);
|
||||
if (!await(requestLatch, 5, TimeUnit.SECONDS))
|
||||
{
|
||||
logger.warn("Request {} took too long{}{}{}{}", requestId,
|
||||
System.lineSeparator(), server.dump(),
|
||||
System.lineSeparator(), client.dump());
|
||||
}
|
||||
}
|
||||
|
||||
private boolean await(CountDownLatch latch, long time, TimeUnit unit)
|
||||
|
@ -314,6 +364,7 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
|
|||
switch (method)
|
||||
{
|
||||
case "GET":
|
||||
{
|
||||
int contentLength = request.getIntHeader("X-Download");
|
||||
if (contentLength > 0)
|
||||
{
|
||||
|
@ -321,10 +372,13 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
|
|||
response.getOutputStream().write(new byte[contentLength]);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case "POST":
|
||||
{
|
||||
response.setHeader("X-Content", request.getHeader("X-Upload"));
|
||||
IO.copy(request.getInputStream(), response.getOutputStream());
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (Boolean.parseBoolean(request.getHeader("X-Close")))
|
Loading…
Reference in New Issue