diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index bdaeff4347e..4f874e4d123 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -31,6 +31,9 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.BaseAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.common.UUID; import org.elasticsearch.common.collect.Lists; import org.elasticsearch.common.collect.Maps; @@ -154,20 +157,39 @@ public class TransportBulkAction extends BaseAction { Map> requestsByShard = Maps.newHashMap(); for (int i = 0; i < bulkRequest.requests.size(); i++) { ActionRequest request = bulkRequest.requests.get(i); - ShardId shardId = null; if (request instanceof IndexRequest) { IndexRequest indexRequest = (IndexRequest) request; - shardId = clusterService.operationRouting().indexShards(clusterState, indexRequest.index(), indexRequest.type(), indexRequest.id(), indexRequest.routing()).shardId(); + ShardId shardId = clusterService.operationRouting().indexShards(clusterState, indexRequest.index(), indexRequest.type(), indexRequest.id(), indexRequest.routing()).shardId(); + List list = requestsByShard.get(shardId); + if (list == null) { + list = Lists.newArrayList(); + requestsByShard.put(shardId, list); + } + list.add(new BulkItemRequest(i, request)); } else if (request instanceof DeleteRequest) { DeleteRequest deleteRequest = (DeleteRequest) request; - shardId = clusterService.operationRouting().deleteShards(clusterState, deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), deleteRequest.routing()).shardId(); + MappingMetaData mappingMd = clusterState.metaData().index(deleteRequest.index()).mapping(deleteRequest.type()); + if (mappingMd != null && mappingMd.routing().required() && deleteRequest.routing() == null) { + // if routing is required, and no routing on the delete request, we need to broadcast it.... + GroupShardsIterator groupShards = clusterService.operationRouting().broadcastDeleteShards(clusterState, deleteRequest.index()); + for (ShardsIterator shardsId : groupShards) { + List list = requestsByShard.get(shardsId.shardId()); + if (list == null) { + list = Lists.newArrayList(); + requestsByShard.put(shardsId.shardId(), list); + } + list.add(new BulkItemRequest(i, request)); + } + } else { + ShardId shardId = clusterService.operationRouting().deleteShards(clusterState, deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), deleteRequest.routing()).shardId(); + List list = requestsByShard.get(shardId); + if (list == null) { + list = Lists.newArrayList(); + requestsByShard.put(shardId, list); + } + list.add(new BulkItemRequest(i, request)); + } } - List list = requestsByShard.get(shardId); - if (list == null) { - list = Lists.newArrayList(); - requestsByShard.put(shardId, list); - } - list.add(new BulkItemRequest(i, request)); } final AtomicInteger counter = new AtomicInteger(requestsByShard.size()); diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/routing/SimpleRoutingTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/routing/SimpleRoutingTests.java index 706984bbad6..4323fd53f6e 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/routing/SimpleRoutingTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/routing/SimpleRoutingTests.java @@ -22,6 +22,7 @@ package org.elasticsearch.test.integration.routing; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.client.Client; +import org.elasticsearch.client.Requests; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.query.xcontent.QueryBuilders; import org.elasticsearch.test.integration.AbstractNodesTests; @@ -222,5 +223,17 @@ public class SimpleRoutingTests extends AbstractNodesTests { assertThat(client.prepareGet("test", "type1", "1").execute().actionGet().exists(), equalTo(false)); assertThat(client.prepareGet("test", "type1", "1").setRouting("0").execute().actionGet().exists(), equalTo(false)); } + + logger.info("--> indexing with id [1], and routing [0]"); + client.prepareIndex("test", "type1", "1").setRouting("0").setSource("field", "value1").setRefresh(true).execute().actionGet(); + logger.info("--> verifying get with no routing, should not find anything"); + + logger.info("--> bulk deleting with no routing, should broadcast the delete since _routing is required"); + client.prepareBulk().add(Requests.deleteRequest("test").type("type1").id("1")).execute().actionGet(); + client.admin().indices().prepareRefresh().execute().actionGet(); + for (int i = 0; i < 5; i++) { + assertThat(client.prepareGet("test", "type1", "1").execute().actionGet().exists(), equalTo(false)); + assertThat(client.prepareGet("test", "type1", "1").setRouting("0").execute().actionGet().exists(), equalTo(false)); + } } }