diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index a9b82c514a6..16c22524e2d 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -52,19 +52,19 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndexClosedException; -import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.ingest.IngestService; +import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -75,6 +75,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.LongSupplier; import java.util.stream.Collectors; +import static java.util.Collections.emptyMap; + /** * Groups bulk request items by shard, optionally creating non-existent indices and * delegates to {@link TransportShardBulkAction} for shard-level bulk execution @@ -139,30 +141,46 @@ public class TransportBulkAction extends HandledTransportAction responses = new AtomicArray<>(bulkRequest.requests.size()); if (needToCheck()) { - // Keep track of all unique indices and all unique types per index for the create index requests: - final Set autoCreateIndices = bulkRequest.requests.stream() + // Attempt to create all the indices that we're going to need during the bulk before we start. + // Step 1: collect all the indices in the request + final Set indices = bulkRequest.requests.stream() .map(DocWriteRequest::index) .collect(Collectors.toSet()); - final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size()); + /* Step 2: filter that to indices that don't exist and we can create. At the same time build a map of indices we can't create + * that we'll use when we try to run the requests. */ + final Map indicesThatCannotBeCreated = new HashMap<>(); + Set autoCreateIndices = new HashSet<>(); ClusterState state = clusterService.state(); - for (String index : autoCreateIndices) { - if (shouldAutoCreate(index, state)) { - CreateIndexRequest createIndexRequest = new CreateIndexRequest(); - createIndexRequest.index(index); - createIndexRequest.cause("auto(bulk api)"); - createIndexRequest.masterNodeTimeout(bulkRequest.timeout()); - createIndexAction.execute(createIndexRequest, new ActionListener() { + for (String index : indices) { + boolean shouldAutoCreate; + try { + shouldAutoCreate = shouldAutoCreate(index, state); + } catch (IndexNotFoundException e) { + shouldAutoCreate = false; + indicesThatCannotBeCreated.put(index, e); + } + if (shouldAutoCreate) { + autoCreateIndices.add(index); + } + } + // Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back. + if (autoCreateIndices.isEmpty()) { + executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated); + } else { + final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size()); + for (String index : autoCreateIndices) { + createIndex(index, bulkRequest.timeout(), new ActionListener() { @Override public void onResponse(CreateIndexResponse result) { if (counter.decrementAndGet() == 0) { - executeBulk(task, bulkRequest, startTime, listener, responses); + executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated); } } @Override public void onFailure(Exception e) { if (!(ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException)) { - // fail all requests involving this index, if create didnt work + // fail all requests involving this index, if create didn't work for (int i = 0; i < bulkRequest.requests.size(); i++) { DocWriteRequest request = bulkRequest.requests.get(i); if (request != null && setResponseFailureIfIndexMatches(responses, i, request, index, e)) { @@ -174,18 +192,14 @@ public class TransportBulkAction extends HandledTransportAction { inner.addSuppressed(e); listener.onFailure(inner); - }), responses); + }), responses, indicesThatCannotBeCreated); } } }); - } else { - if (counter.decrementAndGet() == 0) { - executeBulk(task, bulkRequest, startTime, listener, responses); - } } } } else { - executeBulk(task, bulkRequest, startTime, listener, responses); + executeBulk(task, bulkRequest, startTime, listener, responses, emptyMap()); } } @@ -197,6 +211,14 @@ public class TransportBulkAction extends HandledTransportAction listener) { + CreateIndexRequest createIndexRequest = new CreateIndexRequest(); + createIndexRequest.index(index); + createIndexRequest.cause("auto(bulk api)"); + createIndexRequest.masterNodeTimeout(timeout); + createIndexAction.execute(createIndexRequest, listener); + } + private boolean setResponseFailureIfIndexMatches(AtomicArray responses, int idx, DocWriteRequest request, String index, Exception e) { if (index.equals(request.index())) { responses.set(idx, new BulkItemResponse(idx, request.opType(), new BulkItemResponse.Failure(request.index(), request.type(), request.id(), e))); @@ -220,14 +242,16 @@ public class TransportBulkAction extends HandledTransportAction responses; private final long startTimeNanos; private final ClusterStateObserver observer; + private final Map indicesThatCannotBeCreated; - BulkOperation(Task task, BulkRequest bulkRequest, ActionListener listener, - AtomicArray responses, long startTimeNanos) { + BulkOperation(Task task, BulkRequest bulkRequest, ActionListener listener, AtomicArray responses, + long startTimeNanos, Map indicesThatCannotBeCreated) { this.task = task; this.bulkRequest = bulkRequest; this.listener = listener; this.responses = responses; this.startTimeNanos = startTimeNanos; + this.indicesThatCannotBeCreated = indicesThatCannotBeCreated; this.observer = new ClusterStateObserver(clusterService, bulkRequest.timeout(), logger, threadPool.getThreadContext()); } @@ -250,7 +274,7 @@ public class TransportBulkAction extends HandledTransportAction listener, final AtomicArray responses ) { - new BulkOperation(task, bulkRequest, listener, responses, startTimeNanos).run(); - } - - private boolean addFailureIfIndexIsUnavailable(DocWriteRequest request, BulkRequest bulkRequest, AtomicArray responses, int idx, - final ConcreteIndices concreteIndices, - final MetaData metaData) { - Index concreteIndex = concreteIndices.getConcreteIndex(request.index()); - Exception unavailableException = null; - if (concreteIndex == null) { - try { - concreteIndex = concreteIndices.resolveIfAbsent(request); - } catch (IndexClosedException | IndexNotFoundException ex) { - // Fix for issue where bulk request references an index that - // cannot be auto-created see issue #8125 - unavailableException = ex; + private boolean addFailureIfIndexIsUnavailable(DocWriteRequest request, int idx, final ConcreteIndices concreteIndices, + final MetaData metaData) { + IndexNotFoundException cannotCreate = indicesThatCannotBeCreated.get(request.index()); + if (cannotCreate != null) { + addFailure(request, idx, cannotCreate); + return true; + } + Index concreteIndex = concreteIndices.getConcreteIndex(request.index()); + if (concreteIndex == null) { + try { + concreteIndex = concreteIndices.resolveIfAbsent(request); + } catch (IndexClosedException | IndexNotFoundException ex) { + addFailure(request, idx, ex); + return true; + } } - } - if (unavailableException == null) { IndexMetaData indexMetaData = metaData.getIndexSafe(concreteIndex); if (indexMetaData.getState() == IndexMetaData.State.CLOSE) { - unavailableException = new IndexClosedException(concreteIndex); + addFailure(request, idx, new IndexClosedException(concreteIndex)); + return true; } + return false; } - if (unavailableException != null) { + + private void addFailure(DocWriteRequest request, int idx, Exception unavailableException) { BulkItemResponse.Failure failure = new BulkItemResponse.Failure(request.index(), request.type(), request.id(), unavailableException); BulkItemResponse bulkItemResponse = new BulkItemResponse(idx, request.opType(), failure); responses.set(idx, bulkItemResponse); // make sure the request gets never processed again bulkRequest.requests.set(idx, null); - return true; } - return false; + } + + void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos, final ActionListener listener, + final AtomicArray responses, Map indicesThatCannotBeCreated) { + new BulkOperation(task, bulkRequest, listener, responses, startTimeNanos, indicesThatCannotBeCreated).run(); } private static class ConcreteIndices { diff --git a/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java b/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java new file mode 100644 index 00000000000..a02939ad206 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java @@ -0,0 +1,125 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.bulk; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.TransportService; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; + +import static java.util.Collections.emptySet; +import static java.util.Collections.singleton; +import static org.mockito.Mockito.mock; + +public class TransportBulkActionIndicesThatCannotBeCreatedTests extends ESTestCase { + public void testNonExceptional() { + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(new IndexRequest(randomAlphaOfLength(5))); + bulkRequest.add(new IndexRequest(randomAlphaOfLength(5))); + bulkRequest.add(new DeleteRequest(randomAlphaOfLength(5))); + bulkRequest.add(new UpdateRequest(randomAlphaOfLength(5), randomAlphaOfLength(5), randomAlphaOfLength(5))); + // Test emulating auto_create_index=false + indicesThatCannotBeCreatedTestCase(emptySet(), bulkRequest, null); + // Test emulating auto_create_index=true + indicesThatCannotBeCreatedTestCase(emptySet(), bulkRequest, index -> true); + // Test emulating all indices already created + indicesThatCannotBeCreatedTestCase(emptySet(), bulkRequest, index -> false); + // Test emulating auto_create_index=true with some indices already created. + indicesThatCannotBeCreatedTestCase(emptySet(), bulkRequest, index -> randomBoolean()); + } + + public void testAllFail() { + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(new IndexRequest("no")); + bulkRequest.add(new IndexRequest("can't")); + bulkRequest.add(new DeleteRequest("do")); + bulkRequest.add(new UpdateRequest("nothin", randomAlphaOfLength(5), randomAlphaOfLength(5))); + indicesThatCannotBeCreatedTestCase(new HashSet<>(Arrays.asList("no", "can't", "do", "nothin")), bulkRequest, index -> { + throw new IndexNotFoundException("Can't make it because I say so"); + }); + } + + public void testSomeFail() { + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(new IndexRequest("ok")); + bulkRequest.add(new IndexRequest("bad")); + // Emulate auto_create_index=-bad,+* + indicesThatCannotBeCreatedTestCase(singleton("bad"), bulkRequest, index -> { + if (index.equals("bad")) { + throw new IndexNotFoundException("Can't make it because I say so"); + } + return true; + }); + // Emulate auto_create_index=false but the "ok" index already exists + indicesThatCannotBeCreatedTestCase(singleton("bad"), bulkRequest, index -> { + if (index.equals("bad")) { + throw new IndexNotFoundException("Can't make it because I say so"); + } + return false; + }); + } + + + private void indicesThatCannotBeCreatedTestCase(Set expected, + BulkRequest bulkRequest, Function shouldAutoCreate) { + TransportBulkAction action = new TransportBulkAction(Settings.EMPTY, null, mock(TransportService.class), mock(ClusterService.class), + null, null, null, mock(ActionFilters.class), null, null) { + @Override + void executeBulk(Task task, BulkRequest bulkRequest, long startTimeNanos, ActionListener listener, + AtomicArray responses, Map indicesThatCannotBeCreated) { + assertEquals(expected, indicesThatCannotBeCreated.keySet()); + } + + @Override + boolean needToCheck() { + return null != shouldAutoCreate; // Use "null" to mean "no indices can be created so don't bother checking" + } + + @Override + boolean shouldAutoCreate(String index, ClusterState state) { + return shouldAutoCreate.apply(index); + } + + @Override + void createIndex(String index, TimeValue timeout, ActionListener listener) { + // If we try to create an index just immediately assume it worked + listener.onResponse(new CreateIndexResponse(true, true) {}); + } + }; + action.doExecute(null, bulkRequest, null); + } +} diff --git a/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index b2483bd9306..e6e18fb567d 100644 --- a/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -28,15 +28,13 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; -import org.elasticsearch.cluster.action.shard.ShardStateAction; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AtomicArray; -import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.PipelineExecutionService; import org.elasticsearch.tasks.Task; @@ -51,10 +49,10 @@ import org.mockito.MockitoAnnotations; import java.util.Collections; import java.util.Iterator; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.function.Consumer; -import java.util.function.Supplier; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.sameInstance; @@ -114,8 +112,8 @@ public class TransportBulkActionIngestTests extends ESTestCase { return false; } @Override - void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos, - final ActionListener listener, final AtomicArray responses) { + void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos, final ActionListener listener, + final AtomicArray responses, Map indicesThatCannotBeCreated) { isExecuted = true; } } diff --git a/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java b/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java index 0e3d96b5e8e..e35f98e220e 100644 --- a/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java +++ b/core/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java @@ -36,6 +36,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.CapturingTransport; @@ -49,6 +50,7 @@ import org.junit.BeforeClass; import java.nio.charset.StandardCharsets; import java.util.HashSet; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.LongSupplier; @@ -125,9 +127,10 @@ public class TransportBulkActionTookTests extends ESTestCase { BulkRequest bulkRequest, long startTimeNanos, ActionListener listener, - AtomicArray responses) { + AtomicArray responses, + Map indicesThatCannotBeCreated) { expected.set(1000000); - super.executeBulk(task, bulkRequest, startTimeNanos, listener, responses); + super.executeBulk(task, bulkRequest, startTimeNanos, listener, responses, indicesThatCannotBeCreated); } }; } else { @@ -149,10 +152,11 @@ public class TransportBulkActionTookTests extends ESTestCase { BulkRequest bulkRequest, long startTimeNanos, ActionListener listener, - AtomicArray responses) { + AtomicArray responses, + Map indicesThatCannotBeCreated) { long elapsed = spinForAtLeastOneMillisecond(); expected.set(elapsed); - super.executeBulk(task, bulkRequest, startTimeNanos, listener, responses); + super.executeBulk(task, bulkRequest, startTimeNanos, listener, responses, indicesThatCannotBeCreated); } }; } diff --git a/core/src/test/java/org/elasticsearch/index/mapper/DynamicMappingDisabledTests.java b/core/src/test/java/org/elasticsearch/index/mapper/DynamicMappingDisabledTests.java index 46749e792ed..3928dc78c84 100644 --- a/core/src/test/java/org/elasticsearch/index/mapper/DynamicMappingDisabledTests.java +++ b/core/src/test/java/org/elasticsearch/index/mapper/DynamicMappingDisabledTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.index.mapper; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.TransportBulkAction; @@ -114,22 +115,24 @@ public class DynamicMappingDisabledTests extends ESSingleNodeTestCase { request.source(Requests.INDEX_CONTENT_TYPE, "foo", 3); BulkRequest bulkRequest = new BulkRequest(); bulkRequest.add(request); - final AtomicBoolean onFailureCalled = new AtomicBoolean(); + final AtomicBoolean gotResponse = new AtomicBoolean(); transportBulkAction.execute(bulkRequest, new ActionListener() { @Override public void onResponse(BulkResponse bulkResponse) { - fail("onResponse shouldn't be called"); + BulkItemResponse itemResponse = bulkResponse.getItems()[0]; + assertTrue(itemResponse.isFailed()); + assertThat(itemResponse.getFailure().getCause(), instanceOf(IndexNotFoundException.class)); + assertEquals("no such index and [index.mapper.dynamic] is [false]", itemResponse.getFailure().getCause().getMessage()); + gotResponse.set(true); } @Override public void onFailure(Exception e) { - onFailureCalled.set(true); - assertThat(e, instanceOf(IndexNotFoundException.class)); - assertEquals("no such index and [index.mapper.dynamic] is [false]", e.getMessage()); + fail("unexpected failure in bulk action, expected failed bulk item"); } }); - assertTrue(onFailureCalled.get()); + assertTrue(gotResponse.get()); } } diff --git a/settings.gradle b/settings.gradle index 2f1fcfaed35..c67c44dd2ff 100644 --- a/settings.gradle +++ b/settings.gradle @@ -57,11 +57,12 @@ List projects = [ 'plugins:repository-s3', 'plugins:jvm-example', 'plugins:store-smb', + 'qa:auto-create-index', 'qa:backwards-5.0', 'qa:evil-tests', + 'qa:multi-cluster-search', 'qa:no-bootstrap-tests', 'qa:rolling-upgrade', - 'qa:multi-cluster-search', 'qa:smoke-test-client', 'qa:smoke-test-http', 'qa:smoke-test-ingest-with-all-dependencies',