Fixed delete by query issue with index aliases and nested mappings.

The issue was that under these circumstances the delete by query operation would run forever.
What also is fixed is that during shard recovery when delete by query is replayed nested docs
are also deleted. Closes #2302
This commit is contained in:
Martijn van Groningen 2012-10-03 16:55:19 +02:00
parent 246dc1d992
commit ee5df74a6b
5 changed files with 146 additions and 66 deletions

View File

@ -736,16 +736,18 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
private final String[] filteringAliases;
private final Filter aliasFilter;
private final String[] types;
private final Filter parentFilter;
private long startTime;
private long endTime;
public DeleteByQuery(Query query, BytesReference source, @Nullable String[] filteringAliases, @Nullable Filter aliasFilter, String... types) {
public DeleteByQuery(Query query, BytesReference source, @Nullable String[] filteringAliases, @Nullable Filter aliasFilter, Filter parentFilter, String... types) {
this.query = query;
this.source = source;
this.types = types;
this.filteringAliases = filteringAliases;
this.aliasFilter = aliasFilter;
this.parentFilter = parentFilter;
}
public Query query() {
@ -768,6 +770,14 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
return aliasFilter;
}
public boolean nested() {
return parentFilter != null;
}
public Filter parentFilter() {
return parentFilter;
}
public DeleteByQuery startTime(long startTime) {
this.startTime = startTime;
return this;

View File

@ -52,6 +52,7 @@ import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.merge.policy.EnableMergePolicy;
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.search.nested.IncludeNestedDocsQuery;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
@ -702,12 +703,18 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
if (writer == null) {
throw new EngineClosedException(shardId);
}
Query query;
if (delete.aliasFilter() == null) {
query = delete.query();
} else {
if (delete.nested() && delete.aliasFilter() != null) {
query = new IncludeNestedDocsQuery(new FilteredQuery(delete.query(), delete.aliasFilter()), delete.parentFilter());
} else if (delete.nested()) {
query = new IncludeNestedDocsQuery(delete.query(), delete.parentFilter());
} else if (delete.aliasFilter() != null) {
query = new FilteredQuery(delete.query(), delete.aliasFilter());
} else {
query = delete.query();
}
writer.deleteDocuments(query);
translog.add(new Translog.DeleteByQuery(delete));
dirty = true;

View File

@ -10,11 +10,10 @@ import java.io.IOException;
import java.util.Set;
/**
* A special query that accepts a top level parent matching query, and returns all the children of that parent as
* well. This is handy when deleting by query.
* A special query that accepts a top level parent matching query, and returns the nested docs of the matching parent
* doc as well. This is handy when deleting by query.
*/
public class IncludeAllChildrenQuery extends Query {
public class IncludeNestedDocsQuery extends Query {
private final Filter parentFilter;
private final Query parentQuery;
@ -27,30 +26,39 @@ public class IncludeAllChildrenQuery extends Query {
private final Query origParentQuery;
public IncludeAllChildrenQuery(Query parentQuery, Filter parentFilter) {
public IncludeNestedDocsQuery(Query parentQuery, Filter parentFilter) {
this.origParentQuery = parentQuery;
this.parentQuery = parentQuery;
this.parentFilter = parentFilter;
}
IncludeAllChildrenQuery(Query origParentQuery, Query parentQuery, Filter parentFilter) {
this.origParentQuery = origParentQuery;
this.parentQuery = parentQuery;
this.parentFilter = parentFilter;
// For rewritting
IncludeNestedDocsQuery(Query rewrite, Query originalQuery, IncludeNestedDocsQuery previousInstance) {
this.origParentQuery = originalQuery;
this.parentQuery = rewrite;
this.parentFilter = previousInstance.parentFilter;
setBoost(previousInstance.getBoost());
}
// For cloning
IncludeNestedDocsQuery(Query originalQuery, IncludeNestedDocsQuery previousInstance) {
this.origParentQuery = originalQuery;
this.parentQuery = originalQuery;
this.parentFilter = previousInstance.parentFilter;
}
@Override
public Weight createWeight(Searcher searcher) throws IOException {
return new IncludeAllChildrenWeight(parentQuery, parentQuery.createWeight(searcher), parentFilter);
return new IncludeNestedDocsWeight(parentQuery, parentQuery.createWeight(searcher), parentFilter);
}
static class IncludeAllChildrenWeight extends Weight {
static class IncludeNestedDocsWeight extends Weight {
private final Query parentQuery;
private final Weight parentWeight;
private final Filter parentsFilter;
IncludeAllChildrenWeight(Query parentQuery, Weight parentWeight, Filter parentsFilter) {
IncludeNestedDocsWeight(Query parentQuery, Weight parentWeight, Filter parentsFilter) {
this.parentQuery = parentQuery;
this.parentWeight = parentWeight;
this.parentsFilter = parentsFilter;
@ -85,12 +93,6 @@ public class IncludeAllChildrenQuery extends Query {
return null;
}
final int firstParentDoc = parentScorer.nextDoc();
if (firstParentDoc == DocIdSetIterator.NO_MORE_DOCS) {
// No matches
return null;
}
DocIdSet parents = parentsFilter.getDocIdSet(reader);
if (parents == null) {
// No matches
@ -100,11 +102,15 @@ public class IncludeAllChildrenQuery extends Query {
parents = ((FixedBitDocSet) parents).set();
}
if (!(parents instanceof FixedBitSet)) {
throw new IllegalStateException("parentFilter must return OpenBitSet; got " + parents);
throw new IllegalStateException("parentFilter must return FixedBitSet; got " + parents);
}
return new IncludeAllChildrenScorer(this, parentScorer, (FixedBitSet) parents, firstParentDoc);
int firstParentDoc = parentScorer.nextDoc();
if (firstParentDoc == DocIdSetIterator.NO_MORE_DOCS) {
// No matches
return null;
}
return new IncludeNestedDocsScorer(this, parentScorer, (FixedBitSet) parents, firstParentDoc);
}
@Override
@ -118,17 +124,16 @@ public class IncludeAllChildrenQuery extends Query {
}
}
static class IncludeAllChildrenScorer extends Scorer {
static class IncludeNestedDocsScorer extends Scorer {
private final Scorer parentScorer;
private final FixedBitSet parentBits;
final Scorer parentScorer;
final FixedBitSet parentBits;
private int currentChildPointer = -1;
private int currentParentPointer = -1;
int currentChildPointer = -1;
int currentParentPointer = -1;
int currentDoc = -1;
private int currentDoc = -1;
IncludeAllChildrenScorer(Weight weight, Scorer parentScorer, FixedBitSet parentBits, int currentParentPointer) {
IncludeNestedDocsScorer(Weight weight, Scorer parentScorer, FixedBitSet parentBits, int currentParentPointer) {
super(weight);
this.parentScorer = parentScorer;
this.parentBits = parentBits;
@ -192,33 +197,33 @@ public class IncludeAllChildrenQuery extends Query {
return nextDoc();
}
currentParentPointer = parentScorer.advance(target);
if (currentParentPointer == NO_MORE_DOCS) {
return (currentDoc = NO_MORE_DOCS);
}
if (currentParentPointer == 0) {
currentChildPointer = 0;
} else {
currentChildPointer = parentBits.prevSetBit(currentParentPointer - 1);
if (currentChildPointer == -1) {
// no previous set parent, just set the child to 0 to delete all up to the parent
if (target < currentParentPointer) {
currentDoc = currentParentPointer = parentScorer.advance(target);
if (currentParentPointer == NO_MORE_DOCS) {
return (currentDoc = NO_MORE_DOCS);
}
if (currentParentPointer == 0) {
currentChildPointer = 0;
} else {
currentChildPointer++; // we only care about children
currentChildPointer = parentBits.prevSetBit(currentParentPointer - 1);
if (currentChildPointer == -1) {
// no previous set parent, just set the child to 0 to delete all up to the parent
currentChildPointer = 0;
} else {
currentChildPointer++; // we only care about children
}
}
} else {
currentDoc = currentChildPointer++;
}
currentDoc = currentChildPointer;
return currentDoc;
}
@Override
public float score() throws IOException {
return parentScorer.score();
}
@Override
public int docID() {
return currentDoc;
}
@ -233,9 +238,7 @@ public class IncludeAllChildrenQuery extends Query {
public Query rewrite(IndexReader reader) throws IOException {
final Query parentRewrite = parentQuery.rewrite(reader);
if (parentRewrite != parentQuery) {
Query rewritten = new IncludeAllChildrenQuery(parentQuery, parentRewrite, parentFilter);
rewritten.setBoost(getBoost());
return rewritten;
return new IncludeNestedDocsQuery(parentRewrite, parentQuery, this);
} else {
return this;
}
@ -243,15 +246,14 @@ public class IncludeAllChildrenQuery extends Query {
@Override
public String toString(String field) {
return "IncludeAllChildrenQuery (" + parentQuery.toString() + ")";
return "IncludeNestedDocsQuery (" + parentQuery.toString() + ")";
}
@Override
public boolean equals(Object _other) {
if (_other instanceof IncludeAllChildrenQuery) {
final IncludeAllChildrenQuery other = (IncludeAllChildrenQuery) _other;
return origParentQuery.equals(other.origParentQuery) &&
parentFilter.equals(other.parentFilter);
if (_other instanceof IncludeNestedDocsQuery) {
final IncludeNestedDocsQuery other = (IncludeNestedDocsQuery) _other;
return origParentQuery.equals(other.origParentQuery) && parentFilter.equals(other.parentFilter);
} else {
return false;
}
@ -268,7 +270,7 @@ public class IncludeAllChildrenQuery extends Query {
@Override
public Object clone() {
return new IncludeAllChildrenQuery((Query) origParentQuery.clone(),
parentFilter);
Query clonedQuery = (Query) origParentQuery.clone();
return new IncludeNestedDocsQuery(clonedQuery, this);
}
}

View File

@ -53,7 +53,6 @@ import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.search.nested.IncludeAllChildrenQuery;
import org.elasticsearch.index.search.nested.NonNestedDocsFilter;
import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.search.stats.ShardSearchService;
@ -372,18 +371,13 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
query = filterQueryIfNeeded(query, types);
Filter aliasFilter = indexAliasesService.aliasFilter(filteringAliases);
return new Engine.DeleteByQuery(query, querySource, filteringAliases, aliasFilter, types).startTime(startTime);
Filter parentFilter = mapperService.hasNested() ? indexCache.filter().cache(NonNestedDocsFilter.INSTANCE) : null;
return new Engine.DeleteByQuery(query, querySource, filteringAliases, aliasFilter, parentFilter, types).startTime(startTime);
}
@Override
public void deleteByQuery(Engine.DeleteByQuery deleteByQuery) throws ElasticSearchException {
writeAllowed();
if (mapperService.hasNested()) {
// we need to wrap it to delete nested docs as well...
IncludeAllChildrenQuery nestedQuery = new IncludeAllChildrenQuery(deleteByQuery.query(), indexCache.filter().cache(NonNestedDocsFilter.INSTANCE));
deleteByQuery = new Engine.DeleteByQuery(nestedQuery, deleteByQuery.source(), deleteByQuery.filteringAliases(), deleteByQuery.aliasFilter(), deleteByQuery.types());
}
if (logger.isTraceEnabled()) {
logger.trace("delete_by_query [{}]", deleteByQuery.query());
}

View File

@ -25,6 +25,8 @@ import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.index.query.FilterBuilders;
import org.elasticsearch.index.query.FilteredQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.facet.FacetBuilders;
import org.elasticsearch.search.facet.termsstats.TermsStatsFacet;
@ -450,4 +452,69 @@ public class SimpleNestedTests extends AbstractNodesTests {
assertThat(termsStatsFacet.entries().get(0).count(), equalTo(3l));
assertThat(termsStatsFacet.entries().get(0).total(), equalTo(8d));
}
@Test
// When IncludeNestedDocsQuery is wrapped in a FilteredQuery then a in-finite loop occurs b/c of a bug in IncludeNestedDocsQuery#advance()
// This IncludeNestedDocsQuery also needs to be aware of the filter from alias
public void testDeleteNestedDocsWithAlias() throws Exception {
client.admin().indices().prepareDelete().execute().actionGet();
client.admin().indices().prepareCreate("test")
.setSettings(settingsBuilder().put("index.number_of_shards", 1).put("index.referesh_interval", -1).build())
.addMapping("type1", jsonBuilder().startObject().startObject("type1").startObject("properties")
.startObject("nested1")
.field("type", "nested")
.endObject()
.endObject().endObject().endObject())
.execute().actionGet();
client.admin().indices().prepareAliases()
.addAlias("test", "alias1", FilterBuilders.termFilter("field1", "value1")).execute().actionGet();
client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
client.prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject()
.field("field1", "value1")
.startArray("nested1")
.startObject()
.field("n_field1", "n_value1_1")
.field("n_field2", "n_value2_1")
.endObject()
.startObject()
.field("n_field1", "n_value1_2")
.field("n_field2", "n_value2_2")
.endObject()
.endArray()
.endObject()).execute().actionGet();
client.prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject()
.field("field1", "value2")
.startArray("nested1")
.startObject()
.field("n_field1", "n_value1_1")
.field("n_field2", "n_value2_1")
.endObject()
.startObject()
.field("n_field1", "n_value1_2")
.field("n_field2", "n_value2_2")
.endObject()
.endArray()
.endObject()).execute().actionGet();
client.admin().indices().prepareFlush().setRefresh(true).execute().actionGet();
IndicesStatusResponse statusResponse = client.admin().indices().prepareStatus().execute().actionGet();
assertThat(statusResponse.index("test").docs().numDocs(), equalTo(6l));
client.prepareDeleteByQuery("alias1").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();
client.admin().indices().prepareFlush().setRefresh(true).execute().actionGet();
statusResponse = client.admin().indices().prepareStatus().execute().actionGet();
// This must be 3, otherwise child docs aren't deleted.
// If this is 5 then only the parent has been removed
assertThat(statusResponse.index("test").docs().numDocs(), equalTo(3l));
assertThat(client.prepareGet("test", "type1", "1").execute().actionGet().exists(), equalTo(false));
}
}