diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/200_default_pipeline.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/200_default_pipeline.yml index c20d7698131..4695991f3c3 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/200_default_pipeline.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/200_default_pipeline.yml @@ -31,6 +31,8 @@ teardown: settings: index: default_pipeline: "my_pipeline" + aliases: + test_alias: {} - do: index: @@ -49,9 +51,24 @@ teardown: - do: index: + index: test_alias + type: test + id: 2 + body: {bytes_source_field: "1kb"} + + - do: + get: index: test type: test id: 2 + - match: { _source.bytes_source_field: "1kb" } + - match: { _source.bytes_target_field: 1024 } + + - do: + index: + index: test + type: test + id: 3 pipeline: "_none" body: {bytes_source_field: "1kb"} @@ -59,15 +76,15 @@ teardown: get: index: test type: test - id: 2 + id: 3 - match: { _source.bytes_source_field: "1kb" } - is_false: _source.bytes_target_field - + - do: catch: bad_request index: index: test type: test - id: 3 + id: 4 pipeline: "" body: {bytes_source_field: "1kb"} diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 66697cb907d..fa294a1bb2b 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -44,6 +44,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.AliasOrIndex; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MappingMetaData; @@ -203,13 +204,22 @@ public class TransportBulkAction extends HandledTransportAction listener, final AtomicArray responses, Map indicesThatCannotBeCreated) { boolean hasIndexRequestsWithPipelines = false; - ImmutableOpenMap indicesMetaData = clusterService.state().getMetaData().indices(); + final MetaData metaData = clusterService.state().getMetaData(); + ImmutableOpenMap indicesMetaData = metaData.indices(); for (DocWriteRequest actionRequest : bulkRequest.requests) { if (actionRequest instanceof IndexRequest) { IndexRequest indexRequest = (IndexRequest) actionRequest; String pipeline = indexRequest.getPipeline(); if (pipeline == null) { IndexMetaData indexMetaData = indicesMetaData.get(indexRequest.index()); + if (indexMetaData == null) { + //check the alias + AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(indexRequest.index()); + if (indexOrAlias != null && indexOrAlias.isAlias()) { + AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias; + indexMetaData = alias.getWriteIndex(); + } + } if (indexMetaData == null) { indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); } else { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index c93b0345ece..f25f8844153 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.action.support.AutoCreateIndex; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; +import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; @@ -80,6 +81,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { * Index for which mock settings contain a default pipeline. */ private static final String WITH_DEFAULT_PIPELINE = "index_with_default_pipeline"; + private static final String WITH_DEFAULT_PIPELINE_ALIAS = "alias_for_index_with_default_pipeline"; private static final Settings SETTINGS = Settings.builder().put(AutoCreateIndex.AUTO_CREATE_INDEX_SETTING.getKey(), true).build(); @@ -190,7 +192,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { IndexMetaData.builder(WITH_DEFAULT_PIPELINE).settings( settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline") .build() - ).numberOfShards(1).numberOfReplicas(1).build())) + ).putAlias(AliasMetaData.builder(WITH_DEFAULT_PIPELINE_ALIAS).build()).numberOfShards(1).numberOfReplicas(1).build())) .build()).build(); when(state.getMetaData()).thenReturn(metaData); when(state.metaData()).thenReturn(metaData); @@ -399,34 +401,11 @@ public class TransportBulkActionIngestTests extends ESTestCase { } public void testUseDefaultPipeline() throws Exception { - Exception exception = new Exception("fake exception"); - IndexRequest indexRequest = new IndexRequest(WITH_DEFAULT_PIPELINE, "type", "id"); - indexRequest.source(Collections.emptyMap()); - AtomicBoolean responseCalled = new AtomicBoolean(false); - AtomicBoolean failureCalled = new AtomicBoolean(false); - singleItemBulkWriteAction.execute(null, indexRequest, ActionListener.wrap( - response -> { - responseCalled.set(true); - }, - e -> { - assertThat(e, sameInstance(exception)); - failureCalled.set(true); - })); + validateDefaultPipeline(new IndexRequest(WITH_DEFAULT_PIPELINE, "type", "id")); + } - // check failure works, and passes through to the listener - assertFalse(action.isExecuted); // haven't executed yet - assertFalse(responseCalled.get()); - assertFalse(failureCalled.get()); - verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); - completionHandler.getValue().accept(exception); - assertTrue(failureCalled.get()); - - // now check success - indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing - completionHandler.getValue().accept(null); - assertTrue(action.isExecuted); - assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one - verifyZeroInteractions(transportService); + public void testUseDefaultPipelineWithAlias() throws Exception { + validateDefaultPipeline(new IndexRequest(WITH_DEFAULT_PIPELINE_ALIAS, "type", "id")); } public void testCreateIndexBeforeRunPipeline() throws Exception { @@ -461,4 +440,33 @@ public class TransportBulkActionIngestTests extends ESTestCase { verifyZeroInteractions(transportService); } + private void validateDefaultPipeline(IndexRequest indexRequest) { + Exception exception = new Exception("fake exception"); + indexRequest.source(Collections.emptyMap()); + AtomicBoolean responseCalled = new AtomicBoolean(false); + AtomicBoolean failureCalled = new AtomicBoolean(false); + singleItemBulkWriteAction.execute(null, indexRequest, ActionListener.wrap( + response -> { + responseCalled.set(true); + }, + e -> { + assertThat(e, sameInstance(exception)); + failureCalled.set(true); + })); + + // check failure works, and passes through to the listener + assertFalse(action.isExecuted); // haven't executed yet + assertFalse(responseCalled.get()); + assertFalse(failureCalled.get()); + verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); + completionHandler.getValue().accept(exception); + assertTrue(failureCalled.get()); + + // now check success + indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing + completionHandler.getValue().accept(null); + assertTrue(action.isExecuted); + assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one + verifyZeroInteractions(transportService); + } }