From 61c21f9a0e148db3e450c87c4690bef173ccc821 Mon Sep 17 00:00:00 2001 From: Brian Murphy Date: Fri, 19 Sep 2014 10:54:22 +0100 Subject: [PATCH] Bulk API: Do not fail whole request on closed index The bulk API request was marked as completely failed, in case a request with a closed index was referred in any of the requests inside of a bulk one. Implementation Note: Currently the implementation is a bit more verbose in order to prevent an instanceof check and another cast - if that is fast enough, we could execute that logic only once at the beginning of the loop (thinking this might be a bit overoptimization here). Closes #6410 --- .../elasticsearch/action/DocumentRequest.java | 66 +++++++++++++ .../action/bulk/TransportBulkAction.java | 99 ++++++++++++------- .../action/delete/DeleteRequest.java | 3 +- .../action/index/IndexRequest.java | 3 +- .../action/update/UpdateRequest.java | 3 +- .../org/elasticsearch/document/BulkTests.java | 24 +++++ 6 files changed, 157 insertions(+), 41 deletions(-) create mode 100644 src/main/java/org/elasticsearch/action/DocumentRequest.java diff --git a/src/main/java/org/elasticsearch/action/DocumentRequest.java b/src/main/java/org/elasticsearch/action/DocumentRequest.java new file mode 100644 index 00000000000..8d26006ffa5 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/DocumentRequest.java @@ -0,0 +1,66 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action; + +import org.elasticsearch.action.support.IndicesOptions; + +/** + * Generic interface to group ActionRequest, which work on single document level + * + * Forces this class return index/type/id getters + */ +public interface DocumentRequest { + + /** + * Get the index that this request operates on + * @return the index + */ + String index(); + + /** + * Get the type that this request operates on + * @return the type + */ + String type(); + + /** + * Get the id of the document for this request + * @return the id + */ + String id(); + + /** + * Get the options for this request + * @return the indices options + */ + IndicesOptions indicesOptions(); + + /** + * Set the routing for this request + * @param routing + * @return the Request + */ + T routing(String routing); + + /** + * Get the routing for this request + * @return the Routing + */ + String routing(); +} diff --git a/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index b47c9694d0e..3fa5366e248 100644 --- a/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -27,6 +27,7 @@ import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.DocumentRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction; @@ -40,6 +41,7 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.GroupShardsIterator; @@ -47,8 +49,10 @@ import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndexAlreadyExistsException; +import org.elasticsearch.indices.IndexClosedException; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -96,26 +100,15 @@ public class TransportBulkAction extends HandledTransportAction indices = Sets.newHashSet(); for (ActionRequest request : bulkRequest.requests) { - if (request instanceof IndexRequest) { - IndexRequest indexRequest = (IndexRequest) request; - if (!indices.contains(indexRequest.index())) { - indices.add(indexRequest.index()); - } - } else if (request instanceof DeleteRequest) { - DeleteRequest deleteRequest = (DeleteRequest) request; - if (!indices.contains(deleteRequest.index())) { - indices.add(deleteRequest.index()); - } - } else if (request instanceof UpdateRequest) { - UpdateRequest updateRequest = (UpdateRequest) request; - if (!indices.contains(updateRequest.index())) { - indices.add(updateRequest.index()); + if (request instanceof DocumentRequest) { + DocumentRequest req = (DocumentRequest) request; + if (!indices.contains(req.index())) { + indices.add(req.index()); } } else { throw new ElasticsearchException("Parsed unknown request in bulk actions: " + request.getClass().getSimpleName()); } } - final AtomicInteger counter = new AtomicInteger(indices.size()); ClusterState state = clusterService.state(); for (final String index : indices) { @@ -204,30 +197,33 @@ public class TransportBulkAction extends HandledTransportAction responses, int idx, + final ConcreteIndices concreteIndices, + final MetaData metaData) { + String concreteIndex = concreteIndices.getConcreteIndex(request.index()); + boolean isClosed = false; + if (concreteIndex == null) { + try { + concreteIndex = concreteIndices.resolveIfAbsent(request.index(), request.indicesOptions()); + } catch (IndexClosedException ice) { + isClosed = true; + } + } + if (!isClosed) { + IndexMetaData indexMetaData = metaData.index(concreteIndex); + isClosed = indexMetaData.getState() == IndexMetaData.State.CLOSE; + } + if (isClosed) { + BulkItemResponse.Failure failure = new BulkItemResponse.Failure(request.index(), request.type(), request.id(), + new IndexClosedException(new Index(metaData.index(request.index()).getIndex()))); + BulkItemResponse bulkItemResponse = new BulkItemResponse(idx, "index", failure); + responses.set(idx, bulkItemResponse); + // make sure the request gets never processed again + bulkRequest.requests.set(idx, null); + } + return isClosed; + } + + private static class ConcreteIndices { private final Map indices = new HashMap<>(); private final MetaData metaData; diff --git a/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java b/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java index a0fb3e79181..cd33659f745 100644 --- a/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java +++ b/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java @@ -21,6 +21,7 @@ package org.elasticsearch.action.delete; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.DocumentRequest; import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; @@ -43,7 +44,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError; * @see org.elasticsearch.client.Client#delete(DeleteRequest) * @see org.elasticsearch.client.Requests#deleteRequest(String) */ -public class DeleteRequest extends ShardReplicationOperationRequest { +public class DeleteRequest extends ShardReplicationOperationRequest implements DocumentRequest { private String type; private String id; diff --git a/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 09ae0cd6238..27d44b8b58a 100644 --- a/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.TimestampParsingException; +import org.elasticsearch.action.DocumentRequest; import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest; import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.metadata.MappingMetaData; @@ -64,7 +65,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError; * @see org.elasticsearch.client.Requests#indexRequest(String) * @see org.elasticsearch.client.Client#index(IndexRequest) */ -public class IndexRequest extends ShardReplicationOperationRequest { +public class IndexRequest extends ShardReplicationOperationRequest implements DocumentRequest { /** * Operation type controls if the type of the index operation. diff --git a/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/src/main/java/org/elasticsearch/action/update/UpdateRequest.java index 670cfc2ee91..46abe672608 100644 --- a/src/main/java/org/elasticsearch/action/update/UpdateRequest.java +++ b/src/main/java/org/elasticsearch/action/update/UpdateRequest.java @@ -22,6 +22,7 @@ package org.elasticsearch.action.update; import com.google.common.collect.Maps; import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.DocumentRequest; import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.replication.ReplicationType; @@ -47,7 +48,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError; /** */ -public class UpdateRequest extends InstanceShardOperationRequest { +public class UpdateRequest extends InstanceShardOperationRequest implements DocumentRequest { private String type; private String id; diff --git a/src/test/java/org/elasticsearch/document/BulkTests.java b/src/test/java/org/elasticsearch/document/BulkTests.java index 2ae723f78bf..0fd27701fa5 100644 --- a/src/test/java/org/elasticsearch/document/BulkTests.java +++ b/src/test/java/org/elasticsearch/document/BulkTests.java @@ -22,6 +22,7 @@ package org.elasticsearch.document; import com.google.common.base.Charsets; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.count.CountResponse; @@ -651,8 +652,31 @@ public class BulkTests extends ElasticsearchIntegrationTest { assertThat(bulkItemResponse.getItems()[5].getOpType(), is("delete")); } + private static String indexOrAlias() { return randomBoolean() ? "test" : "alias"; } + + @Test // issue 6410 + public void testThatMissingIndexDoesNotAbortFullBulkRequest() throws Exception{ + createIndex("bulkindex1", "bulkindex2"); + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(new IndexRequest("bulkindex1", "index1_type", "1").source("text", "hallo1")) + .add(new IndexRequest("bulkindex2", "index2_type", "1").source("text", "hallo2")) + .add(new IndexRequest("bulkindex2", "index2_type").source("text", "hallo2")) + .add(new UpdateRequest("bulkindex2", "index2_type", "2").doc("foo", "bar")) + .add(new DeleteRequest("bulkindex2", "index2_type", "3")) + .refresh(true); + + client().bulk(bulkRequest).get(); + SearchResponse searchResponse = client().prepareSearch("bulkindex*").get(); + assertHitCount(searchResponse, 3); + + assertAcked(client().admin().indices().prepareClose("bulkindex2")); + + BulkResponse bulkResponse = client().bulk(bulkRequest).get(); + assertThat(bulkResponse.hasFailures(), is(true)); + assertThat(bulkResponse.getItems().length, is(5)); + } }