First (rough) pass at dependancy resolution for reducers

uses the depth-first algorithm from http://en.wikipedia.org/wiki/Topological_sorting#Algorithms

Needs some cleaning up
This commit is contained in:
Colin Goodheart-Smithe 2015-02-16 10:42:08 +00:00
parent 63f3281f12
commit 3ab3ffa989
2 changed files with 78 additions and 1 deletions

View File

@ -19,14 +19,18 @@
package org.elasticsearch.search.aggregations;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.search.aggregations.reducers.Reducer;
import org.elasticsearch.search.aggregations.reducers.ReducerFactory;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
@ -156,8 +160,73 @@ public class AggregatorFactories {
if (factories.isEmpty()) {
return EMPTY;
}
List<ReducerFactory> orderedReducers = resolveReducerOrder(this.reducerFactories, this.factories);
// NOCOMMIT work out dependency order of reducer factories
return new AggregatorFactories(factories.toArray(new AggregatorFactory[factories.size()]), this.reducerFactories);
return new AggregatorFactories(factories.toArray(new AggregatorFactory[factories.size()]), orderedReducers);
}
/*
* L Empty list that will contain the sorted nodes
* while there are unmarked nodes do
* select an unmarked node n
* visit(n)
* function visit(node n)
* if n has a temporary mark then stop (not a DAG)
* if n is not marked (i.e. has not been visited yet) then
* mark n temporarily
* for each node m with an edge from n to m do
* visit(m)
* mark n permanently
* unmark n temporarily
* add n to head of L
*/
private List<ReducerFactory> resolveReducerOrder(List<ReducerFactory> reducerFactories, List<AggregatorFactory> aggFactories) {
Map<String, ReducerFactory> reducerFactoriesMap = new HashMap<>();
for (ReducerFactory factory : reducerFactories) {
reducerFactoriesMap.put(factory.getName(), factory);
}
Set<String> aggFactoryNames = new HashSet<>();
for (AggregatorFactory aggFactory : aggFactories) {
aggFactoryNames.add(aggFactory.name);
}
List<ReducerFactory> orderedReducers = new LinkedList<>();
List<ReducerFactory> unmarkedFactories = new ArrayList<ReducerFactory>(reducerFactories);
Set<ReducerFactory> temporarilyMarked = new HashSet<ReducerFactory>();
while (!unmarkedFactories.isEmpty()) {
ReducerFactory factory = unmarkedFactories.get(0);
resolveReducerOrder(aggFactoryNames, reducerFactoriesMap, orderedReducers, unmarkedFactories, temporarilyMarked, factory);
}
List<String> orderedReducerNames = new ArrayList<>();
for (ReducerFactory reducerFactory : orderedReducers) {
orderedReducerNames.add(reducerFactory.getName());
}
System.out.println("ORDERED REDUCERS: " + orderedReducerNames);
return orderedReducers;
}
private void resolveReducerOrder(Set<String> aggFactoryNames, Map<String, ReducerFactory> reducerFactoriesMap,
List<ReducerFactory> orderedReducers, List<ReducerFactory> unmarkedFactories, Set<ReducerFactory> temporarilyMarked,
ReducerFactory factory) {
if (temporarilyMarked.contains(factory)) {
throw new ElasticsearchIllegalStateException("Cyclical dependancy found with reducer [" + factory.getName() + "]"); // NOCOMMIT is this the right Exception to throw?
} else if (unmarkedFactories.contains(factory)) {
temporarilyMarked.add(factory);
String[] bucketsPaths = factory.getBucketsPaths();
for (String bucketsPath : bucketsPaths) {
ReducerFactory matchingFactory = reducerFactoriesMap.get(bucketsPath);
if (aggFactoryNames.contains(bucketsPath)) {
continue;
} else if (matchingFactory != null) {
resolveReducerOrder(aggFactoryNames, reducerFactoriesMap, orderedReducers, unmarkedFactories, temporarilyMarked,
matchingFactory);
} else {
throw new ElasticsearchIllegalStateException("No reducer found for path [" + bucketsPath + "]"); // NOCOMMIT is this the right Exception to throw?
}
}
unmarkedFactories.remove(factory);
temporarilyMarked.remove(factory);
orderedReducers.add(factory);
}
}
}
}

View File

@ -86,4 +86,12 @@ public abstract class ReducerFactory {
this.metaData = metaData;
}
public String getName() {
return name;
}
public String[] getBucketsPaths() {
return bucketsPaths;
}
}