HttpClient optimizations.
HttpConnectionPool now puts "hot" idle connection in the front of a deque, rather than at the end of a queue, so that the hotter they are the more they are used. HttpRequest is now initialized with better defaults so that HttpConnection.normalizeRequest() has less work to do. This reduced the number of branches to be executed for each request. HttpDestination now produces less garbage by using Jetty's BlockingArrayQueue instead of Java's LinkedBlockingQueue. HttpClient has now a bigger response buffer, 16 KiB, by default.
This commit is contained in:
parent
8636c7da45
commit
d86ceaf790
|
@ -122,7 +122,7 @@ public class HttpClient extends ContainerLifeCycle
|
|||
private volatile int maxConnectionsPerDestination = 64;
|
||||
private volatile int maxRequestsQueuedPerDestination = 1024;
|
||||
private volatile int requestBufferSize = 4096;
|
||||
private volatile int responseBufferSize = 4096;
|
||||
private volatile int responseBufferSize = 16384;
|
||||
private volatile int maxRedirects = 8;
|
||||
private volatile SocketAddress bindAddress;
|
||||
private volatile long connectTimeout = 15000;
|
||||
|
|
|
@ -22,7 +22,6 @@ import java.net.HttpCookie;
|
|||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.eclipse.jetty.client.api.Authentication;
|
||||
import org.eclipse.jetty.client.api.Connection;
|
||||
|
@ -82,23 +81,11 @@ public abstract class HttpConnection implements Connection
|
|||
|
||||
protected void normalizeRequest(Request request)
|
||||
{
|
||||
if (request.getMethod() == null)
|
||||
request.method(HttpMethod.GET);
|
||||
|
||||
if (request.getVersion() == null)
|
||||
request.version(HttpVersion.HTTP_1_1);
|
||||
|
||||
if (request.getIdleTimeout() <= 0)
|
||||
request.idleTimeout(client.getIdleTimeout(), TimeUnit.MILLISECONDS);
|
||||
|
||||
String method = request.getMethod();
|
||||
HttpVersion version = request.getVersion();
|
||||
HttpFields headers = request.getHeaders();
|
||||
ContentProvider content = request.getContent();
|
||||
|
||||
if (request.getAgent() == null)
|
||||
headers.put(client.getUserAgentField());
|
||||
|
||||
// Make sure the path is there
|
||||
String path = request.getPath();
|
||||
if (path.trim().length() == 0)
|
||||
|
@ -155,12 +142,5 @@ public abstract class HttpConnection implements Connection
|
|||
Authentication.Result authnResult = client.getAuthenticationStore().findAuthenticationResult(authenticationURI);
|
||||
if (authnResult != null)
|
||||
authnResult.apply(request);
|
||||
|
||||
if (!headers.containsKey(HttpHeader.ACCEPT_ENCODING.asString()))
|
||||
{
|
||||
HttpField acceptEncodingField = client.getAcceptEncodingField();
|
||||
if (acceptEncodingField != null)
|
||||
headers.put(acceptEncodingField);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,7 +24,6 @@ import java.net.URI;
|
|||
import java.nio.channels.AsynchronousCloseException;
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -38,6 +37,7 @@ import org.eclipse.jetty.http.HttpField;
|
|||
import org.eclipse.jetty.http.HttpHeader;
|
||||
import org.eclipse.jetty.http.HttpMethod;
|
||||
import org.eclipse.jetty.http.HttpScheme;
|
||||
import org.eclipse.jetty.util.BlockingArrayQueue;
|
||||
import org.eclipse.jetty.util.Promise;
|
||||
import org.eclipse.jetty.util.component.ContainerLifeCycle;
|
||||
import org.eclipse.jetty.util.component.Dumpable;
|
||||
|
@ -66,7 +66,7 @@ public abstract class HttpDestination implements Destination, Closeable, Dumpabl
|
|||
this.host = host;
|
||||
this.address = new Address(host, port);
|
||||
|
||||
this.exchanges = new LinkedBlockingQueue<>(client.getMaxRequestsQueuedPerDestination());
|
||||
this.exchanges = new BlockingArrayQueue<>(client.getMaxRequestsQueuedPerDestination());
|
||||
|
||||
this.requestNotifier = new RequestNotifier(client);
|
||||
this.responseNotifier = new ResponseNotifier(client);
|
||||
|
|
|
@ -43,6 +43,7 @@ import org.eclipse.jetty.client.api.Request;
|
|||
import org.eclipse.jetty.client.api.Response;
|
||||
import org.eclipse.jetty.client.util.FutureResponseListener;
|
||||
import org.eclipse.jetty.client.util.PathContentProvider;
|
||||
import org.eclipse.jetty.http.HttpField;
|
||||
import org.eclipse.jetty.http.HttpFields;
|
||||
import org.eclipse.jetty.http.HttpHeader;
|
||||
import org.eclipse.jetty.http.HttpMethod;
|
||||
|
@ -66,8 +67,8 @@ public class HttpRequest implements Request
|
|||
private String scheme;
|
||||
private String path;
|
||||
private String query;
|
||||
private String method;
|
||||
private HttpVersion version;
|
||||
private String method = HttpMethod.GET.asString();
|
||||
private HttpVersion version = HttpVersion.HTTP_1_1;
|
||||
private long idleTimeout;
|
||||
private long timeout;
|
||||
private ContentProvider content;
|
||||
|
@ -91,6 +92,11 @@ public class HttpRequest implements Request
|
|||
extractParams(query);
|
||||
this.uri = buildURI(true);
|
||||
followRedirects(client.isFollowRedirects());
|
||||
idleTimeout = client.getIdleTimeout();
|
||||
HttpField acceptEncodingField = client.getAcceptEncodingField();
|
||||
if (acceptEncodingField != null)
|
||||
headers.put(acceptEncodingField);
|
||||
headers.put(client.getUserAgentField());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -134,8 +140,7 @@ public class HttpRequest implements Request
|
|||
@Override
|
||||
public Request method(HttpMethod method)
|
||||
{
|
||||
this.method = method.asString();
|
||||
return this;
|
||||
return method(method.asString());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -195,7 +200,7 @@ public class HttpRequest implements Request
|
|||
@Override
|
||||
public Request version(HttpVersion version)
|
||||
{
|
||||
this.version = version;
|
||||
this.version = Objects.requireNonNull(version);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
|
@ -19,7 +19,9 @@
|
|||
package org.eclipse.jetty.client.http;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.BlockingDeque;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.eclipse.jetty.client.api.Connection;
|
||||
|
@ -39,7 +41,7 @@ public class HttpConnectionPool implements Dumpable
|
|||
private final Destination destination;
|
||||
private final int maxConnections;
|
||||
private final Promise<Connection> connectionPromise;
|
||||
private final BlockingQueue<Connection> idleConnections;
|
||||
private final BlockingDeque<Connection> idleConnections;
|
||||
private final BlockingQueue<Connection> activeConnections;
|
||||
|
||||
public HttpConnectionPool(Destination destination, int maxConnections, Promise<Connection> connectionPromise)
|
||||
|
@ -47,7 +49,7 @@ public class HttpConnectionPool implements Dumpable
|
|||
this.destination = destination;
|
||||
this.maxConnections = maxConnections;
|
||||
this.connectionPromise = connectionPromise;
|
||||
this.idleConnections = new BlockingArrayQueue<>(maxConnections);
|
||||
this.idleConnections = new LinkedBlockingDeque<>(maxConnections);
|
||||
this.activeConnections = new BlockingArrayQueue<>(maxConnections);
|
||||
}
|
||||
|
||||
|
@ -110,7 +112,7 @@ public class HttpConnectionPool implements Dumpable
|
|||
|
||||
private Connection acquireIdleConnection()
|
||||
{
|
||||
Connection connection = idleConnections.poll();
|
||||
Connection connection = idleConnections.pollFirst();
|
||||
if (connection != null)
|
||||
activate(connection);
|
||||
return connection;
|
||||
|
@ -134,7 +136,8 @@ public class HttpConnectionPool implements Dumpable
|
|||
{
|
||||
if (activeConnections.remove(connection))
|
||||
{
|
||||
if (idleConnections.offer(connection))
|
||||
// Make sure we use "hot" connections first
|
||||
if (idleConnections.offerFirst(connection))
|
||||
{
|
||||
LOG.debug("Connection idle {}", connection);
|
||||
return true;
|
||||
|
@ -147,7 +150,7 @@ public class HttpConnectionPool implements Dumpable
|
|||
return false;
|
||||
}
|
||||
|
||||
public void remove(Connection connection)
|
||||
public boolean remove(Connection connection)
|
||||
{
|
||||
boolean removed = activeConnections.remove(connection);
|
||||
removed |= idleConnections.remove(connection);
|
||||
|
@ -156,6 +159,7 @@ public class HttpConnectionPool implements Dumpable
|
|||
int pooled = connectionCount.decrementAndGet();
|
||||
LOG.debug("Connection removed {} - pooled: {}", connection, pooled);
|
||||
}
|
||||
return removed;
|
||||
}
|
||||
|
||||
public boolean isActive(Connection connection)
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
package org.eclipse.jetty.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
@ -51,6 +50,7 @@ import org.eclipse.jetty.util.IO;
|
|||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||
import org.eclipse.jetty.util.thread.Scheduler;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -73,9 +73,27 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
|
|||
client.setMaxConnectionsPerDestination(32768);
|
||||
client.setMaxRequestsQueuedPerDestination(1024 * 1024);
|
||||
client.setDispatchIO(false);
|
||||
client.setStrictEventOrdering(false);
|
||||
|
||||
Random random = new Random();
|
||||
// At least 25k requests to warmup properly (use -XX:+PrintCompilation to verify JIT activity)
|
||||
int runs = 1;
|
||||
int iterations = 500;
|
||||
for (int i = 0; i < runs; ++i)
|
||||
{
|
||||
run(random, iterations);
|
||||
}
|
||||
|
||||
// Re-run after warmup
|
||||
iterations = 50_000;
|
||||
for (int i = 0; i < runs; ++i)
|
||||
{
|
||||
run(random, iterations);
|
||||
}
|
||||
}
|
||||
|
||||
private void run(Random random, int iterations) throws InterruptedException
|
||||
{
|
||||
CountDownLatch latch = new CountDownLatch(iterations);
|
||||
List<String> failures = new ArrayList<>();
|
||||
|
||||
|
@ -84,7 +102,7 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
|
|||
|
||||
// Dumps the state of the client if the test takes too long
|
||||
final Thread testThread = Thread.currentThread();
|
||||
client.getScheduler().schedule(new Runnable()
|
||||
Scheduler.Task task = client.getScheduler().schedule(new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
|
@ -108,9 +126,11 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
|
|||
for (int i = 0; i < iterations; ++i)
|
||||
{
|
||||
test(random, latch, failures);
|
||||
// test("http", "localhost", "GET", false, false, 64 * 1024, false, latch, failures);
|
||||
}
|
||||
Assert.assertTrue(latch.await(iterations, TimeUnit.SECONDS));
|
||||
long end = System.nanoTime();
|
||||
task.cancel();
|
||||
long elapsed = TimeUnit.NANOSECONDS.toMillis(end - begin);
|
||||
logger.info("{} requests in {} ms, {} req/s", iterations, elapsed, elapsed > 0 ? iterations * 1000 / elapsed : -1);
|
||||
|
||||
|
@ -122,34 +142,44 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
|
|||
|
||||
private void test(Random random, final CountDownLatch latch, final List<String> failures) throws InterruptedException
|
||||
{
|
||||
int maxContentLength = 64 * 1024;
|
||||
|
||||
// Choose a random destination
|
||||
String host = random.nextBoolean() ? "localhost" : "127.0.0.1";
|
||||
URI uri = URI.create(scheme + "://" + host + ":" + connector.getLocalPort());
|
||||
Request request = client.newRequest(uri);
|
||||
|
||||
// Choose a random method
|
||||
HttpMethod method = random.nextBoolean() ? HttpMethod.GET : HttpMethod.POST;
|
||||
request.method(method);
|
||||
|
||||
boolean ssl = HttpScheme.HTTPS.is(scheme);
|
||||
|
||||
// Choose randomly whether to close the connection on the client or on the server
|
||||
boolean clientClose = false;
|
||||
if (!ssl && random.nextBoolean())
|
||||
clientClose = true;
|
||||
boolean serverClose = false;
|
||||
if (!ssl && random.nextBoolean())
|
||||
serverClose = true;
|
||||
|
||||
int maxContentLength = 64 * 1024;
|
||||
int contentLength = random.nextInt(maxContentLength) + 1;
|
||||
|
||||
test(scheme, host, method.asString(), clientClose, serverClose, contentLength, true, latch, failures);
|
||||
}
|
||||
|
||||
private void test(String scheme, String host, String method, boolean clientClose, boolean serverClose, int contentLength, final boolean checkContentLength, final CountDownLatch latch, final List<String> failures) throws InterruptedException
|
||||
{
|
||||
Request request = client.newRequest(host, connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.method(method);
|
||||
|
||||
if (clientClose)
|
||||
request.header(HttpHeader.CONNECTION, "close");
|
||||
else if (!ssl && random.nextBoolean())
|
||||
else if (serverClose)
|
||||
request.header("X-Close", "true");
|
||||
|
||||
int contentLength = random.nextInt(maxContentLength) + 1;
|
||||
switch (method)
|
||||
{
|
||||
case GET:
|
||||
// Randomly ask the server to download data upon this GET request
|
||||
if (random.nextBoolean())
|
||||
request.header("X-Download", String.valueOf(contentLength));
|
||||
case "GET":
|
||||
request.header("X-Download", String.valueOf(contentLength));
|
||||
break;
|
||||
case POST:
|
||||
case "POST":
|
||||
request.header("X-Upload", String.valueOf(contentLength));
|
||||
request.content(new BytesContentProvider(new byte[contentLength]));
|
||||
break;
|
||||
|
@ -163,15 +193,19 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
|
|||
@Override
|
||||
public void onHeaders(Response response)
|
||||
{
|
||||
String content = response.getHeaders().get("X-Content");
|
||||
if (content != null)
|
||||
contentLength.set(Integer.parseInt(content));
|
||||
if (checkContentLength)
|
||||
{
|
||||
String content = response.getHeaders().get("X-Content");
|
||||
if (content != null)
|
||||
contentLength.set(Integer.parseInt(content));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onContent(Response response, ByteBuffer content)
|
||||
{
|
||||
contentLength.addAndGet(-content.remaining());
|
||||
if (checkContentLength)
|
||||
contentLength.addAndGet(-content.remaining());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -182,8 +216,10 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
|
|||
result.getFailure().printStackTrace();
|
||||
failures.add("Result failed " + result);
|
||||
}
|
||||
if (contentLength.get() != 0)
|
||||
|
||||
if (checkContentLength && contentLength.get() != 0)
|
||||
failures.add("Content length mismatch " + contentLength);
|
||||
|
||||
requestLatch.countDown();
|
||||
latch.countDown();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue