Fix throttled reindex_from_remote (#23953)

reindex_from_remote was using `TimeValue#toString` to generate the
scroll timeout which is bad because that generates fractional
time values that are useful for people but bad for Elasticsearch
which doesn't like to parse them. This switches it to using
`TimeValue#getStringRep` which spits out whole time values.

Closes to #23945

Makes #23828 even more desirable
This commit is contained in:
Nik Everett 2017-04-07 15:56:52 -04:00 committed by GitHub
parent 0c465b1931
commit de6837b7ac
3 changed files with 93 additions and 5 deletions

View File

@ -59,7 +59,7 @@ final class RemoteRequestBuilders {
static Map<String, String> initialSearchParams(SearchRequest searchRequest, Version remoteVersion) { static Map<String, String> initialSearchParams(SearchRequest searchRequest, Version remoteVersion) {
Map<String, String> params = new HashMap<>(); Map<String, String> params = new HashMap<>();
if (searchRequest.scroll() != null) { if (searchRequest.scroll() != null) {
params.put("scroll", searchRequest.scroll().keepAlive().toString()); params.put("scroll", searchRequest.scroll().keepAlive().getStringRep());
} }
params.put("size", Integer.toString(searchRequest.source().size())); params.put("size", Integer.toString(searchRequest.source().size()));
if (searchRequest.source().version() == null || searchRequest.source().version() == true) { if (searchRequest.source().version() == null || searchRequest.source().version() == true) {
@ -168,7 +168,7 @@ final class RemoteRequestBuilders {
} }
static Map<String, String> scrollParams(TimeValue keepAlive) { static Map<String, String> scrollParams(TimeValue keepAlive) {
return singletonMap("scroll", keepAlive.toString()); return singletonMap("scroll", keepAlive.getStringRep());
} }
static HttpEntity scrollEntity(String scroll, Version remoteVersion) { static HttpEntity scrollEntity(String scroll, Version remoteVersion) {

View File

@ -35,11 +35,11 @@ import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Map; import java.util.Map;
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.clearScrollEntity;
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.initialSearchEntity; import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.initialSearchEntity;
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.initialSearchParams; import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.initialSearchParams;
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.initialSearchPath; import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.initialSearchPath;
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.scrollEntity; import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.scrollEntity;
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.clearScrollEntity;
import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.scrollParams; import static org.elasticsearch.index.reindex.remote.RemoteRequestBuilders.scrollParams;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.either;
@ -150,7 +150,11 @@ public class RemoteRequestBuildersTests extends ESTestCase {
Map<String, String> params = initialSearchParams(searchRequest, remoteVersion); Map<String, String> params = initialSearchParams(searchRequest, remoteVersion);
assertThat(params, scroll == null ? not(hasKey("scroll")) : hasEntry("scroll", scroll.toString())); if (scroll == null) {
assertThat(params, not(hasKey("scroll")));
} else {
assertEquals(scroll, TimeValue.parseTimeValue(params.get("scroll"), "scroll"));
}
assertThat(params, hasEntry("size", Integer.toString(size))); assertThat(params, hasEntry("size", Integer.toString(size)));
assertThat(params, fetchVersion == null || fetchVersion == true ? hasEntry("version", null) : not(hasEntry("version", null))); assertThat(params, fetchVersion == null || fetchVersion == true ? hasEntry("version", null) : not(hasEntry("version", null)));
} }
@ -181,7 +185,7 @@ public class RemoteRequestBuildersTests extends ESTestCase {
public void testScrollParams() { public void testScrollParams() {
TimeValue scroll = TimeValue.parseTimeValue(randomPositiveTimeValue(), "test"); TimeValue scroll = TimeValue.parseTimeValue(randomPositiveTimeValue(), "test");
assertThat(scrollParams(scroll), hasEntry("scroll", scroll.toString())); assertEquals(scroll, TimeValue.parseTimeValue(scrollParams(scroll).get("scroll"), "scroll"));
} }
public void testScrollEntity() throws IOException { public void testScrollEntity() throws IOException {

View File

@ -459,3 +459,87 @@
id: 1 id: 1
- match: { _source.text: "test" } - match: { _source.text: "test" }
- is_false: _source.filtered - is_false: _source.filtered
---
"Reindex from remote with rethrottle":
# Throttling happens between each scroll batch so we need to control the size of the batch by using a single shard
# and a small batch size on the request
- do:
indices.create:
index: source
body:
settings:
number_of_shards: "1"
number_of_replicas: "0"
- do:
index:
index: source
type: foo
id: 1
body: { "text": "test" }
- do:
index:
index: source
type: foo
id: 2
body: { "text": "test" }
- do:
index:
index: source
type: foo
id: 3
body: { "text": "test" }
- do:
indices.refresh: {}
# Fetch the http host. We use the host of the master because we know there will always be a master.
- do:
cluster.state: {}
- set: { master_node: master }
- do:
nodes.info:
metric: [ http ]
- is_true: nodes.$master.http.publish_address
- set: {nodes.$master.http.publish_address: host}
- do:
reindex:
requests_per_second: .00000001 # About 9.5 years to complete the request
wait_for_completion: false
refresh: true
body:
source:
remote:
host: http://${host}
index: source
size: 1
dest:
index: dest
- match: {task: '/.+:\d+/'}
- set: {task: task}
- do:
reindex_rethrottle:
requests_per_second: -1
task_id: $task
- do:
tasks.get:
wait_for_completion: true
task_id: $task
- do:
search:
index: dest
body:
query:
match:
text: test
- match: {hits.total: 3}
# Make sure reindex closed all the scroll contexts
- do:
indices.stats:
index: source
metric: search
- match: {indices.source.total.search.open_contexts: 0}