diff --git a/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 3fa5366e248..b47c9694d0e 100644 --- a/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -27,7 +27,6 @@ import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.DocumentRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction; @@ -41,7 +40,6 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.GroupShardsIterator; @@ -49,10 +47,8 @@ import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AtomicArray; -import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndexAlreadyExistsException; -import org.elasticsearch.indices.IndexClosedException; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -100,15 +96,26 @@ public class TransportBulkAction extends HandledTransportAction indices = Sets.newHashSet(); for (ActionRequest request : bulkRequest.requests) { - if (request instanceof DocumentRequest) { - DocumentRequest req = (DocumentRequest) request; - if (!indices.contains(req.index())) { - indices.add(req.index()); + if (request instanceof IndexRequest) { + IndexRequest indexRequest = (IndexRequest) request; + if (!indices.contains(indexRequest.index())) { + indices.add(indexRequest.index()); + } + } else if (request instanceof DeleteRequest) { + DeleteRequest deleteRequest = (DeleteRequest) request; + if (!indices.contains(deleteRequest.index())) { + indices.add(deleteRequest.index()); + } + } else if (request instanceof UpdateRequest) { + UpdateRequest updateRequest = (UpdateRequest) request; + if (!indices.contains(updateRequest.index())) { + indices.add(updateRequest.index()); } } else { throw new ElasticsearchException("Parsed unknown request in bulk actions: " + request.getClass().getSimpleName()); } } + final AtomicInteger counter = new AtomicInteger(indices.size()); ClusterState state = clusterService.state(); for (final String index : indices) { @@ -197,33 +204,30 @@ public class TransportBulkAction extends HandledTransportAction responses, int idx, - final ConcreteIndices concreteIndices, - final MetaData metaData) { - String concreteIndex = concreteIndices.getConcreteIndex(request.index()); - boolean isClosed = false; - if (concreteIndex == null) { - try { - concreteIndex = concreteIndices.resolveIfAbsent(request.index(), request.indicesOptions()); - } catch (IndexClosedException ice) { - isClosed = true; - } - } - if (!isClosed) { - IndexMetaData indexMetaData = metaData.index(concreteIndex); - isClosed = indexMetaData.getState() == IndexMetaData.State.CLOSE; - } - if (isClosed) { - BulkItemResponse.Failure failure = new BulkItemResponse.Failure(request.index(), request.type(), request.id(), - new IndexClosedException(new Index(metaData.index(request.index()).getIndex()))); - BulkItemResponse bulkItemResponse = new BulkItemResponse(idx, "index", failure); - responses.set(idx, bulkItemResponse); - // make sure the request gets never processed again - bulkRequest.requests.set(idx, null); - } - return isClosed; - } - - private static class ConcreteIndices { + private final Map indices = new HashMap<>(); private final MetaData metaData; diff --git a/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java b/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java index cd33659f745..a0fb3e79181 100644 --- a/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java +++ b/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java @@ -21,7 +21,6 @@ package org.elasticsearch.action.delete; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.DocumentRequest; import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; @@ -44,7 +43,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError; * @see org.elasticsearch.client.Client#delete(DeleteRequest) * @see org.elasticsearch.client.Requests#deleteRequest(String) */ -public class DeleteRequest extends ShardReplicationOperationRequest implements DocumentRequest { +public class DeleteRequest extends ShardReplicationOperationRequest { private String type; private String id; diff --git a/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 27d44b8b58a..09ae0cd6238 100644 --- a/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -25,7 +25,6 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.TimestampParsingException; -import org.elasticsearch.action.DocumentRequest; import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest; import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.metadata.MappingMetaData; @@ -65,7 +64,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError; * @see org.elasticsearch.client.Requests#indexRequest(String) * @see org.elasticsearch.client.Client#index(IndexRequest) */ -public class IndexRequest extends ShardReplicationOperationRequest implements DocumentRequest { +public class IndexRequest extends ShardReplicationOperationRequest { /** * Operation type controls if the type of the index operation. diff --git a/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/src/main/java/org/elasticsearch/action/update/UpdateRequest.java index 46abe672608..670cfc2ee91 100644 --- a/src/main/java/org/elasticsearch/action/update/UpdateRequest.java +++ b/src/main/java/org/elasticsearch/action/update/UpdateRequest.java @@ -22,7 +22,6 @@ package org.elasticsearch.action.update; import com.google.common.collect.Maps; import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.DocumentRequest; import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.replication.ReplicationType; @@ -48,7 +47,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError; /** */ -public class UpdateRequest extends InstanceShardOperationRequest implements DocumentRequest { +public class UpdateRequest extends InstanceShardOperationRequest { private String type; private String id; diff --git a/src/test/java/org/elasticsearch/document/BulkTests.java b/src/test/java/org/elasticsearch/document/BulkTests.java index 0fd27701fa5..2ae723f78bf 100644 --- a/src/test/java/org/elasticsearch/document/BulkTests.java +++ b/src/test/java/org/elasticsearch/document/BulkTests.java @@ -22,7 +22,6 @@ package org.elasticsearch.document; import com.google.common.base.Charsets; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.bulk.BulkItemResponse; -import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.count.CountResponse; @@ -652,31 +651,8 @@ public class BulkTests extends ElasticsearchIntegrationTest { assertThat(bulkItemResponse.getItems()[5].getOpType(), is("delete")); } - private static String indexOrAlias() { return randomBoolean() ? "test" : "alias"; } - - @Test // issue 6410 - public void testThatMissingIndexDoesNotAbortFullBulkRequest() throws Exception{ - createIndex("bulkindex1", "bulkindex2"); - BulkRequest bulkRequest = new BulkRequest(); - bulkRequest.add(new IndexRequest("bulkindex1", "index1_type", "1").source("text", "hallo1")) - .add(new IndexRequest("bulkindex2", "index2_type", "1").source("text", "hallo2")) - .add(new IndexRequest("bulkindex2", "index2_type").source("text", "hallo2")) - .add(new UpdateRequest("bulkindex2", "index2_type", "2").doc("foo", "bar")) - .add(new DeleteRequest("bulkindex2", "index2_type", "3")) - .refresh(true); - - client().bulk(bulkRequest).get(); - SearchResponse searchResponse = client().prepareSearch("bulkindex*").get(); - assertHitCount(searchResponse, 3); - - assertAcked(client().admin().indices().prepareClose("bulkindex2")); - - BulkResponse bulkResponse = client().bulk(bulkRequest).get(); - assertThat(bulkResponse.hasFailures(), is(true)); - assertThat(bulkResponse.getItems().length, is(5)); - } }