Read the default pipeline for bulk upsert through an alias (#41963) (#42802)

This commit allows bulk upserts to correctly read the default pipeline
for the concrete index that belongs to an alias.

Bulk upserts are modeled differently from normal index requests such that
the index request is a request inside of the update request. The update
request (outer) contains the index or alias name is not part of the (inner)
index request. This commit adds a secondary check against the update request
(outer) if the index request (inner) does not find an alias.
This commit is contained in:
Jake Landis 2019-07-02 20:44:33 -05:00 committed by GitHub
parent a4e518b640
commit 2dc056b0a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 52 additions and 7 deletions

View File

@ -119,6 +119,12 @@ teardown:
{"doc":{"bytes_source_field":"2kb"}, "doc_as_upsert":true}
{"update":{"_id":"8","_index":"test"}}
{"script": "ctx._source.ran_script = true","upsert":{"bytes_source_field":"3kb"}, "scripted_upsert" : true}
{"update":{"_id":"6_alias","_index":"test_alias"}}
{"script":"ctx._source.ran_script = true","upsert":{"bytes_source_field":"1kb"}}
{"update":{"_id":"7_alias","_index":"test_alias"}}
{"doc":{"bytes_source_field":"2kb"}, "doc_as_upsert":true}
{"update":{"_id":"8_alias","_index":"test_alias"}}
{"script": "ctx._source.ran_script = true","upsert":{"bytes_source_field":"3kb"}, "scripted_upsert" : true}
- do:
mget:
@ -127,6 +133,9 @@ teardown:
- { _index: "test", _id: "6" }
- { _index: "test", _id: "7" }
- { _index: "test", _id: "8" }
- { _index: "test", _id: "6_alias" }
- { _index: "test", _id: "7_alias" }
- { _index: "test", _id: "8_alias" }
- match: { docs.0._index: "test" }
- match: { docs.0._id: "6" }
- match: { docs.0._source.bytes_source_field: "1kb" }
@ -141,6 +150,20 @@ teardown:
- match: { docs.2._source.bytes_source_field: "3kb" }
- match: { docs.2._source.bytes_target_field: 3072 }
- match: { docs.2._source.ran_script: true }
- match: { docs.3._index: "test" }
- match: { docs.3._id: "6_alias" }
- match: { docs.3._source.bytes_source_field: "1kb" }
- match: { docs.3._source.bytes_target_field: 1024 }
- is_false: docs.3._source.ran_script
- match: { docs.4._index: "test" }
- match: { docs.4._id: "7_alias" }
- match: { docs.4._source.bytes_source_field: "2kb" }
- match: { docs.4._source.bytes_target_field: 2048 }
- match: { docs.5._index: "test" }
- match: { docs.5._id: "8_alias" }
- match: { docs.5._source.bytes_source_field: "3kb" }
- match: { docs.5._source.bytes_target_field: 3072 }
- match: { docs.5._source.ran_script: true }
# explicit no default pipeline
- do:

View File

@ -165,14 +165,22 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
if (pipeline == null) {
// start to look for default pipeline via settings found in the index meta data
IndexMetaData indexMetaData = indicesMetaData.get(actionRequest.index());
// check the alias for the index request (this is how normal index requests are modeled)
if (indexMetaData == null && indexRequest.index() != null) {
// if the write request if through an alias use the write index's meta data
AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(indexRequest.index());
if (indexOrAlias != null && indexOrAlias.isAlias()) {
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias;
indexMetaData = alias.getWriteIndex();
}
}
// check the alias for the action request (this is how upserts are modeled)
if (indexMetaData == null && actionRequest.index() != null) {
AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(actionRequest.index());
if (indexOrAlias != null && indexOrAlias.isAlias()) {
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias;
indexMetaData = alias.getWriteIndex();
}
}
if (indexMetaData != null) {
// Find the default pipeline if one is defined from and existing index.
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings());

View File

@ -40,6 +40,7 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
@ -412,16 +413,29 @@ public class TransportBulkActionIngestTests extends ESTestCase {
}
public void testUseDefaultPipelineWithBulkUpsert() throws Exception {
String indexRequestName = randomFrom(new String[]{null, WITH_DEFAULT_PIPELINE, WITH_DEFAULT_PIPELINE_ALIAS});
validatePipelineWithBulkUpsert(indexRequestName, WITH_DEFAULT_PIPELINE);
}
public void testUseDefaultPipelineWithBulkUpsertWithAlias() throws Exception {
String indexRequestName = randomFrom(new String[]{null, WITH_DEFAULT_PIPELINE, WITH_DEFAULT_PIPELINE_ALIAS});
validatePipelineWithBulkUpsert(indexRequestName, WITH_DEFAULT_PIPELINE_ALIAS);
}
private void validatePipelineWithBulkUpsert(@Nullable String indexRequestIndexName, String updateRequestIndexName) throws Exception {
Exception exception = new Exception("fake exception");
BulkRequest bulkRequest = new BulkRequest();
IndexRequest indexRequest1 = new IndexRequest(WITH_DEFAULT_PIPELINE, "type", "id1").source(Collections.emptyMap());
IndexRequest indexRequest2 = new IndexRequest(WITH_DEFAULT_PIPELINE, "type", "id2").source(Collections.emptyMap());
IndexRequest indexRequest3 = new IndexRequest(WITH_DEFAULT_PIPELINE, "type", "id3").source(Collections.emptyMap());
UpdateRequest upsertRequest = new UpdateRequest(WITH_DEFAULT_PIPELINE, "type", "id1").upsert(indexRequest1).script(mockScript("1"));
UpdateRequest docAsUpsertRequest = new UpdateRequest(WITH_DEFAULT_PIPELINE, "type", "id2").doc(indexRequest2).docAsUpsert(true);
IndexRequest indexRequest1 = new IndexRequest(indexRequestIndexName, "type", "id1").source(Collections.emptyMap());
IndexRequest indexRequest2 = new IndexRequest(indexRequestIndexName, "type", "id2").source(Collections.emptyMap());
IndexRequest indexRequest3 = new IndexRequest(indexRequestIndexName, "type", "id3").source(Collections.emptyMap());
UpdateRequest upsertRequest = new UpdateRequest(updateRequestIndexName, "type", "id1")
.upsert(indexRequest1).script(mockScript("1"));
UpdateRequest docAsUpsertRequest = new UpdateRequest(updateRequestIndexName, "type", "id2")
.doc(indexRequest2).docAsUpsert(true);
// this test only covers the mechanics that scripted bulk upserts will execute a default pipeline. However, in practice scripted
// bulk upserts with a default pipeline are a bit surprising since the script executes AFTER the pipeline.
UpdateRequest scriptedUpsert = new UpdateRequest(WITH_DEFAULT_PIPELINE, "type", "id2").upsert(indexRequest3).script(mockScript("1"))
UpdateRequest scriptedUpsert = new UpdateRequest(updateRequestIndexName, "type", "id2")
.upsert(indexRequest3).script(mockScript("1"))
.scriptedUpsert(true);
bulkRequest.add(upsertRequest).add(docAsUpsertRequest).add(scriptedUpsert);