diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActionModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActionModule.java index 330fb094c6a..50d0b270f64 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActionModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActionModule.java @@ -48,6 +48,8 @@ import org.elasticsearch.action.bulk.TransportBulkAction; import org.elasticsearch.action.bulk.TransportShardBulkAction; import org.elasticsearch.action.count.TransportCountAction; import org.elasticsearch.action.delete.TransportDeleteAction; +import org.elasticsearch.action.delete.index.TransportIndexDeleteAction; +import org.elasticsearch.action.delete.index.TransportShardDeleteAction; import org.elasticsearch.action.deletebyquery.TransportDeleteByQueryAction; import org.elasticsearch.action.deletebyquery.TransportIndexDeleteByQueryAction; import org.elasticsearch.action.deletebyquery.TransportShardDeleteByQueryAction; @@ -99,6 +101,8 @@ public class TransportActionModule extends AbstractModule { bind(TransportIndexAction.class).asEagerSingleton(); bind(TransportGetAction.class).asEagerSingleton(); bind(TransportDeleteAction.class).asEagerSingleton(); + bind(TransportIndexDeleteAction.class).asEagerSingleton(); + bind(TransportShardDeleteAction.class).asEagerSingleton(); bind(TransportCountAction.class).asEagerSingleton(); bind(TransportBulkAction.class).asEagerSingleton(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java index 4717344a7fc..ea49b92b5ca 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java @@ -25,11 +25,15 @@ import org.elasticsearch.action.TransportActions; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction; +import org.elasticsearch.action.delete.index.IndexDeleteRequest; +import org.elasticsearch.action.delete.index.IndexDeleteResponse; +import org.elasticsearch.action.delete.index.TransportIndexDeleteAction; import org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -51,11 +55,14 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct private final TransportCreateIndexAction createIndexAction; + private final TransportIndexDeleteAction indexDeleteAction; + @Inject public TransportDeleteAction(Settings settings, TransportService transportService, ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, - TransportCreateIndexAction createIndexAction) { + TransportCreateIndexAction createIndexAction, TransportIndexDeleteAction indexDeleteAction) { super(settings, transportService, clusterService, indicesService, threadPool, shardStateAction); this.createIndexAction = createIndexAction; + this.indexDeleteAction = indexDeleteAction; this.autoCreateIndex = settings.getAsBoolean("action.auto_create_index", true); } @@ -63,23 +70,47 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct if (autoCreateIndex && !clusterService.state().metaData().hasConcreteIndex(deleteRequest.index())) { createIndexAction.execute(new CreateIndexRequest(deleteRequest.index()), new ActionListener() { @Override public void onResponse(CreateIndexResponse result) { - TransportDeleteAction.super.doExecute(deleteRequest, listener); + innerExecute(deleteRequest, listener); } @Override public void onFailure(Throwable e) { if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) { // we have the index, do it - TransportDeleteAction.super.doExecute(deleteRequest, listener); + innerExecute(deleteRequest, listener); } else { listener.onFailure(e); } } }); } else { - super.doExecute(deleteRequest, listener); + innerExecute(deleteRequest, listener); } } + private void innerExecute(final DeleteRequest request, final ActionListener listener) { + ClusterState clusterState = clusterService.state(); + request.index(clusterState.metaData().concreteIndex(request.index())); // we need to get the concrete index here... + if (clusterState.metaData().hasIndex(request.index())) { + // check if routing is required, if so, do a broadcast delete + MappingMetaData mappingMd = clusterState.metaData().index(request.index()).mapping(request.type()); + if (mappingMd != null && mappingMd.routing().required()) { + if (request.routing() == null) { + indexDeleteAction.execute(new IndexDeleteRequest(request), new ActionListener() { + @Override public void onResponse(IndexDeleteResponse indexDeleteResponse) { + listener.onResponse(new DeleteResponse(request.index(), request.type(), request.id())); + } + + @Override public void onFailure(Throwable e) { + listener.onFailure(e); + } + }); + return; + } + } + } + super.doExecute(request, listener); + } + @Override protected boolean checkWriteConsistency() { return true; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/index/IndexDeleteRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/index/IndexDeleteRequest.java new file mode 100644 index 00000000000..692392dbf29 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/index/IndexDeleteRequest.java @@ -0,0 +1,78 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.delete.index; + +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.support.replication.IndexReplicationOperationRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * @author kimchy (shay.banon) + */ +public class IndexDeleteRequest extends IndexReplicationOperationRequest { + + private String type; + + private String id; + + private boolean refresh = false; + + IndexDeleteRequest() { + } + + public IndexDeleteRequest(DeleteRequest request) { + this.timeout = request.timeout(); + this.consistencyLevel = request.consistencyLevel(); + this.replicationType = request.replicationType(); + this.index = request.index(); + this.type = request.type(); + this.id = request.id(); + this.refresh = request.refresh(); + } + + public String type() { + return this.type; + } + + public String id() { + return this.id; + } + + public boolean refresh() { + return this.refresh; + } + + @Override public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + type = in.readUTF(); + id = in.readUTF(); + refresh = in.readBoolean(); + } + + @Override public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeUTF(type); + out.writeUTF(id); + out.writeBoolean(refresh); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/index/IndexDeleteResponse.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/index/IndexDeleteResponse.java new file mode 100644 index 00000000000..8ea14db1cee --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/index/IndexDeleteResponse.java @@ -0,0 +1,119 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.delete.index; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; + +import java.io.IOException; + +/** + * Delete by query response executed on a specific index. + * + * @author kimchy (shay.banon) + */ +public class IndexDeleteResponse implements ActionResponse, Streamable { + + private String index; + + private int successfulShards; + + private int failedShards; + + IndexDeleteResponse(String index, int successfulShards, int failedShards) { + this.index = index; + this.successfulShards = successfulShards; + this.failedShards = failedShards; + } + + IndexDeleteResponse() { + + } + + /** + * The index the delete by query operation was executed against. + */ + public String index() { + return this.index; + } + + /** + * The index the delete by query operation was executed against. + */ + public String getIndex() { + return index; + } + + /** + * The total number of shards the delete by query was executed on. + */ + public int totalShards() { + return failedShards + successfulShards; + } + + /** + * The total number of shards the delete by query was executed on. + */ + public int getTotalShards() { + return totalShards(); + } + + /** + * The successful number of shards the delete by query was executed on. + */ + public int successfulShards() { + return successfulShards; + } + + /** + * The successful number of shards the delete by query was executed on. + */ + public int getSuccessfulShards() { + return successfulShards; + } + + /** + * The failed number of shards the delete by query was executed on. + */ + public int failedShards() { + return failedShards; + } + + /** + * The failed number of shards the delete by query was executed on. + */ + public int getFailedShards() { + return failedShards; + } + + @Override public void readFrom(StreamInput in) throws IOException { + index = in.readUTF(); + successfulShards = in.readVInt(); + failedShards = in.readVInt(); + } + + @Override public void writeTo(StreamOutput out) throws IOException { + out.writeUTF(index); + out.writeVInt(successfulShards); + out.writeVInt(failedShards); + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/index/ShardDeleteRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/index/ShardDeleteRequest.java new file mode 100644 index 00000000000..26e775b0bdc --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/index/ShardDeleteRequest.java @@ -0,0 +1,99 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.delete.index; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +import static org.elasticsearch.action.Actions.*; + +/** + * Delete by query request to execute on a specific shard. + * + * @author kimchy (shay.banon) + */ +public class ShardDeleteRequest extends ShardReplicationOperationRequest { + + private int shardId; + private String type; + private String id; + private boolean refresh = false; + + ShardDeleteRequest(IndexDeleteRequest request, int shardId) { + this.index = request.index(); + this.shardId = shardId; + this.type = request.type(); + this.id = request.id(); + replicationType(request.replicationType()); + consistencyLevel(request.consistencyLevel()); + timeout = request.timeout(); + this.refresh = request.refresh(); + } + + ShardDeleteRequest() { + } + + @Override public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = super.validate(); + if (type == null) { + addValidationError("type is missing", validationException); + } + if (id == null) { + addValidationError("id is missing", validationException); + } + return validationException; + } + + public int shardId() { + return this.shardId; + } + + public String type() { + return this.type; + } + + public String id() { + return this.id; + } + + public boolean refresh() { + return this.refresh; + } + + @Override public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + shardId = in.readVInt(); + type = in.readUTF(); + id = in.readUTF(); + refresh = in.readBoolean(); + } + + @Override public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVInt(shardId); + out.writeUTF(type); + out.writeUTF(id); + out.writeBoolean(refresh); + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/index/ShardDeleteResponse.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/index/ShardDeleteResponse.java new file mode 100644 index 00000000000..96b6a5e370a --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/index/ShardDeleteResponse.java @@ -0,0 +1,41 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.delete.index; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; + +import java.io.IOException; + +/** + * Delete response executed on a specific shard. + * + * @author kimchy (shay.banon) + */ +public class ShardDeleteResponse implements ActionResponse, Streamable { + + @Override public void readFrom(StreamInput in) throws IOException { + } + + @Override public void writeTo(StreamOutput out) throws IOException { + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/index/TransportIndexDeleteAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/index/TransportIndexDeleteAction.java new file mode 100644 index 00000000000..015a0936ea4 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/index/TransportIndexDeleteAction.java @@ -0,0 +1,80 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.delete.index; + +import org.elasticsearch.action.support.replication.TransportIndexReplicationOperationAction; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.concurrent.atomic.AtomicReferenceArray; + +/** + * @author kimchy (shay.banon) + */ +public class TransportIndexDeleteAction extends TransportIndexReplicationOperationAction { + + @Inject public TransportIndexDeleteAction(Settings settings, ClusterService clusterService, TransportService transportService, + ThreadPool threadPool, TransportShardDeleteAction deleteAction) { + super(settings, transportService, clusterService, threadPool, deleteAction); + } + + @Override protected IndexDeleteRequest newRequestInstance() { + return new IndexDeleteRequest(); + } + + @Override protected IndexDeleteResponse newResponseInstance(IndexDeleteRequest request, AtomicReferenceArray shardsResponses) { + int successfulShards = 0; + int failedShards = 0; + for (int i = 0; i < shardsResponses.length(); i++) { + if (shardsResponses.get(i) == null) { + failedShards++; + } else { + successfulShards++; + } + } + return new IndexDeleteResponse(request.index(), successfulShards, failedShards); + } + + @Override protected boolean accumulateExceptions() { + return false; + } + + @Override protected String transportAction() { + return "indices/index/delete"; + } + + @Override protected void checkBlock(IndexDeleteRequest request, ClusterState state) { + state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, request.index()); + } + + @Override protected GroupShardsIterator shards(IndexDeleteRequest request) { + return clusterService.operationRouting().broadcastDeleteShards(clusterService.state(), request.index()); + } + + @Override protected ShardDeleteRequest newShardRequestInstance(IndexDeleteRequest request, int shardId) { + return new ShardDeleteRequest(request, shardId); + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/index/TransportShardDeleteAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/index/TransportShardDeleteAction.java new file mode 100644 index 00000000000..08355a258bc --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/index/TransportShardDeleteAction.java @@ -0,0 +1,95 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.delete.index; + +import org.elasticsearch.ElasticSearchIllegalStateException; +import org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.cluster.routing.ShardsIterator; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.shard.service.IndexShard; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +/** + * @author kimchy (Shay Banon) + */ +public class TransportShardDeleteAction extends TransportShardReplicationOperationAction { + + @Inject public TransportShardDeleteAction(Settings settings, TransportService transportService, + ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, + ShardStateAction shardStateAction) { + super(settings, transportService, clusterService, indicesService, threadPool, shardStateAction); + } + + @Override protected boolean checkWriteConsistency() { + return true; + } + + @Override protected ShardDeleteRequest newRequestInstance() { + return new ShardDeleteRequest(); + } + + @Override protected ShardDeleteResponse newResponseInstance() { + return new ShardDeleteResponse(); + } + + @Override protected String transportAction() { + return "indices/index/b_shard/delete"; + } + + @Override protected void checkBlock(ShardDeleteRequest request, ClusterState state) { + state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, request.index()); + } + + @Override protected ShardDeleteResponse shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) { + ShardDeleteRequest request = shardRequest.request; + IndexShard indexShard = indexShard(shardRequest); + Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id()); + delete.refresh(request.refresh()); + indexShard.delete(delete); + return new ShardDeleteResponse(); + } + + @Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) { + ShardDeleteRequest request = shardRequest.request; + IndexShard indexShard = indexShard(shardRequest); + Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id()); + delete.refresh(request.refresh()); + indexShard.delete(delete); + } + + @Override protected ShardsIterator shards(ClusterState clusterState, ShardDeleteRequest request) { + GroupShardsIterator group = clusterService.operationRouting().broadcastDeleteShards(clusterService.state(), request.index()); + for (ShardsIterator shards : group) { + if (shards.shardId().id() == request.shardId()) { + return shards; + } + } + throw new ElasticSearchIllegalStateException("No shards iterator found for shard [" + request.shardId() + "]"); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java index 4c98fb8bac8..6ab3d57a0d7 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java @@ -135,7 +135,7 @@ public abstract class TransportShardReplicationOperationAction { + class OperationTransportHandler extends BaseTransportRequestHandler { @Override public Request newInstance() { return newRequestInstance(); @@ -170,7 +170,7 @@ public abstract class TransportShardReplicationOperationAction { + class ReplicaOperationTransportHandler extends BaseTransportRequestHandler { @Override public ShardOperationRequest newInstance() { return new ShardOperationRequest(); @@ -215,7 +215,7 @@ public abstract class TransportShardReplicationOperationAction listener; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/operation/OperationRouting.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/operation/OperationRouting.java index b5efea9a784..d736c46d2db 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/operation/OperationRouting.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/operation/OperationRouting.java @@ -36,6 +36,8 @@ public interface OperationRouting { ShardsIterator deleteShards(ClusterState clusterState, String index, String type, String id, @Nullable String routing) throws IndexMissingException, IndexShardMissingException; + GroupShardsIterator broadcastDeleteShards(ClusterState clusterState, String index) throws IndexMissingException, IndexShardMissingException; + ShardsIterator getShards(ClusterState clusterState, String index, String type, String id, @Nullable String routing) throws IndexMissingException, IndexShardMissingException; GroupShardsIterator deleteByQueryShards(ClusterState clusterState, String index, @Nullable String routing) throws IndexMissingException; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/operation/plain/PlainOperationRouting.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/operation/plain/PlainOperationRouting.java index ed982f81c0b..4512d0092da 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/operation/plain/PlainOperationRouting.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/operation/plain/PlainOperationRouting.java @@ -69,6 +69,10 @@ public class PlainOperationRouting extends AbstractComponent implements Operatio return shards(clusterState, index, type, id, routing).shardsRandomIt(); } + @Override public GroupShardsIterator broadcastDeleteShards(ClusterState clusterState, String index) throws IndexMissingException { + return indexRoutingTable(clusterState, index).groupByShardsIt(); + } + @Override public GroupShardsIterator deleteByQueryShards(ClusterState clusterState, String index, @Nullable String routing) throws IndexMissingException { if (routing == null) { return indexRoutingTable(clusterState, index).groupByShardsIt(); diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/routing/SimpleRoutingTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/routing/SimpleRoutingTests.java index d5fc701681e..706984bbad6 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/routing/SimpleRoutingTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/routing/SimpleRoutingTests.java @@ -210,5 +210,17 @@ public class SimpleRoutingTests extends AbstractNodesTests { } catch (ElasticSearchException e) { assertThat(e.unwrapCause(), instanceOf(RoutingMissingException.class)); } + + logger.info("--> verifying get with routing, should find"); + for (int i = 0; i < 5; i++) { + assertThat(client.prepareGet("test", "type1", "1").setRouting("0").execute().actionGet().exists(), equalTo(true)); + } + + logger.info("--> deleting with no routing, should broadcast the delete since _routing is required"); + client.prepareDelete("test", "type1", "1").setRefresh(true).execute().actionGet(); + for (int i = 0; i < 5; i++) { + assertThat(client.prepareGet("test", "type1", "1").execute().actionGet().exists(), equalTo(false)); + assertThat(client.prepareGet("test", "type1", "1").setRouting("0").execute().actionGet().exists(), equalTo(false)); + } } }