NIFI-5188: DruidTranquilityController does not fully support Druid aggregator

Rollback Druid 0.9.2 to 0.9.1

Fixed checkstyle error

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #2696
This commit is contained in:
Dongkyu Hwangbo 2018-05-14 16:33:49 +09:00 committed by Matthew Burgess
parent 52d6b9cfa2
commit 9b461027a4
3 changed files with 33 additions and 60 deletions

View File

@ -46,6 +46,16 @@
<artifactId>tranquility-core_2.11</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.druid.extensions</groupId>
<artifactId>druid-histogram</artifactId>
<version>${druid.version}</version>
</dependency>
<dependency>
<groupId>io.druid.extensions</groupId>
<artifactId>druid-datasketches</artifactId>
<version>${druid.version}</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
@ -53,4 +63,4 @@
<scope>test</scope>
</dependency>
</dependencies>
</project>
</project>

View File

@ -24,11 +24,17 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.Lists;
import io.druid.granularity.QueryGranularity;
import io.druid.jackson.AggregatorsModule;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.query.aggregation.datasketches.theta.SketchModule;
import io.druid.query.aggregation.histogram.ApproximateHistogramDruidModule;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
@ -47,7 +53,6 @@ import org.apache.nifi.controller.api.druid.DruidTranquilityService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
import org.codehaus.jackson.map.ObjectMapper;
import com.metamx.common.Granularity;
import com.metamx.tranquility.beam.Beam;
@ -62,15 +67,7 @@ import com.metamx.tranquility.tranquilizer.Tranquilizer;
import com.metamx.tranquility.typeclass.Timestamper;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.DoubleMaxAggregatorFactory;
import io.druid.query.aggregation.DoubleMinAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.LongMaxAggregatorFactory;
import io.druid.query.aggregation.LongMinAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import org.joda.time.DateTime;
import org.joda.time.Period;
@ -518,56 +515,22 @@ public class DruidTranquilityController extends AbstractControllerService implem
}
private List<AggregatorFactory> getAggregatorList(String aggregatorJSON) {
List<AggregatorFactory> aggregatorList = new LinkedList<>();
List<Map<String, String>> aggregatorInfo = parseJsonString(aggregatorJSON);
for (Map<String, String> aggregator : aggregatorInfo) {
ComponentLog log = getLogger();
ObjectMapper mapper = new ObjectMapper(null);
mapper.registerModule(new AggregatorsModule());
mapper.registerModules(Lists.newArrayList(new SketchModule().getJacksonModules()));
mapper.registerModules(Lists.newArrayList(new ApproximateHistogramDruidModule().getJacksonModules()));
if (aggregator.get("type").equalsIgnoreCase("count")) {
aggregatorList.add(getCountAggregator(aggregator));
} else if (aggregator.get("type").equalsIgnoreCase("doublesum")) {
aggregatorList.add(getDoubleSumAggregator(aggregator));
} else if (aggregator.get("type").equalsIgnoreCase("doublemax")) {
aggregatorList.add(getDoubleMaxAggregator(aggregator));
} else if (aggregator.get("type").equalsIgnoreCase("doublemin")) {
aggregatorList.add(getDoubleMinAggregator(aggregator));
} else if (aggregator.get("type").equalsIgnoreCase("longsum")) {
aggregatorList.add(getLongSumAggregator(aggregator));
} else if (aggregator.get("type").equalsIgnoreCase("longmax")) {
aggregatorList.add(getLongMaxAggregator(aggregator));
} else if (aggregator.get("type").equalsIgnoreCase("longmin")) {
aggregatorList.add(getLongMinAggregator(aggregator));
}
try {
return mapper.readValue(
aggregatorJSON,
new TypeReference<List<AggregatorFactory>>() {
}
);
} catch (IOException e) {
log.error(e.getMessage(), e);
return null;
}
return aggregatorList;
}
private AggregatorFactory getLongMinAggregator(Map<String, String> map) {
return new LongMinAggregatorFactory(map.get("name"), map.get("fieldName"));
}
private AggregatorFactory getLongMaxAggregator(Map<String, String> map) {
return new LongMaxAggregatorFactory(map.get("name"), map.get("fieldName"));
}
private AggregatorFactory getLongSumAggregator(Map<String, String> map) {
return new LongSumAggregatorFactory(map.get("name"), map.get("fieldName"));
}
private AggregatorFactory getDoubleMinAggregator(Map<String, String> map) {
return new DoubleMinAggregatorFactory(map.get("name"), map.get("fieldName"));
}
private AggregatorFactory getDoubleMaxAggregator(Map<String, String> map) {
return new DoubleMaxAggregatorFactory(map.get("name"), map.get("fieldName"));
}
private AggregatorFactory getDoubleSumAggregator(Map<String, String> map) {
return new DoubleSumAggregatorFactory(map.get("name"), map.get("fieldName"));
}
private AggregatorFactory getCountAggregator(Map<String, String> map) {
return new CountAggregatorFactory(map.get("name"));
}
private Granularity getGranularity(String granularityString) {

View File

@ -52,4 +52,4 @@
<module>nifi-druid-controller-service</module>
<module>nifi-druid-processors</module>
</modules>
</project>
</project>