Bulk API: Ensure that specific failures do not affect whole request

Before a bulk request is executed, missing indices are being created by default.
If this fails, the whole request is failed.

This patch changes the behaviour to not fail the whole request, but rather all
requests using the index where the creation has failed.

Closes #4987
This commit is contained in:
Alexander Reelsen 2014-03-07 10:54:04 +01:00
parent 3f37a0ff5c
commit 056ad0a8d3
2 changed files with 81 additions and 33 deletions

View File

@ -22,6 +22,7 @@ package org.elasticsearch.action.bulk;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
@ -88,7 +89,10 @@ public class TransportBulkAction extends TransportAction<BulkRequest, BulkRespon
@Override @Override
protected void doExecute(final BulkRequest bulkRequest, final ActionListener<BulkResponse> listener) { protected void doExecute(final BulkRequest bulkRequest, final ActionListener<BulkResponse> listener) {
final long startTime = System.currentTimeMillis(); final long startTime = System.currentTimeMillis();
Set<String> indices = Sets.newHashSet(); final AtomicArray<BulkItemResponse> responses = new AtomicArray<BulkItemResponse>(bulkRequest.requests.size());
if (autoCreateIndex.needToCheck()) {
final Set<String> indices = Sets.newHashSet();
for (ActionRequest request : bulkRequest.requests) { for (ActionRequest request : bulkRequest.requests) {
if (request instanceof IndexRequest) { if (request instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) request; IndexRequest indexRequest = (IndexRequest) request;
@ -105,54 +109,81 @@ public class TransportBulkAction extends TransportAction<BulkRequest, BulkRespon
if (!indices.contains(updateRequest.index())) { if (!indices.contains(updateRequest.index())) {
indices.add(updateRequest.index()); indices.add(updateRequest.index());
} }
} else {
throw new ElasticsearchException("Parsed unknown request in bulk actions: " + request.getClass().getSimpleName());
} }
} }
if (autoCreateIndex.needToCheck()) {
final AtomicInteger counter = new AtomicInteger(indices.size()); final AtomicInteger counter = new AtomicInteger(indices.size());
final AtomicBoolean failed = new AtomicBoolean();
ClusterState state = clusterService.state(); ClusterState state = clusterService.state();
for (String index : indices) { for (final String index : indices) {
if (autoCreateIndex.shouldAutoCreate(index, state)) { if (autoCreateIndex.shouldAutoCreate(index, state)) {
createIndexAction.execute(new CreateIndexRequest(index).cause("auto(bulk api)"), new ActionListener<CreateIndexResponse>() { createIndexAction.execute(new CreateIndexRequest(index).cause("auto(bulk api)"), new ActionListener<CreateIndexResponse>() {
@Override @Override
public void onResponse(CreateIndexResponse result) { public void onResponse(CreateIndexResponse result) {
if (counter.decrementAndGet() == 0) { if (counter.decrementAndGet() == 0) {
executeBulk(bulkRequest, startTime, listener); executeBulk(bulkRequest, startTime, listener, responses);
} }
} }
@Override @Override
public void onFailure(Throwable e) { public void onFailure(Throwable e) {
if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) { if (!(ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException)) {
// we have the index, do it // fail all requests involving this index, if create didnt work
if (counter.decrementAndGet() == 0) { for (int i = 0; i < bulkRequest.requests.size(); i++) {
executeBulk(bulkRequest, startTime, listener); ActionRequest request = bulkRequest.requests.get(i);
if (setResponseFailureIfIndexMatches(responses, i, request, index, e)) {
bulkRequest.requests.set(i, null);
} }
} else if (failed.compareAndSet(false, true)) { }
listener.onFailure(e); }
if (counter.decrementAndGet() == 0) {
executeBulk(bulkRequest, startTime, listener, responses);
} }
} }
}); });
} else { } else {
if (counter.decrementAndGet() == 0) { if (counter.decrementAndGet() == 0) {
executeBulk(bulkRequest, startTime, listener); executeBulk(bulkRequest, startTime, listener, responses);
} }
} }
} }
} else { } else {
executeBulk(bulkRequest, startTime, listener); executeBulk(bulkRequest, startTime, listener, responses);
} }
} }
private void executeBulk(final BulkRequest bulkRequest, final long startTime, final ActionListener<BulkResponse> listener) { private boolean setResponseFailureIfIndexMatches(AtomicArray<BulkItemResponse> responses, int idx, ActionRequest request, String index, Throwable e) {
if (request instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) request;
if (index.equals(indexRequest.index())) {
responses.set(idx, new BulkItemResponse(idx, "index", new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), e)));
return true;
}
} else if (request instanceof DeleteRequest) {
DeleteRequest deleteRequest = (DeleteRequest) request;
if (index.equals(deleteRequest.index())) {
responses.set(idx, new BulkItemResponse(idx, "index", new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), e)));
return true;
}
} else if (request instanceof UpdateRequest) {
UpdateRequest updateRequest = (UpdateRequest) request;
if (index.equals(updateRequest.index())) {
responses.set(idx, new BulkItemResponse(idx, "index", new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(), updateRequest.id(), e)));
return true;
}
} else {
throw new ElasticsearchException("Parsed unknown request in bulk actions: " + request.getClass().getSimpleName());
}
return false;
}
private void executeBulk(final BulkRequest bulkRequest, final long startTime, final ActionListener<BulkResponse> listener, final AtomicArray<BulkItemResponse> responses) {
ClusterState clusterState = clusterService.state(); ClusterState clusterState = clusterService.state();
// TODO use timeout to wait here if its blocked... // TODO use timeout to wait here if its blocked...
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.WRITE); clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.WRITE);
MetaData metaData = clusterState.metaData(); MetaData metaData = clusterState.metaData();
final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());
for (int i = 0; i < bulkRequest.requests.size(); i++) { for (int i = 0; i < bulkRequest.requests.size(); i++) {
ActionRequest request = bulkRequest.requests.get(i); ActionRequest request = bulkRequest.requests.get(i);
if (request instanceof IndexRequest) { if (request instanceof IndexRequest) {

View File

@ -608,4 +608,21 @@ public class BulkTests extends ElasticsearchIntegrationTest {
} }
} }
@Test // issue 4987
public void testThatInvalidIndexNamesShouldNotBreakCompleteBulkRequest() {
int bulkEntryCount = randomIntBetween(10, 50);
BulkRequestBuilder builder = client().prepareBulk();
boolean[] expectedFailures = new boolean[bulkEntryCount];
for (int i = 0; i < bulkEntryCount; i++) {
expectedFailures[i] = randomBoolean();
builder.add(client().prepareIndex().setIndex(expectedFailures[i] ? "INVALID.NAME" : "test").setType("type1").setId("1").setSource("field", 1));
}
BulkResponse bulkResponse = builder.get();
assertThat(bulkResponse.hasFailures(), is(true));
assertThat(bulkResponse.getItems().length, is(bulkEntryCount));
for (int i = 0; i < bulkEntryCount; i++) {
assertThat(bulkResponse.getItems()[i].isFailed(), is(expectedFailures[i]));
}
}
} }