Fix lookup support in adjacency matrix (backport of #59099) (#59108)

This request:
```
POST /_search
{
  "aggs": {
    "a": {
      "adjacency_matrix": {
        "filters": {
          "1": {
            "terms": { "t": { "index": "lookup", "id": "1", "path": "t" } }
          }
        }
      }
    }
  }
}
```

Would fail with a 500 error and a message like:
```
{
  "error": {
    "root_cause": [
      {
        "type": "illegal_state_exception",
        "reason":"async actions are left after rewrite"
      }
    ]
  }
}
```

This fixes that by moving the query rewrite phase from a synchronous
call on the data nodes into the standard aggregation rewrite phase which
can properly handle the asynchronous actions.
This commit is contained in:
Nik Everett 2020-07-07 10:28:20 -04:00 committed by GitHub
parent 7c64a1bd7b
commit eb169ae226
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 135 additions and 56 deletions

View File

@ -110,4 +110,4 @@ where examining interactions _over time_ becomes important.
==== Limitations
For N filters the matrix of buckets produced can be N²/2 and so there is a default maximum
imposed of 100 filters . This setting can be changed using the `index.max_adjacency_matrix_filters` index-level setting
(note this setting is deprecated and will be removed in 8.0+).
(note this setting is deprecated and will be repaced with `indices.query.bool.max_clause_count` in 8.0+).

View File

@ -12,36 +12,104 @@ setup:
type: integer
- do:
index:
index: test
id: 1
body: { "num": [1, 2] }
- do:
index:
bulk:
index: test
id: 2
body: { "num": [2, 3] }
- do:
index:
index: test
id: 3
body: { "num": [3, 4] }
refresh: true
body: |
{ "index": {"_id": "1"}}
{ "num": [1, 2] }
{ "index": {"_id": "2"}}
{ "num": [2, 3] }
{ "index": {"_id": "3"}}
{ "num": [3, 4] }
- do:
indices.refresh: {}
---
"Filters intersections":
- do:
search:
rest_total_hits_as_int: true
body: { "size": 0, "aggs": { "conns": { "adjacency_matrix": { "filters": { "1": { "term": { "num": 1 } }, "2": { "term": { "num": 2 } }, "4": { "term": { "num": 4 } } } } } } }
index: test
body:
size: 0
aggs:
conns:
adjacency_matrix:
filters:
1:
term:
num: 1
2:
term:
num: 2
4:
term:
num: 4
- match: { hits.total: 3 }
- match: { hits.total.value: 3 }
- length: { aggregations.conns.buckets: 4 }
- match: { aggregations.conns.buckets.0.doc_count: 1 }
- match: { aggregations.conns.buckets.0.key: "1" }
- match: { aggregations.conns.buckets.1.doc_count: 1 }
- match: { aggregations.conns.buckets.1.key: "1&2" }
- match: { aggregations.conns.buckets.2.doc_count: 2 }
- match: { aggregations.conns.buckets.2.key: "2" }
- match: { aggregations.conns.buckets.3.doc_count: 1 }
- match: { aggregations.conns.buckets.3.key: "4" }
---
"Terms lookup":
- skip:
version: " - 7.8.99"
reason: fixed in 7.9.0
- do:
bulk:
index: lookup
refresh: true
body: |
{ "index": {"_id": 1} }
{ "num": [1] }
{ "index": {"_id": 2} }
{ "num": [2] }
{ "index": {"_id": 4} }
{ "num": [4] }
- do:
search:
index: test
body:
size: 0
aggs:
conns:
adjacency_matrix:
filters:
1:
terms:
num:
index: lookup
id: "1"
path: num
2:
terms:
num:
index: lookup
id: "2"
path: num
4:
terms:
num:
index: lookup
id: "4"
path: num
- match: { hits.total.value: 3 }
- length: { aggregations.conns.buckets: 4 }

View File

@ -27,6 +27,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
@ -68,31 +69,6 @@ public class AdjacencyMatrixAggregationBuilder extends AbstractAggregationBuilde
return result;
}
protected void checkConsistency() {
if ((filters == null) || (filters.size() == 0)) {
throw new IllegalStateException("[" + name + "] is missing : " + FILTERS_FIELD.getPreferredName() + " parameter");
}
}
protected void setFiltersAsMap(Map<String, QueryBuilder> filters) {
// Convert uniquely named objects into internal KeyedFilters
this.filters = new ArrayList<>(filters.size());
for (Entry<String, QueryBuilder> kv : filters.entrySet()) {
this.filters.add(new KeyedFilter(kv.getKey(), kv.getValue()));
}
// internally we want to have a fixed order of filters, regardless of
// the order of the filters in the request
Collections.sort(this.filters, Comparator.comparing(KeyedFilter::key));
}
protected void setFiltersAsList(List<KeyedFilter> filters) {
this.filters = new ArrayList<>(filters);
// internally we want to have a fixed order of filters, regardless of
// the order of the filters in the request
Collections.sort(this.filters, Comparator.comparing(KeyedFilter::key));
}
/**
* @param name
* the name of this aggregation
@ -162,6 +138,33 @@ public class AdjacencyMatrixAggregationBuilder extends AbstractAggregationBuilde
}
}
private void checkConsistency() {
if ((filters == null) || (filters.size() == 0)) {
throw new IllegalStateException("[" + name + "] is missing : " + FILTERS_FIELD.getPreferredName() + " parameter");
}
}
private void setFiltersAsMap(Map<String, QueryBuilder> filters) {
// Convert uniquely named objects into internal KeyedFilters
this.filters = new ArrayList<>(filters.size());
for (Entry<String, QueryBuilder> kv : filters.entrySet()) {
this.filters.add(new KeyedFilter(kv.getKey(), kv.getValue()));
}
// internally we want to have a fixed order of filters, regardless of
// the order of the filters in the request
Collections.sort(this.filters, Comparator.comparing(KeyedFilter::key));
}
private AdjacencyMatrixAggregationBuilder setFiltersAsList(List<KeyedFilter> filters) {
this.filters = new ArrayList<>(filters);
// internally we want to have a fixed order of filters, regardless of
// the order of the filters in the request
Collections.sort(this.filters, Comparator.comparing(KeyedFilter::key));
return this;
}
/**
* Set the separator used to join pairs of bucket keys
*/
@ -191,6 +194,20 @@ public class AdjacencyMatrixAggregationBuilder extends AbstractAggregationBuilde
return result;
}
@Override
protected AdjacencyMatrixAggregationBuilder doRewrite(QueryRewriteContext queryShardContext) throws IOException {
boolean modified = false;
List<KeyedFilter> rewrittenFilters = new ArrayList<>(filters.size());
for (KeyedFilter kf : filters) {
QueryBuilder rewritten = Rewriteable.rewrite(kf.filter(), queryShardContext);
modified = modified || rewritten != kf.filter();
rewrittenFilters.add(new KeyedFilter(kf.key(), rewritten));
}
if (modified) {
return new AdjacencyMatrixAggregationBuilder(name).separator(separator).setFiltersAsList(rewrittenFilters);
}
return this;
}
@Override
protected AggregatorFactory doBuild(QueryShardContext queryShardContext, AggregatorFactory parent, Builder subFactoriesBuilder)
@ -203,13 +220,7 @@ public class AdjacencyMatrixAggregationBuilder extends AbstractAggregationBuilde
+ "This limit can be set by changing the [" + IndexSettings.MAX_ADJACENCY_MATRIX_FILTERS_SETTING.getKey()
+ "] index level setting.");
}
List<KeyedFilter> rewrittenFilters = new ArrayList<>(filters.size());
for (KeyedFilter kf : filters) {
rewrittenFilters.add(new KeyedFilter(kf.key(), Rewriteable.rewrite(kf.filter(), queryShardContext, true)));
}
return new AdjacencyMatrixAggregatorFactory(name, rewrittenFilters, separator, queryShardContext, parent,
return new AdjacencyMatrixAggregatorFactory(name, filters, separator, queryShardContext, parent,
subFactoriesBuilder, metadata);
}

View File

@ -123,7 +123,7 @@ public class AdjacencyMatrixAggregator extends BucketsAggregator {
}
private final String[] keys;
private Weight[] filters;
private final Weight[] filters;
private final int totalNumKeys;
private final int totalNumIntersections;
private final String separator;

View File

@ -50,9 +50,9 @@ public class AdjacencyMatrixAggregatorFactory extends AggregatorFactory {
keys = new String[filters.size()];
for (int i = 0; i < filters.size(); ++i) {
KeyedFilter keyedFilter = filters.get(i);
this.keys[i] = keyedFilter.key();
keys[i] = keyedFilter.key();
Query filter = keyedFilter.filter().toQuery(queryShardContext);
this.weights[i] = contextSearcher.createWeight(contextSearcher.rewrite(filter), ScoreMode.COMPLETE_NO_SCORES, 1f);
weights[i] = contextSearcher.createWeight(contextSearcher.rewrite(filter), ScoreMode.COMPLETE_NO_SCORES, 1f);
}
}