Add data stream support to the reindex api. (#57970)

Backport of #57870 to 7.x branch.

This change now also copies the op_type from the reindex request's destination index request to the actual index request being used in the bulk request.

For ensuring no document exists, the op_type create doesn't need to be copied, since Versions.MATCH_DELETED will copied from the 'mainRequest.getDestination().version()'.
The `version()` method on IndexRequest only returns Versions.MATCH_DELETED if op_type=create and no specific version has been specified.

However in order to be able to index into a data stream, the op_type must be create. So in order to support that the op_type must be copied from the reindex request's destination index request to the actual index request being used in the bulk request.

Relates to #53100 and #57788
This commit is contained in:
Martijn van Groningen 2020-06-12 09:54:37 +02:00 committed by GitHub
parent 5226fef321
commit c8031c6f99
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 123 additions and 4 deletions

View File

@ -256,7 +256,7 @@ POST /my-data-stream/_rollover <2>
-------------------------------------------------- --------------------------------------------------
// TEST[continued] // TEST[continued]
// TEST[setup:huge_twitter] // TEST[setup:huge_twitter]
// TEST[s/# Add > 1000 documents to my-data-stream/POST _reindex?refresh\n{"source":{"index":"twitter"},"dest":{"index":".ds-my-data-stream-000001"}}/] // TEST[s/# Add > 1000 documents to my-data-stream/POST _reindex?refresh\n{"source":{"index":"twitter"},"dest":{"index":"my-data-stream","op_type":"create"}}/]
<1> Creates a data stream called `my-data-stream` with one initial backing index <1> Creates a data stream called `my-data-stream` with one initial backing index
named `my-data-stream-000001`. named `my-data-stream-000001`.
<2> This request creates a new backing index, `my-data-stream-000002`, and adds <2> This request creates a new backing index, `my-data-stream-000002`, and adds

View File

@ -120,9 +120,9 @@ class ReindexValidator {
* it. This is the same sort of dance that TransportIndexRequest * it. This is the same sort of dance that TransportIndexRequest
* uses to decide to autocreate the index. * uses to decide to autocreate the index.
*/ */
target = indexNameExpressionResolver.concreteWriteIndex(clusterState, destination, false).getName(); target = indexNameExpressionResolver.concreteWriteIndex(clusterState, destination, true).getName();
} }
for (String sourceIndex : indexNameExpressionResolver.concreteIndexNames(clusterState, source)) { for (String sourceIndex : indexNameExpressionResolver.concreteIndexNames(clusterState, source, true)) {
if (sourceIndex.equals(target)) { if (sourceIndex.equals(target)) {
ActionRequestValidationException e = new ActionRequestValidationException(); ActionRequestValidationException e = new ActionRequestValidationException();
e.addValidationError("reindex cannot write into an index its reading from [" + target + ']'); e.addValidationError("reindex cannot write into an index its reading from [" + target + ']');

View File

@ -30,6 +30,7 @@ import org.apache.http.message.BasicHeader;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
@ -272,7 +273,9 @@ public class Reindexer {
*/ */
index.routing(mainRequest.getDestination().routing()); index.routing(mainRequest.getDestination().routing());
index.setPipeline(mainRequest.getDestination().getPipeline()); index.setPipeline(mainRequest.getDestination().getPipeline());
// OpType is synthesized from version so it is handled when we copy version above. if (mainRequest.getDestination().opType() == DocWriteRequest.OpType.CREATE) {
index.opType(mainRequest.getDestination().opType());
}
return wrap(index); return wrap(index);
} }

View File

@ -0,0 +1,116 @@
---
setup:
- skip:
features: allowed_warnings
- do:
allowed_warnings:
- "index template [generic_logs_template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [generic_logs_template] will take precedence during new index creation"
indices.put_index_template:
name: generic_logs_template
body:
index_patterns: logs-*
data_stream:
timestamp_field: timestamp
template:
mappings:
properties:
timestamp:
type: date
---
teardown:
- do:
indices.delete_data_stream:
name: '*'
---
"Reindex from data stream into another data stream":
- skip:
version: " - 7.99.99"
reason: "change to 7.8.99 after backport"
features: allowed_warnings
- do:
index:
index: logs-foobar
refresh: true
body: { foo: bar }
- do:
reindex:
refresh: true
body:
source:
index: logs-foobar
dest:
index: logs-barbaz
op_type: create
- do:
search:
index: logs-barbaz
body: { query: { match_all: {} } }
- length: { hits.hits: 1 }
- match: { hits.hits.0._index: .ds-logs-barbaz-000001 }
- match: { hits.hits.0._source.foo: 'bar' }
---
"Reindex from index into data stream":
- skip:
version: " - 7.99.99"
reason: "change to 7.8.99 after backport"
features: allowed_warnings
- do:
index:
index: old-logs-index
refresh: true
body: { foo: bar }
- do:
reindex:
refresh: true
body:
source:
index: old-logs-index
dest:
index: logs-foobar
op_type: create
- do:
search:
index: logs-foobar
body: { query: { match_all: {} } }
- length: { hits.hits: 1 }
- match: { hits.hits.0._index: .ds-logs-foobar-000001 }
- match: { hits.hits.0._source.foo: 'bar' }
---
"Reindex from data source into an index":
- skip:
version: " - 7.99.99"
reason: "change to 7.8.99 after backport"
features: allowed_warnings
- do:
index:
index: logs-foobar
refresh: true
body: { foo: bar }
- do:
reindex:
refresh: true
body:
source:
index: logs-foobar
dest:
index: my-index
- do:
search:
index: my-index
body: { query: { match_all: {} } }
- length: { hits.hits: 1 }
- match: { hits.hits.0._index: my-index }
- match: { hits.hits.0._source.foo: 'bar' }