diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/deletebyquery/IndexDeleteByQueryRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/deletebyquery/IndexDeleteByQueryRequest.java index d6ae87bd1a1..615da425014 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/deletebyquery/IndexDeleteByQueryRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/deletebyquery/IndexDeleteByQueryRequest.java @@ -44,8 +44,9 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest private String queryParserName; private String[] types = Strings.EMPTY_ARRAY; @Nullable private String routing; + @Nullable private String[] filteringAliases; - IndexDeleteByQueryRequest(DeleteByQueryRequest request, String index) { + IndexDeleteByQueryRequest(DeleteByQueryRequest request, String index, @Nullable String[] filteringAliases) { this.index = index; this.timeout = request.timeout(); this.querySource = request.querySource(); @@ -54,6 +55,7 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest this.replicationType = request.replicationType(); this.consistencyLevel = request.consistencyLevel(); this.routing = request.routing(); + this.filteringAliases = filteringAliases; } IndexDeleteByQueryRequest() { @@ -92,6 +94,10 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest return this.types; } + String[] filteringAliases() { + return filteringAliases; + } + public IndexDeleteByQueryRequest queryParserName(String queryParserName) { this.queryParserName = queryParserName; return this; @@ -119,6 +125,13 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest if (in.readBoolean()) { routing = in.readUTF(); } + int aliasesSize = in.readVInt(); + if (aliasesSize > 0) { + filteringAliases = new String[aliasesSize]; + for (int i = 0; i < aliasesSize; i++) { + filteringAliases[i] = in.readUTF(); + } + } } public void writeTo(StreamOutput out) throws IOException { @@ -141,5 +154,13 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest out.writeBoolean(true); out.writeUTF(routing); } + if (filteringAliases != null) { + out.writeVInt(filteringAliases.length); + for (String alias : filteringAliases) { + out.writeUTF(alias); + } + } else { + out.writeVInt(0); + } } } \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/deletebyquery/ShardDeleteByQueryRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/deletebyquery/ShardDeleteByQueryRequest.java index eeb129005e4..14b06f13848 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/deletebyquery/ShardDeleteByQueryRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/deletebyquery/ShardDeleteByQueryRequest.java @@ -44,6 +44,7 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest private String queryParserName; private String[] types = Strings.EMPTY_ARRAY; @Nullable private String routing; + @Nullable private String[] filteringAliases; ShardDeleteByQueryRequest(IndexDeleteByQueryRequest request, int shardId) { this.index = request.index(); @@ -55,6 +56,7 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest consistencyLevel(request.consistencyLevel()); timeout = request.timeout(); this.routing = request.routing(); + filteringAliases = request.filteringAliases(); } ShardDeleteByQueryRequest() { @@ -88,6 +90,10 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest return this.routing; } + public String[] filteringAliases() { + return filteringAliases; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); querySource = new byte[in.readVInt()]; @@ -106,6 +112,13 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest if (in.readBoolean()) { routing = in.readUTF(); } + int aliasesSize = in.readVInt(); + if (aliasesSize > 0) { + filteringAliases = new String[aliasesSize]; + for (int i = 0; i < aliasesSize; i++) { + filteringAliases[i] = in.readUTF(); + } + } } @Override public void writeTo(StreamOutput out) throws IOException { @@ -129,6 +142,14 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest out.writeBoolean(true); out.writeUTF(routing); } + if (filteringAliases != null) { + out.writeVInt(filteringAliases.length); + for (String alias : filteringAliases) { + out.writeUTF(alias); + } + } else { + out.writeVInt(0); + } } @Override public String toString() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/deletebyquery/TransportDeleteByQueryAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/deletebyquery/TransportDeleteByQueryAction.java index 2d287758aaa..df1a14f5414 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/deletebyquery/TransportDeleteByQueryAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/deletebyquery/TransportDeleteByQueryAction.java @@ -64,13 +64,14 @@ public class TransportDeleteByQueryAction extends TransportIndicesReplicationOpe return TransportActions.DELETE_BY_QUERY; } - @Override protected void checkBlock(DeleteByQueryRequest request, ClusterState state) { - for (String index : request.indices()) { + @Override protected void checkBlock(DeleteByQueryRequest request, String[] concreteIndices, ClusterState state) { + for (String index : concreteIndices) { state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, index); } } @Override protected IndexDeleteByQueryRequest newIndexRequestInstance(DeleteByQueryRequest request, String index) { - return new IndexDeleteByQueryRequest(request, index); + String[] filteringAliases = clusterService.state().metaData().filteringAliases(index, request.indices()); + return new IndexDeleteByQueryRequest(request, index, filteringAliases); } } \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java index c895b2a4fc4..fcc774850e6 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java @@ -70,13 +70,13 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication @Override protected PrimaryResponse shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) { ShardDeleteByQueryRequest request = shardRequest.request; - indexShard(shardRequest).deleteByQuery(request.querySource(), request.queryParserName(), request.types()); + indexShard(shardRequest).deleteByQuery(request.querySource(), request.queryParserName(), request.filteringAliases(), request.types()); return new PrimaryResponse(new ShardDeleteByQueryResponse(), null); } @Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) { ShardDeleteByQueryRequest request = shardRequest.request; - indexShard(shardRequest).deleteByQuery(request.querySource(), request.queryParserName(), request.types()); + indexShard(shardRequest).deleteByQuery(request.querySource(), request.queryParserName(), request.filteringAliases(), request.types()); } @Override protected ShardIterator shards(ClusterState clusterState, ShardDeleteByQueryRequest request) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportIndicesReplicationOperationAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportIndicesReplicationOperationAction.java index a0d51b79495..daa4c1b27f1 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportIndicesReplicationOperationAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportIndicesReplicationOperationAction.java @@ -61,18 +61,17 @@ public abstract class TransportIndicesReplicationOperationAction listener) { ClusterState clusterState = clusterService.state(); - // update to actual indices - request.indices(clusterState.metaData().concreteIndices(request.indices())); + // get actual indices - checkBlock(request, clusterState); + String[] concreteIndices = clusterState.metaData().concreteIndices(request.indices()); - String[] indices = request.indices(); + checkBlock(request, concreteIndices, clusterState); final AtomicInteger indexCounter = new AtomicInteger(); - final AtomicInteger completionCounter = new AtomicInteger(indices.length); - final AtomicReferenceArray indexResponses = new AtomicReferenceArray(indices.length); + final AtomicInteger completionCounter = new AtomicInteger(concreteIndices.length); + final AtomicReferenceArray indexResponses = new AtomicReferenceArray(concreteIndices.length); - for (final String index : indices) { + for (final String index : concreteIndices) { IndexRequest indexRequest = newIndexRequestInstance(request, index); // no threading needed, all is done on the index replication one indexRequest.listenerThreaded(false); @@ -108,7 +107,7 @@ public abstract class TransportIndicesReplicationOperationAction= 1) { + int aliasesSize = in.readVInt(); + if (aliasesSize > 0) { + filteringAliases = new String[aliasesSize]; + for (int i = 0; i < aliasesSize; i++) { + filteringAliases[i] = in.readUTF(); + } + } + } } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeVInt(0); // version + out.writeVInt(1); // version out.writeVInt(source.length); out.writeBytes(source); if (queryParserName == null) { @@ -516,6 +531,14 @@ public interface Translog extends IndexShardComponent { for (String type : types) { out.writeUTF(type); } + if (filteringAliases != null) { + out.writeVInt(filteringAliases.length); + for (String alias : filteringAliases) { + out.writeUTF(alias); + } + } else { + out.writeVInt(0); + } } } } diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/index/translog/AbstractSimpleTranslogTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/index/translog/AbstractSimpleTranslogTests.java index cd9d6dcc9e3..0ccef9797f1 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/index/translog/AbstractSimpleTranslogTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/index/translog/AbstractSimpleTranslogTests.java @@ -106,7 +106,7 @@ public abstract class AbstractSimpleTranslogTests { assertThat(snapshot.estimatedTotalOperations(), equalTo(3)); snapshot.release(); - translog.add(new Translog.DeleteByQuery(new byte[]{4}, null)); + translog.add(new Translog.DeleteByQuery(new byte[]{4}, null, null)); snapshot = translog.snapshot(); assertThat(snapshot, translogSize(4)); assertThat(snapshot.estimatedTotalOperations(), equalTo(4)); diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/aliases/IndexAliasesTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/aliases/IndexAliasesTests.java index 416d54da374..6ecddd7ba55 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/aliases/IndexAliasesTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/aliases/IndexAliasesTests.java @@ -324,6 +324,72 @@ public class IndexAliasesTests extends AbstractNodesTests { } + @Test public void testDeletingByQueryFilteringAliases() throws Exception { + logger.info("--> creating index [test1]"); + client1.admin().indices().create(createIndexRequest("test1")).actionGet(); + + logger.info("--> creating index [test2]"); + client1.admin().indices().create(createIndexRequest("test2")).actionGet(); + + logger.info("--> running cluster_health"); + ClusterHealthResponse clusterHealth = client1.admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet(); + logger.info("--> done cluster_health, status " + clusterHealth.status()); + assertThat(clusterHealth.timedOut(), equalTo(false)); + assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN)); + + logger.info("--> adding filtering aliases to index [test1]"); + client1.admin().indices().prepareAliases().addAlias("test1", "aliasToTest1").execute().actionGet(); + client1.admin().indices().prepareAliases().addAlias("test1", "aliasToTests").execute().actionGet(); + client1.admin().indices().prepareAliases().addAlias("test1", "foos", termFilter("name", "foo")).execute().actionGet(); + client1.admin().indices().prepareAliases().addAlias("test1", "bars", termFilter("name", "bar")).execute().actionGet(); + client1.admin().indices().prepareAliases().addAlias("test1", "tests", termFilter("name", "test")).execute().actionGet(); + + logger.info("--> adding filtering aliases to index [test2]"); + client1.admin().indices().prepareAliases().addAlias("test2", "aliasToTest2").execute().actionGet(); + client1.admin().indices().prepareAliases().addAlias("test2", "aliasToTests").execute().actionGet(); + client1.admin().indices().prepareAliases().addAlias("test2", "foos", termFilter("name", "foo")).execute().actionGet(); + client1.admin().indices().prepareAliases().addAlias("test2", "tests", termFilter("name", "test")).execute().actionGet(); + Thread.sleep(300); + + logger.info("--> indexing against [test1]"); + client1.index(indexRequest("test1").type("type1").id("1").source(source("1", "foo test")).refresh(true)).actionGet(); + client1.index(indexRequest("test1").type("type1").id("2").source(source("2", "bar test")).refresh(true)).actionGet(); + client1.index(indexRequest("test1").type("type1").id("3").source(source("3", "baz test")).refresh(true)).actionGet(); + client1.index(indexRequest("test1").type("type1").id("4").source(source("4", "something else")).refresh(true)).actionGet(); + + logger.info("--> indexing against [test2]"); + client1.index(indexRequest("test2").type("type1").id("5").source(source("5", "foo test")).refresh(true)).actionGet(); + client1.index(indexRequest("test2").type("type1").id("6").source(source("6", "bar test")).refresh(true)).actionGet(); + client1.index(indexRequest("test2").type("type1").id("7").source(source("7", "baz test")).refresh(true)).actionGet(); + client1.index(indexRequest("test2").type("type1").id("8").source(source("8", "something else")).refresh(true)).actionGet(); + + logger.info("--> checking counts before delete"); + assertThat(client1.prepareCount("bars").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(1L)); + + logger.info("--> delete by query from a single alias"); + client1.prepareDeleteByQuery("bars").setQuery(QueryBuilders.termQuery("name", "test")).execute().actionGet(); + client1.admin().indices().prepareRefresh().execute().actionGet(); + + logger.info("--> verify that only one record was deleted"); + assertThat(client1.prepareCount("test1").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(3L)); + + logger.info("--> delete by query from an aliases pointing to two indices"); + client1.prepareDeleteByQuery("foos").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet(); + client1.admin().indices().prepareRefresh().execute().actionGet(); + + logger.info("--> verify that proper records were deleted"); + SearchResponse searchResponse = client1.prepareSearch("aliasToTests").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet(); + assertHits(searchResponse.hits(), "3", "4", "6", "7", "8"); + + logger.info("--> delete by query from an aliases and an index"); + client1.prepareDeleteByQuery("tests", "test2").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet(); + client1.admin().indices().prepareRefresh().execute().actionGet(); + + logger.info("--> verify that proper records were deleted"); + searchResponse = client1.prepareSearch("aliasToTests").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet(); + assertHits(searchResponse.hits(), "4"); + } + private void assertHits(SearchHits hits, String... ids) { assertThat(hits.totalHits(), equalTo((long) ids.length)); Set hitIds = newHashSet();