Timeout improvements for rest client and reindex (#21741)

Changes the default socket and connection timeouts for the rest
client from 10 seconds to the more generous 30 seconds.

Defaults reindex-from-remote to those timeouts and make the
timeouts configurable like so:
```
POST _reindex
{
  "source": {
    "remote": {
      "host": "http://otherhost:9200",
      "socket_timeout": "1m",
      "connect_timeout": "10s"
    },
    "index": "source",
    "query": {
      "match": {
        "test": "data"
      }
    }
  },
  "dest": {
    "index": "dest"
  }
}
```

Closes #21707
This commit is contained in:
Nik Everett 2016-12-05 10:54:51 -05:00 committed by GitHub
parent 03a0a0aebb
commit 2087234d74
19 changed files with 268 additions and 72 deletions

View File

@ -37,7 +37,7 @@ import java.util.Objects;
*/ */
public final class RestClientBuilder { public final class RestClientBuilder {
public static final int DEFAULT_CONNECT_TIMEOUT_MILLIS = 1000; public static final int DEFAULT_CONNECT_TIMEOUT_MILLIS = 1000;
public static final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 10000; public static final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 30000;
public static final int DEFAULT_MAX_RETRY_TIMEOUT_MILLIS = DEFAULT_SOCKET_TIMEOUT_MILLIS; public static final int DEFAULT_MAX_RETRY_TIMEOUT_MILLIS = DEFAULT_SOCKET_TIMEOUT_MILLIS;
public static final int DEFAULT_CONNECTION_REQUEST_TIMEOUT_MILLIS = 500; public static final int DEFAULT_CONNECTION_REQUEST_TIMEOUT_MILLIS = 500;
public static final int DEFAULT_MAX_CONN_PER_ROUTE = 10; public static final int DEFAULT_MAX_CONN_PER_ROUTE = 10;
@ -185,7 +185,8 @@ public final class RestClientBuilder {
private CloseableHttpAsyncClient createHttpClient() { private CloseableHttpAsyncClient createHttpClient() {
//default timeouts are all infinite //default timeouts are all infinite
RequestConfig.Builder requestConfigBuilder = RequestConfig.custom().setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_MILLIS) RequestConfig.Builder requestConfigBuilder = RequestConfig.custom()
.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_MILLIS)
.setSocketTimeout(DEFAULT_SOCKET_TIMEOUT_MILLIS) .setSocketTimeout(DEFAULT_SOCKET_TIMEOUT_MILLIS)
.setConnectionRequestTimeout(DEFAULT_CONNECTION_REQUEST_TIMEOUT_MILLIS); .setConnectionRequestTimeout(DEFAULT_CONNECTION_REQUEST_TIMEOUT_MILLIS);
if (requestConfigCallback != null) { if (requestConfigCallback != null) {

View File

@ -16,8 +16,8 @@ The interface has one method that receives an instance of
https://hc.apache.org/httpcomponents-client-ga/httpclient/apidocs/org/apache/http/client/config/RequestConfig.Builder.html[`org.apache.http.client.config.RequestConfig.Builder`] https://hc.apache.org/httpcomponents-client-ga/httpclient/apidocs/org/apache/http/client/config/RequestConfig.Builder.html[`org.apache.http.client.config.RequestConfig.Builder`]
as an argument and has the same return type. The request config builder can as an argument and has the same return type. The request config builder can
be modified and then returned. In the following example we increase the be modified and then returned. In the following example we increase the
connect timeout (defaults to 1 second) and the socket timeout (defaults to 10 connect timeout (defaults to 1 second) and the socket timeout (defaults to 30
seconds). Also we adjust the max retry timeout accordingly (defaults to 10 seconds). Also we adjust the max retry timeout accordingly (defaults to 30
seconds too). seconds too).
[source,java] [source,java]
@ -27,10 +27,10 @@ RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200))
@Override @Override
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) { public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
return requestConfigBuilder.setConnectTimeout(5000) return requestConfigBuilder.setConnectTimeout(5000)
.setSocketTimeout(30000); .setSocketTimeout(60000);
} }
}) })
.setMaxRetryTimeoutMillis(30000) .setMaxRetryTimeoutMillis(60000)
.build(); .build();
-------------------------------------------------- --------------------------------------------------
@ -110,4 +110,4 @@ RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200))
=== Others === Others
For any other required configuration needed, the Apache HttpAsyncClient docs For any other required configuration needed, the Apache HttpAsyncClient docs
should be consulted: https://hc.apache.org/httpcomponents-asyncclient-4.1.x/ . should be consulted: https://hc.apache.org/httpcomponents-asyncclient-4.1.x/ .

View File

@ -444,7 +444,7 @@ To enable queries sent to older versions of Elasticsearch the `query` parameter
is sent directly to the remote host without validation or modification. is sent directly to the remote host without validation or modification.
Reindexing from a remote server uses an on-heap buffer that defaults to a Reindexing from a remote server uses an on-heap buffer that defaults to a
maximum size of 200mb. If the remote index includes very large documents you'll maximum size of 100mb. If the remote index includes very large documents you'll
need to use a smaller batch size. The example below sets the batch size `10` need to use a smaller batch size. The example below sets the batch size `10`
which is very, very small. which is very, very small.
@ -454,9 +454,7 @@ POST _reindex
{ {
"source": { "source": {
"remote": { "remote": {
"host": "http://otherhost:9200", "host": "http://otherhost:9200"
"username": "user",
"password": "pass"
}, },
"index": "source", "index": "source",
"size": 10, "size": 10,
@ -474,10 +472,40 @@ POST _reindex
// CONSOLE // CONSOLE
// TEST[setup:host] // TEST[setup:host]
// TEST[s/^/PUT source\n/] // TEST[s/^/PUT source\n/]
// TEST[s/otherhost:9200",/\${host}"/] // TEST[s/otherhost:9200/\${host}/]
// TEST[s/"username": "user",//]
// TEST[s/"password": "pass"//]
It is also possible to set the socket read timeout on the remote connection
with the `socket_timeout` field and the connection timeout with the
`connect_timeout` field. Both default to thirty seconds. This example
sets the socket read timeout to one minute and the connection timeout to ten
seconds:
[source,js]
--------------------------------------------------
POST _reindex
{
"source": {
"remote": {
"host": "http://otherhost:9200",
"socket_timeout": "1m",
"connect_timeout": "10s"
},
"index": "source",
"query": {
"match": {
"test": "data"
}
}
},
"dest": {
"index": "dest"
}
}
--------------------------------------------------
// CONSOLE
// TEST[setup:host]
// TEST[s/^/PUT source\n/]
// TEST[s/otherhost:9200/\${host}/]
[float] [float]
=== URL Parameters === URL Parameters

View File

@ -45,7 +45,6 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
* Task storing information about a currently running BulkByScroll request. * Task storing information about a currently running BulkByScroll request.
*/ */
public abstract class BulkByScrollTask extends CancellableTask { public abstract class BulkByScrollTask extends CancellableTask {
public BulkByScrollTask(long id, String type, String action, String description, TaskId parentTaskId) { public BulkByScrollTask(long id, String type, String action, String description, TaskId parentTaskId) {
super(id, type, action, description, parentTaskId); super(id, type, action, description, parentTaskId);
} }

View File

@ -145,11 +145,13 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexReq
String host = hostMatcher.group("host"); String host = hostMatcher.group("host");
int port = Integer.parseInt(hostMatcher.group("port")); int port = Integer.parseInt(hostMatcher.group("port"));
Map<String, String> headers = extractStringStringMap(remote, "headers"); Map<String, String> headers = extractStringStringMap(remote, "headers");
TimeValue socketTimeout = extractTimeValue(remote, "socket_timeout", RemoteInfo.DEFAULT_SOCKET_TIMEOUT);
TimeValue connectTimeout = extractTimeValue(remote, "connect_timeout", RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
if (false == remote.isEmpty()) { if (false == remote.isEmpty()) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"Unsupported fields in [remote]: [" + Strings.collectionToCommaDelimitedString(remote.keySet()) + "]"); "Unsupported fields in [remote]: [" + Strings.collectionToCommaDelimitedString(remote.keySet()) + "]");
} }
return new RemoteInfo(scheme, host, port, queryForRemote(source), username, password, headers); return new RemoteInfo(scheme, host, port, queryForRemote(source), username, password, headers, socketTimeout, connectTimeout);
} }
/** /**
@ -202,6 +204,11 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexReq
return safe; return safe;
} }
private static TimeValue extractTimeValue(Map<String, Object> source, String name, TimeValue defaultValue) {
String string = extractString(source, name);
return string == null ? defaultValue : parseTimeValue(string, name);
}
private static BytesReference queryForRemote(Map<String, Object> source) throws IOException { private static BytesReference queryForRemote(Map<String, Object> source) throws IOException {
XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint(); XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint();
Object query = source.remove("query"); Object query = source.remove("query");

View File

@ -196,6 +196,11 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
} }
return RestClient.builder(new HttpHost(remoteInfo.getHost(), remoteInfo.getPort(), remoteInfo.getScheme())) return RestClient.builder(new HttpHost(remoteInfo.getHost(), remoteInfo.getPort(), remoteInfo.getScheme()))
.setDefaultHeaders(clientHeaders) .setDefaultHeaders(clientHeaders)
.setRequestConfigCallback(c -> {
c.setConnectTimeout(Math.toIntExact(remoteInfo.getConnectTimeout().millis()));
c.setSocketTimeout(Math.toIntExact(remoteInfo.getSocketTimeout().millis()));
return c;
})
.setHttpClientConfigCallback(c -> { .setHttpClientConfigCallback(c -> {
// Enable basic auth if it is configured // Enable basic auth if it is configured
if (remoteInfo.getUsername() != null) { if (remoteInfo.getUsername() != null) {

View File

@ -19,11 +19,13 @@
package org.elasticsearch.index.reindex.remote; package org.elasticsearch.index.reindex.remote;
import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
@ -31,8 +33,18 @@ import java.util.Map;
import static java.util.Collections.unmodifiableMap; import static java.util.Collections.unmodifiableMap;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
public class RemoteInfo implements Writeable { public class RemoteInfo implements Writeable {
/**
* Default {@link #socketTimeout} for requests that don't have one set.
*/
public static final TimeValue DEFAULT_SOCKET_TIMEOUT = timeValueSeconds(30);
/**
* Default {@link #connectTimeout} for requests that don't have one set.
*/
public static final TimeValue DEFAULT_CONNECT_TIMEOUT = timeValueSeconds(30);
private final String scheme; private final String scheme;
private final String host; private final String host;
private final int port; private final int port;
@ -40,9 +52,17 @@ public class RemoteInfo implements Writeable {
private final String username; private final String username;
private final String password; private final String password;
private final Map<String, String> headers; private final Map<String, String> headers;
/**
* Time to wait for a response from each request.
*/
private final TimeValue socketTimeout;
/**
* Time to wait for a connecting to the remote cluster.
*/
private final TimeValue connectTimeout;
public RemoteInfo(String scheme, String host, int port, BytesReference query, String username, String password, public RemoteInfo(String scheme, String host, int port, BytesReference query, String username, String password,
Map<String, String> headers) { Map<String, String> headers, TimeValue socketTimeout, TimeValue connectTimeout) {
this.scheme = requireNonNull(scheme, "[scheme] must be specified to reindex from a remote cluster"); this.scheme = requireNonNull(scheme, "[scheme] must be specified to reindex from a remote cluster");
this.host = requireNonNull(host, "[host] must be specified to reindex from a remote cluster"); this.host = requireNonNull(host, "[host] must be specified to reindex from a remote cluster");
this.port = port; this.port = port;
@ -50,6 +70,8 @@ public class RemoteInfo implements Writeable {
this.username = username; this.username = username;
this.password = password; this.password = password;
this.headers = unmodifiableMap(requireNonNull(headers, "[headers] is required")); this.headers = unmodifiableMap(requireNonNull(headers, "[headers] is required"));
this.socketTimeout = requireNonNull(socketTimeout, "[socketTimeout] must be specified");
this.connectTimeout = requireNonNull(connectTimeout, "[connectTimeout] must be specified");
} }
/** /**
@ -68,6 +90,13 @@ public class RemoteInfo implements Writeable {
headers.put(in.readString(), in.readString()); headers.put(in.readString(), in.readString());
} }
this.headers = unmodifiableMap(headers); this.headers = unmodifiableMap(headers);
if (in.getVersion().onOrAfter(Version.V_5_2_0_UNRELEASED)) {
socketTimeout = new TimeValue(in);
connectTimeout = new TimeValue(in);
} else {
socketTimeout = DEFAULT_SOCKET_TIMEOUT;
connectTimeout = DEFAULT_CONNECT_TIMEOUT;
}
} }
@Override @Override
@ -83,6 +112,10 @@ public class RemoteInfo implements Writeable {
out.writeString(header.getKey()); out.writeString(header.getKey());
out.writeString(header.getValue()); out.writeString(header.getValue());
} }
if (out.getVersion().onOrAfter(Version.V_5_2_0_UNRELEASED)) {
socketTimeout.writeTo(out);
connectTimeout.writeTo(out);
}
} }
public String getScheme() { public String getScheme() {
@ -115,6 +148,20 @@ public class RemoteInfo implements Writeable {
return headers; return headers;
} }
/**
* Time to wait for a response from each request.
*/
public TimeValue getSocketTimeout() {
return socketTimeout;
}
/**
* Time to wait to connect to the external cluster.
*/
public TimeValue getConnectTimeout() {
return connectTimeout;
}
@Override @Override
public String toString() { public String toString() {
StringBuilder b = new StringBuilder(); StringBuilder b = new StringBuilder();

View File

@ -33,7 +33,8 @@ import static org.hamcrest.Matchers.hasSize;
public class ReindexFromRemoteBuildRestClientTests extends ESTestCase { public class ReindexFromRemoteBuildRestClientTests extends ESTestCase {
public void testBuildRestClient() throws Exception { public void testBuildRestClient() throws Exception {
RemoteInfo remoteInfo = new RemoteInfo("https", "localhost", 9200, new BytesArray("ignored"), null, null, emptyMap()); RemoteInfo remoteInfo = new RemoteInfo("https", "localhost", 9200, new BytesArray("ignored"), null, null, emptyMap(),
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
long taskId = randomLong(); long taskId = randomLong();
List<Thread> threads = synchronizedList(new ArrayList<>()); List<Thread> threads = synchronizedList(new ArrayList<>());
RestClient client = TransportReindexAction.buildRestClient(remoteInfo, taskId, threads); RestClient client = TransportReindexAction.buildRestClient(remoteInfo, taskId, threads);

View File

@ -46,47 +46,49 @@ public class ReindexFromRemoteWhitelistTests extends ESTestCase {
checkRemoteWhitelist(buildRemoteWhitelist(randomWhitelist()), null); checkRemoteWhitelist(buildRemoteWhitelist(randomWhitelist()), null);
} }
/**
* Build a {@link RemoteInfo}, defaulting values that we don't care about in this test to values that don't hurt anything.
*/
private RemoteInfo newRemoteInfo(String host, int port) {
return new RemoteInfo(randomAsciiOfLength(5), host, port, new BytesArray("test"), null, null, emptyMap(),
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
}
public void testWhitelistedRemote() { public void testWhitelistedRemote() {
List<String> whitelist = randomWhitelist(); List<String> whitelist = randomWhitelist();
String[] inList = whitelist.iterator().next().split(":"); String[] inList = whitelist.iterator().next().split(":");
String host = inList[0]; String host = inList[0];
int port = Integer.valueOf(inList[1]); int port = Integer.valueOf(inList[1]);
checkRemoteWhitelist(buildRemoteWhitelist(whitelist), checkRemoteWhitelist(buildRemoteWhitelist(whitelist), newRemoteInfo(host, port));
new RemoteInfo(randomAsciiOfLength(5), host, port, new BytesArray("test"), null, null, emptyMap()));
} }
public void testWhitelistedByPrefix() { public void testWhitelistedByPrefix() {
checkRemoteWhitelist(buildRemoteWhitelist(singletonList("*.example.com:9200")), checkRemoteWhitelist(buildRemoteWhitelist(singletonList("*.example.com:9200")),
new RemoteInfo(randomAsciiOfLength(5), "es.example.com", 9200, new BytesArray("test"), null, null, emptyMap())); new RemoteInfo(randomAsciiOfLength(5), "es.example.com", 9200, new BytesArray("test"), null, null, emptyMap(),
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT));
checkRemoteWhitelist(buildRemoteWhitelist(singletonList("*.example.com:9200")), checkRemoteWhitelist(buildRemoteWhitelist(singletonList("*.example.com:9200")),
new RemoteInfo(randomAsciiOfLength(5), "6e134134a1.us-east-1.aws.example.com", 9200, newRemoteInfo("6e134134a1.us-east-1.aws.example.com", 9200));
new BytesArray("test"), null, null, emptyMap()));
} }
public void testWhitelistedBySuffix() { public void testWhitelistedBySuffix() {
checkRemoteWhitelist(buildRemoteWhitelist(singletonList("es.example.com:*")), checkRemoteWhitelist(buildRemoteWhitelist(singletonList("es.example.com:*")), newRemoteInfo("es.example.com", 9200));
new RemoteInfo(randomAsciiOfLength(5), "es.example.com", 9200, new BytesArray("test"), null, null, emptyMap()));
} }
public void testWhitelistedByInfix() { public void testWhitelistedByInfix() {
checkRemoteWhitelist(buildRemoteWhitelist(singletonList("es*.example.com:9200")), checkRemoteWhitelist(buildRemoteWhitelist(singletonList("es*.example.com:9200")), newRemoteInfo("es1.example.com", 9200));
new RemoteInfo(randomAsciiOfLength(5), "es1.example.com", 9200, new BytesArray("test"), null, null, emptyMap()));
} }
public void testLoopbackInWhitelistRemote() throws UnknownHostException { public void testLoopbackInWhitelistRemote() throws UnknownHostException {
List<String> whitelist = randomWhitelist(); List<String> whitelist = randomWhitelist();
whitelist.add("127.0.0.1:*"); whitelist.add("127.0.0.1:*");
checkRemoteWhitelist(buildRemoteWhitelist(whitelist), checkRemoteWhitelist(buildRemoteWhitelist(whitelist), newRemoteInfo("127.0.0.1", 9200));
new RemoteInfo(randomAsciiOfLength(5), "127.0.0.1", 9200, new BytesArray("test"), null, null, emptyMap()));
} }
public void testUnwhitelistedRemote() { public void testUnwhitelistedRemote() {
int port = between(1, Integer.MAX_VALUE); int port = between(1, Integer.MAX_VALUE);
RemoteInfo remoteInfo = new RemoteInfo(randomAsciiOfLength(5), "not in list", port, new BytesArray("test"), null, null, emptyMap());
List<String> whitelist = randomBoolean() ? randomWhitelist() : emptyList(); List<String> whitelist = randomBoolean() ? randomWhitelist() : emptyList();
Exception e = expectThrows(IllegalArgumentException.class, Exception e = expectThrows(IllegalArgumentException.class,
() -> checkRemoteWhitelist(buildRemoteWhitelist(whitelist), remoteInfo)); () -> checkRemoteWhitelist(buildRemoteWhitelist(whitelist), newRemoteInfo("not in list", port)));
assertEquals("[not in list:" + port + "] not whitelisted in reindex.remote.whitelist", e.getMessage()); assertEquals("[not in list:" + port + "] not whitelisted in reindex.remote.whitelist", e.getMessage());
} }

View File

@ -19,8 +19,8 @@
package org.elasticsearch.index.reindex; package org.elasticsearch.index.reindex;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionResponse;
@ -48,6 +48,7 @@ import org.junit.Before;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList; import static java.util.Collections.singletonList;
@ -88,29 +89,31 @@ public class ReindexFromRemoteWithAuthTests extends ESSingleNodeTestCase {
address = nodeInfo.getHttp().getAddress().publishAddress(); address = nodeInfo.getHttp().getAddress().publishAddress();
} }
/**
* Build a {@link RemoteInfo}, defaulting values that we don't care about in this test to values that don't hurt anything.
*/
private RemoteInfo newRemoteInfo(String username, String password, Map<String, String> headers) {
return new RemoteInfo("http", address.getAddress(), address.getPort(), new BytesArray("{\"match_all\":{}}"), username, password,
headers, RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
}
public void testReindexFromRemoteWithAuthentication() throws Exception { public void testReindexFromRemoteWithAuthentication() throws Exception {
RemoteInfo remote = new RemoteInfo("http", address.getAddress(), address.getPort(), new BytesArray("{\"match_all\":{}}"), "Aladdin",
"open sesame", emptyMap());
ReindexRequestBuilder request = ReindexAction.INSTANCE.newRequestBuilder(client()).source("source").destination("dest") ReindexRequestBuilder request = ReindexAction.INSTANCE.newRequestBuilder(client()).source("source").destination("dest")
.setRemoteInfo(remote); .setRemoteInfo(newRemoteInfo("Aladdin", "open sesame", emptyMap()));
assertThat(request.get(), matcher().created(1)); assertThat(request.get(), matcher().created(1));
} }
public void testReindexSendsHeaders() throws Exception { public void testReindexSendsHeaders() throws Exception {
RemoteInfo remote = new RemoteInfo("http", address.getAddress(), address.getPort(), new BytesArray("{\"match_all\":{}}"), null,
null, singletonMap(TestFilter.EXAMPLE_HEADER, "doesn't matter"));
ReindexRequestBuilder request = ReindexAction.INSTANCE.newRequestBuilder(client()).source("source").destination("dest") ReindexRequestBuilder request = ReindexAction.INSTANCE.newRequestBuilder(client()).source("source").destination("dest")
.setRemoteInfo(remote); .setRemoteInfo(newRemoteInfo(null, null, singletonMap(TestFilter.EXAMPLE_HEADER, "doesn't matter")));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> request.get()); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> request.get());
assertEquals(RestStatus.BAD_REQUEST, e.status()); assertEquals(RestStatus.BAD_REQUEST, e.status());
assertThat(e.getMessage(), containsString("Hurray! Sent the header!")); assertThat(e.getMessage(), containsString("Hurray! Sent the header!"));
} }
public void testReindexWithoutAuthenticationWhenRequired() throws Exception { public void testReindexWithoutAuthenticationWhenRequired() throws Exception {
RemoteInfo remote = new RemoteInfo("http", address.getAddress(), address.getPort(), new BytesArray("{\"match_all\":{}}"), null,
null, emptyMap());
ReindexRequestBuilder request = ReindexAction.INSTANCE.newRequestBuilder(client()).source("source").destination("dest") ReindexRequestBuilder request = ReindexAction.INSTANCE.newRequestBuilder(client()).source("source").destination("dest")
.setRemoteInfo(remote); .setRemoteInfo(newRemoteInfo(null, null, emptyMap()));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> request.get()); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> request.get());
assertEquals(RestStatus.UNAUTHORIZED, e.status()); assertEquals(RestStatus.UNAUTHORIZED, e.status());
assertThat(e.getMessage(), containsString("\"reason\":\"Authentication required\"")); assertThat(e.getMessage(), containsString("\"reason\":\"Authentication required\""));
@ -118,10 +121,8 @@ public class ReindexFromRemoteWithAuthTests extends ESSingleNodeTestCase {
} }
public void testReindexWithBadAuthentication() throws Exception { public void testReindexWithBadAuthentication() throws Exception {
RemoteInfo remote = new RemoteInfo("http", address.getAddress(), address.getPort(), new BytesArray("{\"match_all\":{}}"), "junk",
"auth", emptyMap());
ReindexRequestBuilder request = ReindexAction.INSTANCE.newRequestBuilder(client()).source("source").destination("dest") ReindexRequestBuilder request = ReindexAction.INSTANCE.newRequestBuilder(client()).source("source").destination("dest")
.setRemoteInfo(remote); .setRemoteInfo(newRemoteInfo("junk", "auth", emptyMap()));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> request.get()); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> request.get());
assertThat(e.getMessage(), containsString("\"reason\":\"Bad Authorization\"")); assertThat(e.getMessage(), containsString("\"reason\":\"Bad Authorization\""));
} }

View File

@ -28,6 +28,7 @@ import org.elasticsearch.script.Script;
import org.elasticsearch.search.slice.SliceBuilder; import org.elasticsearch.search.slice.SliceBuilder;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
/** /**
@ -37,8 +38,9 @@ public class ReindexRequestTests extends AbstractBulkByScrollRequestTestCase<Rei
public void testReindexFromRemoteDoesNotSupportSearchQuery() { public void testReindexFromRemoteDoesNotSupportSearchQuery() {
ReindexRequest reindex = newRequest(); ReindexRequest reindex = newRequest();
reindex.setRemoteInfo(new RemoteInfo(randomAsciiOfLength(5), randomAsciiOfLength(5), between(1, Integer.MAX_VALUE), reindex.setRemoteInfo(
new BytesArray("real_query"), null, null, emptyMap())); new RemoteInfo(randomAsciiOfLength(5), randomAsciiOfLength(5), between(1, Integer.MAX_VALUE), new BytesArray("real_query"),
null, null, emptyMap(), RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT));
reindex.getSearchRequest().source().query(matchAllQuery()); // Unsupported place to put query reindex.getSearchRequest().source().query(matchAllQuery()); // Unsupported place to put query
ActionRequestValidationException e = reindex.validate(); ActionRequestValidationException e = reindex.validate();
assertEquals("Validation Failed: 1: reindex from remote sources should use RemoteInfo's query instead of source's query;", assertEquals("Validation Failed: 1: reindex from remote sources should use RemoteInfo's query instead of source's query;",
@ -47,8 +49,9 @@ public class ReindexRequestTests extends AbstractBulkByScrollRequestTestCase<Rei
public void testReindexFromRemoteDoesNotSupportWorkers() { public void testReindexFromRemoteDoesNotSupportWorkers() {
ReindexRequest reindex = newRequest(); ReindexRequest reindex = newRequest();
reindex.setRemoteInfo(new RemoteInfo(randomAsciiOfLength(5), randomAsciiOfLength(5), between(1, Integer.MAX_VALUE), reindex.setRemoteInfo(
new BytesArray("real_query"), null, null, emptyMap())); new RemoteInfo(randomAsciiOfLength(5), randomAsciiOfLength(5), between(1, Integer.MAX_VALUE), new BytesArray("real_query"),
null, null, emptyMap(), RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT));
reindex.setSlices(between(2, Integer.MAX_VALUE)); reindex.setSlices(between(2, Integer.MAX_VALUE));
ActionRequestValidationException e = reindex.validate(); ActionRequestValidationException e = reindex.validate();
assertEquals( assertEquals(
@ -71,7 +74,9 @@ public class ReindexRequestTests extends AbstractBulkByScrollRequestTestCase<Rei
} }
if (randomBoolean()) { if (randomBoolean()) {
original.setRemoteInfo(new RemoteInfo(randomAsciiOfLength(5), randomAsciiOfLength(5), between(1, 10000), original.setRemoteInfo(new RemoteInfo(randomAsciiOfLength(5), randomAsciiOfLength(5), between(1, 10000),
new BytesArray(randomAsciiOfLength(5)), null, null, emptyMap())); new BytesArray(randomAsciiOfLength(5)), null, null, emptyMap(),
parseTimeValue(randomPositiveTimeValue(), "socket_timeout"),
parseTimeValue(randomPositiveTimeValue(), "connect_timeout")));
} }
} }

View File

@ -36,8 +36,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.reindex.remote.RemoteInfo; import org.elasticsearch.index.reindex.remote.RemoteInfo;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import java.util.Collections;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
@ -91,10 +89,11 @@ public class ReindexSourceTargetValidationTests extends ESTestCase {
public void testRemoteInfoSkipsValidation() { public void testRemoteInfoSkipsValidation() {
// The index doesn't have to exist // The index doesn't have to exist
succeeds(new RemoteInfo(randomAsciiOfLength(5), "test", 9200, new BytesArray("test"), null, null, emptyMap()), "does_not_exist", succeeds(new RemoteInfo(randomAsciiOfLength(5), "test", 9200, new BytesArray("test"), null, null, emptyMap(),
"target"); RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT), "does_not_exist", "target");
// And it doesn't matter if they are the same index. They are considered to be different because the remote one is, well, remote. // And it doesn't matter if they are the same index. They are considered to be different because the remote one is, well, remote.
succeeds(new RemoteInfo(randomAsciiOfLength(5), "test", 9200, new BytesArray("test"), null, null, emptyMap()), "target", "target"); succeeds(new RemoteInfo(randomAsciiOfLength(5), "test", 9200, new BytesArray("test"), null, null, emptyMap(),
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT), "target", "target");
} }
private void fails(String target, String... sources) { private void fails(String target, String... sources) {

View File

@ -36,6 +36,8 @@ import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
public class RestReindexActionTests extends ESTestCase { public class RestReindexActionTests extends ESTestCase {
public void testBuildRemoteInfoNoRemote() throws IOException { public void testBuildRemoteInfoNoRemote() throws IOException {
assertNull(RestReindexAction.buildRemoteInfo(new HashMap<>())); assertNull(RestReindexAction.buildRemoteInfo(new HashMap<>()));
@ -52,6 +54,8 @@ public class RestReindexActionTests extends ESTestCase {
remote.put("username", "testuser"); remote.put("username", "testuser");
remote.put("password", "testpass"); remote.put("password", "testpass");
remote.put("headers", headers); remote.put("headers", headers);
remote.put("socket_timeout", "90s");
remote.put("connect_timeout", "10s");
Map<String, Object> query = new HashMap<>(); Map<String, Object> query = new HashMap<>();
query.put("a", "b"); query.put("a", "b");
@ -68,6 +72,8 @@ public class RestReindexActionTests extends ESTestCase {
assertEquals("testuser", remoteInfo.getUsername()); assertEquals("testuser", remoteInfo.getUsername());
assertEquals("testpass", remoteInfo.getPassword()); assertEquals("testpass", remoteInfo.getPassword());
assertEquals(headers, remoteInfo.getHeaders()); assertEquals(headers, remoteInfo.getHeaders());
assertEquals(timeValueSeconds(90), remoteInfo.getSocketTimeout());
assertEquals(timeValueSeconds(10), remoteInfo.getConnectTimeout());
} }
public void testBuildRemoteInfoWithoutAllParts() throws IOException { public void testBuildRemoteInfoWithoutAllParts() throws IOException {
@ -76,16 +82,20 @@ public class RestReindexActionTests extends ESTestCase {
expectThrows(IllegalArgumentException.class, () -> buildRemoteInfoHostTestCase("http://example.com")); expectThrows(IllegalArgumentException.class, () -> buildRemoteInfoHostTestCase("http://example.com"));
} }
public void testBuildRemoteInfoWithAllParts() throws IOException { public void testBuildRemoteInfoWithAllHostParts() throws IOException {
RemoteInfo info = buildRemoteInfoHostTestCase("http://example.com:9200"); RemoteInfo info = buildRemoteInfoHostTestCase("http://example.com:9200");
assertEquals("http", info.getScheme()); assertEquals("http", info.getScheme());
assertEquals("example.com", info.getHost()); assertEquals("example.com", info.getHost());
assertEquals(9200, info.getPort()); assertEquals(9200, info.getPort());
assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout()); // Didn't set the timeout so we should get the default
assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout()); // Didn't set the timeout so we should get the default
info = buildRemoteInfoHostTestCase("https://other.example.com:9201"); info = buildRemoteInfoHostTestCase("https://other.example.com:9201");
assertEquals("https", info.getScheme()); assertEquals("https", info.getScheme());
assertEquals("other.example.com", info.getHost()); assertEquals("other.example.com", info.getHost());
assertEquals(9201, info.getPort()); assertEquals(9201, info.getPort());
assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, info.getSocketTimeout());
assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, info.getConnectTimeout());
} }
public void testReindexFromRemoteRequestParsing() throws IOException { public void testReindexFromRemoteRequestParsing() throws IOException {

View File

@ -118,7 +118,7 @@ public class RetryTests extends ESSingleNodeTestCase {
NodeInfo nodeInfo = client().admin().cluster().prepareNodesInfo().get().getNodes().get(0); NodeInfo nodeInfo = client().admin().cluster().prepareNodesInfo().get().getNodes().get(0);
TransportAddress address = nodeInfo.getHttp().getAddress().publishAddress(); TransportAddress address = nodeInfo.getHttp().getAddress().publishAddress();
RemoteInfo remote = new RemoteInfo("http", address.getAddress(), address.getPort(), new BytesArray("{\"match_all\":{}}"), null, RemoteInfo remote = new RemoteInfo("http", address.getAddress(), address.getPort(), new BytesArray("{\"match_all\":{}}"), null,
null, emptyMap()); null, emptyMap(), RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
ReindexRequestBuilder request = ReindexAction.INSTANCE.newRequestBuilder(client()).source("source").destination("dest") ReindexRequestBuilder request = ReindexAction.INSTANCE.newRequestBuilder(client()).source("source").destination("dest")
.setRemoteInfo(remote); .setRemoteInfo(remote);
testCase(ReindexAction.NAME, request, matcher().created(DOC_COUNT)); testCase(ReindexAction.NAME, request, matcher().created(DOC_COUNT));

View File

@ -73,7 +73,10 @@ public class RoundTripTests extends ESTestCase {
while (headers.size() < headersCount) { while (headers.size() < headersCount) {
headers.put(randomAsciiOfLength(5), randomAsciiOfLength(5)); headers.put(randomAsciiOfLength(5), randomAsciiOfLength(5));
} }
reindex.setRemoteInfo(new RemoteInfo(randomAsciiOfLength(5), randomAsciiOfLength(5), port, query, username, password, headers)); TimeValue socketTimeout = parseTimeValue(randomPositiveTimeValue(), "socketTimeout");
TimeValue connectTimeout = parseTimeValue(randomPositiveTimeValue(), "connectTimeout");
reindex.setRemoteInfo(new RemoteInfo(randomAsciiOfLength(5), randomAsciiOfLength(5), port, query, username, password, headers,
socketTimeout, connectTimeout));
} }
ReindexRequest tripped = new ReindexRequest(); ReindexRequest tripped = new ReindexRequest();
roundTrip(reindex, tripped); roundTrip(reindex, tripped);
@ -89,7 +92,7 @@ public class RoundTripTests extends ESTestCase {
tripped = new ReindexRequest(); tripped = new ReindexRequest();
reindex.setSlices(1); reindex.setSlices(1);
roundTrip(Version.V_5_0_0_rc1, reindex, tripped); roundTrip(Version.V_5_0_0_rc1, reindex, tripped);
assertRequestEquals(reindex, tripped); assertRequestEquals(Version.V_5_0_0_rc1, reindex, tripped);
} }
public void testUpdateByQueryRequest() throws IOException { public void testUpdateByQueryRequest() throws IOException {
@ -154,7 +157,7 @@ public class RoundTripTests extends ESTestCase {
request.setScript(random().nextBoolean() ? null : randomScript()); request.setScript(random().nextBoolean() ? null : randomScript());
} }
private void assertRequestEquals(ReindexRequest request, ReindexRequest tripped) { private void assertRequestEquals(Version version, ReindexRequest request, ReindexRequest tripped) {
assertRequestEquals((AbstractBulkIndexByScrollRequest<?>) request, (AbstractBulkIndexByScrollRequest<?>) tripped); assertRequestEquals((AbstractBulkIndexByScrollRequest<?>) request, (AbstractBulkIndexByScrollRequest<?>) tripped);
assertEquals(request.getDestination().version(), tripped.getDestination().version()); assertEquals(request.getDestination().version(), tripped.getDestination().version());
assertEquals(request.getDestination().index(), tripped.getDestination().index()); assertEquals(request.getDestination().index(), tripped.getDestination().index());
@ -168,6 +171,13 @@ public class RoundTripTests extends ESTestCase {
assertEquals(request.getRemoteInfo().getUsername(), tripped.getRemoteInfo().getUsername()); assertEquals(request.getRemoteInfo().getUsername(), tripped.getRemoteInfo().getUsername());
assertEquals(request.getRemoteInfo().getPassword(), tripped.getRemoteInfo().getPassword()); assertEquals(request.getRemoteInfo().getPassword(), tripped.getRemoteInfo().getPassword());
assertEquals(request.getRemoteInfo().getHeaders(), tripped.getRemoteInfo().getHeaders()); assertEquals(request.getRemoteInfo().getHeaders(), tripped.getRemoteInfo().getHeaders());
if (version.onOrAfter(Version.V_5_2_0_UNRELEASED)) {
assertEquals(request.getRemoteInfo().getSocketTimeout(), tripped.getRemoteInfo().getSocketTimeout());
assertEquals(request.getRemoteInfo().getConnectTimeout(), tripped.getRemoteInfo().getConnectTimeout());
} else {
assertEquals(RemoteInfo.DEFAULT_SOCKET_TIMEOUT, tripped.getRemoteInfo().getSocketTimeout());
assertEquals(RemoteInfo.DEFAULT_CONNECT_TIMEOUT, tripped.getRemoteInfo().getConnectTimeout());
}
} }
} }

View File

@ -25,14 +25,17 @@ import org.elasticsearch.test.ESTestCase;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
public class RemoteInfoTests extends ESTestCase { public class RemoteInfoTests extends ESTestCase {
private RemoteInfo newRemoteInfo(String scheme, String username, String password) {
return new RemoteInfo(scheme, "testhost", 12344, new BytesArray("testquery"), username, password, emptyMap(),
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
}
public void testToString() { public void testToString() {
RemoteInfo info = new RemoteInfo("http", "testhost", 12344, new BytesArray("testquery"), null, null, emptyMap()); assertEquals("host=testhost port=12344 query=testquery", newRemoteInfo("http", null, null).toString());
assertEquals("host=testhost port=12344 query=testquery", info.toString()); assertEquals("host=testhost port=12344 query=testquery username=testuser", newRemoteInfo("http", "testuser", null).toString());
info = new RemoteInfo("http", "testhost", 12344, new BytesArray("testquery"), "testuser", null, emptyMap()); assertEquals("host=testhost port=12344 query=testquery username=testuser password=<<>>",
assertEquals("host=testhost port=12344 query=testquery username=testuser", info.toString()); newRemoteInfo("http", "testuser", "testpass").toString());
info = new RemoteInfo("http", "testhost", 12344, new BytesArray("testquery"), "testuser", "testpass", emptyMap()); assertEquals("scheme=https host=testhost port=12344 query=testquery username=testuser password=<<>>",
assertEquals("host=testhost port=12344 query=testquery username=testuser password=<<>>", info.toString()); newRemoteInfo("https", "testuser", "testpass").toString());
info = new RemoteInfo("https", "testhost", 12344, new BytesArray("testquery"), "testuser", "testpass", emptyMap());
assertEquals("scheme=https host=testhost port=12344 query=testquery username=testuser password=<<>>", info.toString());
} }
} }

View File

@ -244,6 +244,34 @@
dest: dest:
index: dest index: dest
---
"broken socket timeout in remote fails":
- do:
catch: /number_format_exception/
reindex:
body:
source:
remote:
host: http://okremote:9200
socket_timeout: borked
index: test
dest:
index: dest
---
"broken connect timeout in remote fails":
- do:
catch: /number_format_exception/
reindex:
body:
source:
remote:
host: http://okremote:9200
connect_timeout: borked
index: test
dest:
index: dest
--- ---
"junk in slices fails": "junk in slices fails":
- do: - do:

View File

@ -205,3 +205,55 @@
match: match:
text: test text: test
- match: {hits.total: 1} - match: {hits.total: 1}
---
"Reindex from remote with timeouts":
# Validates that you can configure the socket_timeout and connect_timeout,
# not that they do anything.
- do:
index:
index: source
type: foo
id: 1
body: { "text": "test" }
refresh: true
# Fetch the http host. We use the host of the master because we know there will always be a master.
- do:
cluster.state: {}
- set: { master_node: master }
- do:
nodes.info:
metric: [ http ]
- is_true: nodes.$master.http.publish_address
- set: {nodes.$master.http.publish_address: host}
- do:
reindex:
refresh: true
body:
source:
remote:
host: http://${host}
socket_timeout: 1m
connect_timeout: 1m
index: source
dest:
index: dest
- match: {created: 1}
- match: {updated: 0}
- match: {version_conflicts: 0}
- match: {batches: 1}
- match: {failures: []}
- match: {throttled_millis: 0}
- gte: { took: 0 }
- is_false: task
- is_false: deleted
- do:
search:
index: dest
body:
query:
match:
text: test
- match: {hits.total: 1}

View File

@ -272,10 +272,8 @@ public abstract class ESRestTestCase extends ESTestCase {
return "http"; return "http";
} }
private static RestClient buildClient(Settings settings) throws IOException { private RestClient buildClient(Settings settings) throws IOException {
RestClientBuilder builder = RestClient.builder(clusterHosts.toArray(new HttpHost[clusterHosts.size()])) RestClientBuilder builder = RestClient.builder(clusterHosts.toArray(new HttpHost[clusterHosts.size()]));
.setMaxRetryTimeoutMillis(30000)
.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setSocketTimeout(30000));
String keystorePath = settings.get(TRUSTSTORE_PATH); String keystorePath = settings.get(TRUSTSTORE_PATH);
if (keystorePath != null) { if (keystorePath != null) {
final String keystorePass = settings.get(TRUSTSTORE_PASSWORD); final String keystorePass = settings.get(TRUSTSTORE_PASSWORD);