Merge pull request #3099 from eclipse/jetty-9.4.x-HttpClient-dump-improvements

Fixes Issue #3103 - HttpClient buffer leak found by dump improvements

The leak problem was an additional, unnecessary, call retain() in ResponseNotifier.notifyContent() that was leaking `ByteBuffer`s.
This commit is contained in:
Simone Bordet 2018-11-13 18:04:17 +01:00 committed by GitHub
commit f2ed692a1b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 135 additions and 85 deletions

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.client;
import java.io.IOException;
import java.net.CookieManager;
import java.net.CookiePolicy;
import java.net.CookieStore;
@ -70,6 +71,7 @@ import org.eclipse.jetty.util.SocketAddressResolver;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.DumpableCollection;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
@ -179,12 +181,23 @@ public class HttpClient extends ContainerLifeCycle
public HttpClient(HttpClientTransport transport, SslContextFactory sslContextFactory)
{
this.transport = transport;
addBean(transport);
if (sslContextFactory == null)
{
sslContextFactory = new SslContextFactory(false);
sslContextFactory.setEndpointIdentificationAlgorithm("HTTPS");
}
this.sslContextFactory = sslContextFactory;
addBean(sslContextFactory);
addBean(handlers);
addBean(decoderFactories);
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
dumpObjects(out, indent, new DumpableCollection("requestListeners", requestListeners));
}
public HttpClientTransport getTransport()
@ -204,38 +217,24 @@ public class HttpClient extends ContainerLifeCycle
@Override
protected void doStart() throws Exception
{
// TODO use #addBean in constructor?
if (sslContextFactory != null)
addBean(sslContextFactory);
if (executor == null)
{
QueuedThreadPool threadPool = new QueuedThreadPool();
threadPool.setName(name);
executor = threadPool;
setExecutor(threadPool);
}
// TODO use #updateBean in #setExecutor
addBean(executor);
if (byteBufferPool == null)
byteBufferPool = new MappedByteBufferPool(2048,
executor instanceof ThreadPool.SizedThreadPool
? ((ThreadPool.SizedThreadPool)executor).getMaxThreads()/2
: ProcessorUtils.availableProcessors()*2);
// TODO use #updateBean in #setByteBufferPool?
addBean(byteBufferPool);
setByteBufferPool(new MappedByteBufferPool(2048,
executor instanceof ThreadPool.SizedThreadPool
? ((ThreadPool.SizedThreadPool)executor).getMaxThreads() / 2
: ProcessorUtils.availableProcessors() * 2));
if (scheduler == null)
scheduler = new ScheduledExecutorScheduler(name + "-scheduler", false);
// TODO use #updateBean in #setScheduler?
addBean(scheduler);
transport.setHttpClient(this);
addBean(transport);
setScheduler(new ScheduledExecutorScheduler(name + "-scheduler", false));
if (resolver == null)
resolver = new SocketAddressResolver.Async(executor, scheduler, getAddressResolutionTimeout());
addBean(resolver);
setSocketAddressResolver(new SocketAddressResolver.Async(executor, scheduler, getAddressResolutionTimeout()));
handlers.put(new ContinueProtocolHandler());
handlers.put(new RedirectProtocolHandler(this));
@ -247,6 +246,7 @@ public class HttpClient extends ContainerLifeCycle
cookieManager = newCookieManager();
cookieStore = cookieManager.getCookieStore();
transport.setHttpClient(this);
super.doStart();
}
@ -649,6 +649,9 @@ public class HttpClient extends ContainerLifeCycle
*/
public void setByteBufferPool(ByteBufferPool byteBufferPool)
{
if (isStarted())
LOG.warn("Calling setByteBufferPool() while started is deprecated");
updateBean(this.byteBufferPool, byteBufferPool);
this.byteBufferPool = byteBufferPool;
}
@ -800,8 +803,9 @@ public class HttpClient extends ContainerLifeCycle
*/
public void setExecutor(Executor executor)
{
if (isRunning())
LOG.warn("setExecutor called when in {} state",getState());
if (isStarted())
LOG.warn("Calling setExecutor() while started is deprecated");
updateBean(this.executor, executor);
this.executor = executor;
}
@ -818,6 +822,9 @@ public class HttpClient extends ContainerLifeCycle
*/
public void setScheduler(Scheduler scheduler)
{
if (isStarted())
LOG.warn("Calling setScheduler() while started is deprecated");
updateBean(this.scheduler, scheduler);
this.scheduler = scheduler;
}
@ -834,6 +841,9 @@ public class HttpClient extends ContainerLifeCycle
*/
public void setSocketAddressResolver(SocketAddressResolver resolver)
{
if (isStarted())
LOG.warn("Calling setSocketAddressResolver() while started is deprecated");
updateBean(this.resolver, resolver);
this.resolver = resolver;
}

View File

@ -18,16 +18,18 @@
package org.eclipse.jetty.client;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.util.component.Dumpable;
/**
* <p>A container for {@link ProtocolHandler}s accessible from {@link HttpClient#getProtocolHandlers()}.</p>
*/
public class ProtocolHandlers
public class ProtocolHandlers implements Dumpable
{
private final Map<String, ProtocolHandler> handlers = new LinkedHashMap<>();
@ -91,4 +93,16 @@ public class ProtocolHandlers
}
return null;
}
@Override
public String dump()
{
return Dumpable.dump(this);
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
Dumpable.dumpObjects(out, indent, this, handlers);
}
}

View File

@ -30,7 +30,6 @@ import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.CountingCallback;
import org.eclipse.jetty.util.Retainable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -122,13 +121,8 @@ public class ResponseNotifier
else
{
CountingCallback counter = new CountingCallback(callback, contentListeners.size());
Retainable retainable = callback instanceof Retainable ? (Retainable)callback : null;
for (Response.AsyncContentListener listener : contentListeners)
{
if (retainable != null)
retainable.retain();
notifyContent(listener, response, buffer.slice(), counter);
}
}
}

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.client;
import java.nio.file.Path;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
@ -30,6 +31,8 @@ import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.util.SocketAddressResolver;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
import org.eclipse.jetty.util.thread.Scheduler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.provider.Arguments;
@ -64,19 +67,32 @@ public abstract class AbstractHttpClientServerTest
protected void startClient(final Scenario scenario) throws Exception
{
startClient(scenario, new HttpClientTransportOverHTTP(1));
startClient(scenario, null,null);
}
protected void startClient(final Scenario scenario, HttpClientTransport transport) throws Exception
protected void startClient(final Scenario scenario, HttpClientTransport transport, Consumer<HttpClient> config) throws Exception
{
QueuedThreadPool clientThreads = new QueuedThreadPool();
clientThreads.setName("client");
client = new HttpClient(transport, scenario.newSslContextFactory());
client.setExecutor(clientThreads);
if (transport==null)
transport = new HttpClientTransportOverHTTP(1);
QueuedThreadPool executor = new QueuedThreadPool();
executor.setName("client");
Scheduler scheduler = new ScheduledExecutorScheduler("client-scheduler", false);
client = newHttpClient(scenario, transport);
client.setExecutor(executor);
client.setScheduler(scheduler);
client.setSocketAddressResolver(new SocketAddressResolver.Sync());
if (config!=null)
config.accept(client);
client.start();
}
public HttpClient newHttpClient(Scenario scenario, HttpClientTransport transport)
{
return new HttpClient(transport, scenario.newSslContextFactory());
}
@AfterEach
public void disposeClient() throws Exception
{

View File

@ -111,6 +111,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
{
public WorkDir testdir;
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
public void testStoppingClosesConnections(Scenario scenario) throws Exception
@ -880,31 +881,33 @@ public class HttpClientTest extends AbstractHttpClientServerTest
@ArgumentsSource(ScenarioProvider.class)
public void testConnectHostWithMultipleAddresses(Scenario scenario) throws Exception
{
start(scenario, new EmptyServerHandler());
client.setSocketAddressResolver(new SocketAddressResolver.Async(client.getExecutor(), client.getScheduler(), client.getConnectTimeout())
startServer(scenario, new EmptyServerHandler());
startClient(scenario, null, client ->
{
@Override
public void resolve(String host, int port, Promise<List<InetSocketAddress>> promise)
client.setSocketAddressResolver(new SocketAddressResolver.Async(client.getExecutor(), client.getScheduler(), 5000)
{
super.resolve(host, port, new Promise<List<InetSocketAddress>>()
@Override
public void resolve(String host, int port, Promise<List<InetSocketAddress>> promise)
{
@Override
public void succeeded(List<InetSocketAddress> result)
super.resolve(host, port, new Promise<List<InetSocketAddress>>()
{
// Add as first address an invalid address so that we test
// that the connect operation iterates over the addresses.
result.add(0, new InetSocketAddress("idontexist", port));
promise.succeeded(result);
}
@Override
public void succeeded(List<InetSocketAddress> result)
{
// Add as first address an invalid address so that we test
// that the connect operation iterates over the addresses.
result.add(0, new InetSocketAddress("idontexist", port));
promise.succeeded(result);
}
@Override
public void failed(Throwable x)
{
promise.failed(x);
}
});
}
@Override
public void failed(Throwable x)
{
promise.failed(x);
}
});
}
});
});
// If no exceptions the test passes.

View File

@ -43,7 +43,6 @@ import org.eclipse.jetty.client.http.HttpDestinationOverHTTP;
import org.eclipse.jetty.client.util.ByteBufferContentProvider;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.StacklessLogging;
@ -55,10 +54,11 @@ import org.junit.jupiter.params.provider.ArgumentsSource;
public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
{
@Override
public void start(Scenario scenario, Handler handler) throws Exception
public HttpClient newHttpClient(Scenario scenario, HttpClientTransport transport)
{
super.start(scenario, handler);
HttpClient client = super.newHttpClient(scenario, transport);
client.setStrictEventOrdering(false);
return client;
}
@ParameterizedTest

View File

@ -30,7 +30,6 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.client.util.FutureResponseListener;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
@ -43,13 +42,13 @@ import org.junit.jupiter.params.provider.ArgumentsSource;
public class ValidatingConnectionPoolTest extends AbstractHttpClientServerTest
{
@Override
protected void startClient(final Scenario scenario) throws Exception
public HttpClient newHttpClient(Scenario scenario, HttpClientTransport transport)
{
long timeout = 1000;
HttpClientTransportOverHTTP transport = new HttpClientTransportOverHTTP(1);
transport.setConnectionPoolFactory(destination ->
new ValidatingConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, destination.getHttpClient().getScheduler(), timeout));
startClient(scenario, transport);
return super.newHttpClient(scenario, transport);
}
@ParameterizedTest

View File

@ -27,10 +27,9 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
class SslSelectionDump extends ContainerLifeCycle implements Dumpable
class SslSelectionDump implements Dumpable
{
static class CaptionedList extends ArrayList<String> implements Dumpable
{
@ -66,9 +65,7 @@ class SslSelectionDump extends ContainerLifeCycle implements Dumpable
String[] includedByConfig)
{
this.type = type;
addBean(enabled);
addBean(disabled);
List<String> jvmEnabled = Arrays.asList(enabledByJVM);
List<Pattern> excludedPatterns = Arrays.stream(excludedByConfig)
.map((entry) -> Pattern.compile(entry))
@ -165,7 +162,7 @@ class SslSelectionDump extends ContainerLifeCycle implements Dumpable
@Override
public void dump(Appendable out, String indent) throws IOException
{
dumpObjects(out, indent);
Dumpable.dumpObjects(out, indent, this, enabled, disabled);
}
@Override

View File

@ -18,6 +18,9 @@
package org.eclipse.jetty.http.client;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
@ -65,9 +68,6 @@ import org.hamcrest.Matchers;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTransportScenario>
{
private final Logger logger = Log.getLogger(HttpClientLoadTest.class);
@ -85,11 +85,12 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
public void testIterative(Transport transport) throws Exception
{
init(transport);
scenario.start(new LoadHandler());
scenario.client.setByteBufferPool(new LeakTrackingByteBufferPool(new MappedByteBufferPool.Tagged()));
scenario.client.setMaxConnectionsPerDestination(32768);
scenario.client.setMaxRequestsQueuedPerDestination(1024 * 1024);
scenario.start(new LoadHandler(), client ->
{
client.setByteBufferPool(new LeakTrackingByteBufferPool(new MappedByteBufferPool.Tagged()));
client.setMaxConnectionsPerDestination(32768);
client.setMaxRequestsQueuedPerDestination(1024 * 1024);
});
// At least 25k requests to warmup properly (use -XX:+PrintCompilation to verify JIT activity)
int runs = 1;
@ -130,11 +131,13 @@ public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTran
public void testConcurrent(Transport transport) throws Exception
{
init(transport);
scenario.start(new LoadHandler());
scenario.start(new LoadHandler(), client ->
{
client.setByteBufferPool(new LeakTrackingByteBufferPool(new MappedByteBufferPool.Tagged()));
client.setMaxConnectionsPerDestination(32768);
client.setMaxRequestsQueuedPerDestination(1024 * 1024);
});
scenario.client.setByteBufferPool(new LeakTrackingByteBufferPool(new MappedByteBufferPool.Tagged()));
scenario.client.setMaxConnectionsPerDestination(32768);
scenario.client.setMaxRequestsQueuedPerDestination(1024 * 1024);
int runs = 1;
int iterations = 256;

View File

@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.function.Consumer;
import javax.servlet.http.HttpServlet;
@ -270,20 +271,29 @@ public class TransportScenario
else
setConnectionIdleTimeout(idleTimeout);
}
public void start(Handler handler) throws Exception
{
start(handler,null);
}
public void start(Handler handler, Consumer<HttpClient> config) throws Exception
{
startServer(handler);
startClient();
startClient(config);
}
public void start(HttpServlet servlet) throws Exception
{
startServer(servlet);
startClient();
startClient(null);
}
public void startClient() throws Exception
{
startClient(null);
}
public void startClient(Consumer<HttpClient> config) throws Exception
{
QueuedThreadPool clientThreads = new QueuedThreadPool();
clientThreads.setName("client");
@ -291,6 +301,10 @@ public class TransportScenario
client = newHttpClient(provideClientTransport(transport), sslContextFactory);
client.setExecutor(clientThreads);
client.setSocketAddressResolver(new SocketAddressResolver.Sync());
if (config!=null)
config.accept(client);
client.start();
if (server != null)
server.addBean(client);