Add support for filtering aliases to DeleteByQuery

This commit is contained in:
Igor Motov 2011-05-25 11:20:09 -04:00 committed by kimchy
parent d1d631794d
commit 646800cb29
12 changed files with 180 additions and 26 deletions

View File

@ -44,8 +44,9 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest
private String queryParserName;
private String[] types = Strings.EMPTY_ARRAY;
@Nullable private String routing;
@Nullable private String[] filteringAliases;
IndexDeleteByQueryRequest(DeleteByQueryRequest request, String index) {
IndexDeleteByQueryRequest(DeleteByQueryRequest request, String index, @Nullable String[] filteringAliases) {
this.index = index;
this.timeout = request.timeout();
this.querySource = request.querySource();
@ -54,6 +55,7 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest
this.replicationType = request.replicationType();
this.consistencyLevel = request.consistencyLevel();
this.routing = request.routing();
this.filteringAliases = filteringAliases;
}
IndexDeleteByQueryRequest() {
@ -92,6 +94,10 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest
return this.types;
}
String[] filteringAliases() {
return filteringAliases;
}
public IndexDeleteByQueryRequest queryParserName(String queryParserName) {
this.queryParserName = queryParserName;
return this;
@ -119,6 +125,13 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest
if (in.readBoolean()) {
routing = in.readUTF();
}
int aliasesSize = in.readVInt();
if (aliasesSize > 0) {
filteringAliases = new String[aliasesSize];
for (int i = 0; i < aliasesSize; i++) {
filteringAliases[i] = in.readUTF();
}
}
}
public void writeTo(StreamOutput out) throws IOException {
@ -141,5 +154,13 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest
out.writeBoolean(true);
out.writeUTF(routing);
}
if (filteringAliases != null) {
out.writeVInt(filteringAliases.length);
for (String alias : filteringAliases) {
out.writeUTF(alias);
}
} else {
out.writeVInt(0);
}
}
}

View File

@ -44,6 +44,7 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest
private String queryParserName;
private String[] types = Strings.EMPTY_ARRAY;
@Nullable private String routing;
@Nullable private String[] filteringAliases;
ShardDeleteByQueryRequest(IndexDeleteByQueryRequest request, int shardId) {
this.index = request.index();
@ -55,6 +56,7 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest
consistencyLevel(request.consistencyLevel());
timeout = request.timeout();
this.routing = request.routing();
filteringAliases = request.filteringAliases();
}
ShardDeleteByQueryRequest() {
@ -88,6 +90,10 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest
return this.routing;
}
public String[] filteringAliases() {
return filteringAliases;
}
@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
querySource = new byte[in.readVInt()];
@ -106,6 +112,13 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest
if (in.readBoolean()) {
routing = in.readUTF();
}
int aliasesSize = in.readVInt();
if (aliasesSize > 0) {
filteringAliases = new String[aliasesSize];
for (int i = 0; i < aliasesSize; i++) {
filteringAliases[i] = in.readUTF();
}
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
@ -129,6 +142,14 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest
out.writeBoolean(true);
out.writeUTF(routing);
}
if (filteringAliases != null) {
out.writeVInt(filteringAliases.length);
for (String alias : filteringAliases) {
out.writeUTF(alias);
}
} else {
out.writeVInt(0);
}
}
@Override public String toString() {

View File

@ -64,13 +64,14 @@ public class TransportDeleteByQueryAction extends TransportIndicesReplicationOpe
return TransportActions.DELETE_BY_QUERY;
}
@Override protected void checkBlock(DeleteByQueryRequest request, ClusterState state) {
for (String index : request.indices()) {
@Override protected void checkBlock(DeleteByQueryRequest request, String[] concreteIndices, ClusterState state) {
for (String index : concreteIndices) {
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, index);
}
}
@Override protected IndexDeleteByQueryRequest newIndexRequestInstance(DeleteByQueryRequest request, String index) {
return new IndexDeleteByQueryRequest(request, index);
String[] filteringAliases = clusterService.state().metaData().filteringAliases(index, request.indices());
return new IndexDeleteByQueryRequest(request, index, filteringAliases);
}
}

View File

@ -70,13 +70,13 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
@Override protected PrimaryResponse<ShardDeleteByQueryResponse> shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) {
ShardDeleteByQueryRequest request = shardRequest.request;
indexShard(shardRequest).deleteByQuery(request.querySource(), request.queryParserName(), request.types());
indexShard(shardRequest).deleteByQuery(request.querySource(), request.queryParserName(), request.filteringAliases(), request.types());
return new PrimaryResponse<ShardDeleteByQueryResponse>(new ShardDeleteByQueryResponse(), null);
}
@Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) {
ShardDeleteByQueryRequest request = shardRequest.request;
indexShard(shardRequest).deleteByQuery(request.querySource(), request.queryParserName(), request.types());
indexShard(shardRequest).deleteByQuery(request.querySource(), request.queryParserName(), request.filteringAliases(), request.types());
}
@Override protected ShardIterator shards(ClusterState clusterState, ShardDeleteByQueryRequest request) {

View File

@ -61,18 +61,17 @@ public abstract class TransportIndicesReplicationOperationAction<Request extends
@Override protected void doExecute(final Request request, final ActionListener<Response> listener) {
ClusterState clusterState = clusterService.state();
// update to actual indices
request.indices(clusterState.metaData().concreteIndices(request.indices()));
// get actual indices
checkBlock(request, clusterState);
String[] concreteIndices = clusterState.metaData().concreteIndices(request.indices());
String[] indices = request.indices();
checkBlock(request, concreteIndices, clusterState);
final AtomicInteger indexCounter = new AtomicInteger();
final AtomicInteger completionCounter = new AtomicInteger(indices.length);
final AtomicReferenceArray<Object> indexResponses = new AtomicReferenceArray<Object>(indices.length);
final AtomicInteger completionCounter = new AtomicInteger(concreteIndices.length);
final AtomicReferenceArray<Object> indexResponses = new AtomicReferenceArray<Object>(concreteIndices.length);
for (final String index : indices) {
for (final String index : concreteIndices) {
IndexRequest indexRequest = newIndexRequestInstance(request, index);
// no threading needed, all is done on the index replication one
indexRequest.listenerThreaded(false);
@ -108,7 +107,7 @@ public abstract class TransportIndicesReplicationOperationAction<Request extends
protected abstract boolean accumulateExceptions();
protected void checkBlock(Request request, ClusterState state) {
protected void checkBlock(Request request, String[] concreteIndices, ClusterState state) {
}

View File

@ -24,6 +24,7 @@ import org.apache.lucene.document.Document;
import org.apache.lucene.index.ExtendedIndexSearcher;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.Filter;
import org.apache.lucene.search.Query;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.Nullable;
@ -553,13 +554,17 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
private final Query query;
private final String queryParserName;
private final byte[] source;
private final String[] filteringAliases;
private final Filter aliasFilter;
private final String[] types;
public DeleteByQuery(Query query, byte[] source, @Nullable String queryParserName, String... types) {
public DeleteByQuery(Query query, byte[] source, @Nullable String queryParserName, @Nullable String[] filteringAliases, @Nullable Filter aliasFilter, String... types) {
this.query = query;
this.source = source;
this.queryParserName = queryParserName;
this.types = types;
this.filteringAliases = filteringAliases;
this.aliasFilter = aliasFilter;
}
public String queryParserName() {
@ -577,5 +582,13 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
public String[] types() {
return this.types;
}
public String[] filteringAliases() {
return filteringAliases;
}
public Filter aliasFilter() {
return aliasFilter;
}
}
}

View File

@ -20,6 +20,8 @@
package org.elasticsearch.index.engine.robin;
import org.apache.lucene.index.*;
import org.apache.lucene.search.FilteredQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.ElasticSearchException;
@ -591,7 +593,13 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
if (writer == null) {
throw new EngineClosedException(shardId);
}
writer.deleteDocuments(delete.query());
Query query;
if (delete.aliasFilter() == null) {
query = delete.query();
} else {
query = new FilteredQuery(delete.query(), delete.aliasFilter());
}
writer.deleteDocuments(query);
translog.add(new Translog.DeleteByQuery(delete));
dirty = true;
possibleMergeNeeded = true;

View File

@ -59,7 +59,7 @@ public interface IndexShard extends IndexShardComponent {
void delete(Engine.Delete delete) throws ElasticSearchException;
void deleteByQuery(byte[] querySource, @Nullable String queryParserName, String... types) throws ElasticSearchException;
void deleteByQuery(byte[] querySource, @Nullable String queryParserName, @Nullable String[] filteringAliases, String... types) throws ElasticSearchException;
byte[] get(String type, String id) throws ElasticSearchException;

View File

@ -316,15 +316,15 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
engine.delete(delete);
}
@Override public void deleteByQuery(byte[] querySource, @Nullable String queryParserName, String... types) throws ElasticSearchException {
@Override public void deleteByQuery(byte[] querySource, @Nullable String queryParserName, @Nullable String[] filteringAliases, String... types) throws ElasticSearchException {
writeAllowed();
if (types == null) {
types = Strings.EMPTY_ARRAY;
}
innerDeleteByQuery(querySource, queryParserName, types);
innerDeleteByQuery(querySource, queryParserName, filteringAliases, types);
}
private void innerDeleteByQuery(byte[] querySource, String queryParserName, String... types) {
private void innerDeleteByQuery(byte[] querySource, String queryParserName, String[] filteringAliases, String... types) {
IndexQueryParser queryParser = queryParserService.defaultIndexQueryParser();
if (queryParserName != null) {
queryParser = queryParserService.indexQueryParser(queryParserName);
@ -335,11 +335,13 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
Query query = queryParser.parse(querySource).query();
query = filterByTypesIfNeeded(query, types);
Filter aliasFilter = indexAliasesService.aliasFilter(filteringAliases);
if (logger.isTraceEnabled()) {
logger.trace("delete_by_query [{}]", query);
}
engine.delete(new Engine.DeleteByQuery(query, querySource, queryParserName, types));
engine.delete(new Engine.DeleteByQuery(query, querySource, queryParserName, filteringAliases, aliasFilter, types));
}
@Override public byte[] get(String type, String id) throws ElasticSearchException {
@ -538,7 +540,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
break;
case DELETE_BY_QUERY:
Translog.DeleteByQuery deleteByQuery = (Translog.DeleteByQuery) operation;
innerDeleteByQuery(deleteByQuery.source(), deleteByQuery.queryParserName(), deleteByQuery.types());
innerDeleteByQuery(deleteByQuery.source(), deleteByQuery.queryParserName(), deleteByQuery.filteringAliases(), deleteByQuery.types());
break;
default:
throw new ElasticSearchIllegalStateException("No operation defined for [" + operation + "]");

View File

@ -451,6 +451,7 @@ public interface Translog extends IndexShardComponent {
static class DeleteByQuery implements Operation {
private byte[] source;
@Nullable private String queryParserName;
@Nullable private String[] filteringAliases;
private String[] types = Strings.EMPTY_ARRAY;
public DeleteByQuery() {
@ -460,10 +461,11 @@ public interface Translog extends IndexShardComponent {
this(deleteByQuery.source(), deleteByQuery.queryParserName(), deleteByQuery.types());
}
public DeleteByQuery(byte[] source, @Nullable String queryParserName, String... types) {
public DeleteByQuery(byte[] source, @Nullable String queryParserName, String[] filteringAliases, String... types) {
this.queryParserName = queryParserName;
this.source = source;
this.types = types;
this.filteringAliases = filteringAliases;
}
@Override public Type opType() {
@ -482,12 +484,16 @@ public interface Translog extends IndexShardComponent {
return this.source;
}
public String[] filteringAliases() {
return filteringAliases;
}
public String[] types() {
return this.types;
}
@Override public void readFrom(StreamInput in) throws IOException {
in.readVInt(); // version
int version = in.readVInt(); // version
source = new byte[in.readVInt()];
in.readFully(source);
if (in.readBoolean()) {
@ -500,10 +506,19 @@ public interface Translog extends IndexShardComponent {
types[i] = in.readUTF();
}
}
if (version >= 1) {
int aliasesSize = in.readVInt();
if (aliasesSize > 0) {
filteringAliases = new String[aliasesSize];
for (int i = 0; i < aliasesSize; i++) {
filteringAliases[i] = in.readUTF();
}
}
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(0); // version
out.writeVInt(1); // version
out.writeVInt(source.length);
out.writeBytes(source);
if (queryParserName == null) {
@ -516,6 +531,14 @@ public interface Translog extends IndexShardComponent {
for (String type : types) {
out.writeUTF(type);
}
if (filteringAliases != null) {
out.writeVInt(filteringAliases.length);
for (String alias : filteringAliases) {
out.writeUTF(alias);
}
} else {
out.writeVInt(0);
}
}
}
}

View File

@ -106,7 +106,7 @@ public abstract class AbstractSimpleTranslogTests {
assertThat(snapshot.estimatedTotalOperations(), equalTo(3));
snapshot.release();
translog.add(new Translog.DeleteByQuery(new byte[]{4}, null));
translog.add(new Translog.DeleteByQuery(new byte[]{4}, null, null));
snapshot = translog.snapshot();
assertThat(snapshot, translogSize(4));
assertThat(snapshot.estimatedTotalOperations(), equalTo(4));

View File

@ -324,6 +324,72 @@ public class IndexAliasesTests extends AbstractNodesTests {
}
@Test public void testDeletingByQueryFilteringAliases() throws Exception {
logger.info("--> creating index [test1]");
client1.admin().indices().create(createIndexRequest("test1")).actionGet();
logger.info("--> creating index [test2]");
client1.admin().indices().create(createIndexRequest("test2")).actionGet();
logger.info("--> running cluster_health");
ClusterHealthResponse clusterHealth = client1.admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet();
logger.info("--> done cluster_health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));
logger.info("--> adding filtering aliases to index [test1]");
client1.admin().indices().prepareAliases().addAlias("test1", "aliasToTest1").execute().actionGet();
client1.admin().indices().prepareAliases().addAlias("test1", "aliasToTests").execute().actionGet();
client1.admin().indices().prepareAliases().addAlias("test1", "foos", termFilter("name", "foo")).execute().actionGet();
client1.admin().indices().prepareAliases().addAlias("test1", "bars", termFilter("name", "bar")).execute().actionGet();
client1.admin().indices().prepareAliases().addAlias("test1", "tests", termFilter("name", "test")).execute().actionGet();
logger.info("--> adding filtering aliases to index [test2]");
client1.admin().indices().prepareAliases().addAlias("test2", "aliasToTest2").execute().actionGet();
client1.admin().indices().prepareAliases().addAlias("test2", "aliasToTests").execute().actionGet();
client1.admin().indices().prepareAliases().addAlias("test2", "foos", termFilter("name", "foo")).execute().actionGet();
client1.admin().indices().prepareAliases().addAlias("test2", "tests", termFilter("name", "test")).execute().actionGet();
Thread.sleep(300);
logger.info("--> indexing against [test1]");
client1.index(indexRequest("test1").type("type1").id("1").source(source("1", "foo test")).refresh(true)).actionGet();
client1.index(indexRequest("test1").type("type1").id("2").source(source("2", "bar test")).refresh(true)).actionGet();
client1.index(indexRequest("test1").type("type1").id("3").source(source("3", "baz test")).refresh(true)).actionGet();
client1.index(indexRequest("test1").type("type1").id("4").source(source("4", "something else")).refresh(true)).actionGet();
logger.info("--> indexing against [test2]");
client1.index(indexRequest("test2").type("type1").id("5").source(source("5", "foo test")).refresh(true)).actionGet();
client1.index(indexRequest("test2").type("type1").id("6").source(source("6", "bar test")).refresh(true)).actionGet();
client1.index(indexRequest("test2").type("type1").id("7").source(source("7", "baz test")).refresh(true)).actionGet();
client1.index(indexRequest("test2").type("type1").id("8").source(source("8", "something else")).refresh(true)).actionGet();
logger.info("--> checking counts before delete");
assertThat(client1.prepareCount("bars").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(1L));
logger.info("--> delete by query from a single alias");
client1.prepareDeleteByQuery("bars").setQuery(QueryBuilders.termQuery("name", "test")).execute().actionGet();
client1.admin().indices().prepareRefresh().execute().actionGet();
logger.info("--> verify that only one record was deleted");
assertThat(client1.prepareCount("test1").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(3L));
logger.info("--> delete by query from an aliases pointing to two indices");
client1.prepareDeleteByQuery("foos").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();
client1.admin().indices().prepareRefresh().execute().actionGet();
logger.info("--> verify that proper records were deleted");
SearchResponse searchResponse = client1.prepareSearch("aliasToTests").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();
assertHits(searchResponse.hits(), "3", "4", "6", "7", "8");
logger.info("--> delete by query from an aliases and an index");
client1.prepareDeleteByQuery("tests", "test2").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();
client1.admin().indices().prepareRefresh().execute().actionGet();
logger.info("--> verify that proper records were deleted");
searchResponse = client1.prepareSearch("aliasToTests").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();
assertHits(searchResponse.hits(), "4");
}
private void assertHits(SearchHits hits, String... ids) {
assertThat(hits.totalHits(), equalTo((long) ids.length));
Set<String> hitIds = newHashSet();