Propogate version in reindex from remote search (#42958)
This is related to #31908. In order to use the external version in a reindex from remote request, the search request must be configured to request the version (as it is not returned by default). This commit modifies the search request to request the version. Additionally, it modifies our current reindex from remote tests to randomly use the external version_type.
This commit is contained in:
parent
ca5dbf93a5
commit
d18f511327
|
@ -77,7 +77,7 @@ forbiddenPatterns {
|
|||
exclude '**/*.p12'
|
||||
}
|
||||
|
||||
// Support for testing reindex-from-remote against old Elaticsearch versions
|
||||
// Support for testing reindex-from-remote against old Elasticsearch versions
|
||||
configurations {
|
||||
oldesFixture
|
||||
es2
|
||||
|
|
|
@ -84,14 +84,13 @@ final class RemoteRequestBuilders {
|
|||
request.addParameter("scroll", keepAlive.getStringRep());
|
||||
}
|
||||
request.addParameter("size", Integer.toString(searchRequest.source().size()));
|
||||
if (searchRequest.source().version() == null || searchRequest.source().version() == true) {
|
||||
/*
|
||||
* Passing `null` here just add the `version` request parameter
|
||||
* without any value. This way of requesting the version works
|
||||
* for all supported versions of Elasticsearch.
|
||||
*/
|
||||
request.addParameter("version", null);
|
||||
|
||||
if (searchRequest.source().version() == null || searchRequest.source().version() == false) {
|
||||
request.addParameter("version", Boolean.FALSE.toString());
|
||||
} else {
|
||||
request.addParameter("version", Boolean.TRUE.toString());
|
||||
}
|
||||
|
||||
if (searchRequest.source().sorts() != null) {
|
||||
boolean useScan = false;
|
||||
// Detect if we should use search_type=scan rather than a sort
|
||||
|
|
|
@ -73,18 +73,35 @@ public class ManyDocumentsIT extends ESRestTestCase {
|
|||
Map<?, ?> http = (Map<?, ?>) nodeInfo.get("http");
|
||||
String remote = "http://"+ http.get("publish_address");
|
||||
Request request = new Request("POST", "/_reindex");
|
||||
request.setJsonEntity(
|
||||
if (randomBoolean()) {
|
||||
request.setJsonEntity(
|
||||
"{\n" +
|
||||
" \"source\":{\n" +
|
||||
" \"index\":\"test\",\n" +
|
||||
" \"remote\":{\n" +
|
||||
" \"host\":\"" + remote + "\"\n" +
|
||||
" }\n" +
|
||||
" }\n," +
|
||||
" \"dest\":{\n" +
|
||||
" \"index\":\"des\"\n" +
|
||||
" }\n" +
|
||||
"}");
|
||||
" \"source\":{\n" +
|
||||
" \"index\":\"test\",\n" +
|
||||
" \"remote\":{\n" +
|
||||
" \"host\":\"" + remote + "\"\n" +
|
||||
" }\n" +
|
||||
" }\n," +
|
||||
" \"dest\":{\n" +
|
||||
" \"index\":\"des\"\n" +
|
||||
" }\n" +
|
||||
"}");
|
||||
} else {
|
||||
// Test with external version_type
|
||||
request.setJsonEntity(
|
||||
"{\n" +
|
||||
" \"source\":{\n" +
|
||||
" \"index\":\"test\",\n" +
|
||||
" \"remote\":{\n" +
|
||||
" \"host\":\"" + remote + "\"\n" +
|
||||
" }\n" +
|
||||
" }\n," +
|
||||
" \"dest\":{\n" +
|
||||
" \"index\":\"des\",\n" +
|
||||
" \"version_type\": \"external\"\n" +
|
||||
" }\n" +
|
||||
"}");
|
||||
}
|
||||
Map<String, Object> response = entityAsMap(client().performRequest(request));
|
||||
assertThat(response, hasEntry("total", count));
|
||||
assertThat(response, hasEntry("created", count));
|
||||
|
|
|
@ -56,19 +56,38 @@ public class ReindexFromOldRemoteIT extends ESRestTestCase {
|
|||
}
|
||||
|
||||
Request reindex = new Request("POST", "/_reindex");
|
||||
reindex.setJsonEntity(
|
||||
if (randomBoolean()) {
|
||||
// Reindex using the external version_type
|
||||
reindex.setJsonEntity(
|
||||
"{\n"
|
||||
+ " \"source\":{\n"
|
||||
+ " \"index\": \"test\",\n"
|
||||
+ " \"size\": 1,\n"
|
||||
+ " \"remote\": {\n"
|
||||
+ " \"host\": \"http://127.0.0.1:" + oldEsPort + "\"\n"
|
||||
+ " }\n"
|
||||
+ " },\n"
|
||||
+ " \"dest\": {\n"
|
||||
+ " \"index\": \"test\"\n"
|
||||
+ " }\n"
|
||||
+ "}");
|
||||
+ " \"source\":{\n"
|
||||
+ " \"index\": \"test\",\n"
|
||||
+ " \"size\": 1,\n"
|
||||
+ " \"remote\": {\n"
|
||||
+ " \"host\": \"http://127.0.0.1:" + oldEsPort + "\"\n"
|
||||
+ " }\n"
|
||||
+ " },\n"
|
||||
+ " \"dest\": {\n"
|
||||
+ " \"index\": \"test\",\n"
|
||||
+ " \"version_type\": \"external\"\n"
|
||||
+ " }\n"
|
||||
+ "}");
|
||||
} else {
|
||||
// Reindex using the default internal version_type
|
||||
reindex.setJsonEntity(
|
||||
"{\n"
|
||||
+ " \"source\":{\n"
|
||||
+ " \"index\": \"test\",\n"
|
||||
+ " \"size\": 1,\n"
|
||||
+ " \"remote\": {\n"
|
||||
+ " \"host\": \"http://127.0.0.1:" + oldEsPort + "\"\n"
|
||||
+ " }\n"
|
||||
+ " },\n"
|
||||
+ " \"dest\": {\n"
|
||||
+ " \"index\": \"test\"\n"
|
||||
+ " }\n"
|
||||
+ "}");
|
||||
}
|
||||
reindex.addParameter("refresh", "true");
|
||||
reindex.addParameter("pretty", "true");
|
||||
if (requestsPerSecond != null) {
|
||||
|
|
|
@ -141,7 +141,7 @@ public class RemoteRequestBuildersTests extends ESTestCase {
|
|||
// Test request without any fields
|
||||
Version remoteVersion = Version.fromId(between(2000099, Version.CURRENT.id));
|
||||
assertThat(initialSearch(searchRequest, query, remoteVersion).getParameters(),
|
||||
not(either(hasKey("stored_fields")).or(hasKey("fields"))));
|
||||
not(either(hasKey("stored_fields")).or(hasKey("fields"))));
|
||||
|
||||
// Test stored_fields for versions that support it
|
||||
searchRequest = new SearchRequest().source(new SearchSourceBuilder());
|
||||
|
@ -162,14 +162,14 @@ public class RemoteRequestBuildersTests extends ESTestCase {
|
|||
searchRequest.source().storedField("_source").storedField("_id");
|
||||
remoteVersion = Version.fromId(between(0, 2000099 - 1));
|
||||
assertThat(initialSearch(searchRequest, query, remoteVersion).getParameters(),
|
||||
hasEntry("fields", "_source,_id,_parent,_routing,_ttl"));
|
||||
hasEntry("fields", "_source,_id,_parent,_routing,_ttl"));
|
||||
|
||||
// But only versions before 1.0 force _source to be in the list
|
||||
searchRequest = new SearchRequest().source(new SearchSourceBuilder());
|
||||
searchRequest.source().storedField("_id");
|
||||
remoteVersion = Version.fromId(between(1000099, 2000099 - 1));
|
||||
assertThat(initialSearch(searchRequest, query, remoteVersion).getParameters(),
|
||||
hasEntry("fields", "_id,_parent,_routing,_ttl"));
|
||||
hasEntry("fields", "_id,_parent,_routing,_ttl"));
|
||||
}
|
||||
|
||||
public void testInitialSearchParamsMisc() {
|
||||
|
@ -189,7 +189,7 @@ public class RemoteRequestBuildersTests extends ESTestCase {
|
|||
fetchVersion = randomBoolean();
|
||||
searchRequest.source().version(fetchVersion);
|
||||
}
|
||||
|
||||
|
||||
Map<String, String> params = initialSearch(searchRequest, query, remoteVersion).getParameters();
|
||||
|
||||
if (scroll == null) {
|
||||
|
@ -198,7 +198,12 @@ public class RemoteRequestBuildersTests extends ESTestCase {
|
|||
assertScroll(remoteVersion, params, scroll);
|
||||
}
|
||||
assertThat(params, hasEntry("size", Integer.toString(size)));
|
||||
assertThat(params, fetchVersion == null || fetchVersion == true ? hasEntry("version", null) : not(hasEntry("version", null)));
|
||||
if (fetchVersion != null) {
|
||||
assertThat(params, fetchVersion ? hasEntry("version", Boolean.TRUE.toString()) :
|
||||
hasEntry("version", Boolean.FALSE.toString()));
|
||||
} else {
|
||||
assertThat(params, hasEntry("version", Boolean.FALSE.toString()));
|
||||
}
|
||||
}
|
||||
|
||||
private void assertScroll(Version remoteVersion, Map<String, String> params, TimeValue requested) {
|
||||
|
@ -225,22 +230,22 @@ public class RemoteRequestBuildersTests extends ESTestCase {
|
|||
assertEquals(ContentType.APPLICATION_JSON.toString(), entity.getContentType().getValue());
|
||||
if (remoteVersion.onOrAfter(Version.fromId(1000099))) {
|
||||
assertEquals("{\"query\":" + query + ",\"_source\":true}",
|
||||
Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)));
|
||||
Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)));
|
||||
} else {
|
||||
assertEquals("{\"query\":" + query + "}",
|
||||
Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)));
|
||||
Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)));
|
||||
}
|
||||
|
||||
// Source filtering is included if set up
|
||||
searchRequest.source().fetchSource(new String[] {"in1", "in2"}, new String[] {"out"});
|
||||
searchRequest.source().fetchSource(new String[]{"in1", "in2"}, new String[]{"out"});
|
||||
entity = initialSearch(searchRequest, new BytesArray(query), remoteVersion).getEntity();
|
||||
assertEquals(ContentType.APPLICATION_JSON.toString(), entity.getContentType().getValue());
|
||||
assertEquals("{\"query\":" + query + ",\"_source\":{\"includes\":[\"in1\",\"in2\"],\"excludes\":[\"out\"]}}",
|
||||
Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)));
|
||||
Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)));
|
||||
|
||||
// Invalid XContent fails
|
||||
RuntimeException e = expectThrows(RuntimeException.class,
|
||||
() -> initialSearch(searchRequest, new BytesArray("{}, \"trailing\": {}"), remoteVersion));
|
||||
() -> initialSearch(searchRequest, new BytesArray("{}, \"trailing\": {}"), remoteVersion));
|
||||
assertThat(e.getCause().getMessage(), containsString("Unexpected character (',' (code 44))"));
|
||||
e = expectThrows(RuntimeException.class, () -> initialSearch(searchRequest, new BytesArray("{"), remoteVersion));
|
||||
assertThat(e.getCause().getMessage(), containsString("Unexpected end-of-input"));
|
||||
|
|
Loading…
Reference in New Issue