From 2087234d7433ca752f2b0cc6e104f0936490a0fd Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Mon, 5 Dec 2016 10:54:51 -0500 Subject: [PATCH] Timeout improvements for rest client and reindex (#21741) Changes the default socket and connection timeouts for the rest client from 10 seconds to the more generous 30 seconds. Defaults reindex-from-remote to those timeouts and make the timeouts configurable like so: ``` POST _reindex { "source": { "remote": { "host": "http://otherhost:9200", "socket_timeout": "1m", "connect_timeout": "10s" }, "index": "source", "query": { "match": { "test": "data" } } }, "dest": { "index": "dest" } } ``` Closes #21707 --- .../client/RestClientBuilder.java | 5 +- docs/java-rest/configuration.asciidoc | 10 ++-- docs/reference/docs/reindex.asciidoc | 42 ++++++++++++--- .../index/reindex/BulkByScrollTask.java | 1 - .../index/reindex/RestReindexAction.java | 9 +++- .../index/reindex/TransportReindexAction.java | 5 ++ .../index/reindex/remote/RemoteInfo.java | 49 ++++++++++++++++- ...ReindexFromRemoteBuildRestClientTests.java | 3 +- .../ReindexFromRemoteWhitelistTests.java | 30 ++++++----- .../ReindexFromRemoteWithAuthTests.java | 27 +++++----- .../index/reindex/ReindexRequestTests.java | 15 ++++-- .../ReindexSourceTargetValidationTests.java | 9 ++-- .../index/reindex/RestReindexActionTests.java | 12 ++++- .../index/reindex/RetryTests.java | 2 +- .../index/reindex/RoundTripTests.java | 16 ++++-- .../index/reindex/remote/RemoteInfoTests.java | 19 ++++--- .../test/reindex/20_validation.yaml | 28 ++++++++++ .../rest-api-spec/test/reindex/90_remote.yaml | 52 +++++++++++++++++++ .../test/rest/ESRestTestCase.java | 6 +-- 19 files changed, 268 insertions(+), 72 deletions(-) diff --git a/client/rest/src/main/java/org/elasticsearch/client/RestClientBuilder.java b/client/rest/src/main/java/org/elasticsearch/client/RestClientBuilder.java index d342d59ade4..d881bd70a44 100644 --- a/client/rest/src/main/java/org/elasticsearch/client/RestClientBuilder.java +++ b/client/rest/src/main/java/org/elasticsearch/client/RestClientBuilder.java @@ -37,7 +37,7 @@ import java.util.Objects; */ public final class RestClientBuilder { public static final int DEFAULT_CONNECT_TIMEOUT_MILLIS = 1000; - public static final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 10000; + public static final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 30000; public static final int DEFAULT_MAX_RETRY_TIMEOUT_MILLIS = DEFAULT_SOCKET_TIMEOUT_MILLIS; public static final int DEFAULT_CONNECTION_REQUEST_TIMEOUT_MILLIS = 500; public static final int DEFAULT_MAX_CONN_PER_ROUTE = 10; @@ -185,7 +185,8 @@ public final class RestClientBuilder { private CloseableHttpAsyncClient createHttpClient() { //default timeouts are all infinite - RequestConfig.Builder requestConfigBuilder = RequestConfig.custom().setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_MILLIS) + RequestConfig.Builder requestConfigBuilder = RequestConfig.custom() + .setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_MILLIS) .setSocketTimeout(DEFAULT_SOCKET_TIMEOUT_MILLIS) .setConnectionRequestTimeout(DEFAULT_CONNECTION_REQUEST_TIMEOUT_MILLIS); if (requestConfigCallback != null) { diff --git a/docs/java-rest/configuration.asciidoc b/docs/java-rest/configuration.asciidoc index b3546c0f75b..ad5a57dd94e 100644 --- a/docs/java-rest/configuration.asciidoc +++ b/docs/java-rest/configuration.asciidoc @@ -16,8 +16,8 @@ The interface has one method that receives an instance of https://hc.apache.org/httpcomponents-client-ga/httpclient/apidocs/org/apache/http/client/config/RequestConfig.Builder.html[`org.apache.http.client.config.RequestConfig.Builder`] as an argument and has the same return type. The request config builder can be modified and then returned. In the following example we increase the -connect timeout (defaults to 1 second) and the socket timeout (defaults to 10 -seconds). Also we adjust the max retry timeout accordingly (defaults to 10 +connect timeout (defaults to 1 second) and the socket timeout (defaults to 30 +seconds). Also we adjust the max retry timeout accordingly (defaults to 30 seconds too). [source,java] @@ -27,10 +27,10 @@ RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200)) @Override public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) { return requestConfigBuilder.setConnectTimeout(5000) - .setSocketTimeout(30000); + .setSocketTimeout(60000); } }) - .setMaxRetryTimeoutMillis(30000) + .setMaxRetryTimeoutMillis(60000) .build(); -------------------------------------------------- @@ -110,4 +110,4 @@ RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200)) === Others For any other required configuration needed, the Apache HttpAsyncClient docs -should be consulted: https://hc.apache.org/httpcomponents-asyncclient-4.1.x/ . \ No newline at end of file +should be consulted: https://hc.apache.org/httpcomponents-asyncclient-4.1.x/ . diff --git a/docs/reference/docs/reindex.asciidoc b/docs/reference/docs/reindex.asciidoc index dff3fa066da..a1f868358eb 100644 --- a/docs/reference/docs/reindex.asciidoc +++ b/docs/reference/docs/reindex.asciidoc @@ -444,7 +444,7 @@ To enable queries sent to older versions of Elasticsearch the `query` parameter is sent directly to the remote host without validation or modification. Reindexing from a remote server uses an on-heap buffer that defaults to a -maximum size of 200mb. If the remote index includes very large documents you'll +maximum size of 100mb. If the remote index includes very large documents you'll need to use a smaller batch size. The example below sets the batch size `10` which is very, very small. @@ -454,9 +454,7 @@ POST _reindex { "source": { "remote": { - "host": "http://otherhost:9200", - "username": "user", - "password": "pass" + "host": "http://otherhost:9200" }, "index": "source", "size": 10, @@ -474,10 +472,40 @@ POST _reindex // CONSOLE // TEST[setup:host] // TEST[s/^/PUT source\n/] -// TEST[s/otherhost:9200",/\${host}"/] -// TEST[s/"username": "user",//] -// TEST[s/"password": "pass"//] +// TEST[s/otherhost:9200/\${host}/] +It is also possible to set the socket read timeout on the remote connection +with the `socket_timeout` field and the connection timeout with the +`connect_timeout` field. Both default to thirty seconds. This example +sets the socket read timeout to one minute and the connection timeout to ten +seconds: + +[source,js] +-------------------------------------------------- +POST _reindex +{ + "source": { + "remote": { + "host": "http://otherhost:9200", + "socket_timeout": "1m", + "connect_timeout": "10s" + }, + "index": "source", + "query": { + "match": { + "test": "data" + } + } + }, + "dest": { + "index": "dest" + } +} +-------------------------------------------------- +// CONSOLE +// TEST[setup:host] +// TEST[s/^/PUT source\n/] +// TEST[s/otherhost:9200/\${host}/] [float] === URL Parameters diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java index 2662f474bfd..7de4f19339f 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java @@ -45,7 +45,6 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueNanos; * Task storing information about a currently running BulkByScroll request. */ public abstract class BulkByScrollTask extends CancellableTask { - public BulkByScrollTask(long id, String type, String action, String description, TaskId parentTaskId) { super(id, type, action, description, parentTaskId); } diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java index 54746bc74b8..d5f0fe77910 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java @@ -145,11 +145,13 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler headers = extractStringStringMap(remote, "headers"); + TimeValue socketTimeout = extractTimeValue(remote, "socket_timeout", RemoteInfo.DEFAULT_SOCKET_TIMEOUT); + TimeValue connectTimeout = extractTimeValue(remote, "connect_timeout", RemoteInfo.DEFAULT_CONNECT_TIMEOUT); if (false == remote.isEmpty()) { throw new IllegalArgumentException( "Unsupported fields in [remote]: [" + Strings.collectionToCommaDelimitedString(remote.keySet()) + "]"); } - return new RemoteInfo(scheme, host, port, queryForRemote(source), username, password, headers); + return new RemoteInfo(scheme, host, port, queryForRemote(source), username, password, headers, socketTimeout, connectTimeout); } /** @@ -202,6 +204,11 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler source, String name, TimeValue defaultValue) { + String string = extractString(source, name); + return string == null ? defaultValue : parseTimeValue(string, name); + } + private static BytesReference queryForRemote(Map source) throws IOException { XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint(); Object query = source.remove("query"); diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java index 7ae0d715ed0..183b396b6de 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java @@ -196,6 +196,11 @@ public class TransportReindexAction extends HandledTransportAction { + c.setConnectTimeout(Math.toIntExact(remoteInfo.getConnectTimeout().millis())); + c.setSocketTimeout(Math.toIntExact(remoteInfo.getSocketTimeout().millis())); + return c; + }) .setHttpClientConfigCallback(c -> { // Enable basic auth if it is configured if (remoteInfo.getUsername() != null) { diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteInfo.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteInfo.java index 1405d656d99..5fad275cde4 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteInfo.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteInfo.java @@ -19,11 +19,13 @@ package org.elasticsearch.index.reindex.remote; +import org.elasticsearch.Version; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.TimeValue; import java.io.IOException; import java.util.HashMap; @@ -31,8 +33,18 @@ import java.util.Map; import static java.util.Collections.unmodifiableMap; import static java.util.Objects.requireNonNull; +import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; public class RemoteInfo implements Writeable { + /** + * Default {@link #socketTimeout} for requests that don't have one set. + */ + public static final TimeValue DEFAULT_SOCKET_TIMEOUT = timeValueSeconds(30); + /** + * Default {@link #connectTimeout} for requests that don't have one set. + */ + public static final TimeValue DEFAULT_CONNECT_TIMEOUT = timeValueSeconds(30); + private final String scheme; private final String host; private final int port; @@ -40,9 +52,17 @@ public class RemoteInfo implements Writeable { private final String username; private final String password; private final Map headers; + /** + * Time to wait for a response from each request. + */ + private final TimeValue socketTimeout; + /** + * Time to wait for a connecting to the remote cluster. + */ + private final TimeValue connectTimeout; public RemoteInfo(String scheme, String host, int port, BytesReference query, String username, String password, - Map headers) { + Map headers, TimeValue socketTimeout, TimeValue connectTimeout) { this.scheme = requireNonNull(scheme, "[scheme] must be specified to reindex from a remote cluster"); this.host = requireNonNull(host, "[host] must be specified to reindex from a remote cluster"); this.port = port; @@ -50,6 +70,8 @@ public class RemoteInfo implements Writeable { this.username = username; this.password = password; this.headers = unmodifiableMap(requireNonNull(headers, "[headers] is required")); + this.socketTimeout = requireNonNull(socketTimeout, "[socketTimeout] must be specified"); + this.connectTimeout = requireNonNull(connectTimeout, "[connectTimeout] must be specified"); } /** @@ -68,6 +90,13 @@ public class RemoteInfo implements Writeable { headers.put(in.readString(), in.readString()); } this.headers = unmodifiableMap(headers); + if (in.getVersion().onOrAfter(Version.V_5_2_0_UNRELEASED)) { + socketTimeout = new TimeValue(in); + connectTimeout = new TimeValue(in); + } else { + socketTimeout = DEFAULT_SOCKET_TIMEOUT; + connectTimeout = DEFAULT_CONNECT_TIMEOUT; + } } @Override @@ -83,6 +112,10 @@ public class RemoteInfo implements Writeable { out.writeString(header.getKey()); out.writeString(header.getValue()); } + if (out.getVersion().onOrAfter(Version.V_5_2_0_UNRELEASED)) { + socketTimeout.writeTo(out); + connectTimeout.writeTo(out); + } } public String getScheme() { @@ -115,6 +148,20 @@ public class RemoteInfo implements Writeable { return headers; } + /** + * Time to wait for a response from each request. + */ + public TimeValue getSocketTimeout() { + return socketTimeout; + } + + /** + * Time to wait to connect to the external cluster. + */ + public TimeValue getConnectTimeout() { + return connectTimeout; + } + @Override public String toString() { StringBuilder b = new StringBuilder(); diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFromRemoteBuildRestClientTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFromRemoteBuildRestClientTests.java index 53c24851d5e..7f8a87c2dad 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFromRemoteBuildRestClientTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFromRemoteBuildRestClientTests.java @@ -33,7 +33,8 @@ import static org.hamcrest.Matchers.hasSize; public class ReindexFromRemoteBuildRestClientTests extends ESTestCase { public void testBuildRestClient() throws Exception { - RemoteInfo remoteInfo = new RemoteInfo("https", "localhost", 9200, new BytesArray("ignored"), null, null, emptyMap()); + RemoteInfo remoteInfo = new RemoteInfo("https", "localhost", 9200, new BytesArray("ignored"), null, null, emptyMap(), + RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT); long taskId = randomLong(); List threads = synchronizedList(new ArrayList<>()); RestClient client = TransportReindexAction.buildRestClient(remoteInfo, taskId, threads); diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFromRemoteWhitelistTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFromRemoteWhitelistTests.java index 239ce43fdc5..1a22fe53ef4 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFromRemoteWhitelistTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFromRemoteWhitelistTests.java @@ -46,47 +46,49 @@ public class ReindexFromRemoteWhitelistTests extends ESTestCase { checkRemoteWhitelist(buildRemoteWhitelist(randomWhitelist()), null); } + /** + * Build a {@link RemoteInfo}, defaulting values that we don't care about in this test to values that don't hurt anything. + */ + private RemoteInfo newRemoteInfo(String host, int port) { + return new RemoteInfo(randomAsciiOfLength(5), host, port, new BytesArray("test"), null, null, emptyMap(), + RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT); + } + public void testWhitelistedRemote() { List whitelist = randomWhitelist(); String[] inList = whitelist.iterator().next().split(":"); String host = inList[0]; int port = Integer.valueOf(inList[1]); - checkRemoteWhitelist(buildRemoteWhitelist(whitelist), - new RemoteInfo(randomAsciiOfLength(5), host, port, new BytesArray("test"), null, null, emptyMap())); + checkRemoteWhitelist(buildRemoteWhitelist(whitelist), newRemoteInfo(host, port)); } public void testWhitelistedByPrefix() { checkRemoteWhitelist(buildRemoteWhitelist(singletonList("*.example.com:9200")), - new RemoteInfo(randomAsciiOfLength(5), "es.example.com", 9200, new BytesArray("test"), null, null, emptyMap())); + new RemoteInfo(randomAsciiOfLength(5), "es.example.com", 9200, new BytesArray("test"), null, null, emptyMap(), + RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT)); checkRemoteWhitelist(buildRemoteWhitelist(singletonList("*.example.com:9200")), - new RemoteInfo(randomAsciiOfLength(5), "6e134134a1.us-east-1.aws.example.com", 9200, - new BytesArray("test"), null, null, emptyMap())); + newRemoteInfo("6e134134a1.us-east-1.aws.example.com", 9200)); } public void testWhitelistedBySuffix() { - checkRemoteWhitelist(buildRemoteWhitelist(singletonList("es.example.com:*")), - new RemoteInfo(randomAsciiOfLength(5), "es.example.com", 9200, new BytesArray("test"), null, null, emptyMap())); + checkRemoteWhitelist(buildRemoteWhitelist(singletonList("es.example.com:*")), newRemoteInfo("es.example.com", 9200)); } public void testWhitelistedByInfix() { - checkRemoteWhitelist(buildRemoteWhitelist(singletonList("es*.example.com:9200")), - new RemoteInfo(randomAsciiOfLength(5), "es1.example.com", 9200, new BytesArray("test"), null, null, emptyMap())); + checkRemoteWhitelist(buildRemoteWhitelist(singletonList("es*.example.com:9200")), newRemoteInfo("es1.example.com", 9200)); } - public void testLoopbackInWhitelistRemote() throws UnknownHostException { List whitelist = randomWhitelist(); whitelist.add("127.0.0.1:*"); - checkRemoteWhitelist(buildRemoteWhitelist(whitelist), - new RemoteInfo(randomAsciiOfLength(5), "127.0.0.1", 9200, new BytesArray("test"), null, null, emptyMap())); + checkRemoteWhitelist(buildRemoteWhitelist(whitelist), newRemoteInfo("127.0.0.1", 9200)); } public void testUnwhitelistedRemote() { int port = between(1, Integer.MAX_VALUE); - RemoteInfo remoteInfo = new RemoteInfo(randomAsciiOfLength(5), "not in list", port, new BytesArray("test"), null, null, emptyMap()); List whitelist = randomBoolean() ? randomWhitelist() : emptyList(); Exception e = expectThrows(IllegalArgumentException.class, - () -> checkRemoteWhitelist(buildRemoteWhitelist(whitelist), remoteInfo)); + () -> checkRemoteWhitelist(buildRemoteWhitelist(whitelist), newRemoteInfo("not in list", port))); assertEquals("[not in list:" + port + "] not whitelisted in reindex.remote.whitelist", e.getMessage()); } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFromRemoteWithAuthTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFromRemoteWithAuthTests.java index 27955f71f92..47084d1d661 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFromRemoteWithAuthTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFromRemoteWithAuthTests.java @@ -19,8 +19,8 @@ package org.elasticsearch.index.reindex; -import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ElasticsearchSecurityException; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; @@ -48,6 +48,7 @@ import org.junit.Before; import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Map; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; @@ -88,29 +89,31 @@ public class ReindexFromRemoteWithAuthTests extends ESSingleNodeTestCase { address = nodeInfo.getHttp().getAddress().publishAddress(); } + /** + * Build a {@link RemoteInfo}, defaulting values that we don't care about in this test to values that don't hurt anything. + */ + private RemoteInfo newRemoteInfo(String username, String password, Map headers) { + return new RemoteInfo("http", address.getAddress(), address.getPort(), new BytesArray("{\"match_all\":{}}"), username, password, + headers, RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT); + } + public void testReindexFromRemoteWithAuthentication() throws Exception { - RemoteInfo remote = new RemoteInfo("http", address.getAddress(), address.getPort(), new BytesArray("{\"match_all\":{}}"), "Aladdin", - "open sesame", emptyMap()); ReindexRequestBuilder request = ReindexAction.INSTANCE.newRequestBuilder(client()).source("source").destination("dest") - .setRemoteInfo(remote); + .setRemoteInfo(newRemoteInfo("Aladdin", "open sesame", emptyMap())); assertThat(request.get(), matcher().created(1)); } public void testReindexSendsHeaders() throws Exception { - RemoteInfo remote = new RemoteInfo("http", address.getAddress(), address.getPort(), new BytesArray("{\"match_all\":{}}"), null, - null, singletonMap(TestFilter.EXAMPLE_HEADER, "doesn't matter")); ReindexRequestBuilder request = ReindexAction.INSTANCE.newRequestBuilder(client()).source("source").destination("dest") - .setRemoteInfo(remote); + .setRemoteInfo(newRemoteInfo(null, null, singletonMap(TestFilter.EXAMPLE_HEADER, "doesn't matter"))); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> request.get()); assertEquals(RestStatus.BAD_REQUEST, e.status()); assertThat(e.getMessage(), containsString("Hurray! Sent the header!")); } public void testReindexWithoutAuthenticationWhenRequired() throws Exception { - RemoteInfo remote = new RemoteInfo("http", address.getAddress(), address.getPort(), new BytesArray("{\"match_all\":{}}"), null, - null, emptyMap()); ReindexRequestBuilder request = ReindexAction.INSTANCE.newRequestBuilder(client()).source("source").destination("dest") - .setRemoteInfo(remote); + .setRemoteInfo(newRemoteInfo(null, null, emptyMap())); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> request.get()); assertEquals(RestStatus.UNAUTHORIZED, e.status()); assertThat(e.getMessage(), containsString("\"reason\":\"Authentication required\"")); @@ -118,10 +121,8 @@ public class ReindexFromRemoteWithAuthTests extends ESSingleNodeTestCase { } public void testReindexWithBadAuthentication() throws Exception { - RemoteInfo remote = new RemoteInfo("http", address.getAddress(), address.getPort(), new BytesArray("{\"match_all\":{}}"), "junk", - "auth", emptyMap()); ReindexRequestBuilder request = ReindexAction.INSTANCE.newRequestBuilder(client()).source("source").destination("dest") - .setRemoteInfo(remote); + .setRemoteInfo(newRemoteInfo("junk", "auth", emptyMap())); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> request.get()); assertThat(e.getMessage(), containsString("\"reason\":\"Bad Authorization\"")); } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexRequestTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexRequestTests.java index 30ba03aca76..e6572548609 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexRequestTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexRequestTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.script.Script; import org.elasticsearch.search.slice.SliceBuilder; import static java.util.Collections.emptyMap; +import static org.elasticsearch.common.unit.TimeValue.parseTimeValue; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; /** @@ -37,8 +38,9 @@ public class ReindexRequestTests extends AbstractBulkByScrollRequestTestCase())); @@ -52,6 +54,8 @@ public class RestReindexActionTests extends ESTestCase { remote.put("username", "testuser"); remote.put("password", "testpass"); remote.put("headers", headers); + remote.put("socket_timeout", "90s"); + remote.put("connect_timeout", "10s"); Map query = new HashMap<>(); query.put("a", "b"); @@ -68,6 +72,8 @@ public class RestReindexActionTests extends ESTestCase { assertEquals("testuser", remoteInfo.getUsername()); assertEquals("testpass", remoteInfo.getPassword()); assertEquals(headers, remoteInfo.getHeaders()); + assertEquals(timeValueSeconds(90), remoteInfo.getSocketTimeout()); + assertEquals(timeValueSeconds(10), remoteInfo.getConnectTimeout()); } public void testBuildRemoteInfoWithoutAllParts() throws IOException { @@ -76,16 +82,20 @@ public class RestReindexActionTests extends ESTestCase { expectThrows(IllegalArgumentException.class, () -> buildRemoteInfoHostTestCase("http://example.com")); } - public void testBuildRemoteInfoWithAllParts() throws IOException { + public void testBuildRemoteInfoWithAllHostParts() throws IOException { RemoteInfo info = buildRemoteInfoHostTestCase("http://example.com:9200"); assertEquals("http", info.getScheme()); assertEquals("example.com", info.getHost()); assertEquals(9200, info.getPort()); + assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout()); // Didn't set the timeout so we should get the default + assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout()); // Didn't set the timeout so we should get the default info = buildRemoteInfoHostTestCase("https://other.example.com:9201"); assertEquals("https", info.getScheme()); assertEquals("other.example.com", info.getHost()); assertEquals(9201, info.getPort()); + assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout()); + assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout()); } public void testReindexFromRemoteRequestParsing() throws IOException { diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java index fd93710d4fa..1dcf8811d23 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java @@ -118,7 +118,7 @@ public class RetryTests extends ESSingleNodeTestCase { NodeInfo nodeInfo = client().admin().cluster().prepareNodesInfo().get().getNodes().get(0); TransportAddress address = nodeInfo.getHttp().getAddress().publishAddress(); RemoteInfo remote = new RemoteInfo("http", address.getAddress(), address.getPort(), new BytesArray("{\"match_all\":{}}"), null, - null, emptyMap()); + null, emptyMap(), RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT); ReindexRequestBuilder request = ReindexAction.INSTANCE.newRequestBuilder(client()).source("source").destination("dest") .setRemoteInfo(remote); testCase(ReindexAction.NAME, request, matcher().created(DOC_COUNT)); diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java index 8e34df3ec36..7c4b4e3274f 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java @@ -73,7 +73,10 @@ public class RoundTripTests extends ESTestCase { while (headers.size() < headersCount) { headers.put(randomAsciiOfLength(5), randomAsciiOfLength(5)); } - reindex.setRemoteInfo(new RemoteInfo(randomAsciiOfLength(5), randomAsciiOfLength(5), port, query, username, password, headers)); + TimeValue socketTimeout = parseTimeValue(randomPositiveTimeValue(), "socketTimeout"); + TimeValue connectTimeout = parseTimeValue(randomPositiveTimeValue(), "connectTimeout"); + reindex.setRemoteInfo(new RemoteInfo(randomAsciiOfLength(5), randomAsciiOfLength(5), port, query, username, password, headers, + socketTimeout, connectTimeout)); } ReindexRequest tripped = new ReindexRequest(); roundTrip(reindex, tripped); @@ -89,7 +92,7 @@ public class RoundTripTests extends ESTestCase { tripped = new ReindexRequest(); reindex.setSlices(1); roundTrip(Version.V_5_0_0_rc1, reindex, tripped); - assertRequestEquals(reindex, tripped); + assertRequestEquals(Version.V_5_0_0_rc1, reindex, tripped); } public void testUpdateByQueryRequest() throws IOException { @@ -154,7 +157,7 @@ public class RoundTripTests extends ESTestCase { request.setScript(random().nextBoolean() ? null : randomScript()); } - private void assertRequestEquals(ReindexRequest request, ReindexRequest tripped) { + private void assertRequestEquals(Version version, ReindexRequest request, ReindexRequest tripped) { assertRequestEquals((AbstractBulkIndexByScrollRequest) request, (AbstractBulkIndexByScrollRequest) tripped); assertEquals(request.getDestination().version(), tripped.getDestination().version()); assertEquals(request.getDestination().index(), tripped.getDestination().index()); @@ -168,6 +171,13 @@ public class RoundTripTests extends ESTestCase { assertEquals(request.getRemoteInfo().getUsername(), tripped.getRemoteInfo().getUsername()); assertEquals(request.getRemoteInfo().getPassword(), tripped.getRemoteInfo().getPassword()); assertEquals(request.getRemoteInfo().getHeaders(), tripped.getRemoteInfo().getHeaders()); + if (version.onOrAfter(Version.V_5_2_0_UNRELEASED)) { + assertEquals(request.getRemoteInfo().getSocketTimeout(), tripped.getRemoteInfo().getSocketTimeout()); + assertEquals(request.getRemoteInfo().getConnectTimeout(), tripped.getRemoteInfo().getConnectTimeout()); + } else { + assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, tripped.getRemoteInfo().getSocketTimeout()); + assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, tripped.getRemoteInfo().getConnectTimeout()); + } } } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteInfoTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteInfoTests.java index 3ee647aa55b..fa1f46aa383 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteInfoTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteInfoTests.java @@ -25,14 +25,17 @@ import org.elasticsearch.test.ESTestCase; import static java.util.Collections.emptyMap; public class RemoteInfoTests extends ESTestCase { + private RemoteInfo newRemoteInfo(String scheme, String username, String password) { + return new RemoteInfo(scheme, "testhost", 12344, new BytesArray("testquery"), username, password, emptyMap(), + RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT); + } + public void testToString() { - RemoteInfo info = new RemoteInfo("http", "testhost", 12344, new BytesArray("testquery"), null, null, emptyMap()); - assertEquals("host=testhost port=12344 query=testquery", info.toString()); - info = new RemoteInfo("http", "testhost", 12344, new BytesArray("testquery"), "testuser", null, emptyMap()); - assertEquals("host=testhost port=12344 query=testquery username=testuser", info.toString()); - info = new RemoteInfo("http", "testhost", 12344, new BytesArray("testquery"), "testuser", "testpass", emptyMap()); - assertEquals("host=testhost port=12344 query=testquery username=testuser password=<<>>", info.toString()); - info = new RemoteInfo("https", "testhost", 12344, new BytesArray("testquery"), "testuser", "testpass", emptyMap()); - assertEquals("scheme=https host=testhost port=12344 query=testquery username=testuser password=<<>>", info.toString()); + assertEquals("host=testhost port=12344 query=testquery", newRemoteInfo("http", null, null).toString()); + assertEquals("host=testhost port=12344 query=testquery username=testuser", newRemoteInfo("http", "testuser", null).toString()); + assertEquals("host=testhost port=12344 query=testquery username=testuser password=<<>>", + newRemoteInfo("http", "testuser", "testpass").toString()); + assertEquals("scheme=https host=testhost port=12344 query=testquery username=testuser password=<<>>", + newRemoteInfo("https", "testuser", "testpass").toString()); } } diff --git a/modules/reindex/src/test/resources/rest-api-spec/test/reindex/20_validation.yaml b/modules/reindex/src/test/resources/rest-api-spec/test/reindex/20_validation.yaml index 68ae83eabd2..50f12192960 100644 --- a/modules/reindex/src/test/resources/rest-api-spec/test/reindex/20_validation.yaml +++ b/modules/reindex/src/test/resources/rest-api-spec/test/reindex/20_validation.yaml @@ -244,6 +244,34 @@ dest: index: dest +--- +"broken socket timeout in remote fails": + - do: + catch: /number_format_exception/ + reindex: + body: + source: + remote: + host: http://okremote:9200 + socket_timeout: borked + index: test + dest: + index: dest + +--- +"broken connect timeout in remote fails": + - do: + catch: /number_format_exception/ + reindex: + body: + source: + remote: + host: http://okremote:9200 + connect_timeout: borked + index: test + dest: + index: dest + --- "junk in slices fails": - do: diff --git a/modules/reindex/src/test/resources/rest-api-spec/test/reindex/90_remote.yaml b/modules/reindex/src/test/resources/rest-api-spec/test/reindex/90_remote.yaml index 6adac98ad77..88894308891 100644 --- a/modules/reindex/src/test/resources/rest-api-spec/test/reindex/90_remote.yaml +++ b/modules/reindex/src/test/resources/rest-api-spec/test/reindex/90_remote.yaml @@ -205,3 +205,55 @@ match: text: test - match: {hits.total: 1} + +--- +"Reindex from remote with timeouts": + # Validates that you can configure the socket_timeout and connect_timeout, + # not that they do anything. + - do: + index: + index: source + type: foo + id: 1 + body: { "text": "test" } + refresh: true + + # Fetch the http host. We use the host of the master because we know there will always be a master. + - do: + cluster.state: {} + - set: { master_node: master } + - do: + nodes.info: + metric: [ http ] + - is_true: nodes.$master.http.publish_address + - set: {nodes.$master.http.publish_address: host} + - do: + reindex: + refresh: true + body: + source: + remote: + host: http://${host} + socket_timeout: 1m + connect_timeout: 1m + index: source + dest: + index: dest + - match: {created: 1} + - match: {updated: 0} + - match: {version_conflicts: 0} + - match: {batches: 1} + - match: {failures: []} + - match: {throttled_millis: 0} + - gte: { took: 0 } + - is_false: task + - is_false: deleted + + - do: + search: + index: dest + body: + query: + match: + text: test + - match: {hits.total: 1} diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index b32675572e7..26952fdbe36 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -272,10 +272,8 @@ public abstract class ESRestTestCase extends ESTestCase { return "http"; } - private static RestClient buildClient(Settings settings) throws IOException { - RestClientBuilder builder = RestClient.builder(clusterHosts.toArray(new HttpHost[clusterHosts.size()])) - .setMaxRetryTimeoutMillis(30000) - .setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setSocketTimeout(30000)); + private RestClient buildClient(Settings settings) throws IOException { + RestClientBuilder builder = RestClient.builder(clusterHosts.toArray(new HttpHost[clusterHosts.size()])); String keystorePath = settings.get(TRUSTSTORE_PATH); if (keystorePath != null) { final String keystorePass = settings.get(TRUSTSTORE_PASSWORD);