From 124c1f13582d0e8be6a25ddc23f7367836bce296 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 14 Aug 2018 11:27:08 +0200 Subject: [PATCH] INGEST: Create Index Before Pipeline Execute (#32786) * INGEST: Create Index Before Pipeline Execute * Ensures that indices are created before the default pipeline setting is read to correcly handle the case of an index template containing a default pipeline (without the fix the first document does not get the pipeline applied as explained in #32758) * closes #32758 --- .../action/bulk/TransportBulkAction.java | 79 +++++++++++-------- .../bulk/TransportBulkActionIngestTests.java | 71 +++++++++++++++-- 2 files changed, 109 insertions(+), 41 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 6af9735f7e9..e3e94e82339 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -127,37 +127,6 @@ public class TransportBulkAction extends HandledTransportAction listener) { - boolean hasIndexRequestsWithPipelines = false; - ImmutableOpenMap indicesMetaData = clusterService.state().getMetaData().indices(); - for (DocWriteRequest actionRequest : bulkRequest.requests) { - if (actionRequest instanceof IndexRequest) { - IndexRequest indexRequest = (IndexRequest) actionRequest; - String pipeline = indexRequest.getPipeline(); - if (pipeline == null) { - IndexMetaData indexMetaData = indicesMetaData.get(indexRequest.index()); - if (indexMetaData == null) { - indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); - } else { - String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings()); - indexRequest.setPipeline(defaultPipeline); - if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) { - hasIndexRequestsWithPipelines = true; - } - } - } else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) { - hasIndexRequestsWithPipelines = true; - } - } - } - if (hasIndexRequestsWithPipelines) { - if (clusterService.localNode().isIngestNode()) { - processBulkIndexIngestRequest(task, bulkRequest, listener); - } else { - ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener); - } - return; - } - final long startTime = relativeTime(); final AtomicArray responses = new AtomicArray<>(bulkRequest.requests.size()); @@ -191,7 +160,7 @@ public class TransportBulkAction extends HandledTransportAction { + executeIngestAndBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> { inner.addSuppressed(e); listener.onFailure(inner); }), responses, indicesThatCannotBeCreated); @@ -225,7 +194,47 @@ public class TransportBulkAction extends HandledTransportAction listener, final AtomicArray responses, + Map indicesThatCannotBeCreated) { + boolean hasIndexRequestsWithPipelines = false; + ImmutableOpenMap indicesMetaData = clusterService.state().getMetaData().indices(); + for (DocWriteRequest actionRequest : bulkRequest.requests) { + if (actionRequest instanceof IndexRequest) { + IndexRequest indexRequest = (IndexRequest) actionRequest; + String pipeline = indexRequest.getPipeline(); + if (pipeline == null) { + IndexMetaData indexMetaData = indicesMetaData.get(indexRequest.index()); + if (indexMetaData == null) { + indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); + } else { + String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings()); + indexRequest.setPipeline(defaultPipeline); + if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) { + hasIndexRequestsWithPipelines = true; + } + } + } else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) { + hasIndexRequestsWithPipelines = true; + } + } + } + if (hasIndexRequestsWithPipelines) { + try { + if (clusterService.localNode().isIngestNode()) { + processBulkIndexIngestRequest(task, bulkRequest, listener); + } else { + ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener); + } + } catch (Exception e) { + listener.onFailure(e); + } + } else { + executeBulk(task, bulkRequest, startTimeNanos, listener, responses, indicesThatCannotBeCreated); } } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index 0bc813249f5..4c0dacc8a6e 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -22,20 +22,25 @@ package org.elasticsearch.action.bulk; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.index.IndexAction; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.AutoCreateIndex; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; @@ -77,6 +82,9 @@ public class TransportBulkActionIngestTests extends ESTestCase { */ private static final String WITH_DEFAULT_PIPELINE = "index_with_default_pipeline"; + private static final Settings SETTINGS = + Settings.builder().put(AutoCreateIndex.AUTO_CREATE_INDEX_SETTING.getKey(), true).build(); + /** Services needed by bulk action */ TransportService transportService; ClusterService clusterService; @@ -112,25 +120,42 @@ public class TransportBulkActionIngestTests extends ESTestCase { /** A subclass of the real bulk action to allow skipping real bulk indexing, and marking when it would have happened. */ class TestTransportBulkAction extends TransportBulkAction { boolean isExecuted = false; // set when the "real" bulk execution happens + + boolean needToCheck; // pluggable return value for `needToCheck` + + boolean indexCreated = true; // If set to false, will be set to true by call to createIndex + TestTransportBulkAction() { - super(Settings.EMPTY, null, transportService, clusterService, ingestService, - null, null, new ActionFilters(Collections.emptySet()), null, null); + super(SETTINGS, null, transportService, clusterService, ingestService, + null, null, new ActionFilters(Collections.emptySet()), null, + new AutoCreateIndex( + SETTINGS, new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + new IndexNameExpressionResolver(SETTINGS) + ) + ); } @Override protected boolean needToCheck() { - return false; + return needToCheck; } @Override void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos, final ActionListener listener, final AtomicArray responses, Map indicesThatCannotBeCreated) { + assertTrue(indexCreated); isExecuted = true; } + + @Override + void createIndex(String index, TimeValue timeout, ActionListener listener) { + indexCreated = true; + listener.onResponse(null); + } } class TestSingleItemBulkWriteAction extends TransportSingleItemBulkWriteAction { TestSingleItemBulkWriteAction(TestTransportBulkAction bulkAction) { - super(Settings.EMPTY, IndexAction.NAME, TransportBulkActionIngestTests.this.transportService, + super(SETTINGS, IndexAction.NAME, TransportBulkActionIngestTests.this.transportService, TransportBulkActionIngestTests.this.clusterService, null, null, null, new ActionFilters(Collections.emptySet()), null, IndexRequest::new, IndexRequest::new, ThreadPool.Names.WRITE, bulkAction, null); @@ -162,7 +187,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { when(nodes.getIngestNodes()).thenReturn(ingestNodes); ClusterState state = mock(ClusterState.class); when(state.getNodes()).thenReturn(nodes); - when(state.getMetaData()).thenReturn(MetaData.builder().indices(ImmutableOpenMap.builder() + MetaData metaData = MetaData.builder().indices(ImmutableOpenMap.builder() .putAll( Collections.singletonMap( WITH_DEFAULT_PIPELINE, @@ -170,7 +195,9 @@ public class TransportBulkActionIngestTests extends ESTestCase { settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline") .build() ).numberOfShards(1).numberOfReplicas(1).build())) - .build()).build()); + .build()).build(); + when(state.getMetaData()).thenReturn(metaData); + when(state.metaData()).thenReturn(metaData); when(clusterService.state()).thenReturn(state); doAnswer(invocation -> { ClusterChangedEvent event = mock(ClusterChangedEvent.class); @@ -408,4 +435,36 @@ public class TransportBulkActionIngestTests extends ESTestCase { verifyZeroInteractions(transportService); } + public void testCreateIndexBeforeRunPipeline() throws Exception { + Exception exception = new Exception("fake exception"); + IndexRequest indexRequest = new IndexRequest("missing_index", "type", "id"); + indexRequest.setPipeline("testpipeline"); + indexRequest.source(Collections.emptyMap()); + AtomicBoolean responseCalled = new AtomicBoolean(false); + AtomicBoolean failureCalled = new AtomicBoolean(false); + action.needToCheck = true; + action.indexCreated = false; + singleItemBulkWriteAction.execute(null, indexRequest, ActionListener.wrap( + response -> responseCalled.set(true), + e -> { + assertThat(e, sameInstance(exception)); + failureCalled.set(true); + })); + + // check failure works, and passes through to the listener + assertFalse(action.isExecuted); // haven't executed yet + assertFalse(responseCalled.get()); + assertFalse(failureCalled.get()); + verify(executionService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture()); + completionHandler.getValue().accept(exception); + assertTrue(failureCalled.get()); + + // now check success + indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing + completionHandler.getValue().accept(null); + assertTrue(action.isExecuted); + assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one + verifyZeroInteractions(transportService); + } + }