Merge pull request #269 from metamx/fix-task

Refactor Hadoop-based indexing to repair broken Hadoop Index Task
This commit is contained in:
fjy 2013-10-16 11:38:21 -07:00
commit 98dcd4468b
19 changed files with 948 additions and 721 deletions

View File

@ -24,8 +24,10 @@ The interval is the [ISO8601 interval](http://en.wikipedia.org/wiki/ISO_8601#Tim
```json
{
"dataSource": "the_data_source",
"timestampColumn": "ts",
"timestampFormat": "<iso, millis, posix, auto or any Joda time format>",
"timestampSpec" : {
"timestampColumn": "ts",
"timestampFormat": "<iso, millis, posix, auto or any Joda time format>"
},
"dataSpec": {
"format": "<csv, tsv, or json>",
"columns": [
@ -95,18 +97,17 @@ The interval is the [ISO8601 interval](http://en.wikipedia.org/wiki/ISO_8601#Tim
|property|description|required?|
|--------|-----------|---------|
|dataSource|name of the dataSource the data will belong to|yes|
|timestampColumn|the column that is to be used as the timestamp column|yes|
|timestampFormat|the format of timestamps; auto = either iso or millis, Joda time formats:http://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html|yes|
|dataSpec|a specification of the data format and an array that names all of the columns in the input data|yes|
|dimensions|the columns that are to be used as dimensions|yes|
|granularitySpec|the time granularity and interval to chunk segments up into|yes|
|timestampSpec|includes the column that is to be used as the timestamp column and the format of the timestamps; auto = either iso or millis, Joda time formats can be found [here](http://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html).|yes|
|dataSpec|a specification of the data format and an array that names all of the columns in the input data.|yes|
|dimensions|the columns that are to be used as dimensions.|yes|
|granularitySpec|the time granularity and interval to chunk segments up into.|yes|
|pathSpec|a specification of where to pull the data in from|yes|
|rollupSpec|a specification of the rollup to perform while processing the data|yes|
|workingPath|the working path to use for intermediate results (results between Hadoop jobs)|yes|
|segmentOutputPath|the path to dump segments into|yes|
|leaveIntermediate|leave behind files in the workingPath when job completes or fails (debugging tool)|no|
|partitionsSpec|a specification of how to partition each time bucket into segments, absence of this property means no partitioning will occur|no|
|updaterJobSpec|a specification of how to update the metadata for the druid cluster these segments belong to|yes|
|workingPath|the working path to use for intermediate results (results between Hadoop jobs).|yes|
|segmentOutputPath|the path to dump segments into.|yes|
|leaveIntermediate|leave behind files in the workingPath when job completes or fails (debugging tool).|no|
|partitionsSpec|a specification of how to partition each time bucket into segments, absence of this property means no partitioning will occur.|no|
|updaterJobSpec|a specification of how to update the metadata for the druid cluster these segments belong to.|yes|
### Path specification
@ -118,9 +119,9 @@ Is a type of data loader that expects data to be laid out in a specific path for
|property|description|required?|
|--------|-----------|---------|
|dataGranularity|specifies the granularity to expect the data at, e.g. hour means to expect directories `y=XXXX/m=XX/d=XX/H=XX`|yes|
|inputPath|Base path to append the expected time path to|yes|
|filePattern|Pattern that files should match to be included|yes|
|dataGranularity|specifies the granularity to expect the data at, e.g. hour means to expect directories `y=XXXX/m=XX/d=XX/H=XX`.|yes|
|inputPath|Base path to append the expected time path to.|yes|
|filePattern|Pattern that files should match to be included.|yes|
For example, if the sample config were run with the interval 2012-06-01/2012-06-02, it would expect data at the paths
@ -138,7 +139,7 @@ The indexing process has the ability to roll data up as it processes the incomin
|property|description|required?|
|--------|-----------|---------|
|aggs|specifies a list of aggregators to aggregate for each bucket (a bucket is defined by the tuple of the truncated timestamp and the dimensions). Aggregators available here are the same as available when querying.|yes|
|rollupGranularity|The granularity to use when truncating incoming timestamps for bucketization|yes|
|rollupGranularity|The granularity to use when truncating incoming timestamps for bucketization.|yes|
### Partitioning specification
@ -158,11 +159,11 @@ This is a specification of the properties that tell the job how to update metada
|property|description|required?|
|--------|-----------|---------|
|type|"db" is the only value available|yes|
|connectURI|a valid JDBC url to MySQL|yes|
|user|username for db|yes|
|password|password for db|yes|
|segmentTable|table to use in DB|yes|
|type|"db" is the only value available.|yes|
|connectURI|A valid JDBC url to MySQL.|yes|
|user|Username for db.|yes|
|password|password for db.|yes|
|segmentTable|Table to use in DB.|yes|
These properties should parrot what you have configured for your [Coordinator](Coordinator.html).
@ -177,15 +178,17 @@ java -Xmx2g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath lib/*:config/ov
This will start up a very simple local indexing service. For more complex deployments of the indexing service, see [here](Indexing-Service.html).
The schema of the Hadoop Index Task is very similar to the schema for the Hadoop Index Config. A sample Hadoop index task is shown below:
The schema of the Hadoop Index Task contains a task "type" and a Hadoop Index Config. A sample Hadoop index task is shown below:
```json
{
"type" : "index_hadoop",
"config": {
"dataSource" : "example",
"timestampColumn" : "timestamp",
"timestampFormat" : "auto",
"timestampSpec" : {
"timestampColumn" : "timestamp",
"timestampFormat" : "auto"
},
"dataSpec" : {
"format" : "json",
"dimensions" : ["dim1","dim2","dim3"]
@ -224,8 +227,23 @@ The schema of the Hadoop Index Task is very similar to the schema for the Hadoop
|property|description|required?|
|--------|-----------|---------|
|type|"index_hadoop"|yes|
|config|a Hadoop Index Config|yes|
|type|This should be "index_hadoop".|yes|
|config|A Hadoop Index Config.|yes|
The Hadoop Index Config submitted as part of an Hadoop Index Task is identical to the Hadoop Index Config used by the `HadoopBatchIndexer` except that three fields must be omitted: `segmentOutputPath`, `workingPath`, `updaterJobSpec`. The Indexing Service takes care of setting these fields internally.
To run the task:
```
curl -X 'POST' -H 'Content-Type:application/json' -d @example_index_hadoop_task.json localhost:8087/druid/indexer/v1/task
```
If the task succeeds, you should see in the logs of the indexing service:
```
2013-10-16 16:38:31,945 INFO [pool-6-thread-1] io.druid.indexing.overlord.exec.TaskConsumer - Task SUCCESS: HadoopIndexTask...
```
Having Problems?
----------------
Getting data into Druid can definitely be difficult for first time users. Please don't hesitate to ask questions in our IRC channel or on our [google groups page](https://groups.google.com/forum/#!forum/druid-development).

View File

@ -12,21 +12,21 @@ S3-compatible deep storage is basically either S3 or something like riak-cs whic
S3 configuration parameters are
```
com.metamx.aws.accessKey=<S3 access key>
com.metamx.aws.secretKey=<S3 secret_key>
druid.storage.s3.bucket=<bucket to store in>
druid.storage.s3.baseKey=<base key prefix to use, i.e. what directory>
druid.s3.accessKey=<S3 access key>
druid.s3.secretKey=<S3 secret_key>
druid.storage.bucket=<bucket to store in>
druid.storage.baseKey=<base key prefix to use, i.e. what directory>
```
## HDFS
As of 0.4.0, HDFS can be used for storage of segments as well.
In order to use hdfs for deep storage, you need to set the following configuration on your realtime nodes.
In order to use hdfs for deep storage, you need to set the following configuration on your real-time nodes.
```
druid.storage.hdfs=true
druid.storage.hdfs.storageDirectory=<directory for storing segments>
druid.storage.type=hdfs
druid.storage.storageDirectory=<directory for storing segments>
```
If you are using the Hadoop indexer, set your output directory to be a location on Hadoop and it will work
@ -36,13 +36,13 @@ If you are using the Hadoop indexer, set your output directory to be a location
A local mount can be used for storage of segments as well. This allows you to use just your local file system or anything else that can be mount locally like NFS, Ceph, etc.
In order to use a local mount for deep storage, you need to set the following configuration on your realtime nodes.
In order to use a local mount for deep storage, you need to set the following configuration on your real-time nodes.
```
druid.storage.local=true
druid.storage.local.storageDirectory=<directory for storing segments>
druid.storage.type=local
druid.storage.storageDirectory=<directory for storing segments>
```
Note that you should generally set `druid.storage.local.storageDirectory` to something different from `druid.paths.indexCache`.
Note that you should generally set `druid.storage.storageDirectory` to something different from `druid.segmentCache.locations` and `druid.segmentCache.infoDir`.
If you are using the Hadoop indexer in local mode, then just give it a local file as your output directory and it will work.

View File

@ -81,13 +81,13 @@ import java.util.Set;
/**
* Determines appropriate ShardSpecs for a job by determining whether or not partitioning is necessary, and if so,
* choosing the best dimension that satisfies the criteria:
*
* <p/>
* <ul>
* <li>Must have exactly one value per row.</li>
* <li>Must not generate oversized partitions. A dimension with N rows having the same value will necessarily
* put all those rows in the same partition, and that partition may be much larger than the target size.</li>
* <li>Must have exactly one value per row.</li>
* <li>Must not generate oversized partitions. A dimension with N rows having the same value will necessarily
* put all those rows in the same partition, and that partition may be much larger than the target size.</li>
* </ul>
*
* <p/>
* "Best" means a very high cardinality dimension, or, if none exist, the dimension that minimizes variation of
* segment size relative to the target.
*/
@ -125,7 +125,7 @@ public class DeterminePartitionsJob implements Jobby
* in the final segment.
*/
if(!config.getPartitionsSpec().isAssumeGrouped()) {
if (!config.getPartitionsSpec().isAssumeGrouped()) {
final Job groupByJob = new Job(
new Configuration(),
String.format("%s-determine_partitions_groupby-%s", config.getDataSource(), config.getIntervals())
@ -150,7 +150,7 @@ public class DeterminePartitionsJob implements Jobby
groupByJob.submit();
log.info("Job %s submitted, status available at: %s", groupByJob.getJobName(), groupByJob.getTrackingURL());
if(!groupByJob.waitForCompletion(true)) {
if (!groupByJob.waitForCompletion(true)) {
log.error("Job failed: %s", groupByJob.getJobID());
return false;
}
@ -170,7 +170,7 @@ public class DeterminePartitionsJob implements Jobby
injectSystemProperties(dimSelectionJob);
if(!config.getPartitionsSpec().isAssumeGrouped()) {
if (!config.getPartitionsSpec().isAssumeGrouped()) {
// Read grouped data from the groupByJob.
dimSelectionJob.setMapperClass(DeterminePartitionsDimSelectionPostGroupByMapper.class);
dimSelectionJob.setInputFormatClass(SequenceFileInputFormat.class);
@ -203,7 +203,7 @@ public class DeterminePartitionsJob implements Jobby
dimSelectionJob.getTrackingURL()
);
if(!dimSelectionJob.waitForCompletion(true)) {
if (!dimSelectionJob.waitForCompletion(true)) {
log.error("Job failed: %s", dimSelectionJob.getJobID().toString());
return false;
}
@ -237,15 +237,15 @@ public class DeterminePartitionsJob implements Jobby
}
shardSpecs.put(bucket, actualSpecs);
}
else {
} else {
log.info("Path[%s] didn't exist!?", partitionInfoPath);
}
}
config.setShardSpecs(shardSpecs);
return true;
} catch(Exception e) {
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
@ -271,9 +271,9 @@ public class DeterminePartitionsJob implements Jobby
{
// Create group key, there are probably more efficient ways of doing this
final Map<String, Set<String>> dims = Maps.newTreeMap();
for(final String dim : inputRow.getDimensions()) {
for (final String dim : inputRow.getDimensions()) {
final Set<String> dimValues = ImmutableSortedSet.copyOf(inputRow.getDimension(dim));
if(dimValues.size() > 0) {
if (dimValues.size() > 0) {
dims.put(dim, dimValues);
}
}
@ -314,7 +314,7 @@ public class DeterminePartitionsJob implements Jobby
protected void setup(Context context)
throws IOException, InterruptedException
{
final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfigBuilder.fromConfiguration(context.getConfiguration());
final String partitionDimension = config.getPartitionDimension();
helper = new DeterminePartitionsDimSelectionMapperHelper(config, partitionDimension);
}
@ -346,7 +346,7 @@ public class DeterminePartitionsJob implements Jobby
throws IOException, InterruptedException
{
super.setup(context);
final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfigBuilder.fromConfiguration(context.getConfiguration());
final String partitionDimension = config.getPartitionDimension();
helper = new DeterminePartitionsDimSelectionMapperHelper(config, partitionDimension);
}
@ -359,7 +359,7 @@ public class DeterminePartitionsJob implements Jobby
) throws IOException, InterruptedException
{
final Map<String, Iterable<String>> dims = Maps.newHashMap();
for(final String dim : inputRow.getDimensions()) {
for (final String dim : inputRow.getDimensions()) {
dims.put(dim, inputRow.getDimension(dim));
}
helper.emitDimValueCounts(context, new DateTime(inputRow.getTimestampFromEpoch()), dims);
@ -383,9 +383,9 @@ public class DeterminePartitionsJob implements Jobby
final ImmutableMap.Builder<DateTime, Integer> timeIndexBuilder = ImmutableMap.builder();
int idx = 0;
for(final Interval bucketInterval: config.getGranularitySpec().bucketIntervals()) {
for (final Interval bucketInterval : config.getGranularitySpec().bucketIntervals()) {
timeIndexBuilder.put(bucketInterval.getStart(), idx);
idx ++;
idx++;
}
this.intervalIndexes = timeIndexBuilder.build();
@ -399,7 +399,7 @@ public class DeterminePartitionsJob implements Jobby
{
final Optional<Interval> maybeInterval = config.getGranularitySpec().bucketInterval(timestamp);
if(!maybeInterval.isPresent()) {
if (!maybeInterval.isPresent()) {
throw new ISE("WTF?! No bucket found for timestamp: %s", timestamp);
}
@ -414,13 +414,13 @@ public class DeterminePartitionsJob implements Jobby
// Emit row-counter value.
write(context, groupKey, new DimValueCount("", "", 1));
for(final Map.Entry<String, Iterable<String>> dimAndValues : dims.entrySet()) {
for (final Map.Entry<String, Iterable<String>> dimAndValues : dims.entrySet()) {
final String dim = dimAndValues.getKey();
if(partitionDimension == null || partitionDimension.equals(dim)) {
if (partitionDimension == null || partitionDimension.equals(dim)) {
final Iterable<String> dimValues = dimAndValues.getValue();
if(Iterables.size(dimValues) == 1) {
if (Iterables.size(dimValues) == 1) {
// Emit this value.
write(context, groupKey, new DimValueCount(dim, Iterables.getOnlyElement(dimValues), 1));
} else {
@ -433,7 +433,7 @@ public class DeterminePartitionsJob implements Jobby
}
public static class DeterminePartitionsDimSelectionPartitioner
extends Partitioner<BytesWritable, Text>
extends Partitioner<BytesWritable, Text>
{
@Override
public int getPartition(BytesWritable bytesWritable, Text text, int numPartitions)
@ -463,7 +463,7 @@ public class DeterminePartitionsJob implements Jobby
if (config == null) {
synchronized (DeterminePartitionsDimSelectionBaseReducer.class) {
if (config == null) {
config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
config = HadoopDruidIndexerConfigBuilder.fromConfiguration(context.getConfiguration());
}
}
}
@ -561,7 +561,7 @@ public class DeterminePartitionsJob implements Jobby
final DimValueCount firstDvc = iterator.next();
final int totalRows = firstDvc.numRows;
if(!firstDvc.dim.equals("") || !firstDvc.value.equals("")) {
if (!firstDvc.dim.equals("") || !firstDvc.value.equals("")) {
throw new IllegalStateException("WTF?! Expected total row indicator on first k/v pair!");
}
@ -574,10 +574,10 @@ public class DeterminePartitionsJob implements Jobby
// We'll store possible partitions in here
final Map<String, DimPartitions> dimPartitionss = Maps.newHashMap();
while(iterator.hasNext()) {
while (iterator.hasNext()) {
final DimValueCount dvc = iterator.next();
if(currentDimPartitions == null || !currentDimPartitions.dim.equals(dvc.dim)) {
if (currentDimPartitions == null || !currentDimPartitions.dim.equals(dvc.dim)) {
// Starting a new dimension! Exciting!
currentDimPartitions = new DimPartitions(dvc.dim);
currentDimPartition = new DimPartition();
@ -586,17 +586,17 @@ public class DeterminePartitionsJob implements Jobby
}
// Respect poisoning
if(!currentDimSkip && dvc.numRows < 0) {
if (!currentDimSkip && dvc.numRows < 0) {
log.info("Cannot partition on multi-valued dimension: %s", dvc.dim);
currentDimSkip = true;
}
if(currentDimSkip) {
if (currentDimSkip) {
continue;
}
// See if we need to cut a new partition ending immediately before this dimension value
if(currentDimPartition.rows > 0 && currentDimPartition.rows + dvc.numRows >= config.getTargetPartitionSize()) {
if (currentDimPartition.rows > 0 && currentDimPartition.rows + dvc.numRows >= config.getTargetPartitionSize()) {
final ShardSpec shardSpec = new SingleDimensionShardSpec(
currentDimPartitions.dim,
currentDimPartitionStart,
@ -618,20 +618,20 @@ public class DeterminePartitionsJob implements Jobby
}
// Update counters
currentDimPartition.cardinality ++;
currentDimPartition.cardinality++;
currentDimPartition.rows += dvc.numRows;
if(!iterator.hasNext() || !currentDimPartitions.dim.equals(iterator.peek().dim)) {
if (!iterator.hasNext() || !currentDimPartitions.dim.equals(iterator.peek().dim)) {
// Finalize the current dimension
if(currentDimPartition.rows > 0) {
if (currentDimPartition.rows > 0) {
// One more shard to go
final ShardSpec shardSpec;
if (currentDimPartitions.partitions.isEmpty()) {
shardSpec = new NoneShardSpec();
} else {
if(currentDimPartition.rows < config.getTargetPartitionSize() * SHARD_COMBINE_THRESHOLD) {
if (currentDimPartition.rows < config.getTargetPartitionSize() * SHARD_COMBINE_THRESHOLD) {
// Combine with previous shard
final DimPartition previousDimPartition = currentDimPartitions.partitions.remove(
currentDimPartitions.partitions.size() - 1
@ -685,7 +685,7 @@ public class DeterminePartitionsJob implements Jobby
}
// Choose best dimension
if(dimPartitionss.isEmpty()) {
if (dimPartitionss.isEmpty()) {
throw new ISE("No suitable partitioning dimension found!");
}
@ -694,8 +694,8 @@ public class DeterminePartitionsJob implements Jobby
DimPartitions minDistancePartitions = null;
DimPartitions maxCardinalityPartitions = null;
for(final DimPartitions dimPartitions : dimPartitionss.values()) {
if(dimPartitions.getRows() != totalRows) {
for (final DimPartitions dimPartitions : dimPartitionss.values()) {
if (dimPartitions.getRows() != totalRows) {
log.info(
"Dimension[%s] is not present in all rows (row count %,d != expected row count %,d)",
dimPartitions.dim,
@ -708,32 +708,32 @@ public class DeterminePartitionsJob implements Jobby
// Make sure none of these shards are oversized
boolean oversized = false;
for(final DimPartition partition : dimPartitions.partitions) {
if(partition.rows > config.getMaxPartitionSize()) {
for (final DimPartition partition : dimPartitions.partitions) {
if (partition.rows > config.getMaxPartitionSize()) {
log.info("Dimension[%s] has an oversized shard: %s", dimPartitions.dim, partition.shardSpec);
oversized = true;
}
}
if(oversized) {
if (oversized) {
continue;
}
final int cardinality = dimPartitions.getCardinality();
final long distance = dimPartitions.getDistanceSquaredFromTarget(config.getTargetPartitionSize());
if(cardinality > maxCardinality) {
if (cardinality > maxCardinality) {
maxCardinality = cardinality;
maxCardinalityPartitions = dimPartitions;
}
if(distance < minDistance) {
if (distance < minDistance) {
minDistance = distance;
minDistancePartitions = dimPartitions;
}
}
if(maxCardinalityPartitions == null) {
if (maxCardinalityPartitions == null) {
throw new ISE("No suitable partitioning dimension found!");
}
@ -821,7 +821,7 @@ public class DeterminePartitionsJob implements Jobby
public int getCardinality()
{
int sum = 0;
for(final DimPartition dimPartition : partitions) {
for (final DimPartition dimPartition : partitions) {
sum += dimPartition.cardinality;
}
return sum;
@ -830,7 +830,7 @@ public class DeterminePartitionsJob implements Jobby
public long getDistanceSquaredFromTarget(long target)
{
long distance = 0;
for(final DimPartition dimPartition : partitions) {
for (final DimPartition dimPartition : partitions) {
distance += (dimPartition.rows - target) * (dimPartition.rows - target);
}
@ -841,7 +841,7 @@ public class DeterminePartitionsJob implements Jobby
public int getRows()
{
int sum = 0;
for(final DimPartition dimPartition : partitions) {
for (final DimPartition dimPartition : partitions) {
sum += dimPartition.rows;
}
return sum;
@ -892,7 +892,11 @@ public class DeterminePartitionsJob implements Jobby
throws IOException, InterruptedException
{
context.write(
new SortableBytes(groupKey, tabJoiner.join(dimValueCount.dim, dimValueCount.value).getBytes(HadoopDruidIndexerConfig.javaNativeCharset)).toBytesWritable(),
new SortableBytes(
groupKey, tabJoiner.join(dimValueCount.dim, dimValueCount.value).getBytes(
HadoopDruidIndexerConfig.javaNativeCharset
)
).toBytesWritable(),
dimValueCount.toText()
);
}

View File

@ -21,7 +21,6 @@ package io.druid.indexer;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
@ -68,7 +67,6 @@ import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.format.ISODateTimeFormat;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Collections;
@ -80,13 +78,13 @@ import java.util.Set;
*/
public class HadoopDruidIndexerConfig
{
public static final Charset javaNativeCharset = Charset.forName("Unicode");
public static final Splitter tabSplitter = Splitter.on("\t");
public static final Joiner tabJoiner = Joiner.on("\t");
private static final Logger log = new Logger(HadoopDruidIndexerConfig.class);
private static final Injector injector;
public static final String CONFIG_PROPERTY = "druid.indexer.config";
public static final Charset javaNativeCharset = Charset.forName("Unicode");
public static final Splitter tabSplitter = Splitter.on("\t");
public static final Joiner tabJoiner = Joiner.on("\t");
public static final ObjectMapper jsonMapper;
static {
@ -113,85 +111,33 @@ public class HadoopDruidIndexerConfig
INVALID_ROW_COUNTER
}
public static HadoopDruidIndexerConfig fromMap(Map<String, Object> argSpec)
{
return jsonMapper.convertValue(argSpec, HadoopDruidIndexerConfig.class);
}
@SuppressWarnings("unchecked")
public static HadoopDruidIndexerConfig fromFile(File file)
{
try {
return fromMap((Map<String, Object>) jsonMapper.readValue(file, new TypeReference<Map<String, Object>>(){}));
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
@SuppressWarnings("unchecked")
public static HadoopDruidIndexerConfig fromString(String str)
{
try {
return fromMap(
(Map<String, Object>) jsonMapper.readValue(
str, new TypeReference<Map<String, Object>>()
{
}
)
);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
public static HadoopDruidIndexerConfig fromConfiguration(Configuration conf)
{
final HadoopDruidIndexerConfig retVal = fromString(conf.get(CONFIG_PROPERTY));
retVal.verify();
return retVal;
}
private static final Logger log = new Logger(HadoopDruidIndexerConfig.class);
private static final String CONFIG_PROPERTY = "druid.indexer.config";
private volatile String dataSource;
private volatile String timestampColumnName;
private volatile String timestampFormat;
private volatile TimestampSpec timestampSpec;
private volatile DataSpec dataSpec;
@Deprecated
private volatile Granularity segmentGranularity;
private volatile GranularitySpec granularitySpec;
private volatile PathSpec pathSpec;
private volatile String jobOutputDir;
private volatile String segmentOutputDir;
private volatile String version = new DateTime().toString();
private volatile String workingPath;
private volatile String segmentOutputPath;
private volatile String version;
private volatile PartitionsSpec partitionsSpec;
private volatile boolean leaveIntermediate = false;
private volatile boolean cleanupOnFailure = true;
private volatile Map<DateTime, List<HadoopyShardSpec>> shardSpecs = ImmutableMap.of();
private volatile boolean overwriteFiles = false;
private volatile boolean leaveIntermediate;
private volatile boolean cleanupOnFailure;
private volatile Map<DateTime, List<HadoopyShardSpec>> shardSpecs;
private volatile boolean overwriteFiles;
private volatile DataRollupSpec rollupSpec;
private volatile DbUpdaterJobSpec updaterJobSpec;
private volatile boolean ignoreInvalidRows = false;
private volatile boolean ignoreInvalidRows;
@JsonCreator
public HadoopDruidIndexerConfig(
final @JsonProperty("intervals") List<Interval> intervals,
final @JsonProperty("dataSource") String dataSource,
final @JsonProperty("timestampColumn") String timestampColumnName,
final @JsonProperty("timestampFormat") String timestampFormat,
final @JsonProperty("timestampSpec") TimestampSpec timestampSpec,
final @JsonProperty("dataSpec") DataSpec dataSpec,
final @JsonProperty("segmentGranularity") Granularity segmentGranularity,
final @JsonProperty("granularitySpec") GranularitySpec granularitySpec,
final @JsonProperty("pathSpec") PathSpec pathSpec,
final @JsonProperty("workingPath") String jobOutputDir,
final @JsonProperty("segmentOutputPath") String segmentOutputDir,
final @JsonProperty("workingPath") String workingPath,
final @JsonProperty("segmentOutputPath") String segmentOutputPath,
final @JsonProperty("version") String version,
final @JsonProperty("partitionDimension") String partitionDimension,
final @JsonProperty("targetPartitionSize") Long targetPartitionSize,
final @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec,
final @JsonProperty("leaveIntermediate") boolean leaveIntermediate,
final @JsonProperty("cleanupOnFailure") Boolean cleanupOnFailure,
@ -199,48 +145,51 @@ public class HadoopDruidIndexerConfig
final @JsonProperty("overwriteFiles") boolean overwriteFiles,
final @JsonProperty("rollupSpec") DataRollupSpec rollupSpec,
final @JsonProperty("updaterJobSpec") DbUpdaterJobSpec updaterJobSpec,
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows,
// These fields are deprecated and will be removed in the future
final @JsonProperty("timestampColumn") String timestampColumn,
final @JsonProperty("timestampFormat") String timestampFormat,
final @JsonProperty("intervals") List<Interval> intervals,
final @JsonProperty("segmentGranularity") Granularity segmentGranularity,
final @JsonProperty("partitionDimension") String partitionDimension,
final @JsonProperty("targetPartitionSize") Long targetPartitionSize
)
{
this.dataSource = dataSource;
this.timestampColumnName = (timestampColumnName == null) ? null : timestampColumnName.toLowerCase();
this.timestampFormat = timestampFormat;
this.timestampSpec = (timestampSpec == null) ? new TimestampSpec(timestampColumn, timestampFormat) : timestampSpec;
this.dataSpec = dataSpec;
this.granularitySpec = granularitySpec;
this.pathSpec = pathSpec;
this.jobOutputDir = jobOutputDir;
this.segmentOutputDir = segmentOutputDir;
this.workingPath = workingPath;
this.segmentOutputPath = segmentOutputPath;
this.version = version == null ? new DateTime().toString() : version;
this.partitionsSpec = partitionsSpec;
this.leaveIntermediate = leaveIntermediate;
this.cleanupOnFailure = (cleanupOnFailure == null ? true : cleanupOnFailure);
this.shardSpecs = shardSpecs;
this.shardSpecs = (shardSpecs == null ? ImmutableMap.<DateTime, List<HadoopyShardSpec>>of() : shardSpecs);
this.overwriteFiles = overwriteFiles;
this.rollupSpec = rollupSpec;
this.updaterJobSpec = updaterJobSpec;
this.ignoreInvalidRows = ignoreInvalidRows;
if(partitionsSpec != null) {
if (partitionsSpec != null) {
Preconditions.checkArgument(
partitionDimension == null && targetPartitionSize == null,
"Cannot mix partitionsSpec with partitionDimension/targetPartitionSize"
);
this.partitionsSpec = partitionsSpec;
} else {
// Backwards compatibility
this.partitionsSpec = new PartitionsSpec(partitionDimension, targetPartitionSize, null, false);
}
if(granularitySpec != null) {
if (granularitySpec != null) {
Preconditions.checkArgument(
segmentGranularity == null && intervals == null,
"Cannot mix granularitySpec with segmentGranularity/intervals"
);
this.granularitySpec = granularitySpec;
} else {
// Backwards compatibility
this.segmentGranularity = segmentGranularity;
if(segmentGranularity != null && intervals != null) {
if (segmentGranularity != null && intervals != null) {
this.granularitySpec = new UniformGranularitySpec(segmentGranularity, intervals);
}
}
@ -253,21 +202,6 @@ public class HadoopDruidIndexerConfig
{
}
public List<Interval> getIntervals()
{
return JodaUtils.condenseIntervals(getGranularitySpec().bucketIntervals());
}
@Deprecated
public void setIntervals(List<Interval> intervals)
{
Preconditions.checkState(this.granularitySpec == null, "Cannot mix setIntervals with granularitySpec");
Preconditions.checkState(this.segmentGranularity != null, "Cannot use setIntervals without segmentGranularity");
// For backwards compatibility
this.granularitySpec = new UniformGranularitySpec(this.segmentGranularity, intervals);
}
@JsonProperty
public String getDataSource()
{
@ -279,31 +213,15 @@ public class HadoopDruidIndexerConfig
this.dataSource = dataSource.toLowerCase();
}
@JsonProperty("timestampColumn")
public String getTimestampColumnName()
{
return timestampColumnName;
}
public void setTimestampColumnName(String timestampColumnName)
{
this.timestampColumnName = timestampColumnName.toLowerCase();
}
@JsonProperty()
public String getTimestampFormat()
{
return timestampFormat;
}
public void setTimestampFormat(String timestampFormat)
{
this.timestampFormat = timestampFormat;
}
@JsonProperty
public TimestampSpec getTimestampSpec()
{
return new TimestampSpec(timestampColumnName, timestampFormat);
return timestampSpec;
}
public void setTimestampSpec(TimestampSpec timestampSpec)
{
this.timestampSpec = timestampSpec;
}
@JsonProperty
@ -317,32 +235,6 @@ public class HadoopDruidIndexerConfig
this.dataSpec = new ToLowercaseDataSpec(dataSpec);
}
public StringInputRowParser getParser()
{
final List<String> dimensionExclusions;
if(getDataSpec().hasCustomDimensions()) {
dimensionExclusions = null;
} else {
dimensionExclusions = Lists.newArrayList();
dimensionExclusions.add(getTimestampColumnName());
dimensionExclusions.addAll(
Lists.transform(
getRollupSpec().getAggs(), new Function<AggregatorFactory, String>()
{
@Override
public String apply(AggregatorFactory aggregatorFactory)
{
return aggregatorFactory.getName();
}
}
)
);
}
return new StringInputRowParser(getTimestampSpec(), getDataSpec(), dimensionExclusions);
}
@JsonProperty
public GranularitySpec getGranularitySpec()
{
@ -354,17 +246,6 @@ public class HadoopDruidIndexerConfig
this.granularitySpec = granularitySpec;
}
@JsonProperty
public PartitionsSpec getPartitionsSpec()
{
return partitionsSpec;
}
public void setPartitionsSpec(PartitionsSpec partitionsSpec)
{
this.partitionsSpec = partitionsSpec;
}
@JsonProperty
public PathSpec getPathSpec()
{
@ -376,26 +257,26 @@ public class HadoopDruidIndexerConfig
this.pathSpec = pathSpec;
}
@JsonProperty("workingPath")
public String getJobOutputDir()
@JsonProperty
public String getWorkingPath()
{
return jobOutputDir;
return workingPath;
}
public void setJobOutputDir(String jobOutputDir)
public void setWorkingPath(String workingPath)
{
this.jobOutputDir = jobOutputDir;
this.workingPath = workingPath;
}
@JsonProperty("segmentOutputPath")
public String getSegmentOutputDir()
@JsonProperty
public String getSegmentOutputPath()
{
return segmentOutputDir;
return segmentOutputPath;
}
public void setSegmentOutputDir(String segmentOutputDir)
public void setSegmentOutputPath(String segmentOutputPath)
{
this.segmentOutputDir = segmentOutputDir;
this.segmentOutputPath = segmentOutputPath;
}
@JsonProperty
@ -409,29 +290,15 @@ public class HadoopDruidIndexerConfig
this.version = version;
}
public String getPartitionDimension()
@JsonProperty
public PartitionsSpec getPartitionsSpec()
{
return partitionsSpec.getPartitionDimension();
return partitionsSpec;
}
public boolean partitionByDimension()
public void setPartitionsSpec(PartitionsSpec partitionsSpec)
{
return partitionsSpec.isDeterminingPartitions();
}
public Long getTargetPartitionSize()
{
return partitionsSpec.getTargetPartitionSize();
}
public long getMaxPartitionSize()
{
return partitionsSpec.getMaxPartitionSize();
}
public boolean isUpdaterJobSpecSet()
{
return (updaterJobSpec != null);
this.partitionsSpec = partitionsSpec;
}
@JsonProperty
@ -511,6 +378,72 @@ public class HadoopDruidIndexerConfig
this.ignoreInvalidRows = ignoreInvalidRows;
}
public List<Interval> getIntervals()
{
return JodaUtils.condenseIntervals(getGranularitySpec().bucketIntervals());
}
public String getPartitionDimension()
{
return partitionsSpec.getPartitionDimension();
}
public boolean partitionByDimension()
{
return partitionsSpec.isDeterminingPartitions();
}
public Long getTargetPartitionSize()
{
return partitionsSpec.getTargetPartitionSize();
}
public long getMaxPartitionSize()
{
return partitionsSpec.getMaxPartitionSize();
}
public boolean isUpdaterJobSpecSet()
{
return (updaterJobSpec != null);
}
public StringInputRowParser getParser()
{
final List<String> dimensionExclusions;
if (getDataSpec().hasCustomDimensions()) {
dimensionExclusions = null;
} else {
dimensionExclusions = Lists.newArrayList();
dimensionExclusions.add(timestampSpec.getTimestampColumn());
dimensionExclusions.addAll(
Lists.transform(
getRollupSpec().getAggs(), new Function<AggregatorFactory, String>()
{
@Override
public String apply(AggregatorFactory aggregatorFactory)
{
return aggregatorFactory.getName();
}
}
)
);
}
return new StringInputRowParser(getTimestampSpec(), getDataSpec(), dimensionExclusions);
}
public HadoopyShardSpec getShardSpec(Bucket bucket)
{
return shardSpecs.get(bucket.time).get(bucket.partitionNum);
}
public Job addInputPaths(Job job) throws IOException
{
return getPathSpec().addInputPaths(this, job);
}
/********************************************
Granularity/Bucket Helper Methods
********************************************/
@ -590,11 +523,6 @@ public class HadoopDruidIndexerConfig
);
}
public HadoopyShardSpec getShardSpec(Bucket bucket)
{
return shardSpecs.get(bucket.time).get(bucket.partitionNum);
}
/******************************************
Path helper logic
******************************************/
@ -606,7 +534,7 @@ public class HadoopDruidIndexerConfig
*/
public Path makeIntermediatePath()
{
return new Path(String.format("%s/%s/%s", getJobOutputDir(), dataSource, getVersion().replace(":", "")));
return new Path(String.format("%s/%s/%s", getWorkingPath(), getDataSource(), getVersion().replace(":", "")));
}
public Path makeSegmentPartitionInfoPath(Bucket bucket)
@ -638,38 +566,33 @@ public class HadoopDruidIndexerConfig
return new Path(makeDescriptorInfoDir(), String.format("%s.json", segment.getIdentifier().replace(":", "")));
}
public Path makeSegmentOutputPath(FileSystem fileSystem, Bucket bucket)
{
final Interval bucketInterval = getGranularitySpec().bucketInterval(bucket.time).get();
if (fileSystem instanceof DistributedFileSystem)
{
return new Path(
String.format(
"%s/%s/%s_%s/%s/%s",
getSegmentOutputDir(),
dataSource,
bucketInterval.getStart().toString(ISODateTimeFormat.basicDateTime()),
bucketInterval.getEnd().toString(ISODateTimeFormat.basicDateTime()),
getVersion().replace(":", "_"),
bucket.partitionNum
)
);
}
return new Path(
String.format(
"%s/%s/%s_%s/%s/%s",
getSegmentOutputDir(),
dataSource,
bucketInterval.getStart().toString(),
bucketInterval.getEnd().toString(),
getVersion(),
bucket.partitionNum
));
}
public Job addInputPaths(Job job) throws IOException
public Path makeSegmentOutputPath(FileSystem fileSystem, Bucket bucket)
{
return pathSpec.addInputPaths(this, job);
final Interval bucketInterval = getGranularitySpec().bucketInterval(bucket.time).get();
if (fileSystem instanceof DistributedFileSystem) {
return new Path(
String.format(
"%s/%s/%s_%s/%s/%s",
getSegmentOutputPath(),
getDataSource(),
bucketInterval.getStart().toString(ISODateTimeFormat.basicDateTime()),
bucketInterval.getEnd().toString(ISODateTimeFormat.basicDateTime()),
getVersion().replace(":", "_"),
bucket.partitionNum
)
);
}
return new Path(
String.format(
"%s/%s/%s_%s/%s/%s",
getSegmentOutputPath(),
getDataSource(),
bucketInterval.getStart().toString(),
bucketInterval.getEnd().toString(),
getVersion(),
bucket.partitionNum
)
);
}
public void intoConfiguration(Job job)
@ -677,7 +600,7 @@ public class HadoopDruidIndexerConfig
Configuration conf = job.getConfiguration();
try {
conf.set(CONFIG_PROPERTY, jsonMapper.writeValueAsString(this));
conf.set(HadoopDruidIndexerConfig.CONFIG_PROPERTY, HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(this));
}
catch (IOException e) {
throw Throwables.propagate(e);
@ -695,12 +618,11 @@ public class HadoopDruidIndexerConfig
Preconditions.checkNotNull(dataSource, "dataSource");
Preconditions.checkNotNull(dataSpec, "dataSpec");
Preconditions.checkNotNull(timestampColumnName, "timestampColumn");
Preconditions.checkNotNull(timestampFormat, "timestampFormat");
Preconditions.checkNotNull(timestampSpec, "timestampSpec");
Preconditions.checkNotNull(granularitySpec, "granularitySpec");
Preconditions.checkNotNull(pathSpec, "pathSpec");
Preconditions.checkNotNull(jobOutputDir, "workingPath");
Preconditions.checkNotNull(segmentOutputDir, "segmentOutputPath");
Preconditions.checkNotNull(workingPath, "workingPath");
Preconditions.checkNotNull(segmentOutputPath, "segmentOutputPath");
Preconditions.checkNotNull(version, "version");
Preconditions.checkNotNull(rollupSpec, "rollupSpec");

View File

@ -0,0 +1,278 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexer;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import io.druid.data.input.impl.DataSpec;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.indexer.granularity.GranularitySpec;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.path.PathSpec;
import io.druid.indexer.rollup.DataRollupSpec;
import io.druid.indexer.updater.DbUpdaterJobSpec;
import org.apache.hadoop.conf.Configuration;
import org.joda.time.DateTime;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
*/
public class HadoopDruidIndexerConfigBuilder
{
public static HadoopDruidIndexerConfig fromSchema(HadoopDruidIndexerSchema schema)
{
return HadoopDruidIndexerConfig.jsonMapper.convertValue(schema, HadoopDruidIndexerConfig.class);
}
public static HadoopDruidIndexerConfig fromMap(Map<String, Object> argSpec)
{
return HadoopDruidIndexerConfig.jsonMapper.convertValue(argSpec, HadoopDruidIndexerConfig.class);
}
@SuppressWarnings("unchecked")
public static HadoopDruidIndexerConfig fromFile(File file)
{
try {
return fromMap(
(Map<String, Object>) HadoopDruidIndexerConfig.jsonMapper.readValue(
file, new TypeReference<Map<String, Object>>()
{
}
)
);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
@SuppressWarnings("unchecked")
public static HadoopDruidIndexerConfig fromString(String str)
{
try {
return fromMap(
(Map<String, Object>) HadoopDruidIndexerConfig.jsonMapper.readValue(
str, new TypeReference<Map<String, Object>>()
{
}
)
);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
public static HadoopDruidIndexerConfig fromConfiguration(Configuration conf)
{
final HadoopDruidIndexerConfig retVal = fromString(conf.get(HadoopDruidIndexerConfig.CONFIG_PROPERTY));
retVal.verify();
return retVal;
}
private volatile String dataSource;
private volatile TimestampSpec timestampSpec;
private volatile DataSpec dataSpec;
private volatile GranularitySpec granularitySpec;
private volatile PathSpec pathSpec;
private volatile String workingPath;
private volatile String segmentOutputPath;
private volatile String version;
private volatile PartitionsSpec partitionsSpec;
private volatile boolean leaveIntermediate;
private volatile boolean cleanupOnFailure;
private volatile Map<DateTime, List<HadoopyShardSpec>> shardSpecs;
private volatile boolean overwriteFiles;
private volatile DataRollupSpec rollupSpec;
private volatile DbUpdaterJobSpec updaterJobSpec;
private volatile boolean ignoreInvalidRows;
public HadoopDruidIndexerConfigBuilder()
{
this.dataSource = null;
this.timestampSpec = null;
this.dataSpec = null;
this.granularitySpec = null;
this.pathSpec = null;
this.workingPath = null;
this.segmentOutputPath = null;
this.version = new DateTime().toString();
this.partitionsSpec = null;
this.leaveIntermediate = false;
this.cleanupOnFailure = true;
this.shardSpecs = ImmutableMap.of();
this.overwriteFiles = false;
this.rollupSpec = null;
this.updaterJobSpec = null;
this.ignoreInvalidRows = false;
}
public HadoopDruidIndexerConfigBuilder withDataSource(String dataSource)
{
this.dataSource = dataSource;
return this;
}
public HadoopDruidIndexerConfigBuilder withTimestampSpec(TimestampSpec timestampSpec)
{
this.timestampSpec = timestampSpec;
return this;
}
public HadoopDruidIndexerConfigBuilder withDataSpec(DataSpec dataSpec)
{
this.dataSpec = dataSpec;
return this;
}
public HadoopDruidIndexerConfigBuilder withGranularitySpec(GranularitySpec granularitySpec)
{
this.granularitySpec = granularitySpec;
return this;
}
public HadoopDruidIndexerConfigBuilder withPathSpec(PathSpec pathSpec)
{
this.pathSpec = pathSpec;
return this;
}
public HadoopDruidIndexerConfigBuilder withWorkingPath(String workingPath)
{
this.workingPath = workingPath;
return this;
}
public HadoopDruidIndexerConfigBuilder withSegmentOutputPath(String segmentOutputPath)
{
this.segmentOutputPath = segmentOutputPath;
return this;
}
public HadoopDruidIndexerConfigBuilder withVersion(String version)
{
this.version = version;
return this;
}
public HadoopDruidIndexerConfigBuilder withPartitionsSpec(PartitionsSpec partitionsSpec)
{
this.partitionsSpec = partitionsSpec;
return this;
}
public HadoopDruidIndexerConfigBuilder withLeaveIntermediate(boolean leaveIntermediate)
{
this.leaveIntermediate = leaveIntermediate;
return this;
}
public HadoopDruidIndexerConfigBuilder withCleanupOnFailure(boolean cleanupOnFailure)
{
this.cleanupOnFailure = cleanupOnFailure;
return this;
}
public HadoopDruidIndexerConfigBuilder withShardSpecs(Map<DateTime, List<HadoopyShardSpec>> shardSpecs)
{
this.shardSpecs = shardSpecs;
return this;
}
public HadoopDruidIndexerConfigBuilder withOverwriteFiles(boolean overwriteFiles)
{
this.overwriteFiles = overwriteFiles;
return this;
}
public HadoopDruidIndexerConfigBuilder withRollupSpec(DataRollupSpec rollupSpec)
{
this.rollupSpec = rollupSpec;
return this;
}
public HadoopDruidIndexerConfigBuilder withUpdaterJobSpec(DbUpdaterJobSpec updaterJobSpec)
{
this.updaterJobSpec = updaterJobSpec;
return this;
}
public HadoopDruidIndexerConfigBuilder withIgnoreInvalidRows(boolean ignoreInvalidRows)
{
this.ignoreInvalidRows = ignoreInvalidRows;
return this;
}
public HadoopDruidIndexerConfigBuilder withSchema(HadoopDruidIndexerSchema schema)
{
this.dataSource = schema.getDataSource();
this.timestampSpec = schema.getTimestampSpec();
this.dataSpec = schema.getDataSpec();
this.granularitySpec = schema.getGranularitySpec();
this.pathSpec = HadoopDruidIndexerConfig.jsonMapper.convertValue(schema.getPathSpec(), PathSpec.class);
this.workingPath = schema.getWorkingPath();
this.segmentOutputPath = schema.getSegmentOutputPath();
this.version = schema.getVersion();
this.partitionsSpec = schema.getPartitionsSpec();
this.leaveIntermediate = schema.isLeaveIntermediate();
this.cleanupOnFailure = schema.isCleanupOnFailure();
this.shardSpecs = schema.getShardSpecs();
this.overwriteFiles = schema.isOverwriteFiles();
this.rollupSpec = schema.getRollupSpec();
this.updaterJobSpec = schema.getUpdaterJobSpec();
this.ignoreInvalidRows = schema.isIgnoreInvalidRows();
return this;
}
public HadoopDruidIndexerConfig build()
{
return new HadoopDruidIndexerConfig(
dataSource,
timestampSpec,
dataSpec,
granularitySpec,
pathSpec,
workingPath,
segmentOutputPath,
version,
partitionsSpec,
leaveIntermediate,
cleanupOnFailure,
shardSpecs,
overwriteFiles,
rollupSpec,
updaterJobSpec,
ignoreInvalidRows,
null,
null,
null,
null,
null,
null
);
}
}

View File

@ -46,6 +46,7 @@ public class HadoopDruidIndexerJob implements Jobby
private final HadoopDruidIndexerConfig config;
private final DbUpdaterJob dbUpdaterJob;
private IndexGeneratorJob indexJob;
private volatile List<DataSegment> publishedSegments = null;

View File

@ -1,123 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexer;
import com.google.common.collect.ImmutableList;
import com.metamx.common.Pair;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import java.util.List;
/**
*/
public class HadoopDruidIndexerMain
{
private static final Logger log = new Logger(HadoopDruidIndexerMain.class);
public static void main(String[] args) throws Exception
{
if (args.length != 1) {
printHelp();
System.exit(2);
}
HadoopDruidIndexerNode node = HadoopDruidIndexerNode.builder().build();
node.setArgumentSpec(args[0]);
Lifecycle lifecycle = new Lifecycle();
lifecycle.addManagedInstance(node);
try {
lifecycle.start();
}
catch (Throwable t) {
log.info(t, "Throwable caught at startup, committing seppuku");
Thread.sleep(500);
printHelp();
System.exit(1);
}
}
private static final List<Pair<String, String>> expectedFields =
ImmutableList.<Pair<String, String>>builder()
.add(Pair.of("dataSource", "Name of dataSource"))
.add(Pair.of("timestampColumn", "Column name of the timestamp column"))
.add(Pair.of("timestampFormat", "Format name of the timestamp column (posix or iso)"))
.add(
Pair.of(
"dataSpec",
"A JSON object with fields "
+
"format=(json, csv, tsv), "
+
"columns=JSON array of column names for the delimited text input file (only for csv and tsv formats),"
+
"dimensions=JSON array of dimensionn names (must match names in columns),"
+
"delimiter=delimiter of the data (only for tsv format)"
)
)
.add(Pair.of("granularitySpec", "A JSON object indicating the Granularity that segments should be created at."))
.add(
Pair.of(
"pathSpec",
"A JSON object with fields type=granularity, inputPath, filePattern, dataGranularity"
)
)
.add(
Pair.of(
"rollupSpec",
"JSON object with fields rollupGranularity, aggs=JSON Array of Aggregator specs"
)
)
.add(Pair.of("workingPath", "Path to store intermediate output data. Deleted when finished."))
.add(Pair.of("segmentOutputPath", "Path to store output segments."))
.add(
Pair.of(
"updaterJobSpec",
"JSON object with fields type=db, connectURI of the database, username, password, and segment table name"
)
)
.add(Pair.of("cleanupOnFailure", "Clean up intermediate files on failure? (default: true)"))
.add(Pair.of("leaveIntermediate", "Leave intermediate files. (default: false)"))
.add(Pair.of("partitionDimension", "Dimension to partition by (optional)"))
.add(
Pair.of(
"targetPartitionSize",
"Integer representing the target number of rows in a partition (required if partitionDimension != null)"
)
)
.build();
public static void printHelp()
{
System.out.println("Usage: <java invocation> <config_spec>");
System.out.println("<config_spec> is either a JSON object or the path to a file that contains a JSON object.");
System.out.println();
System.out.println("JSON object description:");
System.out.println("{");
for (Pair<String, String> expectedField : expectedFields) {
System.out.printf(" \"%s\": %s%n", expectedField.lhs, expectedField.rhs);
}
System.out.println("}");
}
}

View File

@ -38,7 +38,7 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
protected void setup(Context context)
throws IOException, InterruptedException
{
config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
config = HadoopDruidIndexerConfigBuilder.fromConfiguration(context.getConfiguration());
parser = config.getParser();
}

View File

@ -1,138 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexer;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.io.CharStreams;
import com.google.common.io.InputSupplier;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.joda.time.Interval;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.List;
/**
*/
public class HadoopDruidIndexerNode
{
public static Builder builder()
{
return new Builder();
}
private String intervalSpec = null;
private String argumentSpec = null;
public String getIntervalSpec()
{
return intervalSpec;
}
public String getArgumentSpec()
{
return argumentSpec;
}
public HadoopDruidIndexerNode setIntervalSpec(String intervalSpec)
{
this.intervalSpec = intervalSpec;
return this;
}
public HadoopDruidIndexerNode setArgumentSpec(String argumentSpec)
{
this.argumentSpec = argumentSpec;
return this;
}
@SuppressWarnings("unchecked")
public HadoopDruidIndexerNode registerJacksonSubtype(Class<?>... clazzes)
{
HadoopDruidIndexerConfig.jsonMapper.registerSubtypes(clazzes);
return this;
}
@SuppressWarnings("unchecked")
public HadoopDruidIndexerNode registerJacksonSubtype(NamedType... namedTypes)
{
HadoopDruidIndexerConfig.jsonMapper.registerSubtypes(namedTypes);
return this;
}
@LifecycleStart
public void start() throws Exception
{
Preconditions.checkNotNull(argumentSpec, "argumentSpec");
final HadoopDruidIndexerConfig config;
if (argumentSpec.startsWith("{")) {
config = HadoopDruidIndexerConfig.fromString(argumentSpec);
} else if (argumentSpec.startsWith("s3://")) {
final Path s3nPath = new Path(String.format("s3n://%s", argumentSpec.substring("s3://".length())));
final FileSystem fs = s3nPath.getFileSystem(new Configuration());
String configString = CharStreams.toString(new InputSupplier<InputStreamReader>()
{
@Override
public InputStreamReader getInput() throws IOException
{
return new InputStreamReader(fs.open(s3nPath));
}
});
config = HadoopDruidIndexerConfig.fromString(configString);
} else {
config = HadoopDruidIndexerConfig.fromFile(new File(argumentSpec));
}
if (intervalSpec != null) {
final List<Interval> dataInterval = Lists.transform(
Arrays.asList(intervalSpec.split(",")),
new StringIntervalFunction()
);
config.setIntervals(dataInterval);
}
new HadoopDruidIndexerJob(config).run();
}
@LifecycleStop
public void stop()
{
}
public static class Builder
{
public HadoopDruidIndexerNode build()
{
return new HadoopDruidIndexerNode();
}
}
}

View File

@ -0,0 +1,194 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexer;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import io.druid.data.input.impl.DataSpec;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.indexer.granularity.GranularitySpec;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.rollup.DataRollupSpec;
import io.druid.indexer.updater.DbUpdaterJobSpec;
import org.joda.time.DateTime;
import java.util.List;
import java.util.Map;
/**
*/
public class HadoopDruidIndexerSchema
{
private final String dataSource;
private final TimestampSpec timestampSpec;
private final DataSpec dataSpec;
private final GranularitySpec granularitySpec;
private final Map<String, Object> pathSpec; // This cannot just be a PathSpec object
private final String workingPath;
private final String segmentOutputPath;
private final String version;
private final PartitionsSpec partitionsSpec;
private final boolean leaveIntermediate;
private final boolean cleanupOnFailure;
private final Map<DateTime, List<HadoopyShardSpec>> shardSpecs;
private final boolean overwriteFiles;
private final DataRollupSpec rollupSpec;
private final DbUpdaterJobSpec updaterJobSpec;
private final boolean ignoreInvalidRows;
@JsonCreator
public HadoopDruidIndexerSchema(
final @JsonProperty("dataSource") String dataSource,
final @JsonProperty("timestampSpec") TimestampSpec timestampSpec,
final @JsonProperty("dataSpec") DataSpec dataSpec,
final @JsonProperty("granularitySpec") GranularitySpec granularitySpec,
final @JsonProperty("pathSpec") Map<String, Object> pathSpec,
final @JsonProperty("workingPath") String workingPath,
final @JsonProperty("segmentOutputPath") String segmentOutputPath,
final @JsonProperty("version") String version,
final @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec,
final @JsonProperty("leaveIntermediate") boolean leaveIntermediate,
final @JsonProperty("cleanupOnFailure") Boolean cleanupOnFailure,
final @JsonProperty("shardSpecs") Map<DateTime, List<HadoopyShardSpec>> shardSpecs,
final @JsonProperty("overwriteFiles") boolean overwriteFiles,
final @JsonProperty("rollupSpec") DataRollupSpec rollupSpec,
final @JsonProperty("updaterJobSpec") DbUpdaterJobSpec updaterJobSpec,
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows,
// These fields are deprecated and will be removed in the future
final @JsonProperty("timestampColumn") String timestampColumn,
final @JsonProperty("timestampFormat") String timestampFormat
)
{
this.dataSource = dataSource;
this.timestampSpec = (timestampSpec == null) ? new TimestampSpec(timestampColumn, timestampFormat) : timestampSpec;
this.dataSpec = dataSpec;
this.granularitySpec = granularitySpec;
this.pathSpec = pathSpec;
this.workingPath = workingPath;
this.segmentOutputPath = segmentOutputPath;
this.version = version == null ? new DateTime().toString() : version;
this.partitionsSpec = partitionsSpec;
this.leaveIntermediate = leaveIntermediate;
this.cleanupOnFailure = (cleanupOnFailure == null ? true : cleanupOnFailure);
this.shardSpecs = (shardSpecs == null ? ImmutableMap.<DateTime, List<HadoopyShardSpec>>of() : shardSpecs);
this.overwriteFiles = overwriteFiles;
this.rollupSpec = rollupSpec;
this.updaterJobSpec = updaterJobSpec;
this.ignoreInvalidRows = ignoreInvalidRows;
}
@JsonProperty
public String getDataSource()
{
return dataSource;
}
@JsonProperty
public TimestampSpec getTimestampSpec()
{
return timestampSpec;
}
@JsonProperty
public DataSpec getDataSpec()
{
return dataSpec;
}
@JsonProperty
public GranularitySpec getGranularitySpec()
{
return granularitySpec;
}
@JsonProperty
public Map<String, Object> getPathSpec()
{
return pathSpec;
}
@JsonProperty
public String getWorkingPath()
{
return workingPath;
}
@JsonProperty
public String getSegmentOutputPath()
{
return segmentOutputPath;
}
@JsonProperty
public String getVersion()
{
return version;
}
@JsonProperty
public PartitionsSpec getPartitionsSpec()
{
return partitionsSpec;
}
@JsonProperty
public boolean isLeaveIntermediate()
{
return leaveIntermediate;
}
@JsonProperty
public boolean isCleanupOnFailure()
{
return cleanupOnFailure;
}
@JsonProperty
public Map<DateTime, List<HadoopyShardSpec>> getShardSpecs()
{
return shardSpecs;
}
@JsonProperty
public boolean isOverwriteFiles()
{
return overwriteFiles;
}
@JsonProperty
public DataRollupSpec getRollupSpec()
{
return rollupSpec;
}
@JsonProperty
public DbUpdaterJobSpec getUpdaterJobSpec()
{
return updaterJobSpec;
}
@JsonProperty
public boolean isIgnoreInvalidRows()
{
return ignoreInvalidRows;
}
}

View File

@ -86,6 +86,7 @@ public class IndexGeneratorJob implements Jobby
private static final Logger log = new Logger(IndexGeneratorJob.class);
private final HadoopDruidIndexerConfig config;
private IndexGeneratorStats jobStats;
public IndexGeneratorJob(
@ -243,7 +244,7 @@ public class IndexGeneratorJob implements Jobby
protected void setup(Context context)
throws IOException, InterruptedException
{
config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
config = HadoopDruidIndexerConfigBuilder.fromConfiguration(context.getConfiguration());
for (AggregatorFactory factory : config.getRollupSpec().getAggs()) {
metricNames.add(factory.getName().toLowerCase());

View File

@ -60,7 +60,7 @@ public class JobHelper
final Configuration conf = groupByJob.getConfiguration();
final FileSystem fs = FileSystem.get(conf);
Path distributedClassPath = new Path(config.getJobOutputDir(), "classpath");
Path distributedClassPath = new Path(config.getWorkingPath(), "classpath");
if (fs instanceof LocalFileSystem) {
return;

View File

@ -63,12 +63,16 @@ public class GranularUnprocessedPathSpec extends GranularityPathSpec
// This PathSpec breaks so many abstractions that we might as break some more
Preconditions.checkState(
config.getGranularitySpec() instanceof UniformGranularitySpec,
String.format("Cannot use %s without %s", GranularUnprocessedPathSpec.class.getSimpleName(), UniformGranularitySpec.class.getSimpleName())
String.format(
"Cannot use %s without %s",
GranularUnprocessedPathSpec.class.getSimpleName(),
UniformGranularitySpec.class.getSimpleName()
)
);
final Path betaInput = new Path(getInputPath());
final FileSystem fs = betaInput.getFileSystem(job.getConfiguration());
final Granularity segmentGranularity = ((UniformGranularitySpec)config.getGranularitySpec()).getGranularity();
final Granularity segmentGranularity = ((UniformGranularitySpec) config.getGranularitySpec()).getGranularity();
Map<DateTime, Long> inputModifiedTimes = new TreeMap<DateTime, Long>(
Comparators.inverse(Comparators.<Comparable>comparable())
@ -87,7 +91,11 @@ public class GranularUnprocessedPathSpec extends GranularityPathSpec
DateTime timeBucket = entry.getKey();
long mTime = entry.getValue();
String bucketOutput = String.format("%s/%s", config.getSegmentOutputDir(), segmentGranularity.toPath(timeBucket));
String bucketOutput = String.format(
"%s/%s",
config.getSegmentOutputPath(),
segmentGranularity.toPath(timeBucket)
);
for (FileStatus fileStatus : FSSpideringIterator.spiderIterable(fs, new Path(bucketOutput))) {
if (fileStatus.getModificationTime() > mTime) {
bucketsToRun.add(new Interval(timeBucket, segmentGranularity.increment(timeBucket)));

View File

@ -40,7 +40,8 @@ public class HadoopDruidIndexerConfigTest
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
@Test
public void testGranularitySpec() {
public void testGranularitySpec()
{
final HadoopDruidIndexerConfig cfg;
try {
@ -54,7 +55,8 @@ public class HadoopDruidIndexerConfigTest
+ "}",
HadoopDruidIndexerConfig.class
);
} catch(Exception e) {
}
catch (Exception e) {
throw Throwables.propagate(e);
}
@ -74,7 +76,8 @@ public class HadoopDruidIndexerConfigTest
}
@Test
public void testGranularitySpecLegacy() {
public void testGranularitySpecLegacy()
{
// Deprecated and replaced by granularitySpec, but still supported
final HadoopDruidIndexerConfig cfg;
@ -86,7 +89,8 @@ public class HadoopDruidIndexerConfigTest
+ "}",
HadoopDruidIndexerConfig.class
);
} catch(Exception e) {
}
catch (Exception e) {
throw Throwables.propagate(e);
}
@ -106,40 +110,8 @@ public class HadoopDruidIndexerConfigTest
}
@Test
public void testGranularitySpecPostConstructorIntervals() {
// Deprecated and replaced by granularitySpec, but still supported
final HadoopDruidIndexerConfig cfg;
try {
cfg = jsonMapper.readValue(
"{"
+ "\"segmentGranularity\":\"day\""
+ "}",
HadoopDruidIndexerConfig.class
);
} catch(Exception e) {
throw Throwables.propagate(e);
}
cfg.setIntervals(Lists.newArrayList(new Interval("2012-03-01/P1D")));
final UniformGranularitySpec granularitySpec = (UniformGranularitySpec) cfg.getGranularitySpec();
Assert.assertEquals(
"getIntervals",
Lists.newArrayList(new Interval("2012-03-01/P1D")),
granularitySpec.getIntervals()
);
Assert.assertEquals(
"getGranularity",
"DAY",
granularitySpec.getGranularity().toString()
);
}
@Test
public void testInvalidGranularityCombination() {
public void testInvalidGranularityCombination()
{
boolean thrown = false;
try {
final HadoopDruidIndexerConfig cfg = jsonReadWriteRead(
@ -154,7 +126,8 @@ public class HadoopDruidIndexerConfigTest
+ "}",
HadoopDruidIndexerConfig.class
);
} catch(Exception e) {
}
catch (Exception e) {
thrown = true;
}
@ -162,7 +135,8 @@ public class HadoopDruidIndexerConfigTest
}
@Test
public void testPartitionsSpecAutoDimension() {
public void testPartitionsSpecAutoDimension()
{
final HadoopDruidIndexerConfig cfg;
try {
@ -174,7 +148,8 @@ public class HadoopDruidIndexerConfigTest
+ "}",
HadoopDruidIndexerConfig.class
);
} catch(Exception e) {
}
catch (Exception e) {
throw Throwables.propagate(e);
}
@ -200,7 +175,8 @@ public class HadoopDruidIndexerConfigTest
}
@Test
public void testPartitionsSpecSpecificDimension() {
public void testPartitionsSpecSpecificDimension()
{
final HadoopDruidIndexerConfig cfg;
try {
@ -213,7 +189,8 @@ public class HadoopDruidIndexerConfigTest
+ "}",
HadoopDruidIndexerConfig.class
);
} catch(Exception e) {
}
catch (Exception e) {
throw Throwables.propagate(e);
}
@ -245,7 +222,8 @@ public class HadoopDruidIndexerConfigTest
}
@Test
public void testPartitionsSpecLegacy() {
public void testPartitionsSpecLegacy()
{
final HadoopDruidIndexerConfig cfg;
try {
@ -256,7 +234,8 @@ public class HadoopDruidIndexerConfigTest
+ "}",
HadoopDruidIndexerConfig.class
);
} catch(Exception e) {
}
catch (Exception e) {
throw Throwables.propagate(e);
}
@ -288,7 +267,8 @@ public class HadoopDruidIndexerConfigTest
}
@Test
public void testPartitionsSpecMaxPartitionSize() {
public void testPartitionsSpecMaxPartitionSize()
{
final HadoopDruidIndexerConfig cfg;
try {
@ -302,7 +282,8 @@ public class HadoopDruidIndexerConfigTest
+ "}",
HadoopDruidIndexerConfig.class
);
} catch(Exception e) {
}
catch (Exception e) {
throw Throwables.propagate(e);
}
@ -334,7 +315,8 @@ public class HadoopDruidIndexerConfigTest
}
@Test
public void testInvalidPartitionsCombination() {
public void testInvalidPartitionsCombination()
{
boolean thrown = false;
try {
final HadoopDruidIndexerConfig cfg = jsonReadWriteRead(
@ -346,7 +328,8 @@ public class HadoopDruidIndexerConfigTest
+ "}",
HadoopDruidIndexerConfig.class
);
} catch(Exception e) {
}
catch (Exception e) {
thrown = true;
}
@ -382,7 +365,8 @@ public class HadoopDruidIndexerConfigTest
}
@Test
public void testDefaultSettings() {
public void testDefaultSettings()
{
final HadoopDruidIndexerConfig cfg;
try {
@ -390,7 +374,8 @@ public class HadoopDruidIndexerConfigTest
"{}",
HadoopDruidIndexerConfig.class
);
} catch(Exception e) {
}
catch (Exception e) {
throw Throwables.propagate(e);
}
@ -414,7 +399,8 @@ public class HadoopDruidIndexerConfigTest
}
@Test
public void testNoCleanupOnFailure() {
public void testNoCleanupOnFailure()
{
final HadoopDruidIndexerConfig cfg;
try {
@ -422,7 +408,8 @@ public class HadoopDruidIndexerConfigTest
"{\"cleanupOnFailure\":false}",
HadoopDruidIndexerConfig.class
);
} catch(Exception e) {
}
catch (Exception e) {
throw Throwables.propagate(e);
}
@ -435,23 +422,25 @@ public class HadoopDruidIndexerConfigTest
@Test
public void shouldMakeHDFSCompliantSegmentOutputPath() {
public void shouldMakeHDFSCompliantSegmentOutputPath()
{
final HadoopDruidIndexerConfig cfg;
try {
cfg = jsonReadWriteRead(
"{"
+ "\"dataSource\": \"source\","
+ " \"granularitySpec\":{"
+ " \"type\":\"uniform\","
+ " \"gran\":\"hour\","
+ " \"intervals\":[\"2012-07-10/P1D\"]"
+ " },"
+ "\"segmentOutputPath\": \"hdfs://server:9100/tmp/druid/datatest\""
+ "}",
HadoopDruidIndexerConfig.class
"{"
+ "\"dataSource\": \"source\","
+ " \"granularitySpec\":{"
+ " \"type\":\"uniform\","
+ " \"gran\":\"hour\","
+ " \"intervals\":[\"2012-07-10/P1D\"]"
+ " },"
+ "\"segmentOutputPath\": \"hdfs://server:9100/tmp/druid/datatest\""
+ "}",
HadoopDruidIndexerConfig.class
);
} catch(Exception e) {
}
catch (Exception e) {
throw Throwables.propagate(e);
}
@ -466,23 +455,25 @@ public class HadoopDruidIndexerConfigTest
}
@Test
public void shouldMakeDefaultSegmentOutputPathIfNotHDFS() {
public void shouldMakeDefaultSegmentOutputPathIfNotHDFS()
{
final HadoopDruidIndexerConfig cfg;
try {
cfg = jsonReadWriteRead(
"{"
+ "\"dataSource\": \"the:data:source\","
+ " \"granularitySpec\":{"
+ " \"type\":\"uniform\","
+ " \"gran\":\"hour\","
+ " \"intervals\":[\"2012-07-10/P1D\"]"
+ " },"
+ "\"segmentOutputPath\": \"/tmp/dru:id/data:test\""
+ "}",
HadoopDruidIndexerConfig.class
"{"
+ "\"dataSource\": \"the:data:source\","
+ " \"granularitySpec\":{"
+ " \"type\":\"uniform\","
+ " \"gran\":\"hour\","
+ " \"intervals\":[\"2012-07-10/P1D\"]"
+ " },"
+ "\"segmentOutputPath\": \"/tmp/dru:id/data:test\""
+ "}",
HadoopDruidIndexerConfig.class
);
} catch(Exception e) {
}
catch (Exception e) {
throw Throwables.propagate(e);
}
@ -490,7 +481,10 @@ public class HadoopDruidIndexerConfigTest
Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712);
Path path = cfg.makeSegmentOutputPath(new LocalFileSystem(), bucket);
Assert.assertEquals("/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:version/4712", path.toString());
Assert.assertEquals(
"/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:version/4712",
path.toString()
);
}

View File

@ -22,34 +22,50 @@ package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.common.logger.Logger;
import io.druid.common.utils.JodaUtils;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.HadoopDruidIndexerConfigBuilder;
import io.druid.indexer.HadoopDruidIndexerJob;
import io.druid.indexer.HadoopDruidIndexerSchema;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.initialization.Initialization;
import io.druid.timeline.DataSegment;
import io.tesla.aether.internal.DefaultTeslaAether;
import org.joda.time.DateTime;
import java.io.File;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Arrays;
import java.util.List;
public class HadoopIndexTask extends AbstractTask
{
@JsonIgnore
private final HadoopDruidIndexerConfig config;
private static final Logger log = new Logger(HadoopIndexTask.class);
private static String defaultHadoopCoordinates = "org.apache.hadoop:hadoop-core:1.0.3";
@JsonIgnore
private final HadoopDruidIndexerSchema schema;
@JsonIgnore
private final String hadoopCoordinates;
/**
* @param config is used by the HadoopDruidIndexerJob to set up the appropriate parameters
* @param schema is used by the HadoopDruidIndexerJob to set up the appropriate parameters
* for creating Druid index segments. It may be modified.
* <p/>
* Here, we will ensure that the DbConnectorConfig field of the config is set to null, such that the
* Here, we will ensure that the DbConnectorConfig field of the schema is set to null, such that the
* job does not push a list of published segments the database. Instead, we will use the method
* IndexGeneratorJob.getPublishedSegments() to simply return a list of the published
* segments, and let the indexing service report these segments to the database.
@ -58,21 +74,24 @@ public class HadoopIndexTask extends AbstractTask
@JsonCreator
public HadoopIndexTask(
@JsonProperty("id") String id,
@JsonProperty("config") HadoopDruidIndexerConfig config
@JsonProperty("config") HadoopDruidIndexerSchema schema,
@JsonProperty("hadoopCoordinates") String hadoopCoordinates
)
{
super(
id != null ? id : String.format("index_hadoop_%s_%s", config.getDataSource(), new DateTime()),
config.getDataSource(),
JodaUtils.umbrellaInterval(config.getIntervals())
id != null ? id : String.format("index_hadoop_%s_%s", schema.getDataSource(), new DateTime()),
schema.getDataSource(),
JodaUtils.umbrellaInterval(JodaUtils.condenseIntervals(schema.getGranularitySpec().bucketIntervals()))
);
// Some HadoopDruidIndexerConfig stuff doesn't make sense in the context of the indexing service
Preconditions.checkArgument(config.getSegmentOutputDir() == null, "segmentOutputPath must be absent");
Preconditions.checkArgument(config.getJobOutputDir() == null, "workingPath must be absent");
Preconditions.checkArgument(!config.isUpdaterJobSpecSet(), "updaterJobSpec must be absent");
this.config = config;
// Some HadoopDruidIndexerSchema stuff doesn't make sense in the context of the indexing service
Preconditions.checkArgument(schema.getSegmentOutputPath() == null, "segmentOutputPath must be absent");
Preconditions.checkArgument(schema.getWorkingPath() == null, "workingPath must be absent");
Preconditions.checkArgument(schema.getUpdaterJobSpec() == null, "updaterJobSpec must be absent");
this.schema = schema;
this.hadoopCoordinates = (hadoopCoordinates == null ? defaultHadoopCoordinates : hadoopCoordinates);
}
@Override
@ -81,34 +100,64 @@ public class HadoopIndexTask extends AbstractTask
return "index_hadoop";
}
@JsonProperty("config")
public HadoopDruidIndexerSchema getSchema()
{
return schema;
}
@JsonProperty
public String getHadoopCoordinates()
{
return hadoopCoordinates;
}
@SuppressWarnings("unchecked")
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
// Copy config so we don't needlessly modify our provided one
// Also necessary to make constructor validations work upon serde-after-run
final HadoopDruidIndexerConfig configCopy = toolbox.getObjectMapper()
.readValue(
toolbox.getObjectMapper().writeValueAsBytes(config),
HadoopDruidIndexerConfig.class
);
// setup Hadoop
final DefaultTeslaAether aetherClient = new DefaultTeslaAether();
final ClassLoader hadoopLoader = Initialization.getClassLoaderForCoordinates(
aetherClient, hadoopCoordinates
);
final URL[] urLs = ((URLClassLoader) hadoopLoader).getURLs();
final URL[] nonHadoopUrls = ((URLClassLoader) HadoopIndexTask.class.getClassLoader()).getURLs();
List<URL> theURLS = Lists.newArrayList();
theURLS.addAll(Arrays.asList(urLs));
theURLS.addAll(Arrays.asList(nonHadoopUrls));
final URLClassLoader loader = new URLClassLoader(theURLS.toArray(new URL[theURLS.size()]), null);
Thread.currentThread().setContextClassLoader(loader);
System.setProperty("druid.hadoop.internal.classpath", Joiner.on(File.pathSeparator).join(nonHadoopUrls));
final Class<?> mainClass = loader.loadClass(HadoopIndexTaskInnerProcessing.class.getName());
final Method mainMethod = mainClass.getMethod("runTask", String[].class);
// We should have a lock from before we started running
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
log.info("Setting version to: %s", myLock.getVersion());
configCopy.setVersion(myLock.getVersion());
// Set workingPath to some reasonable default
configCopy.setJobOutputDir(toolbox.getConfig().getHadoopWorkingPath());
String[] args = new String[]{
toolbox.getObjectMapper().writeValueAsString(schema),
myLock.getVersion(),
toolbox.getConfig().getHadoopWorkingPath(),
toolbox.getSegmentPusher().getPathForHadoop(getDataSource()),
};
configCopy.setSegmentOutputDir(toolbox.getSegmentPusher().getPathForHadoop(getDataSource()));
String segments = (String) mainMethod.invoke(null, new Object[]{args});
HadoopDruidIndexerJob job = new HadoopDruidIndexerJob(configCopy);
configCopy.verify();
log.info("Starting a hadoop index generator job...");
if (job.run()) {
List<DataSegment> publishedSegments = job.getPublishedSegments();
if (segments != null) {
List<DataSegment> publishedSegments = toolbox.getObjectMapper().readValue(
segments, new TypeReference<List<DataSegment>>()
{
}
);
// Request segment pushes
toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(publishedSegments)));
@ -117,12 +166,41 @@ public class HadoopIndexTask extends AbstractTask
} else {
return TaskStatus.failure(getId());
}
}
@JsonProperty
public HadoopDruidIndexerConfig getConfig()
public static class HadoopIndexTaskInnerProcessing
{
return config;
public static String runTask(String[] args) throws Exception
{
final String schema = args[0];
final String version = args[1];
final String workingPath = args[2];
final String segmentOutputPath = args[3];
final HadoopDruidIndexerSchema theSchema = HadoopDruidIndexerConfig.jsonMapper
.readValue(
schema,
HadoopDruidIndexerSchema.class
);
final HadoopDruidIndexerConfig config =
new HadoopDruidIndexerConfigBuilder().withSchema(theSchema)
.withVersion(version)
.withWorkingPath(
workingPath
)
.withSegmentOutputPath(
segmentOutputPath
)
.build();
HadoopDruidIndexerJob job = new HadoopDruidIndexerJob(config);
log.info("Starting a hadoop index generator job...");
if (job.run()) {
return HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(job.getPublishedSegments());
}
return null;
}
}
}

View File

@ -22,12 +22,13 @@ package io.druid.indexing.common.task;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.Granularity;
import io.druid.data.input.impl.JSONDataSpec;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.HadoopDruidIndexerSchema;
import io.druid.indexer.granularity.UniformGranularitySpec;
import io.druid.indexer.path.StaticPathSpec;
import io.druid.indexer.rollup.DataRollupSpec;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
@ -338,17 +339,12 @@ public class TaskSerdeTest
{
final HadoopIndexTask task = new HadoopIndexTask(
null,
new HadoopDruidIndexerConfig(
null,
new HadoopDruidIndexerSchema(
"foo",
"timestamp",
"auto",
new TimestampSpec("timestamp", "auto"),
new JSONDataSpec(ImmutableList.of("foo"), null),
null,
new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P1D"))),
new StaticPathSpec("bar"),
null,
null,
ImmutableMap.<String, Object>of("paths", "bar"),
null,
null,
null,
@ -359,8 +355,11 @@ public class TaskSerdeTest
false,
new DataRollupSpec(ImmutableList.<AggregatorFactory>of(), QueryGranularity.NONE),
null,
false
)
false,
null,
null
),
null
);
final ObjectMapper jsonMapper = new DefaultObjectMapper();

View File

@ -41,7 +41,7 @@ import java.util.List;
*/
@Command(
name = "hadoop",
description = "Runs the batch Hadoop Druid Indexer, see https://github.com/metamx/druid/wiki/Batch-ingestion for a description."
description = "Runs the batch Hadoop Druid Indexer, see http://druid.io/docs/0.6.0/Batch-ingestion.html for a description."
)
public class CliHadoopIndexer implements Runnable
{

View File

@ -24,8 +24,8 @@ import com.metamx.common.logger.Logger;
import io.airlift.command.Arguments;
import io.airlift.command.Command;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.HadoopDruidIndexerConfigBuilder;
import io.druid.indexer.HadoopDruidIndexerJob;
import io.druid.timeline.partition.SingleDimensionShardSpec;
import java.io.File;
@ -58,9 +58,9 @@ public class CliInternalHadoopIndexer implements Runnable
{
try {
if (argumentSpec.startsWith("{")) {
return HadoopDruidIndexerConfig.fromString(argumentSpec);
return HadoopDruidIndexerConfigBuilder.fromString(argumentSpec);
} else {
return HadoopDruidIndexerConfig.fromFile(new File(argumentSpec));
return HadoopDruidIndexerConfigBuilder.fromFile(new File(argumentSpec));
}
}
catch (Exception e) {

View File

@ -21,12 +21,9 @@ package io.druid.cli;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.api.client.util.Lists;
import com.google.common.base.Joiner;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.multibindings.MapBinder;
@ -43,7 +40,6 @@ import io.druid.guice.LifecycleModule;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.NodeTypeConfig;
import io.druid.guice.PolyBind;
import io.druid.indexer.Utils;
import io.druid.indexing.common.RetryPolicyConfig;
import io.druid.indexing.common.RetryPolicyFactory;
import io.druid.indexing.common.TaskToolboxFactory;
@ -66,20 +62,15 @@ import io.druid.indexing.worker.executor.ChatHandlerResource;
import io.druid.indexing.worker.executor.ExecutorLifecycle;
import io.druid.indexing.worker.executor.ExecutorLifecycleConfig;
import io.druid.initialization.DruidModule;
import io.druid.initialization.Initialization;
import io.druid.query.QuerySegmentWalker;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.OmniDataSegmentKiller;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.StorageLocationConfig;
import io.druid.server.initialization.ExtensionsConfig;
import io.druid.server.initialization.JettyServerInitializer;
import io.tesla.aether.internal.DefaultTeslaAether;
import org.eclipse.jetty.server.Server;
import java.io.File;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Arrays;
import java.util.List;