Watcher: Increase HttpClient parallel sent requests (#30130)
The HTTPClient used in watcher is based on the apache http client. The current client is using a lot of defaults - which are not always optimal. Two of those defaults are the maximum number of total connections and the maximum number of connections to a single route. If one of those limits is reached, the HTTPClient waits for a connection to be finished thus acting in a blocking fashion. In order to prevent this when many requests are being executed, we increase the limit of total connections as well as the connections per route (a route is basically an endpoint, which also contains proxy information, not containing an URL, just hosts). On top of that an additional option has been set to evict long running connections, which can potentially be reused after some time. As this requires an additional background thread, this required some changes to ensure that the httpclient is closed properly. Also the timeout for this can be configured.
This commit is contained in:
parent
4b36ea7433
commit
f00890ee38
|
@ -163,6 +163,9 @@ analysis module. ({pull}30397[#30397])
|
||||||
Added new "Request" object flavored request methods in the RestClient. Prefer
|
Added new "Request" object flavored request methods in the RestClient. Prefer
|
||||||
these instead of the multi-argument versions. ({pull}29623[#29623])
|
these instead of the multi-argument versions. ({pull}29623[#29623])
|
||||||
|
|
||||||
|
Watcher HTTP client used in watches now allows more parallel connections to the
|
||||||
|
same endpoint and evicts long running connections. ({pull}30130[#30130])
|
||||||
|
|
||||||
The cluster state listener to decide if watcher should be
|
The cluster state listener to decide if watcher should be
|
||||||
stopped/started/paused now runs far less code in an executor but is more
|
stopped/started/paused now runs far less code in an executor but is more
|
||||||
synchronous and predictable. Also the trigger engine thread is only started on
|
synchronous and predictable. Also the trigger engine thread is only started on
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.elasticsearch.common.util.BigArrays;
|
||||||
import org.elasticsearch.common.util.PageCacheRecycler;
|
import org.elasticsearch.common.util.PageCacheRecycler;
|
||||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||||
|
import org.elasticsearch.core.internal.io.IOUtils;
|
||||||
import org.elasticsearch.env.Environment;
|
import org.elasticsearch.env.Environment;
|
||||||
import org.elasticsearch.env.NodeEnvironment;
|
import org.elasticsearch.env.NodeEnvironment;
|
||||||
import org.elasticsearch.http.HttpServerTransport;
|
import org.elasticsearch.http.HttpServerTransport;
|
||||||
|
@ -38,6 +39,7 @@ import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||||
import org.elasticsearch.ingest.Processor;
|
import org.elasticsearch.ingest.Processor;
|
||||||
import org.elasticsearch.license.LicenseService;
|
import org.elasticsearch.license.LicenseService;
|
||||||
import org.elasticsearch.license.XPackLicenseState;
|
import org.elasticsearch.license.XPackLicenseState;
|
||||||
|
import org.elasticsearch.persistent.PersistentTasksExecutor;
|
||||||
import org.elasticsearch.plugins.ActionPlugin;
|
import org.elasticsearch.plugins.ActionPlugin;
|
||||||
import org.elasticsearch.plugins.AnalysisPlugin;
|
import org.elasticsearch.plugins.AnalysisPlugin;
|
||||||
import org.elasticsearch.plugins.ClusterPlugin;
|
import org.elasticsearch.plugins.ClusterPlugin;
|
||||||
|
@ -57,9 +59,9 @@ import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.Transport;
|
import org.elasticsearch.transport.Transport;
|
||||||
import org.elasticsearch.transport.TransportInterceptor;
|
import org.elasticsearch.transport.TransportInterceptor;
|
||||||
import org.elasticsearch.watcher.ResourceWatcherService;
|
import org.elasticsearch.watcher.ResourceWatcherService;
|
||||||
import org.elasticsearch.persistent.PersistentTasksExecutor;
|
|
||||||
import org.elasticsearch.xpack.core.ssl.SSLService;
|
import org.elasticsearch.xpack.core.ssl.SSLService;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
@ -391,6 +393,11 @@ public class LocalStateCompositeXPackPlugin extends XPackPlugin implements Scrip
|
||||||
.collect(toList());
|
.collect(toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
IOUtils.close(plugins);
|
||||||
|
}
|
||||||
|
|
||||||
private <T> List<T> filterPlugins(Class<T> type) {
|
private <T> List<T> filterPlugins(Class<T> type) {
|
||||||
return plugins.stream().filter(x -> type.isAssignableFrom(x.getClass())).map(p -> ((T)p))
|
return plugins.stream().filter(x -> type.isAssignableFrom(x.getClass())).map(p -> ((T)p))
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.elasticsearch.common.settings.SettingsFilter;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||||
|
import org.elasticsearch.core.internal.io.IOUtils;
|
||||||
import org.elasticsearch.env.Environment;
|
import org.elasticsearch.env.Environment;
|
||||||
import org.elasticsearch.env.NodeEnvironment;
|
import org.elasticsearch.env.NodeEnvironment;
|
||||||
import org.elasticsearch.index.IndexModule;
|
import org.elasticsearch.index.IndexModule;
|
||||||
|
@ -216,6 +217,7 @@ public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin {
|
||||||
|
|
||||||
private static final Logger logger = Loggers.getLogger(Watcher.class);
|
private static final Logger logger = Loggers.getLogger(Watcher.class);
|
||||||
private WatcherIndexingListener listener;
|
private WatcherIndexingListener listener;
|
||||||
|
private HttpClient httpClient;
|
||||||
|
|
||||||
protected final Settings settings;
|
protected final Settings settings;
|
||||||
protected final boolean transportClient;
|
protected final boolean transportClient;
|
||||||
|
@ -266,7 +268,7 @@ public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin {
|
||||||
// TODO: add more auth types, or remove this indirection
|
// TODO: add more auth types, or remove this indirection
|
||||||
HttpAuthRegistry httpAuthRegistry = new HttpAuthRegistry(httpAuthFactories);
|
HttpAuthRegistry httpAuthRegistry = new HttpAuthRegistry(httpAuthFactories);
|
||||||
HttpRequestTemplate.Parser httpTemplateParser = new HttpRequestTemplate.Parser(httpAuthRegistry);
|
HttpRequestTemplate.Parser httpTemplateParser = new HttpRequestTemplate.Parser(httpAuthRegistry);
|
||||||
final HttpClient httpClient = new HttpClient(settings, httpAuthRegistry, getSslService());
|
httpClient = new HttpClient(settings, httpAuthRegistry, getSslService());
|
||||||
|
|
||||||
// notification
|
// notification
|
||||||
EmailService emailService = new EmailService(settings, cryptoService, clusterService.getClusterSettings());
|
EmailService emailService = new EmailService(settings, cryptoService, clusterService.getClusterSettings());
|
||||||
|
@ -608,4 +610,9 @@ public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin {
|
||||||
public List<ScriptContext> getContexts() {
|
public List<ScriptContext> getContexts() {
|
||||||
return Arrays.asList(Watcher.SCRIPT_SEARCH_CONTEXT, Watcher.SCRIPT_EXECUTABLE_CONTEXT, Watcher.SCRIPT_TEMPLATE_CONTEXT);
|
return Arrays.asList(Watcher.SCRIPT_SEARCH_CONTEXT, Watcher.SCRIPT_EXECUTABLE_CONTEXT, Watcher.SCRIPT_TEMPLATE_CONTEXT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
IOUtils.closeWhileHandlingException(httpClient);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,6 +46,7 @@ import org.elasticsearch.xpack.watcher.common.http.auth.HttpAuthRegistry;
|
||||||
|
|
||||||
import javax.net.ssl.HostnameVerifier;
|
import javax.net.ssl.HostnameVerifier;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
@ -56,9 +57,13 @@ import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
public class HttpClient extends AbstractComponent {
|
public class HttpClient extends AbstractComponent implements Closeable {
|
||||||
|
|
||||||
private static final String SETTINGS_SSL_PREFIX = "xpack.http.ssl.";
|
private static final String SETTINGS_SSL_PREFIX = "xpack.http.ssl.";
|
||||||
|
// picking a reasonable high value here to allow for setups with lots of watch executions or many http inputs/actions
|
||||||
|
// this is also used as the value per route, if you are connecting to the same endpoint a lot, which is likely, when
|
||||||
|
// you are querying a remote Elasticsearch cluster
|
||||||
|
private static final int MAX_CONNECTIONS = 500;
|
||||||
|
|
||||||
private final HttpAuthRegistry httpAuthRegistry;
|
private final HttpAuthRegistry httpAuthRegistry;
|
||||||
private final CloseableHttpClient client;
|
private final CloseableHttpClient client;
|
||||||
|
@ -84,6 +89,10 @@ public class HttpClient extends AbstractComponent {
|
||||||
SSLConnectionSocketFactory factory = new SSLConnectionSocketFactory(sslService.sslSocketFactory(sslSettings), verifier);
|
SSLConnectionSocketFactory factory = new SSLConnectionSocketFactory(sslService.sslSocketFactory(sslSettings), verifier);
|
||||||
clientBuilder.setSSLSocketFactory(factory);
|
clientBuilder.setSSLSocketFactory(factory);
|
||||||
|
|
||||||
|
clientBuilder.evictExpiredConnections();
|
||||||
|
clientBuilder.setMaxConnPerRoute(MAX_CONNECTIONS);
|
||||||
|
clientBuilder.setMaxConnTotal(MAX_CONNECTIONS);
|
||||||
|
|
||||||
client = clientBuilder.build();
|
client = clientBuilder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -251,6 +260,11 @@ public class HttpClient extends AbstractComponent {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
client.close();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper class to have all HTTP methods except HEAD allow for an body, including GET
|
* Helper class to have all HTTP methods except HEAD allow for an body, including GET
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -44,7 +44,6 @@ import org.joda.time.DateTime;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
||||||
import javax.mail.internet.AddressException;
|
import javax.mail.internet.AddressException;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
@ -219,10 +218,9 @@ public class WebhookActionTests extends ESTestCase {
|
||||||
|
|
||||||
public void testThatSelectingProxyWorks() throws Exception {
|
public void testThatSelectingProxyWorks() throws Exception {
|
||||||
Environment environment = TestEnvironment.newEnvironment(Settings.builder().put("path.home", createTempDir()).build());
|
Environment environment = TestEnvironment.newEnvironment(Settings.builder().put("path.home", createTempDir()).build());
|
||||||
HttpClient httpClient = new HttpClient(Settings.EMPTY, authRegistry,
|
|
||||||
new SSLService(environment.settings(), environment));
|
|
||||||
|
|
||||||
try (MockWebServer proxyServer = new MockWebServer()) {
|
try (HttpClient httpClient = new HttpClient(Settings.EMPTY, authRegistry,
|
||||||
|
new SSLService(environment.settings(), environment)); MockWebServer proxyServer = new MockWebServer()) {
|
||||||
proxyServer.start();
|
proxyServer.start();
|
||||||
proxyServer.enqueue(new MockResponse().setResponseCode(200).setBody("fullProxiedContent"));
|
proxyServer.enqueue(new MockResponse().setResponseCode(200).setBody("fullProxiedContent"));
|
||||||
|
|
||||||
|
|
|
@ -77,6 +77,7 @@ public class HttpClientTests extends ESTestCase {
|
||||||
@After
|
@After
|
||||||
public void shutdown() throws Exception {
|
public void shutdown() throws Exception {
|
||||||
webServer.close();
|
webServer.close();
|
||||||
|
httpClient.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testBasics() throws Exception {
|
public void testBasics() throws Exception {
|
||||||
|
@ -184,7 +185,7 @@ public class HttpClientTests extends ESTestCase {
|
||||||
.setSecureSettings(secureSettings)
|
.setSecureSettings(secureSettings)
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
httpClient = new HttpClient(settings, authRegistry, new SSLService(settings, environment));
|
try (HttpClient client = new HttpClient(settings, authRegistry, new SSLService(settings, environment))) {
|
||||||
secureSettings = new MockSecureSettings();
|
secureSettings = new MockSecureSettings();
|
||||||
// We can't use the client created above for the server since it is only a truststore
|
// We can't use the client created above for the server since it is only a truststore
|
||||||
secureSettings.setString("xpack.ssl.keystore.secure_password", "testnode");
|
secureSettings.setString("xpack.ssl.keystore.secure_password", "testnode");
|
||||||
|
@ -194,7 +195,8 @@ public class HttpClientTests extends ESTestCase {
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
TestsSSLService sslService = new TestsSSLService(settings2, environment);
|
TestsSSLService sslService = new TestsSSLService(settings2, environment);
|
||||||
testSslMockWebserver(sslService.sslContext(), false);
|
testSslMockWebserver(client, sslService.sslContext(), false);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testHttpsDisableHostnameVerification() throws Exception {
|
public void testHttpsDisableHostnameVerification() throws Exception {
|
||||||
|
@ -217,7 +219,7 @@ public class HttpClientTests extends ESTestCase {
|
||||||
.setSecureSettings(secureSettings)
|
.setSecureSettings(secureSettings)
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
httpClient = new HttpClient(settings, authRegistry, new SSLService(settings, environment));
|
try (HttpClient client = new HttpClient(settings, authRegistry, new SSLService(settings, environment))) {
|
||||||
MockSecureSettings secureSettings = new MockSecureSettings();
|
MockSecureSettings secureSettings = new MockSecureSettings();
|
||||||
// We can't use the client created above for the server since it only defines a truststore
|
// We can't use the client created above for the server since it only defines a truststore
|
||||||
secureSettings.setString("xpack.ssl.keystore.secure_password", "testnode-no-subjaltname");
|
secureSettings.setString("xpack.ssl.keystore.secure_password", "testnode-no-subjaltname");
|
||||||
|
@ -228,7 +230,8 @@ public class HttpClientTests extends ESTestCase {
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
TestsSSLService sslService = new TestsSSLService(settings2, environment);
|
TestsSSLService sslService = new TestsSSLService(settings2, environment);
|
||||||
testSslMockWebserver(sslService.sslContext(), false);
|
testSslMockWebserver(client, sslService.sslContext(), false);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testHttpsClientAuth() throws Exception {
|
public void testHttpsClientAuth() throws Exception {
|
||||||
|
@ -241,11 +244,12 @@ public class HttpClientTests extends ESTestCase {
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
TestsSSLService sslService = new TestsSSLService(settings, environment);
|
TestsSSLService sslService = new TestsSSLService(settings, environment);
|
||||||
httpClient = new HttpClient(settings, authRegistry, sslService);
|
try (HttpClient client = new HttpClient(settings, authRegistry, sslService)) {
|
||||||
testSslMockWebserver(sslService.sslContext(), true);
|
testSslMockWebserver(client, sslService.sslContext(), true);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testSslMockWebserver(SSLContext sslContext, boolean needClientAuth) throws IOException {
|
private void testSslMockWebserver(HttpClient client, SSLContext sslContext, boolean needClientAuth) throws IOException {
|
||||||
try (MockWebServer mockWebServer = new MockWebServer(sslContext, needClientAuth)) {
|
try (MockWebServer mockWebServer = new MockWebServer(sslContext, needClientAuth)) {
|
||||||
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody("body"));
|
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody("body"));
|
||||||
mockWebServer.start();
|
mockWebServer.start();
|
||||||
|
@ -253,7 +257,7 @@ public class HttpClientTests extends ESTestCase {
|
||||||
HttpRequest.Builder request = HttpRequest.builder("localhost", mockWebServer.getPort())
|
HttpRequest.Builder request = HttpRequest.builder("localhost", mockWebServer.getPort())
|
||||||
.scheme(Scheme.HTTPS)
|
.scheme(Scheme.HTTPS)
|
||||||
.path("/test");
|
.path("/test");
|
||||||
HttpResponse response = httpClient.execute(request.build());
|
HttpResponse response = client.execute(request.build());
|
||||||
assertThat(response.status(), equalTo(200));
|
assertThat(response.status(), equalTo(200));
|
||||||
assertThat(response.body().utf8ToString(), equalTo("body"));
|
assertThat(response.body().utf8ToString(), equalTo("body"));
|
||||||
|
|
||||||
|
@ -288,15 +292,15 @@ public class HttpClientTests extends ESTestCase {
|
||||||
|
|
||||||
@Network
|
@Network
|
||||||
public void testHttpsWithoutTruststore() throws Exception {
|
public void testHttpsWithoutTruststore() throws Exception {
|
||||||
HttpClient httpClient = new HttpClient(Settings.EMPTY, authRegistry, new SSLService(Settings.EMPTY, environment));
|
try (HttpClient client = new HttpClient(Settings.EMPTY, authRegistry, new SSLService(Settings.EMPTY, environment))) {
|
||||||
|
|
||||||
// Known server with a valid cert from a commercial CA
|
// Known server with a valid cert from a commercial CA
|
||||||
HttpRequest.Builder request = HttpRequest.builder("www.elastic.co", 443).scheme(Scheme.HTTPS);
|
HttpRequest.Builder request = HttpRequest.builder("www.elastic.co", 443).scheme(Scheme.HTTPS);
|
||||||
HttpResponse response = httpClient.execute(request.build());
|
HttpResponse response = client.execute(request.build());
|
||||||
assertThat(response.status(), equalTo(200));
|
assertThat(response.status(), equalTo(200));
|
||||||
assertThat(response.hasContent(), is(true));
|
assertThat(response.hasContent(), is(true));
|
||||||
assertThat(response.body(), notNullValue());
|
assertThat(response.body(), notNullValue());
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void testThatProxyCanBeConfigured() throws Exception {
|
public void testThatProxyCanBeConfigured() throws Exception {
|
||||||
// this test fakes a proxy server that sends a response instead of forwarding it to the mock web server
|
// this test fakes a proxy server that sends a response instead of forwarding it to the mock web server
|
||||||
|
@ -307,15 +311,16 @@ public class HttpClientTests extends ESTestCase {
|
||||||
.put(HttpSettings.PROXY_HOST.getKey(), "localhost")
|
.put(HttpSettings.PROXY_HOST.getKey(), "localhost")
|
||||||
.put(HttpSettings.PROXY_PORT.getKey(), proxyServer.getPort())
|
.put(HttpSettings.PROXY_PORT.getKey(), proxyServer.getPort())
|
||||||
.build();
|
.build();
|
||||||
HttpClient httpClient = new HttpClient(settings, authRegistry, new SSLService(settings, environment));
|
|
||||||
|
|
||||||
HttpRequest.Builder requestBuilder = HttpRequest.builder("localhost", webServer.getPort())
|
HttpRequest.Builder requestBuilder = HttpRequest.builder("localhost", webServer.getPort())
|
||||||
.method(HttpMethod.GET)
|
.method(HttpMethod.GET)
|
||||||
.path("/");
|
.path("/");
|
||||||
|
|
||||||
HttpResponse response = httpClient.execute(requestBuilder.build());
|
try (HttpClient client = new HttpClient(settings, authRegistry, new SSLService(settings, environment))) {
|
||||||
|
HttpResponse response = client.execute(requestBuilder.build());
|
||||||
assertThat(response.status(), equalTo(200));
|
assertThat(response.status(), equalTo(200));
|
||||||
assertThat(response.body().utf8ToString(), equalTo("fullProxiedContent"));
|
assertThat(response.body().utf8ToString(), equalTo("fullProxiedContent"));
|
||||||
|
}
|
||||||
|
|
||||||
// ensure we hit the proxyServer and not the webserver
|
// ensure we hit the proxyServer and not the webserver
|
||||||
assertThat(webServer.requests(), hasSize(0));
|
assertThat(webServer.requests(), hasSize(0));
|
||||||
|
@ -386,16 +391,16 @@ public class HttpClientTests extends ESTestCase {
|
||||||
.setSecureSettings(secureSettings)
|
.setSecureSettings(secureSettings)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
HttpClient httpClient = new HttpClient(settings, authRegistry, new SSLService(settings, environment));
|
|
||||||
|
|
||||||
HttpRequest.Builder requestBuilder = HttpRequest.builder("localhost", webServer.getPort())
|
HttpRequest.Builder requestBuilder = HttpRequest.builder("localhost", webServer.getPort())
|
||||||
.method(HttpMethod.GET)
|
.method(HttpMethod.GET)
|
||||||
.scheme(Scheme.HTTP)
|
.scheme(Scheme.HTTP)
|
||||||
.path("/");
|
.path("/");
|
||||||
|
|
||||||
HttpResponse response = httpClient.execute(requestBuilder.build());
|
try (HttpClient client = new HttpClient(settings, authRegistry, new SSLService(settings, environment))) {
|
||||||
|
HttpResponse response = client.execute(requestBuilder.build());
|
||||||
assertThat(response.status(), equalTo(200));
|
assertThat(response.status(), equalTo(200));
|
||||||
assertThat(response.body().utf8ToString(), equalTo("fullProxiedContent"));
|
assertThat(response.body().utf8ToString(), equalTo("fullProxiedContent"));
|
||||||
|
}
|
||||||
|
|
||||||
// ensure we hit the proxyServer and not the webserver
|
// ensure we hit the proxyServer and not the webserver
|
||||||
assertThat(webServer.requests(), hasSize(0));
|
assertThat(webServer.requests(), hasSize(0));
|
||||||
|
@ -413,16 +418,17 @@ public class HttpClientTests extends ESTestCase {
|
||||||
.put(HttpSettings.PROXY_PORT.getKey(), proxyServer.getPort() + 1)
|
.put(HttpSettings.PROXY_PORT.getKey(), proxyServer.getPort() + 1)
|
||||||
.put(HttpSettings.PROXY_HOST.getKey(), "https")
|
.put(HttpSettings.PROXY_HOST.getKey(), "https")
|
||||||
.build();
|
.build();
|
||||||
HttpClient httpClient = new HttpClient(settings, authRegistry, new SSLService(settings, environment));
|
|
||||||
|
|
||||||
HttpRequest.Builder requestBuilder = HttpRequest.builder("localhost", webServer.getPort())
|
HttpRequest.Builder requestBuilder = HttpRequest.builder("localhost", webServer.getPort())
|
||||||
.method(HttpMethod.GET)
|
.method(HttpMethod.GET)
|
||||||
.proxy(new HttpProxy("localhost", proxyServer.getPort(), Scheme.HTTP))
|
.proxy(new HttpProxy("localhost", proxyServer.getPort(), Scheme.HTTP))
|
||||||
.path("/");
|
.path("/");
|
||||||
|
|
||||||
HttpResponse response = httpClient.execute(requestBuilder.build());
|
try (HttpClient client = new HttpClient(settings, authRegistry, new SSLService(settings, environment))) {
|
||||||
|
HttpResponse response = client.execute(requestBuilder.build());
|
||||||
assertThat(response.status(), equalTo(200));
|
assertThat(response.status(), equalTo(200));
|
||||||
assertThat(response.body().utf8ToString(), equalTo("fullProxiedContent"));
|
assertThat(response.body().utf8ToString(), equalTo("fullProxiedContent"));
|
||||||
|
}
|
||||||
|
|
||||||
// ensure we hit the proxyServer and not the webserver
|
// ensure we hit the proxyServer and not the webserver
|
||||||
assertThat(webServer.requests(), hasSize(0));
|
assertThat(webServer.requests(), hasSize(0));
|
||||||
|
@ -535,13 +541,14 @@ public class HttpClientTests extends ESTestCase {
|
||||||
Settings settings = Settings.builder()
|
Settings settings = Settings.builder()
|
||||||
.put(HttpSettings.MAX_HTTP_RESPONSE_SIZE.getKey(), new ByteSizeValue(randomBytesLength - 1, ByteSizeUnit.BYTES))
|
.put(HttpSettings.MAX_HTTP_RESPONSE_SIZE.getKey(), new ByteSizeValue(randomBytesLength - 1, ByteSizeUnit.BYTES))
|
||||||
.build();
|
.build();
|
||||||
HttpClient httpClient = new HttpClient(settings, authRegistry, new SSLService(environment.settings(), environment));
|
|
||||||
|
|
||||||
HttpRequest.Builder requestBuilder = HttpRequest.builder("localhost", webServer.getPort()).method(HttpMethod.GET).path("/");
|
HttpRequest.Builder requestBuilder = HttpRequest.builder("localhost", webServer.getPort()).method(HttpMethod.GET).path("/");
|
||||||
|
|
||||||
IOException e = expectThrows(IOException.class, () -> httpClient.execute(requestBuilder.build()));
|
try (HttpClient client = new HttpClient(settings, authRegistry, new SSLService(environment.settings(), environment))) {
|
||||||
|
IOException e = expectThrows(IOException.class, () -> client.execute(requestBuilder.build()));
|
||||||
assertThat(e.getMessage(), startsWith("Maximum limit of"));
|
assertThat(e.getMessage(), startsWith("Maximum limit of"));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void testThatGetRedirectIsFollowed() throws Exception {
|
public void testThatGetRedirectIsFollowed() throws Exception {
|
||||||
String redirectUrl = "http://" + webServer.getHostName() + ":" + webServer.getPort() + "/foo";
|
String redirectUrl = "http://" + webServer.getHostName() + ":" + webServer.getPort() + "/foo";
|
||||||
|
|
|
@ -40,15 +40,15 @@ public class HttpReadTimeoutTests extends ESTestCase {
|
||||||
|
|
||||||
public void testDefaultTimeout() throws Exception {
|
public void testDefaultTimeout() throws Exception {
|
||||||
Environment environment = TestEnvironment.newEnvironment(Settings.builder().put("path.home", createTempDir()).build());
|
Environment environment = TestEnvironment.newEnvironment(Settings.builder().put("path.home", createTempDir()).build());
|
||||||
HttpClient httpClient = new HttpClient(Settings.EMPTY, mock(HttpAuthRegistry.class),
|
|
||||||
new SSLService(environment.settings(), environment));
|
|
||||||
|
|
||||||
HttpRequest request = HttpRequest.builder("localhost", webServer.getPort())
|
HttpRequest request = HttpRequest.builder("localhost", webServer.getPort())
|
||||||
.method(HttpMethod.POST)
|
.method(HttpMethod.POST)
|
||||||
.path("/")
|
.path("/")
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
try (HttpClient httpClient = new HttpClient(Settings.EMPTY, mock(HttpAuthRegistry.class),
|
||||||
|
new SSLService(environment.settings(), environment))) {
|
||||||
long start = System.nanoTime();
|
long start = System.nanoTime();
|
||||||
|
|
||||||
expectThrows(SocketTimeoutException.class, () -> httpClient.execute(request));
|
expectThrows(SocketTimeoutException.class, () -> httpClient.execute(request));
|
||||||
TimeValue timeout = TimeValue.timeValueNanos(System.nanoTime() - start);
|
TimeValue timeout = TimeValue.timeValueNanos(System.nanoTime() - start);
|
||||||
logger.info("http connection timed out after {}", timeout);
|
logger.info("http connection timed out after {}", timeout);
|
||||||
|
@ -57,19 +57,20 @@ public class HttpReadTimeoutTests extends ESTestCase {
|
||||||
assertThat(timeout.seconds(), greaterThan(8L));
|
assertThat(timeout.seconds(), greaterThan(8L));
|
||||||
assertThat(timeout.seconds(), lessThan(12L));
|
assertThat(timeout.seconds(), lessThan(12L));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void testDefaultTimeoutCustom() throws Exception {
|
public void testDefaultTimeoutCustom() throws Exception {
|
||||||
Environment environment = TestEnvironment.newEnvironment(Settings.builder().put("path.home", createTempDir()).build());
|
Environment environment = TestEnvironment.newEnvironment(Settings.builder().put("path.home", createTempDir()).build());
|
||||||
|
|
||||||
HttpClient httpClient = new HttpClient(Settings.builder()
|
|
||||||
.put("xpack.http.default_read_timeout", "3s").build()
|
|
||||||
, mock(HttpAuthRegistry.class), new SSLService(environment.settings(), environment));
|
|
||||||
|
|
||||||
HttpRequest request = HttpRequest.builder("localhost", webServer.getPort())
|
HttpRequest request = HttpRequest.builder("localhost", webServer.getPort())
|
||||||
.method(HttpMethod.POST)
|
.method(HttpMethod.POST)
|
||||||
.path("/")
|
.path("/")
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
try (HttpClient httpClient = new HttpClient(Settings.builder()
|
||||||
|
.put("xpack.http.default_read_timeout", "3s").build()
|
||||||
|
, mock(HttpAuthRegistry.class), new SSLService(environment.settings(), environment))) {
|
||||||
|
|
||||||
long start = System.nanoTime();
|
long start = System.nanoTime();
|
||||||
expectThrows(SocketTimeoutException.class, () -> httpClient.execute(request));
|
expectThrows(SocketTimeoutException.class, () -> httpClient.execute(request));
|
||||||
TimeValue timeout = TimeValue.timeValueNanos(System.nanoTime() - start);
|
TimeValue timeout = TimeValue.timeValueNanos(System.nanoTime() - start);
|
||||||
|
@ -79,20 +80,21 @@ public class HttpReadTimeoutTests extends ESTestCase {
|
||||||
assertThat(timeout.seconds(), greaterThan(1L));
|
assertThat(timeout.seconds(), greaterThan(1L));
|
||||||
assertThat(timeout.seconds(), lessThan(5L));
|
assertThat(timeout.seconds(), lessThan(5L));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void testTimeoutCustomPerRequest() throws Exception {
|
public void testTimeoutCustomPerRequest() throws Exception {
|
||||||
Environment environment = TestEnvironment.newEnvironment(Settings.builder().put("path.home", createTempDir()).build());
|
Environment environment = TestEnvironment.newEnvironment(Settings.builder().put("path.home", createTempDir()).build());
|
||||||
|
|
||||||
HttpClient httpClient = new HttpClient(Settings.builder()
|
|
||||||
.put("xpack.http.default_read_timeout", "10s").build()
|
|
||||||
, mock(HttpAuthRegistry.class), new SSLService(environment.settings(), environment));
|
|
||||||
|
|
||||||
HttpRequest request = HttpRequest.builder("localhost", webServer.getPort())
|
HttpRequest request = HttpRequest.builder("localhost", webServer.getPort())
|
||||||
.readTimeout(TimeValue.timeValueSeconds(3))
|
.readTimeout(TimeValue.timeValueSeconds(3))
|
||||||
.method(HttpMethod.POST)
|
.method(HttpMethod.POST)
|
||||||
.path("/")
|
.path("/")
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
try (HttpClient httpClient = new HttpClient(Settings.builder()
|
||||||
|
.put("xpack.http.default_read_timeout", "10s").build()
|
||||||
|
, mock(HttpAuthRegistry.class), new SSLService(environment.settings(), environment))) {
|
||||||
|
|
||||||
long start = System.nanoTime();
|
long start = System.nanoTime();
|
||||||
expectThrows(SocketTimeoutException.class, () -> httpClient.execute(request));
|
expectThrows(SocketTimeoutException.class, () -> httpClient.execute(request));
|
||||||
TimeValue timeout = TimeValue.timeValueNanos(System.nanoTime() - start);
|
TimeValue timeout = TimeValue.timeValueNanos(System.nanoTime() - start);
|
||||||
|
@ -103,3 +105,4 @@ public class HttpReadTimeoutTests extends ESTestCase {
|
||||||
assertThat(timeout.seconds(), lessThan(5L));
|
assertThat(timeout.seconds(), lessThan(5L));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue