Fix _bulk response when it can't create an index (#24048)

Before #22488 when an index couldn't be created during a `_bulk`
operation we'd do all the *other* actions and return the index
creation error on each failing action. In #22488 we accidentally
changed it so that we now reject the entire bulk request if a single
action cannot create an index that it must create to run. This
gets reverts to the old behavior while still keeping the nicer
error messages. Instead of failing the entire request we now only
fail the portions of the request that can't work because the index
doesn't exist.

Closes #24028
This commit is contained in:
Nik Everett 2017-04-21 18:56:04 -04:00 committed by GitHub
parent 108d8905e2
commit 447f307ebb
6 changed files with 220 additions and 63 deletions

View File

@ -52,19 +52,19 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -75,6 +75,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import static java.util.Collections.emptyMap;
/**
* Groups bulk request items by shard, optionally creating non-existent indices and
* delegates to {@link TransportShardBulkAction} for shard-level bulk execution
@ -139,30 +141,46 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());
if (needToCheck()) {
// Keep track of all unique indices and all unique types per index for the create index requests:
final Set<String> autoCreateIndices = bulkRequest.requests.stream()
// Attempt to create all the indices that we're going to need during the bulk before we start.
// Step 1: collect all the indices in the request
final Set<String> indices = bulkRequest.requests.stream()
.map(DocWriteRequest::index)
.collect(Collectors.toSet());
final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size());
/* Step 2: filter that to indices that don't exist and we can create. At the same time build a map of indices we can't create
* that we'll use when we try to run the requests. */
final Map<String, IndexNotFoundException> indicesThatCannotBeCreated = new HashMap<>();
Set<String> autoCreateIndices = new HashSet<>();
ClusterState state = clusterService.state();
for (String index : autoCreateIndices) {
if (shouldAutoCreate(index, state)) {
CreateIndexRequest createIndexRequest = new CreateIndexRequest();
createIndexRequest.index(index);
createIndexRequest.cause("auto(bulk api)");
createIndexRequest.masterNodeTimeout(bulkRequest.timeout());
createIndexAction.execute(createIndexRequest, new ActionListener<CreateIndexResponse>() {
for (String index : indices) {
boolean shouldAutoCreate;
try {
shouldAutoCreate = shouldAutoCreate(index, state);
} catch (IndexNotFoundException e) {
shouldAutoCreate = false;
indicesThatCannotBeCreated.put(index, e);
}
if (shouldAutoCreate) {
autoCreateIndices.add(index);
}
}
// Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back.
if (autoCreateIndices.isEmpty()) {
executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
} else {
final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size());
for (String index : autoCreateIndices) {
createIndex(index, bulkRequest.timeout(), new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse result) {
if (counter.decrementAndGet() == 0) {
executeBulk(task, bulkRequest, startTime, listener, responses);
executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
}
}
@Override
public void onFailure(Exception e) {
if (!(ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException)) {
// fail all requests involving this index, if create didnt work
// fail all requests involving this index, if create didn't work
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest request = bulkRequest.requests.get(i);
if (request != null && setResponseFailureIfIndexMatches(responses, i, request, index, e)) {
@ -174,18 +192,14 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
executeBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> {
inner.addSuppressed(e);
listener.onFailure(inner);
}), responses);
}), responses, indicesThatCannotBeCreated);
}
}
});
} else {
if (counter.decrementAndGet() == 0) {
executeBulk(task, bulkRequest, startTime, listener, responses);
}
}
}
} else {
executeBulk(task, bulkRequest, startTime, listener, responses);
executeBulk(task, bulkRequest, startTime, listener, responses, emptyMap());
}
}
@ -197,6 +211,14 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
return autoCreateIndex.shouldAutoCreate(index, state);
}
void createIndex(String index, TimeValue timeout, ActionListener<CreateIndexResponse> listener) {
CreateIndexRequest createIndexRequest = new CreateIndexRequest();
createIndexRequest.index(index);
createIndexRequest.cause("auto(bulk api)");
createIndexRequest.masterNodeTimeout(timeout);
createIndexAction.execute(createIndexRequest, listener);
}
private boolean setResponseFailureIfIndexMatches(AtomicArray<BulkItemResponse> responses, int idx, DocWriteRequest request, String index, Exception e) {
if (index.equals(request.index())) {
responses.set(idx, new BulkItemResponse(idx, request.opType(), new BulkItemResponse.Failure(request.index(), request.type(), request.id(), e)));
@ -220,14 +242,16 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
private final AtomicArray<BulkItemResponse> responses;
private final long startTimeNanos;
private final ClusterStateObserver observer;
private final Map<String, IndexNotFoundException> indicesThatCannotBeCreated;
BulkOperation(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener,
AtomicArray<BulkItemResponse> responses, long startTimeNanos) {
BulkOperation(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener, AtomicArray<BulkItemResponse> responses,
long startTimeNanos, Map<String, IndexNotFoundException> indicesThatCannotBeCreated) {
this.task = task;
this.bulkRequest = bulkRequest;
this.listener = listener;
this.responses = responses;
this.startTimeNanos = startTimeNanos;
this.indicesThatCannotBeCreated = indicesThatCannotBeCreated;
this.observer = new ClusterStateObserver(clusterService, bulkRequest.timeout(), logger, threadPool.getThreadContext());
}
@ -250,7 +274,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
if (docWriteRequest == null) {
continue;
}
if (addFailureIfIndexIsUnavailable(docWriteRequest, bulkRequest, responses, i, concreteIndices, metaData)) {
if (addFailureIfIndexIsUnavailable(docWriteRequest, i, concreteIndices, metaData)) {
continue;
}
Index concreteIndex = concreteIndices.resolveIfAbsent(docWriteRequest);
@ -393,42 +417,44 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
}
});
}
}
void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos, final ActionListener<BulkResponse> listener, final AtomicArray<BulkItemResponse> responses ) {
new BulkOperation(task, bulkRequest, listener, responses, startTimeNanos).run();
}
private boolean addFailureIfIndexIsUnavailable(DocWriteRequest request, BulkRequest bulkRequest, AtomicArray<BulkItemResponse> responses, int idx,
final ConcreteIndices concreteIndices,
final MetaData metaData) {
Index concreteIndex = concreteIndices.getConcreteIndex(request.index());
Exception unavailableException = null;
if (concreteIndex == null) {
try {
concreteIndex = concreteIndices.resolveIfAbsent(request);
} catch (IndexClosedException | IndexNotFoundException ex) {
// Fix for issue where bulk request references an index that
// cannot be auto-created see issue #8125
unavailableException = ex;
private boolean addFailureIfIndexIsUnavailable(DocWriteRequest request, int idx, final ConcreteIndices concreteIndices,
final MetaData metaData) {
IndexNotFoundException cannotCreate = indicesThatCannotBeCreated.get(request.index());
if (cannotCreate != null) {
addFailure(request, idx, cannotCreate);
return true;
}
Index concreteIndex = concreteIndices.getConcreteIndex(request.index());
if (concreteIndex == null) {
try {
concreteIndex = concreteIndices.resolveIfAbsent(request);
} catch (IndexClosedException | IndexNotFoundException ex) {
addFailure(request, idx, ex);
return true;
}
}
}
if (unavailableException == null) {
IndexMetaData indexMetaData = metaData.getIndexSafe(concreteIndex);
if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
unavailableException = new IndexClosedException(concreteIndex);
addFailure(request, idx, new IndexClosedException(concreteIndex));
return true;
}
return false;
}
if (unavailableException != null) {
private void addFailure(DocWriteRequest request, int idx, Exception unavailableException) {
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(request.index(), request.type(), request.id(),
unavailableException);
BulkItemResponse bulkItemResponse = new BulkItemResponse(idx, request.opType(), failure);
responses.set(idx, bulkItemResponse);
// make sure the request gets never processed again
bulkRequest.requests.set(idx, null);
return true;
}
return false;
}
void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos, final ActionListener<BulkResponse> listener,
final AtomicArray<BulkItemResponse> responses, Map<String, IndexNotFoundException> indicesThatCannotBeCreated) {
new BulkOperation(task, bulkRequest, listener, responses, startTimeNanos, indicesThatCannotBeCreated).run();
}
private static class ConcreteIndices {

View File

@ -0,0 +1,125 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.bulk;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.TransportService;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static org.mockito.Mockito.mock;
public class TransportBulkActionIndicesThatCannotBeCreatedTests extends ESTestCase {
public void testNonExceptional() {
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(new IndexRequest(randomAlphaOfLength(5)));
bulkRequest.add(new IndexRequest(randomAlphaOfLength(5)));
bulkRequest.add(new DeleteRequest(randomAlphaOfLength(5)));
bulkRequest.add(new UpdateRequest(randomAlphaOfLength(5), randomAlphaOfLength(5), randomAlphaOfLength(5)));
// Test emulating auto_create_index=false
indicesThatCannotBeCreatedTestCase(emptySet(), bulkRequest, null);
// Test emulating auto_create_index=true
indicesThatCannotBeCreatedTestCase(emptySet(), bulkRequest, index -> true);
// Test emulating all indices already created
indicesThatCannotBeCreatedTestCase(emptySet(), bulkRequest, index -> false);
// Test emulating auto_create_index=true with some indices already created.
indicesThatCannotBeCreatedTestCase(emptySet(), bulkRequest, index -> randomBoolean());
}
public void testAllFail() {
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(new IndexRequest("no"));
bulkRequest.add(new IndexRequest("can't"));
bulkRequest.add(new DeleteRequest("do"));
bulkRequest.add(new UpdateRequest("nothin", randomAlphaOfLength(5), randomAlphaOfLength(5)));
indicesThatCannotBeCreatedTestCase(new HashSet<>(Arrays.asList("no", "can't", "do", "nothin")), bulkRequest, index -> {
throw new IndexNotFoundException("Can't make it because I say so");
});
}
public void testSomeFail() {
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(new IndexRequest("ok"));
bulkRequest.add(new IndexRequest("bad"));
// Emulate auto_create_index=-bad,+*
indicesThatCannotBeCreatedTestCase(singleton("bad"), bulkRequest, index -> {
if (index.equals("bad")) {
throw new IndexNotFoundException("Can't make it because I say so");
}
return true;
});
// Emulate auto_create_index=false but the "ok" index already exists
indicesThatCannotBeCreatedTestCase(singleton("bad"), bulkRequest, index -> {
if (index.equals("bad")) {
throw new IndexNotFoundException("Can't make it because I say so");
}
return false;
});
}
private void indicesThatCannotBeCreatedTestCase(Set<String> expected,
BulkRequest bulkRequest, Function<String, Boolean> shouldAutoCreate) {
TransportBulkAction action = new TransportBulkAction(Settings.EMPTY, null, mock(TransportService.class), mock(ClusterService.class),
null, null, null, mock(ActionFilters.class), null, null) {
@Override
void executeBulk(Task task, BulkRequest bulkRequest, long startTimeNanos, ActionListener<BulkResponse> listener,
AtomicArray<BulkItemResponse> responses, Map<String, IndexNotFoundException> indicesThatCannotBeCreated) {
assertEquals(expected, indicesThatCannotBeCreated.keySet());
}
@Override
boolean needToCheck() {
return null != shouldAutoCreate; // Use "null" to mean "no indices can be created so don't bother checking"
}
@Override
boolean shouldAutoCreate(String index, ClusterState state) {
return shouldAutoCreate.apply(index);
}
@Override
void createIndex(String index, TimeValue timeout, ActionListener<CreateIndexResponse> listener) {
// If we try to create an index just immediately assume it worked
listener.onResponse(new CreateIndexResponse(true, true) {});
}
};
action.doExecute(null, bulkRequest, null);
}
}

View File

@ -28,15 +28,13 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.PipelineExecutionService;
import org.elasticsearch.tasks.Task;
@ -51,10 +49,10 @@ import org.mockito.MockitoAnnotations;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.sameInstance;
@ -114,8 +112,8 @@ public class TransportBulkActionIngestTests extends ESTestCase {
return false;
}
@Override
void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos,
final ActionListener<BulkResponse> listener, final AtomicArray<BulkItemResponse> responses) {
void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos, final ActionListener<BulkResponse> listener,
final AtomicArray<BulkItemResponse> responses, Map<String, IndexNotFoundException> indicesThatCannotBeCreated) {
isExecuted = true;
}
}

View File

@ -36,6 +36,7 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport;
@ -49,6 +50,7 @@ import org.junit.BeforeClass;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongSupplier;
@ -125,9 +127,10 @@ public class TransportBulkActionTookTests extends ESTestCase {
BulkRequest bulkRequest,
long startTimeNanos,
ActionListener<BulkResponse> listener,
AtomicArray<BulkItemResponse> responses) {
AtomicArray<BulkItemResponse> responses,
Map<String, IndexNotFoundException> indicesThatCannotBeCreated) {
expected.set(1000000);
super.executeBulk(task, bulkRequest, startTimeNanos, listener, responses);
super.executeBulk(task, bulkRequest, startTimeNanos, listener, responses, indicesThatCannotBeCreated);
}
};
} else {
@ -149,10 +152,11 @@ public class TransportBulkActionTookTests extends ESTestCase {
BulkRequest bulkRequest,
long startTimeNanos,
ActionListener<BulkResponse> listener,
AtomicArray<BulkItemResponse> responses) {
AtomicArray<BulkItemResponse> responses,
Map<String, IndexNotFoundException> indicesThatCannotBeCreated) {
long elapsed = spinForAtLeastOneMillisecond();
expected.set(elapsed);
super.executeBulk(task, bulkRequest, startTimeNanos, listener, responses);
super.executeBulk(task, bulkRequest, startTimeNanos, listener, responses, indicesThatCannotBeCreated);
}
};
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.index.mapper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.TransportBulkAction;
@ -114,22 +115,24 @@ public class DynamicMappingDisabledTests extends ESSingleNodeTestCase {
request.source(Requests.INDEX_CONTENT_TYPE, "foo", 3);
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(request);
final AtomicBoolean onFailureCalled = new AtomicBoolean();
final AtomicBoolean gotResponse = new AtomicBoolean();
transportBulkAction.execute(bulkRequest, new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkResponse) {
fail("onResponse shouldn't be called");
BulkItemResponse itemResponse = bulkResponse.getItems()[0];
assertTrue(itemResponse.isFailed());
assertThat(itemResponse.getFailure().getCause(), instanceOf(IndexNotFoundException.class));
assertEquals("no such index and [index.mapper.dynamic] is [false]", itemResponse.getFailure().getCause().getMessage());
gotResponse.set(true);
}
@Override
public void onFailure(Exception e) {
onFailureCalled.set(true);
assertThat(e, instanceOf(IndexNotFoundException.class));
assertEquals("no such index and [index.mapper.dynamic] is [false]", e.getMessage());
fail("unexpected failure in bulk action, expected failed bulk item");
}
});
assertTrue(onFailureCalled.get());
assertTrue(gotResponse.get());
}
}

View File

@ -57,11 +57,12 @@ List projects = [
'plugins:repository-s3',
'plugins:jvm-example',
'plugins:store-smb',
'qa:auto-create-index',
'qa:backwards-5.0',
'qa:evil-tests',
'qa:multi-cluster-search',
'qa:no-bootstrap-tests',
'qa:rolling-upgrade',
'qa:multi-cluster-search',
'qa:smoke-test-client',
'qa:smoke-test-http',
'qa:smoke-test-ingest-with-all-dependencies',