diff --git a/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java b/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java index a4f68b05efb..ad17c533cc0 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java +++ b/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java @@ -19,14 +19,18 @@ package org.elasticsearch.search.aggregations; import org.elasticsearch.ElasticsearchIllegalArgumentException; +import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.search.aggregations.reducers.Reducer; import org.elasticsearch.search.aggregations.reducers.ReducerFactory; import org.elasticsearch.search.aggregations.support.AggregationContext; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Set; /** @@ -156,8 +160,73 @@ public class AggregatorFactories { if (factories.isEmpty()) { return EMPTY; } + List orderedReducers = resolveReducerOrder(this.reducerFactories, this.factories); // NOCOMMIT work out dependency order of reducer factories - return new AggregatorFactories(factories.toArray(new AggregatorFactory[factories.size()]), this.reducerFactories); + return new AggregatorFactories(factories.toArray(new AggregatorFactory[factories.size()]), orderedReducers); + } + + /* + * L ← Empty list that will contain the sorted nodes + * while there are unmarked nodes do + * select an unmarked node n + * visit(n) + * function visit(node n) + * if n has a temporary mark then stop (not a DAG) + * if n is not marked (i.e. has not been visited yet) then + * mark n temporarily + * for each node m with an edge from n to m do + * visit(m) + * mark n permanently + * unmark n temporarily + * add n to head of L + */ + private List resolveReducerOrder(List reducerFactories, List aggFactories) { + Map reducerFactoriesMap = new HashMap<>(); + for (ReducerFactory factory : reducerFactories) { + reducerFactoriesMap.put(factory.getName(), factory); + } + Set aggFactoryNames = new HashSet<>(); + for (AggregatorFactory aggFactory : aggFactories) { + aggFactoryNames.add(aggFactory.name); + } + List orderedReducers = new LinkedList<>(); + List unmarkedFactories = new ArrayList(reducerFactories); + Set temporarilyMarked = new HashSet(); + while (!unmarkedFactories.isEmpty()) { + ReducerFactory factory = unmarkedFactories.get(0); + resolveReducerOrder(aggFactoryNames, reducerFactoriesMap, orderedReducers, unmarkedFactories, temporarilyMarked, factory); + } + List orderedReducerNames = new ArrayList<>(); + for (ReducerFactory reducerFactory : orderedReducers) { + orderedReducerNames.add(reducerFactory.getName()); + } + System.out.println("ORDERED REDUCERS: " + orderedReducerNames); + return orderedReducers; + } + + private void resolveReducerOrder(Set aggFactoryNames, Map reducerFactoriesMap, + List orderedReducers, List unmarkedFactories, Set temporarilyMarked, + ReducerFactory factory) { + if (temporarilyMarked.contains(factory)) { + throw new ElasticsearchIllegalStateException("Cyclical dependancy found with reducer [" + factory.getName() + "]"); // NOCOMMIT is this the right Exception to throw? + } else if (unmarkedFactories.contains(factory)) { + temporarilyMarked.add(factory); + String[] bucketsPaths = factory.getBucketsPaths(); + for (String bucketsPath : bucketsPaths) { + ReducerFactory matchingFactory = reducerFactoriesMap.get(bucketsPath); + if (aggFactoryNames.contains(bucketsPath)) { + continue; + } else if (matchingFactory != null) { + resolveReducerOrder(aggFactoryNames, reducerFactoriesMap, orderedReducers, unmarkedFactories, temporarilyMarked, + matchingFactory); + } else { + throw new ElasticsearchIllegalStateException("No reducer found for path [" + bucketsPath + "]"); // NOCOMMIT is this the right Exception to throw? + } + } + unmarkedFactories.remove(factory); + temporarilyMarked.remove(factory); + orderedReducers.add(factory); + } } } } diff --git a/src/main/java/org/elasticsearch/search/aggregations/reducers/ReducerFactory.java b/src/main/java/org/elasticsearch/search/aggregations/reducers/ReducerFactory.java index f904a564dd2..05cb6fbed48 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/reducers/ReducerFactory.java +++ b/src/main/java/org/elasticsearch/search/aggregations/reducers/ReducerFactory.java @@ -86,4 +86,12 @@ public abstract class ReducerFactory { this.metaData = metaData; } + public String getName() { + return name; + } + + public String[] getBucketsPaths() { + return bucketsPaths; + } + }