From 0fe73b51f9234148c373a02bb85c5e320b28ae33 Mon Sep 17 00:00:00 2001 From: javanna Date: Mon, 15 Feb 2016 14:34:58 +0100 Subject: [PATCH] Bulk api: fail deletes when routing is required but not specified As part of #10136 we removed the transport action for broadcast deletes in case routing is required but not specified. Bulk api worked differently though and kept on doing the broadcast delete internally in that case. This commit makes sure that delete items are marked as failed in such cases. Also the check has been moved up in the code together with the existing check for the update api, and we now make sure that the exception is the same as the one thrown for single document apis (delete/update). Note that the failure for the update api contained the wrong optype (the type of the document rather than "update"), that's been fixed too and tested. Closes #16645 --- .../action/bulk/TransportBulkAction.java | 110 +++++++++-------- .../action/delete/TransportDeleteAction.java | 12 +- .../action/update/TransportUpdateAction.java | 15 ++- .../cluster/routing/OperationRouting.java | 5 - .../routing/SimpleRoutingIT.java | 113 ++++++++++++++---- 5 files changed, 162 insertions(+), 93 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 52336ccae0b..b2d58203954 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -30,10 +30,12 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction; import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.delete.TransportDeleteAction; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.AutoCreateIndex; import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.action.update.TransportUpdateAction; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; @@ -42,12 +44,9 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.routing.GroupShardsIterator; -import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AtomicArray; -import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndexAlreadyExistsException; @@ -197,10 +196,10 @@ public class TransportBulkAction extends HandledTransportAction listener) { final long startTime = System.currentTimeMillis(); - executeBulk(bulkRequest, startTime, listener, new AtomicArray(bulkRequest.requests.size())); + executeBulk(bulkRequest, startTime, listener, new AtomicArray<>(bulkRequest.requests.size())); } - private final long buildTookInMillis(long startTime) { + private long buildTookInMillis(long startTime) { // protect ourselves against time going backwards return Math.max(1, System.currentTimeMillis() - startTime); } @@ -214,33 +213,53 @@ public class TransportBulkAction extends HandledTransportAction list = requestsByShard.get(shardIt.shardId()); - if (list == null) { - list = new ArrayList<>(); - requestsByShard.put(shardIt.shardId(), list); - } - list.add(new BulkItemRequest(i, deleteRequest)); - } - } else { - ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, deleteRequest.type(), deleteRequest.id(), deleteRequest.routing()).shardId(); - List list = requestsByShard.get(shardId); - if (list == null) { - list = new ArrayList<>(); - requestsByShard.put(shardId, list); - } - list.add(new BulkItemRequest(i, request)); + ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, deleteRequest.type(), deleteRequest.id(), deleteRequest.routing()).shardId(); + List list = requestsByShard.get(shardId); + if (list == null) { + list = new ArrayList<>(); + requestsByShard.put(shardId, list); } + list.add(new BulkItemRequest(i, request)); } else if (request instanceof UpdateRequest) { UpdateRequest updateRequest = (UpdateRequest) request; String concreteIndex = concreteIndices.getConcreteIndex(updateRequest.index()); - MappingMetaData mappingMd = clusterState.metaData().index(concreteIndex).mappingOrDefault(updateRequest.type()); - if (mappingMd != null && mappingMd.routing().required() && updateRequest.routing() == null) { - BulkItemResponse.Failure failure = new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(), - updateRequest.id(), new IllegalArgumentException("routing is required for this item")); - responses.set(i, new BulkItemResponse(i, updateRequest.type(), failure)); - continue; - } ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, updateRequest.type(), updateRequest.id(), updateRequest.routing()).shardId(); List list = requestsByShard.get(shardId); if (list == null) { diff --git a/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java b/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java index 3a0e7aeec21..3ded0ed8e83 100644 --- a/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java +++ b/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java @@ -96,23 +96,27 @@ public class TransportDeleteAction extends TransportReplicationAction listener) { diff --git a/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java b/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java index a4053ce857e..e605479d554 100644 --- a/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java +++ b/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java @@ -38,6 +38,7 @@ import org.elasticsearch.action.support.single.instance.TransportInstanceSingleO import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.PlainShardIterator; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; @@ -100,14 +101,18 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio @Override protected boolean resolveRequest(ClusterState state, UpdateRequest request, ActionListener listener) { - request.routing((state.metaData().resolveIndexRouting(request.parent(), request.routing(), request.index()))); - // Fail fast on the node that received the request, rather than failing when translating on the index or delete request. - if (request.routing() == null && state.getMetaData().routingRequired(request.concreteIndex(), request.type())) { - throw new RoutingMissingException(request.concreteIndex(), request.type(), request.id()); - } + resolveAndValidateRouting(state.metaData(), request.concreteIndex(), request); return true; } + public static void resolveAndValidateRouting(MetaData metaData, String concreteIndex, UpdateRequest request) { + request.routing((metaData.resolveIndexRouting(request.parent(), request.routing(), request.index()))); + // Fail fast on the node that received the request, rather than failing when translating on the index or delete request. + if (request.routing() == null && metaData.routingRequired(concreteIndex, request.type())) { + throw new RoutingMissingException(concreteIndex, request.type(), request.id()); + } + } + @Override protected void doExecute(final UpdateRequest request, final ActionListener listener) { // if we don't have a master, we don't have metadata, that's fine, let it find a master using create index API diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java b/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java index 184db017c10..2b67b9f1820 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java @@ -29,7 +29,6 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.math.MathUtils; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; @@ -67,10 +66,6 @@ public class OperationRouting extends AbstractComponent { return preferenceActiveShardIterator(indexShard, clusterState.nodes().localNodeId(), clusterState.nodes(), preference); } - public GroupShardsIterator broadcastDeleteShards(ClusterState clusterState, String index) { - return indexRoutingTable(clusterState, index).groupByShardsIt(); - } - public int searchShardsCount(ClusterState clusterState, String[] concreteIndices, @Nullable Map> routing) { final Set shards = computeTargetedShards(clusterState, concreteIndices, routing); return shards.size(); diff --git a/core/src/test/java/org/elasticsearch/routing/SimpleRoutingIT.java b/core/src/test/java/org/elasticsearch/routing/SimpleRoutingIT.java index 88d4cbb2200..42b8c1556df 100644 --- a/core/src/test/java/org/elasticsearch/routing/SimpleRoutingIT.java +++ b/core/src/test/java/org/elasticsearch/routing/SimpleRoutingIT.java @@ -20,24 +20,26 @@ package org.elasticsearch.routing; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.Version; import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.admin.indices.alias.Alias; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.explain.ExplainResponse; +import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.MultiGetRequest; import org.elasticsearch.action.get.MultiGetResponse; import org.elasticsearch.action.termvectors.MultiTermVectorsResponse; import org.elasticsearch.action.termvectors.TermVectorsRequest; import org.elasticsearch.action.termvectors.TermVectorsResponse; +import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Requests; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESIntegTestCase; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.nullValue; @@ -156,8 +158,7 @@ public class SimpleRoutingIT extends ESIntegTestCase { } } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/16645") - public void testRequiredRoutingMapping() throws Exception { + public void testRequiredRoutingCrudApis() throws Exception { client().admin().indices().prepareCreate("test").addAlias(new Alias("alias")) .addMapping("type1", XContentFactory.jsonBuilder().startObject().startObject("type1").startObject("_routing").field("required", true).endObject().endObject().endObject()) .execute().actionGet(); @@ -203,9 +204,31 @@ public class SimpleRoutingIT extends ESIntegTestCase { client().prepareIndex(indexOrAlias(), "type1", "1").setRouting("0").setSource("field", "value1").setRefresh(true).execute().actionGet(); logger.info("--> verifying get with no routing, should not find anything"); - logger.info("--> bulk deleting with no routing, should broadcast the delete since _routing is required"); - client().prepareBulk().add(Requests.deleteRequest(indexOrAlias()).type("type1").id("1")).execute().actionGet(); + try { + client().prepareUpdate(indexOrAlias(), "type1", "1").setDoc("field", "value2").execute().actionGet(); + fail("update with missing routing when routing is required should fail"); + } catch(ElasticsearchException e) { + assertThat(e.unwrapCause(), instanceOf(RoutingMissingException.class)); + } + + client().prepareUpdate(indexOrAlias(), "type1", "1").setRouting("0").setDoc("field", "value2").execute().actionGet(); client().admin().indices().prepareRefresh().execute().actionGet(); + + for (int i = 0; i < 5; i++) { + try { + client().prepareGet(indexOrAlias(), "type1", "1").execute().actionGet().isExists(); + fail(); + } catch (RoutingMissingException e) { + assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); + assertThat(e.getMessage(), equalTo("routing is required for [test]/[type1]/[1]")); + } + GetResponse getResponse = client().prepareGet(indexOrAlias(), "type1", "1").setRouting("0").execute().actionGet(); + assertThat(getResponse.isExists(), equalTo(true)); + assertThat(getResponse.getSourceAsMap().get("field"), equalTo("value2")); + } + + client().prepareDelete(indexOrAlias(), "type1", "1").setRouting("0").setRefresh(true).execute().actionGet(); + for (int i = 0; i < 5; i++) { try { client().prepareGet(indexOrAlias(), "type1", "1").execute().actionGet().isExists(); @@ -227,28 +250,72 @@ public class SimpleRoutingIT extends ESIntegTestCase { .execute().actionGet(); ensureGreen(); - logger.info("--> indexing with id [1], and routing [0]"); - client().prepareBulk().add( - client().prepareIndex(indexOrAlias(), "type1", "1").setRouting("0").setSource("field", "value1")).execute().actionGet(); - client().admin().indices().prepareRefresh().execute().actionGet(); + { + BulkResponse bulkResponse = client().prepareBulk().add(Requests.indexRequest(indexOrAlias()).type("type1").id("1") + .source("field", "value")).execute().actionGet(); + assertThat(bulkResponse.getItems().length, equalTo(1)); + assertThat(bulkResponse.hasFailures(), equalTo(true)); - logger.info("--> verifying get with no routing, should fail"); - for (int i = 0; i < 5; i++) { - try { - client().prepareGet(indexOrAlias(), "type1", "1").execute().actionGet().isExists(); - fail(); - } catch (RoutingMissingException e) { - assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); - assertThat(e.getMessage(), equalTo("routing is required for [test]/[type1]/[1]")); + for (BulkItemResponse bulkItemResponse : bulkResponse) { + assertThat(bulkItemResponse.isFailed(), equalTo(true)); + assertThat(bulkItemResponse.getOpType(), equalTo("index")); + assertThat(bulkItemResponse.getFailure().getStatus(), equalTo(RestStatus.BAD_REQUEST)); + assertThat(bulkItemResponse.getFailure().getCause(), instanceOf(RoutingMissingException.class)); + assertThat(bulkItemResponse.getFailureMessage(), containsString("routing is required for [test]/[type1]/[1]")); } } - logger.info("--> verifying get with routing, should find"); - for (int i = 0; i < 5; i++) { - assertThat(client().prepareGet(indexOrAlias(), "type1", "1").setRouting("0").execute().actionGet().isExists(), equalTo(true)); + + { + BulkResponse bulkResponse = client().prepareBulk().add(Requests.indexRequest(indexOrAlias()).type("type1").id("1").routing("0") + .source("field", "value")).execute().actionGet(); + assertThat(bulkResponse.hasFailures(), equalTo(false)); + } + + { + BulkResponse bulkResponse = client().prepareBulk().add(new UpdateRequest(indexOrAlias(), "type1", "1").doc("field", "value2")) + .execute().actionGet(); + assertThat(bulkResponse.getItems().length, equalTo(1)); + assertThat(bulkResponse.hasFailures(), equalTo(true)); + + for (BulkItemResponse bulkItemResponse : bulkResponse) { + assertThat(bulkItemResponse.isFailed(), equalTo(true)); + assertThat(bulkItemResponse.getOpType(), equalTo("update")); + assertThat(bulkItemResponse.getFailure().getStatus(), equalTo(RestStatus.BAD_REQUEST)); + assertThat(bulkItemResponse.getFailure().getCause(), instanceOf(RoutingMissingException.class)); + assertThat(bulkItemResponse.getFailureMessage(), containsString("routing is required for [test]/[type1]/[1]")); + } + } + + { + BulkResponse bulkResponse = client().prepareBulk().add(new UpdateRequest(indexOrAlias(), "type1", "1").doc("field", "value2") + .routing("0")).execute().actionGet(); + assertThat(bulkResponse.hasFailures(), equalTo(false)); + } + + { + BulkResponse bulkResponse = client().prepareBulk().add(Requests.deleteRequest(indexOrAlias()).type("type1").id("1")) + .execute().actionGet(); + assertThat(bulkResponse.getItems().length, equalTo(1)); + assertThat(bulkResponse.hasFailures(), equalTo(true)); + + for (BulkItemResponse bulkItemResponse : bulkResponse) { + assertThat(bulkItemResponse.isFailed(), equalTo(true)); + assertThat(bulkItemResponse.getOpType(), equalTo("delete")); + assertThat(bulkItemResponse.getFailure().getStatus(), equalTo(RestStatus.BAD_REQUEST)); + assertThat(bulkItemResponse.getFailure().getCause(), instanceOf(RoutingMissingException.class)); + assertThat(bulkItemResponse.getFailureMessage(), containsString("routing is required for [test]/[type1]/[1]")); + } + } + + { + BulkResponse bulkResponse = client().prepareBulk().add(Requests.deleteRequest(indexOrAlias()).type("type1").id("1") + .routing("0")).execute().actionGet(); + assertThat(bulkResponse.getItems().length, equalTo(1)); + assertThat(bulkResponse.hasFailures(), equalTo(false)); } } - public void testRequiredRoutingMapping_variousAPIs() throws Exception { + public void testRequiredRoutingMappingVariousAPIs() throws Exception { client().admin().indices().prepareCreate("test").addAlias(new Alias("alias")) .addMapping("type1", XContentFactory.jsonBuilder().startObject().startObject("type1").startObject("_routing").field("required", true).endObject().endObject().endObject()) .execute().actionGet();