Fix source filtering in reindex-from-remote (#22514)
Reindex-from-remote was accepting source filtering in the request but ignoring it and setting `_source=true` on the search URI. This fixes the filtering so it is piped through to the remote node and adds tests for that. Closes #22507
This commit is contained in:
parent
28273e0a52
commit
5ef78fd015
|
@ -71,6 +71,9 @@ public class ReindexRequest extends AbstractBulkIndexByScrollRequest<ReindexRequ
|
|||
if (getSearchRequest().indices() == null || getSearchRequest().indices().length == 0) {
|
||||
e = addValidationError("use _all if you really want to copy from all existing indexes", e);
|
||||
}
|
||||
if (getSearchRequest().source().fetchSource() != null && getSearchRequest().source().fetchSource().fetchSource() == false) {
|
||||
e = addValidationError("_source:false is not supported in this context", e);
|
||||
}
|
||||
/*
|
||||
* Note that we don't call index's validator - it won't work because
|
||||
* we'll be filling in portions of it as we receive the docs. But we can
|
||||
|
|
|
@ -102,27 +102,30 @@ final class RemoteRequestBuilders {
|
|||
String storedFieldsParamName = remoteVersion.before(Version.V_5_0_0_alpha4) ? "fields" : "stored_fields";
|
||||
params.put(storedFieldsParamName, fields.toString());
|
||||
}
|
||||
// We always want the _source document and this will force it to be returned.
|
||||
params.put("_source", "true");
|
||||
return params;
|
||||
}
|
||||
|
||||
static HttpEntity initialSearchEntity(BytesReference query) {
|
||||
static HttpEntity initialSearchEntity(SearchRequest searchRequest, BytesReference query) {
|
||||
// EMPTY is safe here because we're not calling namedObject
|
||||
try (XContentBuilder entity = JsonXContent.contentBuilder();
|
||||
XContentParser queryParser = XContentHelper.createParser(NamedXContentRegistry.EMPTY, query)) {
|
||||
entity.startObject();
|
||||
entity.field("query");
|
||||
/*
|
||||
* We're intentionally a bit paranoid here - copying the query as xcontent rather than writing a raw field. We don't want poorly
|
||||
* written queries to escape. Ever.
|
||||
*/
|
||||
entity.copyCurrentStructure(queryParser);
|
||||
XContentParser.Token shouldBeEof = queryParser.nextToken();
|
||||
if (shouldBeEof != null) {
|
||||
throw new ElasticsearchException(
|
||||
"query was more than a single object. This first token after the object is [" + shouldBeEof + "]");
|
||||
|
||||
entity.field("query"); {
|
||||
/* We're intentionally a bit paranoid here - copying the query as xcontent rather than writing a raw field. We don't want
|
||||
* poorly written queries to escape. Ever. */
|
||||
entity.copyCurrentStructure(queryParser);
|
||||
XContentParser.Token shouldBeEof = queryParser.nextToken();
|
||||
if (shouldBeEof != null) {
|
||||
throw new ElasticsearchException(
|
||||
"query was more than a single object. This first token after the object is [" + shouldBeEof + "]");
|
||||
}
|
||||
}
|
||||
|
||||
if (searchRequest.source().fetchSource() != null) {
|
||||
entity.field("_source", searchRequest.source().fetchSource());
|
||||
}
|
||||
|
||||
entity.endObject();
|
||||
BytesRef bytes = entity.bytes().toBytesRef();
|
||||
return new ByteArrayEntity(bytes.bytes, bytes.offset, bytes.length, ContentType.APPLICATION_JSON);
|
||||
|
|
|
@ -101,7 +101,7 @@ public class RemoteScrollableHitSource extends ScrollableHitSource {
|
|||
lookupRemoteVersion(version -> {
|
||||
remoteVersion = version;
|
||||
execute("POST", initialSearchPath(searchRequest), initialSearchParams(searchRequest, version),
|
||||
initialSearchEntity(query), RESPONSE_PARSER, r -> onStartResponse(onResponse, r));
|
||||
initialSearchEntity(searchRequest, query), RESPONSE_PARSER, r -> onStartResponse(onResponse, r));
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
@ -152,20 +152,29 @@ public class RemoteRequestBuildersTests extends ESTestCase {
|
|||
assertThat(params, scroll == null ? not(hasKey("scroll")) : hasEntry("scroll", scroll.toString()));
|
||||
assertThat(params, hasEntry("size", Integer.toString(size)));
|
||||
assertThat(params, fetchVersion == null || fetchVersion == true ? hasEntry("version", null) : not(hasEntry("version", null)));
|
||||
assertThat(params, hasEntry("_source", "true"));
|
||||
}
|
||||
|
||||
public void testInitialSearchEntity() throws IOException {
|
||||
SearchRequest searchRequest = new SearchRequest();
|
||||
searchRequest.source(new SearchSourceBuilder());
|
||||
String query = "{\"match_all\":{}}";
|
||||
HttpEntity entity = initialSearchEntity(new BytesArray(query));
|
||||
HttpEntity entity = initialSearchEntity(searchRequest, new BytesArray(query));
|
||||
assertEquals(ContentType.APPLICATION_JSON.toString(), entity.getContentType().getValue());
|
||||
assertEquals("{\"query\":" + query + "}",
|
||||
Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)));
|
||||
|
||||
// Source filtering is included if set up
|
||||
searchRequest.source().fetchSource(new String[] {"in1", "in2"}, new String[] {"out"});
|
||||
entity = initialSearchEntity(searchRequest, new BytesArray(query));
|
||||
assertEquals(ContentType.APPLICATION_JSON.toString(), entity.getContentType().getValue());
|
||||
assertEquals("{\"query\":" + query + ",\"_source\":{\"includes\":[\"in1\",\"in2\"],\"excludes\":[\"out\"]}}",
|
||||
Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)));
|
||||
|
||||
// Invalid XContent fails
|
||||
RuntimeException e = expectThrows(RuntimeException.class, () -> initialSearchEntity(new BytesArray("{}, \"trailing\": {}")));
|
||||
RuntimeException e = expectThrows(RuntimeException.class,
|
||||
() -> initialSearchEntity(searchRequest, new BytesArray("{}, \"trailing\": {}")));
|
||||
assertThat(e.getCause().getMessage(), containsString("Unexpected character (',' (code 44))"));
|
||||
e = expectThrows(RuntimeException.class, () -> initialSearchEntity(new BytesArray("{")));
|
||||
e = expectThrows(RuntimeException.class, () -> initialSearchEntity(searchRequest, new BytesArray("{")));
|
||||
assertThat(e.getCause().getMessage(), containsString("Unexpected end-of-input"));
|
||||
}
|
||||
|
||||
|
|
|
@ -416,3 +416,42 @@
|
|||
type: foo
|
||||
id: 1
|
||||
- match: { _source: {} }
|
||||
|
||||
---
|
||||
"Reindex with source filtering":
|
||||
- do:
|
||||
index:
|
||||
index: source
|
||||
type: foo
|
||||
id: 1
|
||||
body: { "text": "test", "filtered": "removed" }
|
||||
refresh: true
|
||||
|
||||
- do:
|
||||
reindex:
|
||||
refresh: true
|
||||
body:
|
||||
source:
|
||||
index: source
|
||||
_source:
|
||||
excludes:
|
||||
- filtered
|
||||
dest:
|
||||
index: dest
|
||||
- match: {created: 1}
|
||||
- match: {updated: 0}
|
||||
- match: {version_conflicts: 0}
|
||||
- match: {batches: 1}
|
||||
- match: {failures: []}
|
||||
- match: {throttled_millis: 0}
|
||||
- gte: { took: 0 }
|
||||
- is_false: task
|
||||
- is_false: deleted
|
||||
|
||||
- do:
|
||||
get:
|
||||
index: dest
|
||||
type: foo
|
||||
id: 1
|
||||
- match: { _source.text: "test" }
|
||||
- is_false: _source.filtered
|
||||
|
|
|
@ -295,3 +295,21 @@
|
|||
index: test
|
||||
dest:
|
||||
index: dest
|
||||
|
||||
---
|
||||
"_source:false is rejected":
|
||||
- do:
|
||||
index:
|
||||
index: source
|
||||
type: foo
|
||||
id: 1
|
||||
body: { "text": "test" }
|
||||
- do:
|
||||
catch: /_source:false is not supported in this context/
|
||||
reindex:
|
||||
body:
|
||||
source:
|
||||
index: source
|
||||
_source: false
|
||||
dest:
|
||||
index: dest
|
||||
|
|
|
@ -311,3 +311,53 @@
|
|||
index: source
|
||||
dest:
|
||||
index: dest
|
||||
|
||||
---
|
||||
"Reindex from remote with source filtering":
|
||||
- do:
|
||||
index:
|
||||
index: source
|
||||
type: foo
|
||||
id: 1
|
||||
body: { "text": "test", "filtered": "removed" }
|
||||
refresh: true
|
||||
|
||||
# Fetch the http host. We use the host of the master because we know there will always be a master.
|
||||
- do:
|
||||
cluster.state: {}
|
||||
- set: { master_node: master }
|
||||
- do:
|
||||
nodes.info:
|
||||
metric: [ http ]
|
||||
- is_true: nodes.$master.http.publish_address
|
||||
- set: {nodes.$master.http.publish_address: host}
|
||||
- do:
|
||||
reindex:
|
||||
refresh: true
|
||||
body:
|
||||
source:
|
||||
remote:
|
||||
host: http://${host}
|
||||
index: source
|
||||
_source:
|
||||
excludes:
|
||||
- filtered
|
||||
dest:
|
||||
index: dest
|
||||
- match: {created: 1}
|
||||
- match: {updated: 0}
|
||||
- match: {version_conflicts: 0}
|
||||
- match: {batches: 1}
|
||||
- match: {failures: []}
|
||||
- match: {throttled_millis: 0}
|
||||
- gte: { took: 0 }
|
||||
- is_false: task
|
||||
- is_false: deleted
|
||||
|
||||
- do:
|
||||
get:
|
||||
index: dest
|
||||
type: foo
|
||||
id: 1
|
||||
- match: { _source.text: "test" }
|
||||
- is_false: _source.filtered
|
||||
|
|
Loading…
Reference in New Issue