NIFI-4428: - Implement PutDruid Processor and Controller

update

added provenance report

added parameters for batch control

WIP
This commit is contained in:
vvaks 2017-09-27 13:23:23 -04:00 committed by joewitt
parent d93d538177
commit 7fa0a34aba
15 changed files with 1163 additions and 0 deletions

View File

@ -561,6 +561,16 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-livy-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-druid-controller-service-api-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-druid-nar</artifactId>
<type>nar</type>
</dependency>
</dependencies>
<profiles>
<profile>

View File

@ -0,0 +1,35 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-druid-bundle</artifactId>
<version>1.5.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-druid-controller-service-api-nar</artifactId>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-druid-controller-service-api</artifactId>
<version>1.5.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,84 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-druid-bundle</artifactId>
<version>1.5.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-druid-controller-service-api</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>tranquility-core_2.10</artifactId>
<version>0.8.2</version>
<exclusions>
<exclusion>
<groupId>io.druid</groupId>
<artifactId>druid-console</artifactId>
</exclusion>
<exclusion>
<groupId>io.druid</groupId>
<artifactId>druid-aws-common</artifactId>
</exclusion>
<exclusion>
<groupId>com.twitter</groupId>
<artifactId>finagle-core_2.10</artifactId>
</exclusion>
<exclusion>
<groupId>c3p0</groupId>
<artifactId>c3p0</artifactId>
</exclusion>
<exclusion>
<groupId>io.tesla.aether</groupId>
<artifactId>tesla-aether</artifactId>
</exclusion>
<exclusion>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.aether</groupId>
<artifactId>aether-api</artifactId>
</exclusion>
<exclusion>
<groupId>com.maxmind.geoip2</groupId>
<artifactId>geoip2</artifactId>
</exclusion>
<exclusion>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.api;
import java.util.Map;
import org.apache.nifi.controller.ControllerService;
import com.metamx.tranquility.tranquilizer.Tranquilizer;
public interface DruidTranquilityService extends ControllerService{
Tranquilizer<Map<String,Object>> getTranquilizer();
}

View File

@ -0,0 +1,95 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-druid-bundle</artifactId>
<version>1.5.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-druid-controller-service</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-druid-controller-service-api</artifactId>
<version>1.5.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>tranquility-core_2.10</artifactId>
<version>0.8.2</version>
<exclusions>
<exclusion>
<groupId>io.druid</groupId>
<artifactId>druid-console</artifactId>
</exclusion>
<exclusion>
<groupId>io.druid</groupId>
<artifactId>druid-aws-common</artifactId>
</exclusion>
<exclusion>
<groupId>c3p0</groupId>
<artifactId>c3p0</artifactId>
</exclusion>
<exclusion>
<groupId>io.tesla.aether</groupId>
<artifactId>tesla-aether</artifactId>
</exclusion>
<exclusion>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.aether</groupId>
<artifactId>aether-api</artifactId>
</exclusion>
<exclusion>
<groupId>com.maxmind.geoip2</groupId>
<artifactId>geoip2</artifactId>
</exclusion>
<exclusion>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,452 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.api.DruidTranquilityService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import org.codehaus.jackson.map.ObjectMapper;
import org.joda.time.DateTime;
import org.joda.time.Period;
import com.metamx.common.Granularity;
import com.metamx.tranquility.beam.Beam;
import com.metamx.tranquility.beam.ClusteredBeamTuning;
import com.metamx.tranquility.druid.DruidBeamConfig;
import com.metamx.tranquility.druid.DruidBeams;
import com.metamx.tranquility.druid.DruidDimensions;
import com.metamx.tranquility.druid.DruidEnvironment;
import com.metamx.tranquility.druid.DruidLocation;
import com.metamx.tranquility.druid.DruidRollup;
import com.metamx.tranquility.tranquilizer.Tranquilizer;
import com.metamx.tranquility.typeclass.Timestamper;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.DoubleMaxAggregatorFactory;
import io.druid.query.aggregation.DoubleMinAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.LongMaxAggregatorFactory;
import io.druid.query.aggregation.LongMinAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
@Tags({"Druid", "Timeseries", "OLAP", "ingest"})
@CapabilityDescription("Asynchronously sends flowfiles to Druid Indexing Task using Tranquility API. "
+ "If aggregation and roll-up of data is required, an Aggregator JSON descriptor needs to be provided."
+ "Details on how describe aggregation using JSON can be found at: http://druid.io/docs/latest/querying/aggregations.html")
public class DruidTranquilityController extends AbstractControllerService implements DruidTranquilityService {
private String firehosePattern = "druid:firehose:%s";
private int clusterPartitions = 1;
private int clusterReplication = 1;
private String indexRetryPeriod = "PT10M";
private Tranquilizer tranquilizer = null;
public static final PropertyDescriptor DATASOURCE = new PropertyDescriptor.Builder()
.name("druid-cs-data-source")
.displayName("Druid Data Source")
.description("Druid Data Source") //TODO description, example
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor CONNECT_STRING = new PropertyDescriptor.Builder()
.name("druid-cs-zk-connect-string")
.displayName("Zookeeper Connection String")
.description("ZK Connect String for Druid") //TODO example
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor DRUID_INDEX_SERVICE_PATH = new PropertyDescriptor.Builder()
.name("druid-cs-index-service-path")
.displayName("Index Service Path")
.description("Druid Index Service path as defined via the Druid Overlord druid.service property.")
.required(true)
.defaultValue("druid/overlord")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor DRUID_DISCOVERY_PATH = new PropertyDescriptor.Builder()
.name("druid-cs-discovery-path")
.displayName("Discovery Path")
.description("Druid Discovery Path as configured in Druid Common druid.discovery.curator.path property")
.required(true)
.defaultValue("/druid/discovery")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor TIMESTAMP_FIELD = new PropertyDescriptor.Builder()
.name("druid-cs-timestamp-field")
.displayName("Timestamp field")
.description("The name of the field that will be used as the timestamp. Should be in ISO format.")
.required(true)
.defaultValue("timestamp")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor AGGREGATOR_JSON = new PropertyDescriptor.Builder()
.name("druid-cs-aggregators-descriptor")
.displayName("Aggregator JSON")
.description("Tranquility-compliant JSON string that defines what aggregators to apply on ingest."
+ "Example: "
+ "["
+ "{"
+ "\t\"type\" : \"count\","
+ "\t\"name\" : \"count\","
+ "},"
+ "{"
+ "\t\"name\" : \"value_sum\","
+ "\t\"type\" : \"doubleSum\","
+ "\t\"fieldName\" : \"value\""
+ "},"
+ "{"
+ "\t\"fieldName\" : \"value\","
+ "\t\"name\" : \"value_min\","
+ "\t\"type\" : \"doubleMin\""
+ "},"
+ "{"
+ "\t\"type\" : \"doubleMax\","
+ "\t\"name\" : \"value_max\","
+ "\t\"fieldName\" : \"value\""
+ "}"
+ "]")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor DIMENSIONS_LIST = new PropertyDescriptor.Builder()
.name("druid-cs-dimensions-list")
.displayName("Dimension Fields")
.description("A comma separated list of field names that will be stored as dimensions on ingest.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor SEGMENT_GRANULARITY = new PropertyDescriptor.Builder()
.name("druid-cs-segment-granularity")
.displayName("Segment Granularity")
.description("Time unit by which to group and aggregate/rollup events.")
.required(true)
.allowableValues("NONE", "SECOND", "MINUTE", "TEN_MINUTE", "HOUR", "DAY", "MONTH", "YEAR", "Use druid.segment.granularity variable")
.defaultValue("MINUTE")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor QUERY_GRANULARITY = new PropertyDescriptor.Builder()
.name("druid-cs-query-granularity")
.displayName("Query Granularity")
.description("Time unit by which to group and aggregate/rollup events. The value must be at least as large as the value of Segment Granularity.")
.required(true)
.allowableValues("NONE", "SECOND", "MINUTE", "TEN_MINUTE", "HOUR", "DAY", "MONTH", "YEAR", "Use druid.query.granularity variable")
.defaultValue("TEN_MINUTE")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor WINDOW_PERIOD = new PropertyDescriptor.Builder()
.name("druid-cs-window-period")
.displayName("Late Event Grace Period")
.description("Grace period to allow late arriving events for real time ingest.")
.required(true)
.allowableValues("PT1M", "PT10M", "PT60M")// TODO possibly friendly name
.defaultValue("PT10M")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder()
.name("druid-cs-batch-size")
.displayName("Batch Size")
.description("Maximum number of messages to send at once.")
.required(true)
.defaultValue("2000")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor MAX_PENDING_BATCHES = new PropertyDescriptor.Builder()
.name("druid-cs-max-pending-batches")
.displayName("Max Pending Batches")
.description("Maximum number of batches that may be in flight before service blocks and waits for one to finish.")
.required(true)
.defaultValue("5")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor LINGER_MILLIS = new PropertyDescriptor.Builder()
.name("druid-cs-linger-millis")
.displayName("Linger (milliseconds)")
.description("Wait this long for batches to collect more messages (up to Batch Size) before sending them. "
+ "Set to zero to disable waiting. "
+ "Set to -1 to always wait for complete batches before sending. ")
.required(true)
.defaultValue("1000")
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.build();
private static final List<PropertyDescriptor> properties;
static {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(DATASOURCE);
props.add(CONNECT_STRING);
props.add(DRUID_INDEX_SERVICE_PATH);
props.add(DRUID_DISCOVERY_PATH);
props.add(DIMENSIONS_LIST);
props.add(AGGREGATOR_JSON);
props.add(SEGMENT_GRANULARITY);
props.add(QUERY_GRANULARITY);
props.add(WINDOW_PERIOD);
props.add(TIMESTAMP_FIELD);
props.add(MAX_BATCH_SIZE);
props.add(MAX_PENDING_BATCHES);
props.add(LINGER_MILLIS);
properties = Collections.unmodifiableList(props);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@OnEnabled
public void onConfigured(final ConfigurationContext context) throws InitializationException {
ComponentLog log = getLogger();
log.info("Starting Druid Tranquility Controller Service...");
final String dataSource = context.getProperty(DATASOURCE).getValue();
final String zkConnectString = context.getProperty(CONNECT_STRING).getValue();
final String indexService = context.getProperty(DRUID_INDEX_SERVICE_PATH).getValue();
final String discoveryPath = context.getProperty(DRUID_DISCOVERY_PATH).getValue();
final String timestampField = context.getProperty(TIMESTAMP_FIELD).getValue();
final String segmentGranularity = context.getProperty(SEGMENT_GRANULARITY).getValue();
final String queryGranularity = context.getProperty(QUERY_GRANULARITY).getValue();
final String windowPeriod = context.getProperty(WINDOW_PERIOD).getValue();
final String aggregatorJSON = context.getProperty(AGGREGATOR_JSON).getValue();
final String dimensionsStringList = context.getProperty(DIMENSIONS_LIST).getValue();
final int maxBatchSize = Integer.valueOf(context.getProperty(MAX_BATCH_SIZE).getValue());
final int maxPendingBatches = Integer.valueOf(context.getProperty(MAX_PENDING_BATCHES).getValue());
final int lingerMillis = Integer.valueOf(context.getProperty(LINGER_MILLIS).getValue());
final List<String> dimensions = getDimensions(dimensionsStringList);
final List<AggregatorFactory> aggregator = getAggregatorList(aggregatorJSON);
final Timestamper<Map<String, Object>> timestamper = new Timestamper<Map<String, Object>>() {
private static final long serialVersionUID = 1L;
@Override
public DateTime timestamp(Map<String, Object> theMap) {
return new DateTime(theMap.get(timestampField));
}
};
Iterator<AggregatorFactory> aggIterator = aggregator.iterator();
AggregatorFactory currFactory;
log.debug("Number of Aggregations Defined: " + aggregator.size());
while (aggIterator.hasNext()) {
currFactory = aggIterator.next();
log.debug("Verifying Aggregator Definition");
log.debug("Aggregator Name: " + currFactory.getName());
log.debug("Aggregator Type: " + currFactory.getTypeName());
log.debug("Aggregator Req Fields: " + currFactory.requiredFields());
}
// Tranquility uses ZooKeeper (through Curator) for coordination.
final CuratorFramework curator = CuratorFrameworkFactory
.builder()
.connectString(zkConnectString)
.retryPolicy(new ExponentialBackoffRetry(1000, 20, 30000)) // TODO expose as properties, maybe fibonacci backoff
.build();
curator.start();
// The JSON serialization of your object must have a timestamp field in a format that Druid understands. By default,
// Druid expects the field to be called "timestamp" and to be an ISO8601 timestamp.
final TimestampSpec timestampSpec = new TimestampSpec(timestampField, "auto", null);
final Beam<Map<String, Object>> beam = DruidBeams.builder(timestamper)
.curator(curator)
.discoveryPath(discoveryPath)
.location(DruidLocation.create(DruidEnvironment.create(indexService, firehosePattern), dataSource))
.timestampSpec(timestampSpec)
.rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregator, QueryGranularity.fromString(queryGranularity)))
.tuning(
ClusteredBeamTuning
.builder()
.segmentGranularity(getSegmentGranularity(segmentGranularity))
.windowPeriod(new Period(windowPeriod))
.partitions(clusterPartitions)
.replicants(clusterReplication)
.build()
)
.druidBeamConfig(
DruidBeamConfig
.builder()
.indexRetryPeriod(new Period(indexRetryPeriod))
.build())
.buildBeam();
tranquilizer = Tranquilizer.builder()
.maxBatchSize(maxBatchSize)
.maxPendingBatches(maxPendingBatches)
.lingerMillis(lingerMillis)
.blockOnFull(true)
.build(beam);
tranquilizer.start();
}
public Tranquilizer getTranquilizer() {
return tranquilizer;
}
private List<Map<String, String>> parseJsonString(String aggregatorJson) {
ObjectMapper mapper = new ObjectMapper();
List<Map<String, String>> aggSpecList = null;
try {
aggSpecList = mapper.readValue(aggregatorJson, List.class);
return aggSpecList;
} catch (IOException e) {
throw new IllegalArgumentException("Exception while parsing the aggregrator JSON");
}
}
private List<String> getDimensions(String dimensionStringList) {
List<String> dimensionList = new LinkedList(Arrays.asList(dimensionStringList.split(",")));
return dimensionList;
}
private List<AggregatorFactory> getAggregatorList(String aggregatorJSON) {
List<AggregatorFactory> aggregatorList = new LinkedList<>();
List<Map<String, String>> aggregatorInfo = parseJsonString(aggregatorJSON);
for (Map<String, String> aggregator : aggregatorInfo) {
if (aggregator.get("type").equalsIgnoreCase("count")) {
aggregatorList.add(getCountAggregator(aggregator));
} else if (aggregator.get("type").equalsIgnoreCase("doublesum")) {
aggregatorList.add(getDoubleSumAggregator(aggregator));
} else if (aggregator.get("type").equalsIgnoreCase("doublemax")) {
aggregatorList.add(getDoubleMaxAggregator(aggregator));
} else if (aggregator.get("type").equalsIgnoreCase("doublemin")) {
aggregatorList.add(getDoubleMinAggregator(aggregator));
} else if (aggregator.get("type").equalsIgnoreCase("longsum")) {
aggregatorList.add(getLongSumAggregator(aggregator));
} else if (aggregator.get("type").equalsIgnoreCase("longmax")) {
aggregatorList.add(getLongMaxAggregator(aggregator));
} else if (aggregator.get("type").equalsIgnoreCase("longmin")) {
aggregatorList.add(getLongMinAggregator(aggregator));
}
}
return aggregatorList;
}
private AggregatorFactory getLongMinAggregator(Map<String, String> map) {
return new LongMinAggregatorFactory(map.get("name"), map.get("fieldName"));
}
private AggregatorFactory getLongMaxAggregator(Map<String, String> map) {
return new LongMaxAggregatorFactory(map.get("name"), map.get("fieldName"));
}
private AggregatorFactory getLongSumAggregator(Map<String, String> map) {
return new LongSumAggregatorFactory(map.get("name"), map.get("fieldName"));
}
private AggregatorFactory getDoubleMinAggregator(Map<String, String> map) {
return new DoubleMinAggregatorFactory(map.get("name"), map.get("fieldName"));
}
private AggregatorFactory getDoubleMaxAggregator(Map<String, String> map) {
return new DoubleMaxAggregatorFactory(map.get("name"), map.get("fieldName"));
}
private AggregatorFactory getDoubleSumAggregator(Map<String, String> map) {
return new DoubleSumAggregatorFactory(map.get("name"), map.get("fieldName"));
}
private AggregatorFactory getCountAggregator(Map<String, String> map) {
return new CountAggregatorFactory(map.get("name"));
}
private Granularity getSegmentGranularity(String segmentGranularity) {
Granularity granularity = Granularity.HOUR;
switch (segmentGranularity) {
case "SECOND":
granularity = Granularity.SECOND;
break;
case "MINUTE":
granularity = Granularity.MINUTE;
break;
case "FIVE_MINUTE":
granularity = Granularity.FIVE_MINUTE;
break;
case "TEN_MINUTE":
granularity = Granularity.TEN_MINUTE;
break;
case "FIFTEEN_MINUTE":
granularity = Granularity.FIFTEEN_MINUTE;
break;
case "HOUR":
granularity = Granularity.HOUR;
break;
case "SIX_HOUR":
granularity = Granularity.SIX_HOUR;
break;
case "DAY":
granularity = Granularity.DAY;
break;
case "WEEK":
granularity = Granularity.WEEK;
break;
case "MONTH":
granularity = Granularity.MONTH;
break;
case "YEAR":
granularity = Granularity.YEAR;
break;
default:
break;
}
return granularity;
}
}

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.controller.DruidTranquilityController

View File

@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller;
public class DruidTranquilityControllerTest {
}

View File

@ -0,0 +1,39 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-druid-bundle</artifactId>
<version>1.5.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-druid-nar</artifactId>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-druid-controller-service</artifactId>
<version>1.5.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-druid-processors</artifactId>
<version>1.5.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,95 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-druid-bundle</artifactId>
<version>1.5.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-druid-processors</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-druid-controller-service-api</artifactId>
<version>1.5.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>tranquility-core_2.10</artifactId>
<version>0.8.2</version>
<exclusions>
<exclusion>
<groupId>io.druid</groupId>
<artifactId>druid-console</artifactId>
</exclusion>
<exclusion>
<groupId>io.druid</groupId>
<artifactId>druid-aws-common</artifactId>
</exclusion>
<exclusion>
<groupId>c3p0</groupId>
<artifactId>c3p0</artifactId>
</exclusion>
<exclusion>
<groupId>io.tesla.aether</groupId>
<artifactId>tesla-aether</artifactId>
</exclusion>
<exclusion>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.aether</groupId>
<artifactId>aether-api</artifactId>
</exclusion>
<exclusion>
<groupId>com.maxmind.geoip2</groupId>
<artifactId>geoip2</artifactId>
</exclusion>
<exclusion>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,196 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.stream.io.StreamUtils;
import org.codehaus.jackson.map.ObjectMapper;
import org.apache.nifi.controller.api.DruidTranquilityService;
import com.metamx.tranquility.tranquilizer.MessageDroppedException;
import com.metamx.tranquility.tranquilizer.Tranquilizer;
import com.twitter.util.Await;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import scala.runtime.BoxedUnit;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"Druid", "Timeseries", "OLAP", "ingest"})
@CapabilityDescription("Sends events to Apache Druid for Indexing. "
+ "Leverages Druid Tranquility Controller service."
+ "Incoming flow files are expected to contain 1 or many JSON objects, one JSON object per line")
public class PutDruid extends AbstractSessionFactoryProcessor {
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
private final Map<Object, String> messageStatus = new HashMap<>();
public static final PropertyDescriptor DRUID_TRANQUILITY_SERVICE = new PropertyDescriptor.Builder()
.name("putdruid-tranquility-service")
.displayName("Tranquility Service")
.description("Tranquility Service to use for sending events to Druid")
.required(true)
.identifiesControllerService(DruidTranquilityService.class)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Success relationship")
.build();
public static final Relationship REL_FAIL = new Relationship.Builder()
.name("failure")
.description("FlowFiles are routed to this relationship when they cannot be parsed")
.build();
public static final Relationship REL_DROPPED = new Relationship.Builder()
.name("dropped")
.description("FlowFiles are routed to this relationship when they are outside of the configured time window, timestamp format is invalid, ect...")
.build();
public void init(final ProcessorInitializationContext context) {
List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(DRUID_TRANQUILITY_SERVICE);
this.properties = Collections.unmodifiableList(properties);
Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_DROPPED);
relationships.add(REL_FAIL);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
//Method breaks down incoming flow file and sends it to Druid Indexing service
private void processFlowFile(ProcessContext context, ProcessSession session) {
final ComponentLog log = getLogger();
//Get handle on Druid Tranquility session
DruidTranquilityService tranquilityController = context.getProperty(DRUID_TRANQUILITY_SERVICE)
.asControllerService(DruidTranquilityService.class);
Tranquilizer<Map<String, Object>> tranquilizer = tranquilityController.getTranquilizer();
final FlowFile flowFile = session.get();
if (flowFile == null || flowFile.getSize() == 0) {
return;
}
//Get data from flow file body
final byte[] buffer = new byte[(int) flowFile.getSize()];
session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer));
String contentString = new String(buffer, StandardCharsets.UTF_8);
Map<String, Object> contentMap = null;
//Create payload array from flow file content, one element per line
String[] messageArray = contentString.split("\\R");
//Convert each array element from JSON to HashMap and send to Druid
for (String message : messageArray) {
try {
contentMap = new ObjectMapper().readValue(message, HashMap.class);
} catch (IOException e1) {
log.error("Error parsing incoming message array in the flowfile body");
}
log.debug("Tranquilizer Status: " + tranquilizer.status().toString());
messageStatus.put(flowFile, "pending");
//Send data element to Druid, Asynch
Future<BoxedUnit> future = tranquilizer.send(contentMap);
log.debug(" Sent Payload to Druid: " + contentMap);
//Wait for Druid to call back with status
future.addEventListener(new FutureEventListener<Object>() {
@Override
public void onFailure(Throwable cause) {
if (cause instanceof MessageDroppedException) {
//This happens when event timestamp targets a Druid Indexing task that has closed (Late Arriving Data)
log.error(" FlowFile Dropped due to MessageDroppedException: " + cause.getMessage() + " : " + cause);
cause.getStackTrace();
log.error(" Transferring FlowFile to DROPPED relationship");
session.transfer(flowFile, REL_DROPPED);
} else {
log.error("FlowFile Processing Failed due to: {} : {}", new Object[]{cause.getMessage(), cause});
cause.printStackTrace();
log.error(" Transferring FlowFile to FAIL relationship");
session.transfer(flowFile, REL_FAIL);
}
}
@Override
public void onSuccess(Object value) {
log.debug(" FlowFile Processing Success : " + value.toString());
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().send(flowFile, "Druid Tranquility Service");
}
});
try {
//Wait for result from Druid
//This method will be asynch since this is a SessionFactoryProcessor and OnTrigger will create a new Thread
Await.result(future);
} catch (Exception e) {
getLogger().error("Caught exception while waiting for result of put request: " + e.getMessage());
}
}
//session.transfer(flowFile, REL_SUCCESS);
session.commit();
}
public void onTrigger(ProcessContext context, ProcessSessionFactory factory) throws ProcessException {
final ProcessSession session = factory.createSession();
//Create new Thread to ensure that waiting for callback does not reduce throughput
new Thread(() -> {
processFlowFile(context, session);
}).start();
}
}

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.PutDruid

View File

@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors;
public class PutDruidTest {
}

View File

@ -0,0 +1,41 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>1.5.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-druid-bundle</artifactId>
<version>1.5.0-SNAPSHOT</version>
<packaging>pom</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<modules>
<module>nifi-druid-nar</module>
<module>nifi-druid-controller-service-api-nar</module>
<module>nifi-druid-controller-service-api</module>
<module>nifi-druid-controller-service</module>
<module>nifi-druid-processors</module>
</modules>
</project>

12
pom.xml
View File

@ -1491,6 +1491,18 @@
<artifactId>nifi-metrics-reporting-nar</artifactId>
<version>1.6.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-druid-controller-service-api-nar</artifactId>
<version>1.5.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-druid-nar</artifactId>
<version>1.5.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>