From 405e5816b827db5823e6c2ed9c1bbe9fa4629355 Mon Sep 17 00:00:00 2001 From: Brian Murphy Date: Fri, 19 Sep 2014 09:55:53 +0100 Subject: [PATCH] Bulk API: Do not fail whole request on closed index The bulk API request was marked as completely failed, in case a request with a closed index was referred in any of the requests inside of a bulk one. Implementation Note: Currently the implementation is a bit more verbose in order to prevent an instanceof check and another cast - if that is fast enough, we could execute that logic only once at the beginning of the loop (thinking this might be a bit overoptimization here). Closes #6410 --- .../action/bulk/TransportBulkAction.java | 99 ++++++++++++------- .../action/delete/DeleteRequest.java | 3 +- .../action/index/IndexRequest.java | 3 +- .../action/update/UpdateRequest.java | 3 +- .../org/elasticsearch/document/BulkTests.java | 24 +++++ 5 files changed, 91 insertions(+), 41 deletions(-) diff --git a/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index b47c9694d0e..3fa5366e248 100644 --- a/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -27,6 +27,7 @@ import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.DocumentRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction; @@ -40,6 +41,7 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.GroupShardsIterator; @@ -47,8 +49,10 @@ import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndexAlreadyExistsException; +import org.elasticsearch.indices.IndexClosedException; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -96,26 +100,15 @@ public class TransportBulkAction extends HandledTransportAction indices = Sets.newHashSet(); for (ActionRequest request : bulkRequest.requests) { - if (request instanceof IndexRequest) { - IndexRequest indexRequest = (IndexRequest) request; - if (!indices.contains(indexRequest.index())) { - indices.add(indexRequest.index()); - } - } else if (request instanceof DeleteRequest) { - DeleteRequest deleteRequest = (DeleteRequest) request; - if (!indices.contains(deleteRequest.index())) { - indices.add(deleteRequest.index()); - } - } else if (request instanceof UpdateRequest) { - UpdateRequest updateRequest = (UpdateRequest) request; - if (!indices.contains(updateRequest.index())) { - indices.add(updateRequest.index()); + if (request instanceof DocumentRequest) { + DocumentRequest req = (DocumentRequest) request; + if (!indices.contains(req.index())) { + indices.add(req.index()); } } else { throw new ElasticsearchException("Parsed unknown request in bulk actions: " + request.getClass().getSimpleName()); } } - final AtomicInteger counter = new AtomicInteger(indices.size()); ClusterState state = clusterService.state(); for (final String index : indices) { @@ -204,30 +197,33 @@ public class TransportBulkAction extends HandledTransportAction responses, int idx, + final ConcreteIndices concreteIndices, + final MetaData metaData) { + String concreteIndex = concreteIndices.getConcreteIndex(request.index()); + boolean isClosed = false; + if (concreteIndex == null) { + try { + concreteIndex = concreteIndices.resolveIfAbsent(request.index(), request.indicesOptions()); + } catch (IndexClosedException ice) { + isClosed = true; + } + } + if (!isClosed) { + IndexMetaData indexMetaData = metaData.index(concreteIndex); + isClosed = indexMetaData.getState() == IndexMetaData.State.CLOSE; + } + if (isClosed) { + BulkItemResponse.Failure failure = new BulkItemResponse.Failure(request.index(), request.type(), request.id(), + new IndexClosedException(new Index(metaData.index(request.index()).getIndex()))); + BulkItemResponse bulkItemResponse = new BulkItemResponse(idx, "index", failure); + responses.set(idx, bulkItemResponse); + // make sure the request gets never processed again + bulkRequest.requests.set(idx, null); + } + return isClosed; + } + + private static class ConcreteIndices { private final Map indices = new HashMap<>(); private final MetaData metaData; diff --git a/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java b/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java index a0fb3e79181..cd33659f745 100644 --- a/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java +++ b/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java @@ -21,6 +21,7 @@ package org.elasticsearch.action.delete; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.DocumentRequest; import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; @@ -43,7 +44,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError; * @see org.elasticsearch.client.Client#delete(DeleteRequest) * @see org.elasticsearch.client.Requests#deleteRequest(String) */ -public class DeleteRequest extends ShardReplicationOperationRequest { +public class DeleteRequest extends ShardReplicationOperationRequest implements DocumentRequest { private String type; private String id; diff --git a/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 09ae0cd6238..27d44b8b58a 100644 --- a/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.TimestampParsingException; +import org.elasticsearch.action.DocumentRequest; import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest; import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.metadata.MappingMetaData; @@ -64,7 +65,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError; * @see org.elasticsearch.client.Requests#indexRequest(String) * @see org.elasticsearch.client.Client#index(IndexRequest) */ -public class IndexRequest extends ShardReplicationOperationRequest { +public class IndexRequest extends ShardReplicationOperationRequest implements DocumentRequest { /** * Operation type controls if the type of the index operation. diff --git a/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/src/main/java/org/elasticsearch/action/update/UpdateRequest.java index 670cfc2ee91..46abe672608 100644 --- a/src/main/java/org/elasticsearch/action/update/UpdateRequest.java +++ b/src/main/java/org/elasticsearch/action/update/UpdateRequest.java @@ -22,6 +22,7 @@ package org.elasticsearch.action.update; import com.google.common.collect.Maps; import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.DocumentRequest; import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.replication.ReplicationType; @@ -47,7 +48,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError; /** */ -public class UpdateRequest extends InstanceShardOperationRequest { +public class UpdateRequest extends InstanceShardOperationRequest implements DocumentRequest { private String type; private String id; diff --git a/src/test/java/org/elasticsearch/document/BulkTests.java b/src/test/java/org/elasticsearch/document/BulkTests.java index 2ae723f78bf..0fd27701fa5 100644 --- a/src/test/java/org/elasticsearch/document/BulkTests.java +++ b/src/test/java/org/elasticsearch/document/BulkTests.java @@ -22,6 +22,7 @@ package org.elasticsearch.document; import com.google.common.base.Charsets; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.count.CountResponse; @@ -651,8 +652,31 @@ public class BulkTests extends ElasticsearchIntegrationTest { assertThat(bulkItemResponse.getItems()[5].getOpType(), is("delete")); } + private static String indexOrAlias() { return randomBoolean() ? "test" : "alias"; } + + @Test // issue 6410 + public void testThatMissingIndexDoesNotAbortFullBulkRequest() throws Exception{ + createIndex("bulkindex1", "bulkindex2"); + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(new IndexRequest("bulkindex1", "index1_type", "1").source("text", "hallo1")) + .add(new IndexRequest("bulkindex2", "index2_type", "1").source("text", "hallo2")) + .add(new IndexRequest("bulkindex2", "index2_type").source("text", "hallo2")) + .add(new UpdateRequest("bulkindex2", "index2_type", "2").doc("foo", "bar")) + .add(new DeleteRequest("bulkindex2", "index2_type", "3")) + .refresh(true); + + client().bulk(bulkRequest).get(); + SearchResponse searchResponse = client().prepareSearch("bulkindex*").get(); + assertHitCount(searchResponse, 3); + + assertAcked(client().admin().indices().prepareClose("bulkindex2")); + + BulkResponse bulkResponse = client().bulk(bulkRequest).get(); + assertThat(bulkResponse.hasFailures(), is(true)); + assertThat(bulkResponse.getItems().length, is(5)); + } }