From 447f307ebb41cf8acdb387005c56f9ce540e120b Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Fri, 21 Apr 2017 18:56:04 -0400 Subject: [PATCH] Fix _bulk response when it can't create an index (#24048) Before #22488 when an index couldn't be created during a `_bulk` operation we'd do all the *other* actions and return the index creation error on each failing action. In #22488 we accidentally changed it so that we now reject the entire bulk request if a single action cannot create an index that it must create to run. This gets reverts to the old behavior while still keeping the nicer error messages. Instead of failing the entire request we now only fail the portions of the request that can't work because the index doesn't exist. Closes #24028 --- .../action/bulk/TransportBulkAction.java | 118 ++++++++++------- ...ActionIndicesThatCannotBeCreatedTests.java | 125 ++++++++++++++++++ .../bulk/TransportBulkActionIngestTests.java | 10 +- .../bulk/TransportBulkActionTookTests.java | 12 +- .../mapper/DynamicMappingDisabledTests.java | 15 ++- settings.gradle | 3 +- 6 files changed, 220 insertions(+), 63 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index a9b82c514a6..16c22524e2d 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -52,19 +52,19 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndexClosedException; -import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.ingest.IngestService; +import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -75,6 +75,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.LongSupplier; import java.util.stream.Collectors; +import static java.util.Collections.emptyMap; + /** * Groups bulk request items by shard, optionally creating non-existent indices and * delegates to {@link TransportShardBulkAction} for shard-level bulk execution @@ -139,30 +141,46 @@ public class TransportBulkAction extends HandledTransportAction responses = new AtomicArray<>(bulkRequest.requests.size()); if (needToCheck()) { - // Keep track of all unique indices and all unique types per index for the create index requests: - final Set autoCreateIndices = bulkRequest.requests.stream() + // Attempt to create all the indices that we're going to need during the bulk before we start. + // Step 1: collect all the indices in the request + final Set indices = bulkRequest.requests.stream() .map(DocWriteRequest::index) .collect(Collectors.toSet()); - final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size()); + /* Step 2: filter that to indices that don't exist and we can create. At the same time build a map of indices we can't create + * that we'll use when we try to run the requests. */ + final Map indicesThatCannotBeCreated = new HashMap<>(); + Set autoCreateIndices = new HashSet<>(); ClusterState state = clusterService.state(); - for (String index : autoCreateIndices) { - if (shouldAutoCreate(index, state)) { - CreateIndexRequest createIndexRequest = new CreateIndexRequest(); - createIndexRequest.index(index); - createIndexRequest.cause("auto(bulk api)"); - createIndexRequest.masterNodeTimeout(bulkRequest.timeout()); - createIndexAction.execute(createIndexRequest, new ActionListener() { + for (String index : indices) { + boolean shouldAutoCreate; + try { + shouldAutoCreate = shouldAutoCreate(index, state); + } catch (IndexNotFoundException e) { + shouldAutoCreate = false; + indicesThatCannotBeCreated.put(index, e); + } + if (shouldAutoCreate) { + autoCreateIndices.add(index); + } + } + // Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back. + if (autoCreateIndices.isEmpty()) { + executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated); + } else { + final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size()); + for (String index : autoCreateIndices) { + createIndex(index, bulkRequest.timeout(), new ActionListener() { @Override public void onResponse(CreateIndexResponse result) { if (counter.decrementAndGet() == 0) { - executeBulk(task, bulkRequest, startTime, listener, responses); + executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated); } } @Override public void onFailure(Exception e) { if (!(ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException)) { - // fail all requests involving this index, if create didnt work + // fail all requests involving this index, if create didn't work for (int i = 0; i < bulkRequest.requests.size(); i++) { DocWriteRequest request = bulkRequest.requests.get(i); if (request != null && setResponseFailureIfIndexMatches(responses, i, request, index, e)) { @@ -174,18 +192,14 @@ public class TransportBulkAction extends HandledTransportAction { inner.addSuppressed(e); listener.onFailure(inner); - }), responses); + }), responses, indicesThatCannotBeCreated); } } }); - } else { - if (counter.decrementAndGet() == 0) { - executeBulk(task, bulkRequest, startTime, listener, responses); - } } } } else { - executeBulk(task, bulkRequest, startTime, listener, responses); + executeBulk(task, bulkRequest, startTime, listener, responses, emptyMap()); } } @@ -197,6 +211,14 @@ public class TransportBulkAction extends HandledTransportAction listener) { + CreateIndexRequest createIndexRequest = new CreateIndexRequest(); + createIndexRequest.index(index); + createIndexRequest.cause("auto(bulk api)"); + createIndexRequest.masterNodeTimeout(timeout); + createIndexAction.execute(createIndexRequest, listener); + } + private boolean setResponseFailureIfIndexMatches(AtomicArray responses, int idx, DocWriteRequest request, String index, Exception e) { if (index.equals(request.index())) { responses.set(idx, new BulkItemResponse(idx, request.opType(), new BulkItemResponse.Failure(request.index(), request.type(), request.id(), e))); @@ -220,14 +242,16 @@ public class TransportBulkAction extends HandledTransportAction responses; private final long startTimeNanos; private final ClusterStateObserver observer; + private final Map indicesThatCannotBeCreated; - BulkOperation(Task task, BulkRequest bulkRequest, ActionListener listener, - AtomicArray responses, long startTimeNanos) { + BulkOperation(Task task, BulkRequest bulkRequest, ActionListener listener, AtomicArray responses, + long startTimeNanos, Map indicesThatCannotBeCreated) { this.task = task; this.bulkRequest = bulkRequest; this.listener = listener; this.responses = responses; this.startTimeNanos = startTimeNanos; + this.indicesThatCannotBeCreated = indicesThatCannotBeCreated; this.observer = new ClusterStateObserver(clusterService, bulkRequest.timeout(), logger, threadPool.getThreadContext()); } @@ -250,7 +274,7 @@ public class TransportBulkAction extends HandledTransportAction listener, final AtomicArray responses ) { - new BulkOperation(task, bulkRequest, listener, responses, startTimeNanos).run(); - } - - private boolean addFailureIfIndexIsUnavailable(DocWriteRequest request, BulkRequest bulkRequest, AtomicArray responses, int idx, - final ConcreteIndices concreteIndices, - final MetaData metaData) { - Index concreteIndex = concreteIndices.getConcreteIndex(request.index()); - Exception unavailableException = null; - if (concreteIndex == null) { - try { - concreteIndex = concreteIndices.resolveIfAbsent(request); - } catch (IndexClosedException | IndexNotFoundException ex) { - // Fix for issue where bulk request references an index that - // cannot be auto-created see issue #8125 - unavailableException = ex; + private boolean addFailureIfIndexIsUnavailable(DocWriteRequest request, int idx, final ConcreteIndices concreteIndices, + final MetaData metaData) { + IndexNotFoundException cannotCreate = indicesThatCannotBeCreated.get(request.index()); + if (cannotCreate != null) { + addFailure(request, idx, cannotCreate); + return true; + } + Index concreteIndex = concreteIndices.getConcreteIndex(request.index()); + if (concreteIndex == null) { + try { + concreteIndex = concreteIndices.resolveIfAbsent(request); + } catch (IndexClosedException | IndexNotFoundException ex) { + addFailure(request, idx, ex); + return true; + } } - } - if (unavailableException == null) { IndexMetaData indexMetaData = metaData.getIndexSafe(concreteIndex); if (indexMetaData.getState() == IndexMetaData.State.CLOSE) { - unavailableException = new IndexClosedException(concreteIndex); + addFailure(request, idx, new IndexClosedException(concreteIndex)); + return true; } + return false; } - if (unavailableException != null) { + + private void addFailure(DocWriteRequest request, int idx, Exception unavailableException) { BulkItemResponse.Failure failure = new BulkItemResponse.Failure(request.index(), request.type(), request.id(), unavailableException); BulkItemResponse bulkItemResponse = new BulkItemResponse(idx, request.opType(), failure); responses.set(idx, bulkItemResponse); // make sure the request gets never processed again bulkRequest.requests.set(idx, null); - return true; } - return false; + } + + void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos, final ActionListener listener, + final AtomicArray responses, Map indicesThatCannotBeCreated) { + new BulkOperation(task, bulkRequest, listener, responses, startTimeNanos, indicesThatCannotBeCreated).run(); } private static class ConcreteIndices { diff --git a/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java b/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java new file mode 100644 index 00000000000..a02939ad206 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java @@ -0,0 +1,125 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.bulk; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.TransportService; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; + +import static java.util.Collections.emptySet; +import static java.util.Collections.singleton; +import static org.mockito.Mockito.mock; + +public class TransportBulkActionIndicesThatCannotBeCreatedTests extends ESTestCase { + public void testNonExceptional() { + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(new IndexRequest(randomAlphaOfLength(5))); + bulkRequest.add(new IndexRequest(randomAlphaOfLength(5))); + bulkRequest.add(new DeleteRequest(randomAlphaOfLength(5))); + bulkRequest.add(new UpdateRequest(randomAlphaOfLength(5), randomAlphaOfLength(5), randomAlphaOfLength(5))); + // Test emulating auto_create_index=false + indicesThatCannotBeCreatedTestCase(emptySet(), bulkRequest, null); + // Test emulating auto_create_index=true + indicesThatCannotBeCreatedTestCase(emptySet(), bulkRequest, index -> true); + // Test emulating all indices already created + indicesThatCannotBeCreatedTestCase(emptySet(), bulkRequest, index -> false); + // Test emulating auto_create_index=true with some indices already created. + indicesThatCannotBeCreatedTestCase(emptySet(), bulkRequest, index -> randomBoolean()); + } + + public void testAllFail() { + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(new IndexRequest("no")); + bulkRequest.add(new IndexRequest("can't")); + bulkRequest.add(new DeleteRequest("do")); + bulkRequest.add(new UpdateRequest("nothin", randomAlphaOfLength(5), randomAlphaOfLength(5))); + indicesThatCannotBeCreatedTestCase(new HashSet<>(Arrays.asList("no", "can't", "do", "nothin")), bulkRequest, index -> { + throw new IndexNotFoundException("Can't make it because I say so"); + }); + } + + public void testSomeFail() { + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(new IndexRequest("ok")); + bulkRequest.add(new IndexRequest("bad")); + // Emulate auto_create_index=-bad,+* + indicesThatCannotBeCreatedTestCase(singleton("bad"), bulkRequest, index -> { + if (index.equals("bad")) { + throw new IndexNotFoundException("Can't make it because I say so"); + } + return true; + }); + // Emulate auto_create_index=false but the "ok" index already exists + indicesThatCannotBeCreatedTestCase(singleton("bad"), bulkRequest, index -> { + if (index.equals("bad")) { + throw new IndexNotFoundException("Can't make it because I say so"); + } + return false; + }); + } + + + private void indicesThatCannotBeCreatedTestCase(Set expected, + BulkRequest bulkRequest, Function shouldAutoCreate) { + TransportBulkAction action = new TransportBulkAction(Settings.EMPTY, null, mock(TransportService.class), mock(ClusterService.class), + null, null, null, mock(ActionFilters.class), null, null) { + @Override + void executeBulk(Task task, BulkRequest bulkRequest, long startTimeNanos, ActionListener listener, + AtomicArray responses, Map indicesThatCannotBeCreated) { + assertEquals(expected, indicesThatCannotBeCreated.keySet()); + } + + @Override + boolean needToCheck() { + return null != shouldAutoCreate; // Use "null" to mean "no indices can be created so don't bother checking" + } + + @Override + boolean shouldAutoCreate(String index, ClusterState state) { + return shouldAutoCreate.apply(index); + } + + @Override + void createIndex(String index, TimeValue timeout, ActionListener listener) { + // If we try to create an index just immediately assume it worked + listener.onResponse(new CreateIndexResponse(true, true) {}); + } + }; + action.doExecute(null, bulkRequest, null); + } +} diff --git a/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index b2483bd9306..e6e18fb567d 100644 --- a/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -28,15 +28,13 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; -import org.elasticsearch.cluster.action.shard.ShardStateAction; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AtomicArray; -import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.PipelineExecutionService; import org.elasticsearch.tasks.Task; @@ -51,10 +49,10 @@ import org.mockito.MockitoAnnotations; import java.util.Collections; import java.util.Iterator; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.function.Consumer; -import java.util.function.Supplier; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.sameInstance; @@ -114,8 +112,8 @@ public class TransportBulkActionIngestTests extends ESTestCase { return false; } @Override - void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos, - final ActionListener listener, final AtomicArray responses) { + void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos, final ActionListener listener, + final AtomicArray responses, Map indicesThatCannotBeCreated) { isExecuted = true; } } diff --git a/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java b/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java index 0e3d96b5e8e..e35f98e220e 100644 --- a/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java +++ b/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java @@ -36,6 +36,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.CapturingTransport; @@ -49,6 +50,7 @@ import org.junit.BeforeClass; import java.nio.charset.StandardCharsets; import java.util.HashSet; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.LongSupplier; @@ -125,9 +127,10 @@ public class TransportBulkActionTookTests extends ESTestCase { BulkRequest bulkRequest, long startTimeNanos, ActionListener listener, - AtomicArray responses) { + AtomicArray responses, + Map indicesThatCannotBeCreated) { expected.set(1000000); - super.executeBulk(task, bulkRequest, startTimeNanos, listener, responses); + super.executeBulk(task, bulkRequest, startTimeNanos, listener, responses, indicesThatCannotBeCreated); } }; } else { @@ -149,10 +152,11 @@ public class TransportBulkActionTookTests extends ESTestCase { BulkRequest bulkRequest, long startTimeNanos, ActionListener listener, - AtomicArray responses) { + AtomicArray responses, + Map indicesThatCannotBeCreated) { long elapsed = spinForAtLeastOneMillisecond(); expected.set(elapsed); - super.executeBulk(task, bulkRequest, startTimeNanos, listener, responses); + super.executeBulk(task, bulkRequest, startTimeNanos, listener, responses, indicesThatCannotBeCreated); } }; } diff --git a/core/src/test/java/org/elasticsearch/index/mapper/DynamicMappingDisabledTests.java b/core/src/test/java/org/elasticsearch/index/mapper/DynamicMappingDisabledTests.java index 46749e792ed..3928dc78c84 100644 --- a/core/src/test/java/org/elasticsearch/index/mapper/DynamicMappingDisabledTests.java +++ b/core/src/test/java/org/elasticsearch/index/mapper/DynamicMappingDisabledTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.index.mapper; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.TransportBulkAction; @@ -114,22 +115,24 @@ public class DynamicMappingDisabledTests extends ESSingleNodeTestCase { request.source(Requests.INDEX_CONTENT_TYPE, "foo", 3); BulkRequest bulkRequest = new BulkRequest(); bulkRequest.add(request); - final AtomicBoolean onFailureCalled = new AtomicBoolean(); + final AtomicBoolean gotResponse = new AtomicBoolean(); transportBulkAction.execute(bulkRequest, new ActionListener() { @Override public void onResponse(BulkResponse bulkResponse) { - fail("onResponse shouldn't be called"); + BulkItemResponse itemResponse = bulkResponse.getItems()[0]; + assertTrue(itemResponse.isFailed()); + assertThat(itemResponse.getFailure().getCause(), instanceOf(IndexNotFoundException.class)); + assertEquals("no such index and [index.mapper.dynamic] is [false]", itemResponse.getFailure().getCause().getMessage()); + gotResponse.set(true); } @Override public void onFailure(Exception e) { - onFailureCalled.set(true); - assertThat(e, instanceOf(IndexNotFoundException.class)); - assertEquals("no such index and [index.mapper.dynamic] is [false]", e.getMessage()); + fail("unexpected failure in bulk action, expected failed bulk item"); } }); - assertTrue(onFailureCalled.get()); + assertTrue(gotResponse.get()); } } diff --git a/settings.gradle b/settings.gradle index 2f1fcfaed35..c67c44dd2ff 100644 --- a/settings.gradle +++ b/settings.gradle @@ -57,11 +57,12 @@ List projects = [ 'plugins:repository-s3', 'plugins:jvm-example', 'plugins:store-smb', + 'qa:auto-create-index', 'qa:backwards-5.0', 'qa:evil-tests', + 'qa:multi-cluster-search', 'qa:no-bootstrap-tests', 'qa:rolling-upgrade', - 'qa:multi-cluster-search', 'qa:smoke-test-client', 'qa:smoke-test-http', 'qa:smoke-test-ingest-with-all-dependencies',