mirror of https://github.com/apache/druid.git
make the hadoop index task work again
This commit is contained in:
parent
9796a40b92
commit
a1c09df17f
|
@ -24,8 +24,10 @@ The interval is the [ISO8601 interval](http://en.wikipedia.org/wiki/ISO_8601#Tim
|
|||
```json
|
||||
{
|
||||
"dataSource": "the_data_source",
|
||||
"timestampColumn": "ts",
|
||||
"timestampFormat": "<iso, millis, posix, auto or any Joda time format>",
|
||||
"timestampSpec" : {
|
||||
"timestampColumn": "ts",
|
||||
"timestampFormat": "<iso, millis, posix, auto or any Joda time format>"
|
||||
},
|
||||
"dataSpec": {
|
||||
"format": "<csv, tsv, or json>",
|
||||
"columns": [
|
||||
|
@ -95,18 +97,17 @@ The interval is the [ISO8601 interval](http://en.wikipedia.org/wiki/ISO_8601#Tim
|
|||
|property|description|required?|
|
||||
|--------|-----------|---------|
|
||||
|dataSource|name of the dataSource the data will belong to|yes|
|
||||
|timestampColumn|the column that is to be used as the timestamp column|yes|
|
||||
|timestampFormat|the format of timestamps; auto = either iso or millis, Joda time formats:http://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html|yes|
|
||||
|dataSpec|a specification of the data format and an array that names all of the columns in the input data|yes|
|
||||
|dimensions|the columns that are to be used as dimensions|yes|
|
||||
|granularitySpec|the time granularity and interval to chunk segments up into|yes|
|
||||
|timestampSpec|includes the column that is to be used as the timestamp column and the format of the timestamps; auto = either iso or millis, Joda time formats can be found [here](http://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html).|yes|
|
||||
|dataSpec|a specification of the data format and an array that names all of the columns in the input data.|yes|
|
||||
|dimensions|the columns that are to be used as dimensions.|yes|
|
||||
|granularitySpec|the time granularity and interval to chunk segments up into.|yes|
|
||||
|pathSpec|a specification of where to pull the data in from|yes|
|
||||
|rollupSpec|a specification of the rollup to perform while processing the data|yes|
|
||||
|workingPath|the working path to use for intermediate results (results between Hadoop jobs)|yes|
|
||||
|segmentOutputPath|the path to dump segments into|yes|
|
||||
|leaveIntermediate|leave behind files in the workingPath when job completes or fails (debugging tool)|no|
|
||||
|partitionsSpec|a specification of how to partition each time bucket into segments, absence of this property means no partitioning will occur|no|
|
||||
|updaterJobSpec|a specification of how to update the metadata for the druid cluster these segments belong to|yes|
|
||||
|workingPath|the working path to use for intermediate results (results between Hadoop jobs).|yes|
|
||||
|segmentOutputPath|the path to dump segments into.|yes|
|
||||
|leaveIntermediate|leave behind files in the workingPath when job completes or fails (debugging tool).|no|
|
||||
|partitionsSpec|a specification of how to partition each time bucket into segments, absence of this property means no partitioning will occur.|no|
|
||||
|updaterJobSpec|a specification of how to update the metadata for the druid cluster these segments belong to.|yes|
|
||||
|
||||
### Path specification
|
||||
|
||||
|
@ -118,9 +119,9 @@ Is a type of data loader that expects data to be laid out in a specific path for
|
|||
|
||||
|property|description|required?|
|
||||
|--------|-----------|---------|
|
||||
|dataGranularity|specifies the granularity to expect the data at, e.g. hour means to expect directories `y=XXXX/m=XX/d=XX/H=XX`|yes|
|
||||
|inputPath|Base path to append the expected time path to|yes|
|
||||
|filePattern|Pattern that files should match to be included|yes|
|
||||
|dataGranularity|specifies the granularity to expect the data at, e.g. hour means to expect directories `y=XXXX/m=XX/d=XX/H=XX`.|yes|
|
||||
|inputPath|Base path to append the expected time path to.|yes|
|
||||
|filePattern|Pattern that files should match to be included.|yes|
|
||||
|
||||
For example, if the sample config were run with the interval 2012-06-01/2012-06-02, it would expect data at the paths
|
||||
|
||||
|
@ -138,7 +139,7 @@ The indexing process has the ability to roll data up as it processes the incomin
|
|||
|property|description|required?|
|
||||
|--------|-----------|---------|
|
||||
|aggs|specifies a list of aggregators to aggregate for each bucket (a bucket is defined by the tuple of the truncated timestamp and the dimensions). Aggregators available here are the same as available when querying.|yes|
|
||||
|rollupGranularity|The granularity to use when truncating incoming timestamps for bucketization|yes|
|
||||
|rollupGranularity|The granularity to use when truncating incoming timestamps for bucketization.|yes|
|
||||
|
||||
### Partitioning specification
|
||||
|
||||
|
@ -158,11 +159,11 @@ This is a specification of the properties that tell the job how to update metada
|
|||
|
||||
|property|description|required?|
|
||||
|--------|-----------|---------|
|
||||
|type|"db" is the only value available|yes|
|
||||
|connectURI|a valid JDBC url to MySQL|yes|
|
||||
|user|username for db|yes|
|
||||
|password|password for db|yes|
|
||||
|segmentTable|table to use in DB|yes|
|
||||
|type|"db" is the only value available.|yes|
|
||||
|connectURI|A valid JDBC url to MySQL.|yes|
|
||||
|user|Username for db.|yes|
|
||||
|password|password for db.|yes|
|
||||
|segmentTable|Table to use in DB.|yes|
|
||||
|
||||
These properties should parrot what you have configured for your [Coordinator](Coordinator.html).
|
||||
|
||||
|
@ -177,15 +178,17 @@ java -Xmx2g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath lib/*:config/ov
|
|||
|
||||
This will start up a very simple local indexing service. For more complex deployments of the indexing service, see [here](Indexing-Service.html).
|
||||
|
||||
The schema of the Hadoop Index Task is very similar to the schema for the Hadoop Index Config. A sample Hadoop index task is shown below:
|
||||
The schema of the Hadoop Index Task contains a task "type" and a Hadoop Index Config. A sample Hadoop index task is shown below:
|
||||
|
||||
```json
|
||||
{
|
||||
"type" : "index_hadoop",
|
||||
"config": {
|
||||
"dataSource" : "example",
|
||||
"timestampColumn" : "timestamp",
|
||||
"timestampFormat" : "auto",
|
||||
"timestampSpec" : {
|
||||
"timestampColumn" : "timestamp",
|
||||
"timestampFormat" : "auto"
|
||||
},
|
||||
"dataSpec" : {
|
||||
"format" : "json",
|
||||
"dimensions" : ["dim1","dim2","dim3"]
|
||||
|
@ -224,8 +227,23 @@ The schema of the Hadoop Index Task is very similar to the schema for the Hadoop
|
|||
|
||||
|property|description|required?|
|
||||
|--------|-----------|---------|
|
||||
|type|"index_hadoop"|yes|
|
||||
|config|a Hadoop Index Config|yes|
|
||||
|type|This should be "index_hadoop".|yes|
|
||||
|config|A Hadoop Index Config.|yes|
|
||||
|
||||
The Hadoop Index Config submitted as part of an Hadoop Index Task is identical to the Hadoop Index Config used by the `HadoopBatchIndexer` except that three fields must be omitted: `segmentOutputPath`, `workingPath`, `updaterJobSpec`. The Indexing Service takes care of setting these fields internally.
|
||||
|
||||
To run the task:
|
||||
|
||||
```
|
||||
curl -X 'POST' -H 'Content-Type:application/json' -d @example_index_hadoop_task.json localhost:8087/druid/indexer/v1/task
|
||||
```
|
||||
|
||||
If the task succeeds, you should see in the logs of the indexing service:
|
||||
|
||||
```
|
||||
2013-10-16 16:38:31,945 INFO [pool-6-thread-1] io.druid.indexing.overlord.exec.TaskConsumer - Task SUCCESS: HadoopIndexTask...
|
||||
```
|
||||
|
||||
Having Problems?
|
||||
----------------
|
||||
Getting data into Druid can definitely be difficult for first time users. Please don't hesitate to ask questions in our IRC channel or on our [google groups page](https://groups.google.com/forum/#!forum/druid-development).
|
||||
|
|
|
@ -81,13 +81,13 @@ import java.util.Set;
|
|||
/**
|
||||
* Determines appropriate ShardSpecs for a job by determining whether or not partitioning is necessary, and if so,
|
||||
* choosing the best dimension that satisfies the criteria:
|
||||
*
|
||||
* <p/>
|
||||
* <ul>
|
||||
* <li>Must have exactly one value per row.</li>
|
||||
* <li>Must not generate oversized partitions. A dimension with N rows having the same value will necessarily
|
||||
* put all those rows in the same partition, and that partition may be much larger than the target size.</li>
|
||||
* <li>Must have exactly one value per row.</li>
|
||||
* <li>Must not generate oversized partitions. A dimension with N rows having the same value will necessarily
|
||||
* put all those rows in the same partition, and that partition may be much larger than the target size.</li>
|
||||
* </ul>
|
||||
*
|
||||
* <p/>
|
||||
* "Best" means a very high cardinality dimension, or, if none exist, the dimension that minimizes variation of
|
||||
* segment size relative to the target.
|
||||
*/
|
||||
|
@ -125,7 +125,7 @@ public class DeterminePartitionsJob implements Jobby
|
|||
* in the final segment.
|
||||
*/
|
||||
|
||||
if(!config.getPartitionsSpec().isAssumeGrouped()) {
|
||||
if (!config.getPartitionsSpec().isAssumeGrouped()) {
|
||||
final Job groupByJob = new Job(
|
||||
new Configuration(),
|
||||
String.format("%s-determine_partitions_groupby-%s", config.getDataSource(), config.getIntervals())
|
||||
|
@ -150,7 +150,7 @@ public class DeterminePartitionsJob implements Jobby
|
|||
groupByJob.submit();
|
||||
log.info("Job %s submitted, status available at: %s", groupByJob.getJobName(), groupByJob.getTrackingURL());
|
||||
|
||||
if(!groupByJob.waitForCompletion(true)) {
|
||||
if (!groupByJob.waitForCompletion(true)) {
|
||||
log.error("Job failed: %s", groupByJob.getJobID());
|
||||
return false;
|
||||
}
|
||||
|
@ -170,7 +170,7 @@ public class DeterminePartitionsJob implements Jobby
|
|||
|
||||
injectSystemProperties(dimSelectionJob);
|
||||
|
||||
if(!config.getPartitionsSpec().isAssumeGrouped()) {
|
||||
if (!config.getPartitionsSpec().isAssumeGrouped()) {
|
||||
// Read grouped data from the groupByJob.
|
||||
dimSelectionJob.setMapperClass(DeterminePartitionsDimSelectionPostGroupByMapper.class);
|
||||
dimSelectionJob.setInputFormatClass(SequenceFileInputFormat.class);
|
||||
|
@ -203,7 +203,7 @@ public class DeterminePartitionsJob implements Jobby
|
|||
dimSelectionJob.getTrackingURL()
|
||||
);
|
||||
|
||||
if(!dimSelectionJob.waitForCompletion(true)) {
|
||||
if (!dimSelectionJob.waitForCompletion(true)) {
|
||||
log.error("Job failed: %s", dimSelectionJob.getJobID().toString());
|
||||
return false;
|
||||
}
|
||||
|
@ -237,15 +237,15 @@ public class DeterminePartitionsJob implements Jobby
|
|||
}
|
||||
|
||||
shardSpecs.put(bucket, actualSpecs);
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
log.info("Path[%s] didn't exist!?", partitionInfoPath);
|
||||
}
|
||||
}
|
||||
config.setShardSpecs(shardSpecs);
|
||||
|
||||
return true;
|
||||
} catch(Exception e) {
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
@ -271,9 +271,9 @@ public class DeterminePartitionsJob implements Jobby
|
|||
{
|
||||
// Create group key, there are probably more efficient ways of doing this
|
||||
final Map<String, Set<String>> dims = Maps.newTreeMap();
|
||||
for(final String dim : inputRow.getDimensions()) {
|
||||
for (final String dim : inputRow.getDimensions()) {
|
||||
final Set<String> dimValues = ImmutableSortedSet.copyOf(inputRow.getDimension(dim));
|
||||
if(dimValues.size() > 0) {
|
||||
if (dimValues.size() > 0) {
|
||||
dims.put(dim, dimValues);
|
||||
}
|
||||
}
|
||||
|
@ -314,7 +314,7 @@ public class DeterminePartitionsJob implements Jobby
|
|||
protected void setup(Context context)
|
||||
throws IOException, InterruptedException
|
||||
{
|
||||
final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
|
||||
final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfigBuilder.fromConfiguration(context.getConfiguration());
|
||||
final String partitionDimension = config.getPartitionDimension();
|
||||
helper = new DeterminePartitionsDimSelectionMapperHelper(config, partitionDimension);
|
||||
}
|
||||
|
@ -346,7 +346,7 @@ public class DeterminePartitionsJob implements Jobby
|
|||
throws IOException, InterruptedException
|
||||
{
|
||||
super.setup(context);
|
||||
final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
|
||||
final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfigBuilder.fromConfiguration(context.getConfiguration());
|
||||
final String partitionDimension = config.getPartitionDimension();
|
||||
helper = new DeterminePartitionsDimSelectionMapperHelper(config, partitionDimension);
|
||||
}
|
||||
|
@ -359,7 +359,7 @@ public class DeterminePartitionsJob implements Jobby
|
|||
) throws IOException, InterruptedException
|
||||
{
|
||||
final Map<String, Iterable<String>> dims = Maps.newHashMap();
|
||||
for(final String dim : inputRow.getDimensions()) {
|
||||
for (final String dim : inputRow.getDimensions()) {
|
||||
dims.put(dim, inputRow.getDimension(dim));
|
||||
}
|
||||
helper.emitDimValueCounts(context, new DateTime(inputRow.getTimestampFromEpoch()), dims);
|
||||
|
@ -383,9 +383,9 @@ public class DeterminePartitionsJob implements Jobby
|
|||
|
||||
final ImmutableMap.Builder<DateTime, Integer> timeIndexBuilder = ImmutableMap.builder();
|
||||
int idx = 0;
|
||||
for(final Interval bucketInterval: config.getGranularitySpec().bucketIntervals()) {
|
||||
for (final Interval bucketInterval : config.getGranularitySpec().bucketIntervals()) {
|
||||
timeIndexBuilder.put(bucketInterval.getStart(), idx);
|
||||
idx ++;
|
||||
idx++;
|
||||
}
|
||||
|
||||
this.intervalIndexes = timeIndexBuilder.build();
|
||||
|
@ -399,7 +399,7 @@ public class DeterminePartitionsJob implements Jobby
|
|||
{
|
||||
final Optional<Interval> maybeInterval = config.getGranularitySpec().bucketInterval(timestamp);
|
||||
|
||||
if(!maybeInterval.isPresent()) {
|
||||
if (!maybeInterval.isPresent()) {
|
||||
throw new ISE("WTF?! No bucket found for timestamp: %s", timestamp);
|
||||
}
|
||||
|
||||
|
@ -414,13 +414,13 @@ public class DeterminePartitionsJob implements Jobby
|
|||
// Emit row-counter value.
|
||||
write(context, groupKey, new DimValueCount("", "", 1));
|
||||
|
||||
for(final Map.Entry<String, Iterable<String>> dimAndValues : dims.entrySet()) {
|
||||
for (final Map.Entry<String, Iterable<String>> dimAndValues : dims.entrySet()) {
|
||||
final String dim = dimAndValues.getKey();
|
||||
|
||||
if(partitionDimension == null || partitionDimension.equals(dim)) {
|
||||
if (partitionDimension == null || partitionDimension.equals(dim)) {
|
||||
final Iterable<String> dimValues = dimAndValues.getValue();
|
||||
|
||||
if(Iterables.size(dimValues) == 1) {
|
||||
if (Iterables.size(dimValues) == 1) {
|
||||
// Emit this value.
|
||||
write(context, groupKey, new DimValueCount(dim, Iterables.getOnlyElement(dimValues), 1));
|
||||
} else {
|
||||
|
@ -433,7 +433,7 @@ public class DeterminePartitionsJob implements Jobby
|
|||
}
|
||||
|
||||
public static class DeterminePartitionsDimSelectionPartitioner
|
||||
extends Partitioner<BytesWritable, Text>
|
||||
extends Partitioner<BytesWritable, Text>
|
||||
{
|
||||
@Override
|
||||
public int getPartition(BytesWritable bytesWritable, Text text, int numPartitions)
|
||||
|
@ -463,7 +463,7 @@ public class DeterminePartitionsJob implements Jobby
|
|||
if (config == null) {
|
||||
synchronized (DeterminePartitionsDimSelectionBaseReducer.class) {
|
||||
if (config == null) {
|
||||
config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
|
||||
config = HadoopDruidIndexerConfigBuilder.fromConfiguration(context.getConfiguration());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -561,7 +561,7 @@ public class DeterminePartitionsJob implements Jobby
|
|||
final DimValueCount firstDvc = iterator.next();
|
||||
final int totalRows = firstDvc.numRows;
|
||||
|
||||
if(!firstDvc.dim.equals("") || !firstDvc.value.equals("")) {
|
||||
if (!firstDvc.dim.equals("") || !firstDvc.value.equals("")) {
|
||||
throw new IllegalStateException("WTF?! Expected total row indicator on first k/v pair!");
|
||||
}
|
||||
|
||||
|
@ -574,10 +574,10 @@ public class DeterminePartitionsJob implements Jobby
|
|||
// We'll store possible partitions in here
|
||||
final Map<String, DimPartitions> dimPartitionss = Maps.newHashMap();
|
||||
|
||||
while(iterator.hasNext()) {
|
||||
while (iterator.hasNext()) {
|
||||
final DimValueCount dvc = iterator.next();
|
||||
|
||||
if(currentDimPartitions == null || !currentDimPartitions.dim.equals(dvc.dim)) {
|
||||
if (currentDimPartitions == null || !currentDimPartitions.dim.equals(dvc.dim)) {
|
||||
// Starting a new dimension! Exciting!
|
||||
currentDimPartitions = new DimPartitions(dvc.dim);
|
||||
currentDimPartition = new DimPartition();
|
||||
|
@ -586,17 +586,17 @@ public class DeterminePartitionsJob implements Jobby
|
|||
}
|
||||
|
||||
// Respect poisoning
|
||||
if(!currentDimSkip && dvc.numRows < 0) {
|
||||
if (!currentDimSkip && dvc.numRows < 0) {
|
||||
log.info("Cannot partition on multi-valued dimension: %s", dvc.dim);
|
||||
currentDimSkip = true;
|
||||
}
|
||||
|
||||
if(currentDimSkip) {
|
||||
if (currentDimSkip) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// See if we need to cut a new partition ending immediately before this dimension value
|
||||
if(currentDimPartition.rows > 0 && currentDimPartition.rows + dvc.numRows >= config.getTargetPartitionSize()) {
|
||||
if (currentDimPartition.rows > 0 && currentDimPartition.rows + dvc.numRows >= config.getTargetPartitionSize()) {
|
||||
final ShardSpec shardSpec = new SingleDimensionShardSpec(
|
||||
currentDimPartitions.dim,
|
||||
currentDimPartitionStart,
|
||||
|
@ -618,20 +618,20 @@ public class DeterminePartitionsJob implements Jobby
|
|||
}
|
||||
|
||||
// Update counters
|
||||
currentDimPartition.cardinality ++;
|
||||
currentDimPartition.cardinality++;
|
||||
currentDimPartition.rows += dvc.numRows;
|
||||
|
||||
if(!iterator.hasNext() || !currentDimPartitions.dim.equals(iterator.peek().dim)) {
|
||||
if (!iterator.hasNext() || !currentDimPartitions.dim.equals(iterator.peek().dim)) {
|
||||
// Finalize the current dimension
|
||||
|
||||
if(currentDimPartition.rows > 0) {
|
||||
if (currentDimPartition.rows > 0) {
|
||||
// One more shard to go
|
||||
final ShardSpec shardSpec;
|
||||
|
||||
if (currentDimPartitions.partitions.isEmpty()) {
|
||||
shardSpec = new NoneShardSpec();
|
||||
} else {
|
||||
if(currentDimPartition.rows < config.getTargetPartitionSize() * SHARD_COMBINE_THRESHOLD) {
|
||||
if (currentDimPartition.rows < config.getTargetPartitionSize() * SHARD_COMBINE_THRESHOLD) {
|
||||
// Combine with previous shard
|
||||
final DimPartition previousDimPartition = currentDimPartitions.partitions.remove(
|
||||
currentDimPartitions.partitions.size() - 1
|
||||
|
@ -685,7 +685,7 @@ public class DeterminePartitionsJob implements Jobby
|
|||
}
|
||||
|
||||
// Choose best dimension
|
||||
if(dimPartitionss.isEmpty()) {
|
||||
if (dimPartitionss.isEmpty()) {
|
||||
throw new ISE("No suitable partitioning dimension found!");
|
||||
}
|
||||
|
||||
|
@ -694,8 +694,8 @@ public class DeterminePartitionsJob implements Jobby
|
|||
DimPartitions minDistancePartitions = null;
|
||||
DimPartitions maxCardinalityPartitions = null;
|
||||
|
||||
for(final DimPartitions dimPartitions : dimPartitionss.values()) {
|
||||
if(dimPartitions.getRows() != totalRows) {
|
||||
for (final DimPartitions dimPartitions : dimPartitionss.values()) {
|
||||
if (dimPartitions.getRows() != totalRows) {
|
||||
log.info(
|
||||
"Dimension[%s] is not present in all rows (row count %,d != expected row count %,d)",
|
||||
dimPartitions.dim,
|
||||
|
@ -708,32 +708,32 @@ public class DeterminePartitionsJob implements Jobby
|
|||
|
||||
// Make sure none of these shards are oversized
|
||||
boolean oversized = false;
|
||||
for(final DimPartition partition : dimPartitions.partitions) {
|
||||
if(partition.rows > config.getMaxPartitionSize()) {
|
||||
for (final DimPartition partition : dimPartitions.partitions) {
|
||||
if (partition.rows > config.getMaxPartitionSize()) {
|
||||
log.info("Dimension[%s] has an oversized shard: %s", dimPartitions.dim, partition.shardSpec);
|
||||
oversized = true;
|
||||
}
|
||||
}
|
||||
|
||||
if(oversized) {
|
||||
if (oversized) {
|
||||
continue;
|
||||
}
|
||||
|
||||
final int cardinality = dimPartitions.getCardinality();
|
||||
final long distance = dimPartitions.getDistanceSquaredFromTarget(config.getTargetPartitionSize());
|
||||
|
||||
if(cardinality > maxCardinality) {
|
||||
if (cardinality > maxCardinality) {
|
||||
maxCardinality = cardinality;
|
||||
maxCardinalityPartitions = dimPartitions;
|
||||
}
|
||||
|
||||
if(distance < minDistance) {
|
||||
if (distance < minDistance) {
|
||||
minDistance = distance;
|
||||
minDistancePartitions = dimPartitions;
|
||||
}
|
||||
}
|
||||
|
||||
if(maxCardinalityPartitions == null) {
|
||||
if (maxCardinalityPartitions == null) {
|
||||
throw new ISE("No suitable partitioning dimension found!");
|
||||
}
|
||||
|
||||
|
@ -821,7 +821,7 @@ public class DeterminePartitionsJob implements Jobby
|
|||
public int getCardinality()
|
||||
{
|
||||
int sum = 0;
|
||||
for(final DimPartition dimPartition : partitions) {
|
||||
for (final DimPartition dimPartition : partitions) {
|
||||
sum += dimPartition.cardinality;
|
||||
}
|
||||
return sum;
|
||||
|
@ -830,7 +830,7 @@ public class DeterminePartitionsJob implements Jobby
|
|||
public long getDistanceSquaredFromTarget(long target)
|
||||
{
|
||||
long distance = 0;
|
||||
for(final DimPartition dimPartition : partitions) {
|
||||
for (final DimPartition dimPartition : partitions) {
|
||||
distance += (dimPartition.rows - target) * (dimPartition.rows - target);
|
||||
}
|
||||
|
||||
|
@ -841,7 +841,7 @@ public class DeterminePartitionsJob implements Jobby
|
|||
public int getRows()
|
||||
{
|
||||
int sum = 0;
|
||||
for(final DimPartition dimPartition : partitions) {
|
||||
for (final DimPartition dimPartition : partitions) {
|
||||
sum += dimPartition.rows;
|
||||
}
|
||||
return sum;
|
||||
|
@ -892,7 +892,11 @@ public class DeterminePartitionsJob implements Jobby
|
|||
throws IOException, InterruptedException
|
||||
{
|
||||
context.write(
|
||||
new SortableBytes(groupKey, tabJoiner.join(dimValueCount.dim, dimValueCount.value).getBytes(HadoopDruidIndexerConfig.javaNativeCharset)).toBytesWritable(),
|
||||
new SortableBytes(
|
||||
groupKey, tabJoiner.join(dimValueCount.dim, dimValueCount.value).getBytes(
|
||||
HadoopDruidIndexerConfig.javaNativeCharset
|
||||
)
|
||||
).toBytesWritable(),
|
||||
dimValueCount.toText()
|
||||
);
|
||||
}
|
||||
|
|
|
@ -21,7 +21,6 @@ package io.druid.indexer;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Joiner;
|
||||
|
@ -68,7 +67,6 @@ import org.joda.time.DateTime;
|
|||
import org.joda.time.Interval;
|
||||
import org.joda.time.format.ISODateTimeFormat;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.Collections;
|
||||
|
@ -80,13 +78,13 @@ import java.util.Set;
|
|||
*/
|
||||
public class HadoopDruidIndexerConfig
|
||||
{
|
||||
public static final Charset javaNativeCharset = Charset.forName("Unicode");
|
||||
|
||||
public static final Splitter tabSplitter = Splitter.on("\t");
|
||||
public static final Joiner tabJoiner = Joiner.on("\t");
|
||||
|
||||
private static final Logger log = new Logger(HadoopDruidIndexerConfig.class);
|
||||
private static final Injector injector;
|
||||
|
||||
public static final String CONFIG_PROPERTY = "druid.indexer.config";
|
||||
public static final Charset javaNativeCharset = Charset.forName("Unicode");
|
||||
public static final Splitter tabSplitter = Splitter.on("\t");
|
||||
public static final Joiner tabJoiner = Joiner.on("\t");
|
||||
public static final ObjectMapper jsonMapper;
|
||||
|
||||
static {
|
||||
|
@ -113,85 +111,33 @@ public class HadoopDruidIndexerConfig
|
|||
INVALID_ROW_COUNTER
|
||||
}
|
||||
|
||||
public static HadoopDruidIndexerConfig fromMap(Map<String, Object> argSpec)
|
||||
{
|
||||
return jsonMapper.convertValue(argSpec, HadoopDruidIndexerConfig.class);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static HadoopDruidIndexerConfig fromFile(File file)
|
||||
{
|
||||
try {
|
||||
return fromMap((Map<String, Object>) jsonMapper.readValue(file, new TypeReference<Map<String, Object>>(){}));
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static HadoopDruidIndexerConfig fromString(String str)
|
||||
{
|
||||
try {
|
||||
return fromMap(
|
||||
(Map<String, Object>) jsonMapper.readValue(
|
||||
str, new TypeReference<Map<String, Object>>()
|
||||
{
|
||||
}
|
||||
)
|
||||
);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static HadoopDruidIndexerConfig fromConfiguration(Configuration conf)
|
||||
{
|
||||
final HadoopDruidIndexerConfig retVal = fromString(conf.get(CONFIG_PROPERTY));
|
||||
retVal.verify();
|
||||
return retVal;
|
||||
}
|
||||
|
||||
private static final Logger log = new Logger(HadoopDruidIndexerConfig.class);
|
||||
|
||||
private static final String CONFIG_PROPERTY = "druid.indexer.config";
|
||||
|
||||
private volatile String dataSource;
|
||||
private volatile String timestampColumnName;
|
||||
private volatile String timestampFormat;
|
||||
private volatile TimestampSpec timestampSpec;
|
||||
private volatile DataSpec dataSpec;
|
||||
@Deprecated
|
||||
private volatile Granularity segmentGranularity;
|
||||
private volatile GranularitySpec granularitySpec;
|
||||
private volatile PathSpec pathSpec;
|
||||
private volatile String jobOutputDir;
|
||||
private volatile String segmentOutputDir;
|
||||
private volatile String version = new DateTime().toString();
|
||||
private volatile String workingPath;
|
||||
private volatile String segmentOutputPath;
|
||||
private volatile String version;
|
||||
private volatile PartitionsSpec partitionsSpec;
|
||||
private volatile boolean leaveIntermediate = false;
|
||||
private volatile boolean cleanupOnFailure = true;
|
||||
private volatile Map<DateTime, List<HadoopyShardSpec>> shardSpecs = ImmutableMap.of();
|
||||
private volatile boolean overwriteFiles = false;
|
||||
private volatile boolean leaveIntermediate;
|
||||
private volatile boolean cleanupOnFailure;
|
||||
private volatile Map<DateTime, List<HadoopyShardSpec>> shardSpecs;
|
||||
private volatile boolean overwriteFiles;
|
||||
private volatile DataRollupSpec rollupSpec;
|
||||
private volatile DbUpdaterJobSpec updaterJobSpec;
|
||||
private volatile boolean ignoreInvalidRows = false;
|
||||
private volatile boolean ignoreInvalidRows;
|
||||
|
||||
@JsonCreator
|
||||
public HadoopDruidIndexerConfig(
|
||||
final @JsonProperty("intervals") List<Interval> intervals,
|
||||
final @JsonProperty("dataSource") String dataSource,
|
||||
final @JsonProperty("timestampColumn") String timestampColumnName,
|
||||
final @JsonProperty("timestampFormat") String timestampFormat,
|
||||
final @JsonProperty("timestampSpec") TimestampSpec timestampSpec,
|
||||
final @JsonProperty("dataSpec") DataSpec dataSpec,
|
||||
final @JsonProperty("segmentGranularity") Granularity segmentGranularity,
|
||||
final @JsonProperty("granularitySpec") GranularitySpec granularitySpec,
|
||||
final @JsonProperty("pathSpec") PathSpec pathSpec,
|
||||
final @JsonProperty("workingPath") String jobOutputDir,
|
||||
final @JsonProperty("segmentOutputPath") String segmentOutputDir,
|
||||
final @JsonProperty("workingPath") String workingPath,
|
||||
final @JsonProperty("segmentOutputPath") String segmentOutputPath,
|
||||
final @JsonProperty("version") String version,
|
||||
final @JsonProperty("partitionDimension") String partitionDimension,
|
||||
final @JsonProperty("targetPartitionSize") Long targetPartitionSize,
|
||||
final @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec,
|
||||
final @JsonProperty("leaveIntermediate") boolean leaveIntermediate,
|
||||
final @JsonProperty("cleanupOnFailure") Boolean cleanupOnFailure,
|
||||
|
@ -199,48 +145,51 @@ public class HadoopDruidIndexerConfig
|
|||
final @JsonProperty("overwriteFiles") boolean overwriteFiles,
|
||||
final @JsonProperty("rollupSpec") DataRollupSpec rollupSpec,
|
||||
final @JsonProperty("updaterJobSpec") DbUpdaterJobSpec updaterJobSpec,
|
||||
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows
|
||||
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows,
|
||||
// These fields are deprecated and will be removed in the future
|
||||
final @JsonProperty("timestampColumn") String timestampColumn,
|
||||
final @JsonProperty("timestampFormat") String timestampFormat,
|
||||
final @JsonProperty("intervals") List<Interval> intervals,
|
||||
final @JsonProperty("segmentGranularity") Granularity segmentGranularity,
|
||||
final @JsonProperty("partitionDimension") String partitionDimension,
|
||||
final @JsonProperty("targetPartitionSize") Long targetPartitionSize
|
||||
)
|
||||
{
|
||||
this.dataSource = dataSource;
|
||||
this.timestampColumnName = (timestampColumnName == null) ? null : timestampColumnName.toLowerCase();
|
||||
this.timestampFormat = timestampFormat;
|
||||
this.timestampSpec = (timestampSpec == null) ? new TimestampSpec(timestampColumn, timestampFormat) : timestampSpec;
|
||||
this.dataSpec = dataSpec;
|
||||
this.granularitySpec = granularitySpec;
|
||||
this.pathSpec = pathSpec;
|
||||
this.jobOutputDir = jobOutputDir;
|
||||
this.segmentOutputDir = segmentOutputDir;
|
||||
this.workingPath = workingPath;
|
||||
this.segmentOutputPath = segmentOutputPath;
|
||||
this.version = version == null ? new DateTime().toString() : version;
|
||||
this.partitionsSpec = partitionsSpec;
|
||||
this.leaveIntermediate = leaveIntermediate;
|
||||
this.cleanupOnFailure = (cleanupOnFailure == null ? true : cleanupOnFailure);
|
||||
this.shardSpecs = shardSpecs;
|
||||
this.shardSpecs = (shardSpecs == null ? ImmutableMap.<DateTime, List<HadoopyShardSpec>>of() : shardSpecs);
|
||||
this.overwriteFiles = overwriteFiles;
|
||||
this.rollupSpec = rollupSpec;
|
||||
this.updaterJobSpec = updaterJobSpec;
|
||||
this.ignoreInvalidRows = ignoreInvalidRows;
|
||||
|
||||
if(partitionsSpec != null) {
|
||||
if (partitionsSpec != null) {
|
||||
Preconditions.checkArgument(
|
||||
partitionDimension == null && targetPartitionSize == null,
|
||||
"Cannot mix partitionsSpec with partitionDimension/targetPartitionSize"
|
||||
);
|
||||
|
||||
this.partitionsSpec = partitionsSpec;
|
||||
} else {
|
||||
// Backwards compatibility
|
||||
this.partitionsSpec = new PartitionsSpec(partitionDimension, targetPartitionSize, null, false);
|
||||
}
|
||||
|
||||
if(granularitySpec != null) {
|
||||
if (granularitySpec != null) {
|
||||
Preconditions.checkArgument(
|
||||
segmentGranularity == null && intervals == null,
|
||||
"Cannot mix granularitySpec with segmentGranularity/intervals"
|
||||
);
|
||||
this.granularitySpec = granularitySpec;
|
||||
} else {
|
||||
// Backwards compatibility
|
||||
this.segmentGranularity = segmentGranularity;
|
||||
if(segmentGranularity != null && intervals != null) {
|
||||
if (segmentGranularity != null && intervals != null) {
|
||||
this.granularitySpec = new UniformGranularitySpec(segmentGranularity, intervals);
|
||||
}
|
||||
}
|
||||
|
@ -253,21 +202,6 @@ public class HadoopDruidIndexerConfig
|
|||
{
|
||||
}
|
||||
|
||||
public List<Interval> getIntervals()
|
||||
{
|
||||
return JodaUtils.condenseIntervals(getGranularitySpec().bucketIntervals());
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public void setIntervals(List<Interval> intervals)
|
||||
{
|
||||
Preconditions.checkState(this.granularitySpec == null, "Cannot mix setIntervals with granularitySpec");
|
||||
Preconditions.checkState(this.segmentGranularity != null, "Cannot use setIntervals without segmentGranularity");
|
||||
|
||||
// For backwards compatibility
|
||||
this.granularitySpec = new UniformGranularitySpec(this.segmentGranularity, intervals);
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getDataSource()
|
||||
{
|
||||
|
@ -279,31 +213,15 @@ public class HadoopDruidIndexerConfig
|
|||
this.dataSource = dataSource.toLowerCase();
|
||||
}
|
||||
|
||||
@JsonProperty("timestampColumn")
|
||||
public String getTimestampColumnName()
|
||||
{
|
||||
return timestampColumnName;
|
||||
}
|
||||
|
||||
public void setTimestampColumnName(String timestampColumnName)
|
||||
{
|
||||
this.timestampColumnName = timestampColumnName.toLowerCase();
|
||||
}
|
||||
|
||||
@JsonProperty()
|
||||
public String getTimestampFormat()
|
||||
{
|
||||
return timestampFormat;
|
||||
}
|
||||
|
||||
public void setTimestampFormat(String timestampFormat)
|
||||
{
|
||||
this.timestampFormat = timestampFormat;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public TimestampSpec getTimestampSpec()
|
||||
{
|
||||
return new TimestampSpec(timestampColumnName, timestampFormat);
|
||||
return timestampSpec;
|
||||
}
|
||||
|
||||
public void setTimestampSpec(TimestampSpec timestampSpec)
|
||||
{
|
||||
this.timestampSpec = timestampSpec;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
|
@ -317,32 +235,6 @@ public class HadoopDruidIndexerConfig
|
|||
this.dataSpec = new ToLowercaseDataSpec(dataSpec);
|
||||
}
|
||||
|
||||
public StringInputRowParser getParser()
|
||||
{
|
||||
final List<String> dimensionExclusions;
|
||||
|
||||
if(getDataSpec().hasCustomDimensions()) {
|
||||
dimensionExclusions = null;
|
||||
} else {
|
||||
dimensionExclusions = Lists.newArrayList();
|
||||
dimensionExclusions.add(getTimestampColumnName());
|
||||
dimensionExclusions.addAll(
|
||||
Lists.transform(
|
||||
getRollupSpec().getAggs(), new Function<AggregatorFactory, String>()
|
||||
{
|
||||
@Override
|
||||
public String apply(AggregatorFactory aggregatorFactory)
|
||||
{
|
||||
return aggregatorFactory.getName();
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
return new StringInputRowParser(getTimestampSpec(), getDataSpec(), dimensionExclusions);
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public GranularitySpec getGranularitySpec()
|
||||
{
|
||||
|
@ -354,17 +246,6 @@ public class HadoopDruidIndexerConfig
|
|||
this.granularitySpec = granularitySpec;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public PartitionsSpec getPartitionsSpec()
|
||||
{
|
||||
return partitionsSpec;
|
||||
}
|
||||
|
||||
public void setPartitionsSpec(PartitionsSpec partitionsSpec)
|
||||
{
|
||||
this.partitionsSpec = partitionsSpec;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public PathSpec getPathSpec()
|
||||
{
|
||||
|
@ -376,26 +257,26 @@ public class HadoopDruidIndexerConfig
|
|||
this.pathSpec = pathSpec;
|
||||
}
|
||||
|
||||
@JsonProperty("workingPath")
|
||||
public String getJobOutputDir()
|
||||
@JsonProperty
|
||||
public String getWorkingPath()
|
||||
{
|
||||
return jobOutputDir;
|
||||
return workingPath;
|
||||
}
|
||||
|
||||
public void setJobOutputDir(String jobOutputDir)
|
||||
public void setWorkingPath(String workingPath)
|
||||
{
|
||||
this.jobOutputDir = jobOutputDir;
|
||||
this.workingPath = workingPath;
|
||||
}
|
||||
|
||||
@JsonProperty("segmentOutputPath")
|
||||
public String getSegmentOutputDir()
|
||||
@JsonProperty
|
||||
public String getSegmentOutputPath()
|
||||
{
|
||||
return segmentOutputDir;
|
||||
return segmentOutputPath;
|
||||
}
|
||||
|
||||
public void setSegmentOutputDir(String segmentOutputDir)
|
||||
public void setSegmentOutputPath(String segmentOutputPath)
|
||||
{
|
||||
this.segmentOutputDir = segmentOutputDir;
|
||||
this.segmentOutputPath = segmentOutputPath;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
|
@ -409,29 +290,15 @@ public class HadoopDruidIndexerConfig
|
|||
this.version = version;
|
||||
}
|
||||
|
||||
public String getPartitionDimension()
|
||||
@JsonProperty
|
||||
public PartitionsSpec getPartitionsSpec()
|
||||
{
|
||||
return partitionsSpec.getPartitionDimension();
|
||||
return partitionsSpec;
|
||||
}
|
||||
|
||||
public boolean partitionByDimension()
|
||||
public void setPartitionsSpec(PartitionsSpec partitionsSpec)
|
||||
{
|
||||
return partitionsSpec.isDeterminingPartitions();
|
||||
}
|
||||
|
||||
public Long getTargetPartitionSize()
|
||||
{
|
||||
return partitionsSpec.getTargetPartitionSize();
|
||||
}
|
||||
|
||||
public long getMaxPartitionSize()
|
||||
{
|
||||
return partitionsSpec.getMaxPartitionSize();
|
||||
}
|
||||
|
||||
public boolean isUpdaterJobSpecSet()
|
||||
{
|
||||
return (updaterJobSpec != null);
|
||||
this.partitionsSpec = partitionsSpec;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
|
@ -511,6 +378,72 @@ public class HadoopDruidIndexerConfig
|
|||
this.ignoreInvalidRows = ignoreInvalidRows;
|
||||
}
|
||||
|
||||
public List<Interval> getIntervals()
|
||||
{
|
||||
return JodaUtils.condenseIntervals(getGranularitySpec().bucketIntervals());
|
||||
}
|
||||
|
||||
public String getPartitionDimension()
|
||||
{
|
||||
return partitionsSpec.getPartitionDimension();
|
||||
}
|
||||
|
||||
public boolean partitionByDimension()
|
||||
{
|
||||
return partitionsSpec.isDeterminingPartitions();
|
||||
}
|
||||
|
||||
public Long getTargetPartitionSize()
|
||||
{
|
||||
return partitionsSpec.getTargetPartitionSize();
|
||||
}
|
||||
|
||||
public long getMaxPartitionSize()
|
||||
{
|
||||
return partitionsSpec.getMaxPartitionSize();
|
||||
}
|
||||
|
||||
public boolean isUpdaterJobSpecSet()
|
||||
{
|
||||
return (updaterJobSpec != null);
|
||||
}
|
||||
|
||||
public StringInputRowParser getParser()
|
||||
{
|
||||
final List<String> dimensionExclusions;
|
||||
|
||||
if (getDataSpec().hasCustomDimensions()) {
|
||||
dimensionExclusions = null;
|
||||
} else {
|
||||
dimensionExclusions = Lists.newArrayList();
|
||||
dimensionExclusions.add(timestampSpec.getTimestampColumn());
|
||||
dimensionExclusions.addAll(
|
||||
Lists.transform(
|
||||
getRollupSpec().getAggs(), new Function<AggregatorFactory, String>()
|
||||
{
|
||||
@Override
|
||||
public String apply(AggregatorFactory aggregatorFactory)
|
||||
{
|
||||
return aggregatorFactory.getName();
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
return new StringInputRowParser(getTimestampSpec(), getDataSpec(), dimensionExclusions);
|
||||
}
|
||||
|
||||
public HadoopyShardSpec getShardSpec(Bucket bucket)
|
||||
{
|
||||
return shardSpecs.get(bucket.time).get(bucket.partitionNum);
|
||||
}
|
||||
|
||||
public Job addInputPaths(Job job) throws IOException
|
||||
{
|
||||
return getPathSpec().addInputPaths(this, job);
|
||||
}
|
||||
|
||||
/********************************************
|
||||
Granularity/Bucket Helper Methods
|
||||
********************************************/
|
||||
|
@ -590,11 +523,6 @@ public class HadoopDruidIndexerConfig
|
|||
);
|
||||
}
|
||||
|
||||
public HadoopyShardSpec getShardSpec(Bucket bucket)
|
||||
{
|
||||
return shardSpecs.get(bucket.time).get(bucket.partitionNum);
|
||||
}
|
||||
|
||||
/******************************************
|
||||
Path helper logic
|
||||
******************************************/
|
||||
|
@ -606,7 +534,7 @@ public class HadoopDruidIndexerConfig
|
|||
*/
|
||||
public Path makeIntermediatePath()
|
||||
{
|
||||
return new Path(String.format("%s/%s/%s", getJobOutputDir(), dataSource, getVersion().replace(":", "")));
|
||||
return new Path(String.format("%s/%s/%s", getWorkingPath(), getDataSource(), getVersion().replace(":", "")));
|
||||
}
|
||||
|
||||
public Path makeSegmentPartitionInfoPath(Bucket bucket)
|
||||
|
@ -638,38 +566,33 @@ public class HadoopDruidIndexerConfig
|
|||
return new Path(makeDescriptorInfoDir(), String.format("%s.json", segment.getIdentifier().replace(":", "")));
|
||||
}
|
||||
|
||||
public Path makeSegmentOutputPath(FileSystem fileSystem, Bucket bucket)
|
||||
{
|
||||
final Interval bucketInterval = getGranularitySpec().bucketInterval(bucket.time).get();
|
||||
if (fileSystem instanceof DistributedFileSystem)
|
||||
{
|
||||
return new Path(
|
||||
String.format(
|
||||
"%s/%s/%s_%s/%s/%s",
|
||||
getSegmentOutputDir(),
|
||||
dataSource,
|
||||
bucketInterval.getStart().toString(ISODateTimeFormat.basicDateTime()),
|
||||
bucketInterval.getEnd().toString(ISODateTimeFormat.basicDateTime()),
|
||||
getVersion().replace(":", "_"),
|
||||
bucket.partitionNum
|
||||
)
|
||||
);
|
||||
}
|
||||
return new Path(
|
||||
String.format(
|
||||
"%s/%s/%s_%s/%s/%s",
|
||||
getSegmentOutputDir(),
|
||||
dataSource,
|
||||
bucketInterval.getStart().toString(),
|
||||
bucketInterval.getEnd().toString(),
|
||||
getVersion(),
|
||||
bucket.partitionNum
|
||||
));
|
||||
}
|
||||
|
||||
public Job addInputPaths(Job job) throws IOException
|
||||
public Path makeSegmentOutputPath(FileSystem fileSystem, Bucket bucket)
|
||||
{
|
||||
return pathSpec.addInputPaths(this, job);
|
||||
final Interval bucketInterval = getGranularitySpec().bucketInterval(bucket.time).get();
|
||||
if (fileSystem instanceof DistributedFileSystem) {
|
||||
return new Path(
|
||||
String.format(
|
||||
"%s/%s/%s_%s/%s/%s",
|
||||
getSegmentOutputPath(),
|
||||
getDataSource(),
|
||||
bucketInterval.getStart().toString(ISODateTimeFormat.basicDateTime()),
|
||||
bucketInterval.getEnd().toString(ISODateTimeFormat.basicDateTime()),
|
||||
getVersion().replace(":", "_"),
|
||||
bucket.partitionNum
|
||||
)
|
||||
);
|
||||
}
|
||||
return new Path(
|
||||
String.format(
|
||||
"%s/%s/%s_%s/%s/%s",
|
||||
getSegmentOutputPath(),
|
||||
getDataSource(),
|
||||
bucketInterval.getStart().toString(),
|
||||
bucketInterval.getEnd().toString(),
|
||||
getVersion(),
|
||||
bucket.partitionNum
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
public void intoConfiguration(Job job)
|
||||
|
@ -677,7 +600,7 @@ public class HadoopDruidIndexerConfig
|
|||
Configuration conf = job.getConfiguration();
|
||||
|
||||
try {
|
||||
conf.set(CONFIG_PROPERTY, jsonMapper.writeValueAsString(this));
|
||||
conf.set(HadoopDruidIndexerConfig.CONFIG_PROPERTY, HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(this));
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw Throwables.propagate(e);
|
||||
|
@ -695,12 +618,11 @@ public class HadoopDruidIndexerConfig
|
|||
|
||||
Preconditions.checkNotNull(dataSource, "dataSource");
|
||||
Preconditions.checkNotNull(dataSpec, "dataSpec");
|
||||
Preconditions.checkNotNull(timestampColumnName, "timestampColumn");
|
||||
Preconditions.checkNotNull(timestampFormat, "timestampFormat");
|
||||
Preconditions.checkNotNull(timestampSpec, "timestampSpec");
|
||||
Preconditions.checkNotNull(granularitySpec, "granularitySpec");
|
||||
Preconditions.checkNotNull(pathSpec, "pathSpec");
|
||||
Preconditions.checkNotNull(jobOutputDir, "workingPath");
|
||||
Preconditions.checkNotNull(segmentOutputDir, "segmentOutputPath");
|
||||
Preconditions.checkNotNull(workingPath, "workingPath");
|
||||
Preconditions.checkNotNull(segmentOutputPath, "segmentOutputPath");
|
||||
Preconditions.checkNotNull(version, "version");
|
||||
Preconditions.checkNotNull(rollupSpec, "rollupSpec");
|
||||
|
||||
|
|
|
@ -0,0 +1,278 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.indexer;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.druid.data.input.impl.DataSpec;
|
||||
import io.druid.data.input.impl.TimestampSpec;
|
||||
import io.druid.indexer.granularity.GranularitySpec;
|
||||
import io.druid.indexer.partitions.PartitionsSpec;
|
||||
import io.druid.indexer.path.PathSpec;
|
||||
import io.druid.indexer.rollup.DataRollupSpec;
|
||||
import io.druid.indexer.updater.DbUpdaterJobSpec;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class HadoopDruidIndexerConfigBuilder
|
||||
{
|
||||
public static HadoopDruidIndexerConfig fromSchema(HadoopDruidIndexerSchema schema)
|
||||
{
|
||||
return HadoopDruidIndexerConfig.jsonMapper.convertValue(schema, HadoopDruidIndexerConfig.class);
|
||||
}
|
||||
|
||||
public static HadoopDruidIndexerConfig fromMap(Map<String, Object> argSpec)
|
||||
{
|
||||
return HadoopDruidIndexerConfig.jsonMapper.convertValue(argSpec, HadoopDruidIndexerConfig.class);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static HadoopDruidIndexerConfig fromFile(File file)
|
||||
{
|
||||
try {
|
||||
return fromMap(
|
||||
(Map<String, Object>) HadoopDruidIndexerConfig.jsonMapper.readValue(
|
||||
file, new TypeReference<Map<String, Object>>()
|
||||
{
|
||||
}
|
||||
)
|
||||
);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static HadoopDruidIndexerConfig fromString(String str)
|
||||
{
|
||||
try {
|
||||
return fromMap(
|
||||
(Map<String, Object>) HadoopDruidIndexerConfig.jsonMapper.readValue(
|
||||
str, new TypeReference<Map<String, Object>>()
|
||||
{
|
||||
}
|
||||
)
|
||||
);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static HadoopDruidIndexerConfig fromConfiguration(Configuration conf)
|
||||
{
|
||||
final HadoopDruidIndexerConfig retVal = fromString(conf.get(HadoopDruidIndexerConfig.CONFIG_PROPERTY));
|
||||
retVal.verify();
|
||||
return retVal;
|
||||
}
|
||||
|
||||
private volatile String dataSource;
|
||||
private volatile TimestampSpec timestampSpec;
|
||||
private volatile DataSpec dataSpec;
|
||||
private volatile GranularitySpec granularitySpec;
|
||||
private volatile PathSpec pathSpec;
|
||||
private volatile String workingPath;
|
||||
private volatile String segmentOutputPath;
|
||||
private volatile String version;
|
||||
private volatile PartitionsSpec partitionsSpec;
|
||||
private volatile boolean leaveIntermediate;
|
||||
private volatile boolean cleanupOnFailure;
|
||||
private volatile Map<DateTime, List<HadoopyShardSpec>> shardSpecs;
|
||||
private volatile boolean overwriteFiles;
|
||||
private volatile DataRollupSpec rollupSpec;
|
||||
private volatile DbUpdaterJobSpec updaterJobSpec;
|
||||
private volatile boolean ignoreInvalidRows;
|
||||
|
||||
public HadoopDruidIndexerConfigBuilder()
|
||||
{
|
||||
this.dataSource = null;
|
||||
this.timestampSpec = null;
|
||||
this.dataSpec = null;
|
||||
this.granularitySpec = null;
|
||||
this.pathSpec = null;
|
||||
this.workingPath = null;
|
||||
this.segmentOutputPath = null;
|
||||
this.version = new DateTime().toString();
|
||||
this.partitionsSpec = null;
|
||||
this.leaveIntermediate = false;
|
||||
this.cleanupOnFailure = true;
|
||||
this.shardSpecs = ImmutableMap.of();
|
||||
this.overwriteFiles = false;
|
||||
this.rollupSpec = null;
|
||||
this.updaterJobSpec = null;
|
||||
this.ignoreInvalidRows = false;
|
||||
}
|
||||
|
||||
public HadoopDruidIndexerConfigBuilder withDataSource(String dataSource)
|
||||
{
|
||||
this.dataSource = dataSource;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HadoopDruidIndexerConfigBuilder withTimestampSpec(TimestampSpec timestampSpec)
|
||||
{
|
||||
this.timestampSpec = timestampSpec;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HadoopDruidIndexerConfigBuilder withDataSpec(DataSpec dataSpec)
|
||||
{
|
||||
this.dataSpec = dataSpec;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HadoopDruidIndexerConfigBuilder withGranularitySpec(GranularitySpec granularitySpec)
|
||||
{
|
||||
this.granularitySpec = granularitySpec;
|
||||
return this;
|
||||
|
||||
}
|
||||
|
||||
public HadoopDruidIndexerConfigBuilder withPathSpec(PathSpec pathSpec)
|
||||
{
|
||||
this.pathSpec = pathSpec;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HadoopDruidIndexerConfigBuilder withWorkingPath(String workingPath)
|
||||
{
|
||||
this.workingPath = workingPath;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HadoopDruidIndexerConfigBuilder withSegmentOutputPath(String segmentOutputPath)
|
||||
{
|
||||
this.segmentOutputPath = segmentOutputPath;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HadoopDruidIndexerConfigBuilder withVersion(String version)
|
||||
{
|
||||
this.version = version;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HadoopDruidIndexerConfigBuilder withPartitionsSpec(PartitionsSpec partitionsSpec)
|
||||
{
|
||||
this.partitionsSpec = partitionsSpec;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HadoopDruidIndexerConfigBuilder withLeaveIntermediate(boolean leaveIntermediate)
|
||||
{
|
||||
this.leaveIntermediate = leaveIntermediate;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HadoopDruidIndexerConfigBuilder withCleanupOnFailure(boolean cleanupOnFailure)
|
||||
{
|
||||
this.cleanupOnFailure = cleanupOnFailure;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HadoopDruidIndexerConfigBuilder withShardSpecs(Map<DateTime, List<HadoopyShardSpec>> shardSpecs)
|
||||
{
|
||||
this.shardSpecs = shardSpecs;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HadoopDruidIndexerConfigBuilder withOverwriteFiles(boolean overwriteFiles)
|
||||
{
|
||||
this.overwriteFiles = overwriteFiles;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HadoopDruidIndexerConfigBuilder withRollupSpec(DataRollupSpec rollupSpec)
|
||||
{
|
||||
this.rollupSpec = rollupSpec;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HadoopDruidIndexerConfigBuilder withUpdaterJobSpec(DbUpdaterJobSpec updaterJobSpec)
|
||||
{
|
||||
this.updaterJobSpec = updaterJobSpec;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HadoopDruidIndexerConfigBuilder withIgnoreInvalidRows(boolean ignoreInvalidRows)
|
||||
{
|
||||
this.ignoreInvalidRows = ignoreInvalidRows;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HadoopDruidIndexerConfigBuilder withSchema(HadoopDruidIndexerSchema schema)
|
||||
{
|
||||
this.dataSource = schema.getDataSource();
|
||||
this.timestampSpec = schema.getTimestampSpec();
|
||||
this.dataSpec = schema.getDataSpec();
|
||||
this.granularitySpec = schema.getGranularitySpec();
|
||||
this.pathSpec = HadoopDruidIndexerConfig.jsonMapper.convertValue(schema.getPathSpec(), PathSpec.class);
|
||||
this.workingPath = schema.getWorkingPath();
|
||||
this.segmentOutputPath = schema.getSegmentOutputPath();
|
||||
this.version = schema.getVersion();
|
||||
this.partitionsSpec = schema.getPartitionsSpec();
|
||||
this.leaveIntermediate = schema.isLeaveIntermediate();
|
||||
this.cleanupOnFailure = schema.isCleanupOnFailure();
|
||||
this.shardSpecs = schema.getShardSpecs();
|
||||
this.overwriteFiles = schema.isOverwriteFiles();
|
||||
this.rollupSpec = schema.getRollupSpec();
|
||||
this.updaterJobSpec = schema.getUpdaterJobSpec();
|
||||
this.ignoreInvalidRows = schema.isIgnoreInvalidRows();
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
public HadoopDruidIndexerConfig build()
|
||||
{
|
||||
return new HadoopDruidIndexerConfig(
|
||||
dataSource,
|
||||
timestampSpec,
|
||||
dataSpec,
|
||||
granularitySpec,
|
||||
pathSpec,
|
||||
workingPath,
|
||||
segmentOutputPath,
|
||||
version,
|
||||
partitionsSpec,
|
||||
leaveIntermediate,
|
||||
cleanupOnFailure,
|
||||
shardSpecs,
|
||||
overwriteFiles,
|
||||
rollupSpec,
|
||||
updaterJobSpec,
|
||||
ignoreInvalidRows,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
);
|
||||
}
|
||||
}
|
|
@ -46,6 +46,7 @@ public class HadoopDruidIndexerJob implements Jobby
|
|||
|
||||
private final HadoopDruidIndexerConfig config;
|
||||
private final DbUpdaterJob dbUpdaterJob;
|
||||
|
||||
private IndexGeneratorJob indexJob;
|
||||
private volatile List<DataSegment> publishedSegments = null;
|
||||
|
||||
|
|
|
@ -1,123 +0,0 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.indexer;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.metamx.common.Pair;
|
||||
import com.metamx.common.lifecycle.Lifecycle;
|
||||
import com.metamx.common.logger.Logger;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class HadoopDruidIndexerMain
|
||||
{
|
||||
private static final Logger log = new Logger(HadoopDruidIndexerMain.class);
|
||||
|
||||
public static void main(String[] args) throws Exception
|
||||
{
|
||||
if (args.length != 1) {
|
||||
printHelp();
|
||||
System.exit(2);
|
||||
}
|
||||
|
||||
HadoopDruidIndexerNode node = HadoopDruidIndexerNode.builder().build();
|
||||
|
||||
node.setArgumentSpec(args[0]);
|
||||
|
||||
Lifecycle lifecycle = new Lifecycle();
|
||||
lifecycle.addManagedInstance(node);
|
||||
|
||||
try {
|
||||
lifecycle.start();
|
||||
}
|
||||
catch (Throwable t) {
|
||||
log.info(t, "Throwable caught at startup, committing seppuku");
|
||||
Thread.sleep(500);
|
||||
printHelp();
|
||||
System.exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
private static final List<Pair<String, String>> expectedFields =
|
||||
ImmutableList.<Pair<String, String>>builder()
|
||||
.add(Pair.of("dataSource", "Name of dataSource"))
|
||||
.add(Pair.of("timestampColumn", "Column name of the timestamp column"))
|
||||
.add(Pair.of("timestampFormat", "Format name of the timestamp column (posix or iso)"))
|
||||
.add(
|
||||
Pair.of(
|
||||
"dataSpec",
|
||||
"A JSON object with fields "
|
||||
+
|
||||
"format=(json, csv, tsv), "
|
||||
+
|
||||
"columns=JSON array of column names for the delimited text input file (only for csv and tsv formats),"
|
||||
+
|
||||
"dimensions=JSON array of dimensionn names (must match names in columns),"
|
||||
+
|
||||
"delimiter=delimiter of the data (only for tsv format)"
|
||||
)
|
||||
)
|
||||
.add(Pair.of("granularitySpec", "A JSON object indicating the Granularity that segments should be created at."))
|
||||
.add(
|
||||
Pair.of(
|
||||
"pathSpec",
|
||||
"A JSON object with fields type=granularity, inputPath, filePattern, dataGranularity"
|
||||
)
|
||||
)
|
||||
.add(
|
||||
Pair.of(
|
||||
"rollupSpec",
|
||||
"JSON object with fields rollupGranularity, aggs=JSON Array of Aggregator specs"
|
||||
)
|
||||
)
|
||||
.add(Pair.of("workingPath", "Path to store intermediate output data. Deleted when finished."))
|
||||
.add(Pair.of("segmentOutputPath", "Path to store output segments."))
|
||||
.add(
|
||||
Pair.of(
|
||||
"updaterJobSpec",
|
||||
"JSON object with fields type=db, connectURI of the database, username, password, and segment table name"
|
||||
)
|
||||
)
|
||||
.add(Pair.of("cleanupOnFailure", "Clean up intermediate files on failure? (default: true)"))
|
||||
.add(Pair.of("leaveIntermediate", "Leave intermediate files. (default: false)"))
|
||||
.add(Pair.of("partitionDimension", "Dimension to partition by (optional)"))
|
||||
.add(
|
||||
Pair.of(
|
||||
"targetPartitionSize",
|
||||
"Integer representing the target number of rows in a partition (required if partitionDimension != null)"
|
||||
)
|
||||
)
|
||||
.build();
|
||||
|
||||
public static void printHelp()
|
||||
{
|
||||
System.out.println("Usage: <java invocation> <config_spec>");
|
||||
System.out.println("<config_spec> is either a JSON object or the path to a file that contains a JSON object.");
|
||||
System.out.println();
|
||||
System.out.println("JSON object description:");
|
||||
System.out.println("{");
|
||||
for (Pair<String, String> expectedField : expectedFields) {
|
||||
System.out.printf(" \"%s\": %s%n", expectedField.lhs, expectedField.rhs);
|
||||
}
|
||||
System.out.println("}");
|
||||
}
|
||||
}
|
|
@ -38,7 +38,7 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
|
|||
protected void setup(Context context)
|
||||
throws IOException, InterruptedException
|
||||
{
|
||||
config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
|
||||
config = HadoopDruidIndexerConfigBuilder.fromConfiguration(context.getConfiguration());
|
||||
parser = config.getParser();
|
||||
}
|
||||
|
||||
|
|
|
@ -1,138 +0,0 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.indexer;
|
||||
|
||||
import com.fasterxml.jackson.databind.jsontype.NamedType;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.io.CharStreams;
|
||||
import com.google.common.io.InputSupplier;
|
||||
import com.metamx.common.lifecycle.LifecycleStart;
|
||||
import com.metamx.common.lifecycle.LifecycleStop;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class HadoopDruidIndexerNode
|
||||
{
|
||||
public static Builder builder()
|
||||
{
|
||||
return new Builder();
|
||||
}
|
||||
|
||||
private String intervalSpec = null;
|
||||
private String argumentSpec = null;
|
||||
|
||||
public String getIntervalSpec()
|
||||
{
|
||||
return intervalSpec;
|
||||
}
|
||||
|
||||
public String getArgumentSpec()
|
||||
{
|
||||
return argumentSpec;
|
||||
}
|
||||
|
||||
public HadoopDruidIndexerNode setIntervalSpec(String intervalSpec)
|
||||
{
|
||||
this.intervalSpec = intervalSpec;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HadoopDruidIndexerNode setArgumentSpec(String argumentSpec)
|
||||
{
|
||||
this.argumentSpec = argumentSpec;
|
||||
return this;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public HadoopDruidIndexerNode registerJacksonSubtype(Class<?>... clazzes)
|
||||
{
|
||||
HadoopDruidIndexerConfig.jsonMapper.registerSubtypes(clazzes);
|
||||
return this;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public HadoopDruidIndexerNode registerJacksonSubtype(NamedType... namedTypes)
|
||||
{
|
||||
HadoopDruidIndexerConfig.jsonMapper.registerSubtypes(namedTypes);
|
||||
return this;
|
||||
}
|
||||
|
||||
@LifecycleStart
|
||||
public void start() throws Exception
|
||||
{
|
||||
Preconditions.checkNotNull(argumentSpec, "argumentSpec");
|
||||
|
||||
final HadoopDruidIndexerConfig config;
|
||||
if (argumentSpec.startsWith("{")) {
|
||||
config = HadoopDruidIndexerConfig.fromString(argumentSpec);
|
||||
} else if (argumentSpec.startsWith("s3://")) {
|
||||
final Path s3nPath = new Path(String.format("s3n://%s", argumentSpec.substring("s3://".length())));
|
||||
final FileSystem fs = s3nPath.getFileSystem(new Configuration());
|
||||
|
||||
String configString = CharStreams.toString(new InputSupplier<InputStreamReader>()
|
||||
{
|
||||
@Override
|
||||
public InputStreamReader getInput() throws IOException
|
||||
{
|
||||
return new InputStreamReader(fs.open(s3nPath));
|
||||
}
|
||||
});
|
||||
|
||||
config = HadoopDruidIndexerConfig.fromString(configString);
|
||||
} else {
|
||||
config = HadoopDruidIndexerConfig.fromFile(new File(argumentSpec));
|
||||
}
|
||||
|
||||
if (intervalSpec != null) {
|
||||
final List<Interval> dataInterval = Lists.transform(
|
||||
Arrays.asList(intervalSpec.split(",")),
|
||||
new StringIntervalFunction()
|
||||
);
|
||||
|
||||
config.setIntervals(dataInterval);
|
||||
}
|
||||
|
||||
new HadoopDruidIndexerJob(config).run();
|
||||
}
|
||||
|
||||
@LifecycleStop
|
||||
public void stop()
|
||||
{
|
||||
}
|
||||
|
||||
public static class Builder
|
||||
{
|
||||
public HadoopDruidIndexerNode build()
|
||||
{
|
||||
return new HadoopDruidIndexerNode();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,194 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.indexer;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.druid.data.input.impl.DataSpec;
|
||||
import io.druid.data.input.impl.TimestampSpec;
|
||||
import io.druid.indexer.granularity.GranularitySpec;
|
||||
import io.druid.indexer.partitions.PartitionsSpec;
|
||||
import io.druid.indexer.rollup.DataRollupSpec;
|
||||
import io.druid.indexer.updater.DbUpdaterJobSpec;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class HadoopDruidIndexerSchema
|
||||
{
|
||||
private final String dataSource;
|
||||
private final TimestampSpec timestampSpec;
|
||||
private final DataSpec dataSpec;
|
||||
private final GranularitySpec granularitySpec;
|
||||
private final Map<String, Object> pathSpec; // This cannot just be a PathSpec object
|
||||
private final String workingPath;
|
||||
private final String segmentOutputPath;
|
||||
private final String version;
|
||||
private final PartitionsSpec partitionsSpec;
|
||||
private final boolean leaveIntermediate;
|
||||
private final boolean cleanupOnFailure;
|
||||
private final Map<DateTime, List<HadoopyShardSpec>> shardSpecs;
|
||||
private final boolean overwriteFiles;
|
||||
private final DataRollupSpec rollupSpec;
|
||||
private final DbUpdaterJobSpec updaterJobSpec;
|
||||
private final boolean ignoreInvalidRows;
|
||||
|
||||
@JsonCreator
|
||||
public HadoopDruidIndexerSchema(
|
||||
final @JsonProperty("dataSource") String dataSource,
|
||||
final @JsonProperty("timestampSpec") TimestampSpec timestampSpec,
|
||||
final @JsonProperty("dataSpec") DataSpec dataSpec,
|
||||
final @JsonProperty("granularitySpec") GranularitySpec granularitySpec,
|
||||
final @JsonProperty("pathSpec") Map<String, Object> pathSpec,
|
||||
final @JsonProperty("workingPath") String workingPath,
|
||||
final @JsonProperty("segmentOutputPath") String segmentOutputPath,
|
||||
final @JsonProperty("version") String version,
|
||||
final @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec,
|
||||
final @JsonProperty("leaveIntermediate") boolean leaveIntermediate,
|
||||
final @JsonProperty("cleanupOnFailure") Boolean cleanupOnFailure,
|
||||
final @JsonProperty("shardSpecs") Map<DateTime, List<HadoopyShardSpec>> shardSpecs,
|
||||
final @JsonProperty("overwriteFiles") boolean overwriteFiles,
|
||||
final @JsonProperty("rollupSpec") DataRollupSpec rollupSpec,
|
||||
final @JsonProperty("updaterJobSpec") DbUpdaterJobSpec updaterJobSpec,
|
||||
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows,
|
||||
// These fields are deprecated and will be removed in the future
|
||||
final @JsonProperty("timestampColumn") String timestampColumn,
|
||||
final @JsonProperty("timestampFormat") String timestampFormat
|
||||
|
||||
)
|
||||
{
|
||||
this.dataSource = dataSource;
|
||||
this.timestampSpec = (timestampSpec == null) ? new TimestampSpec(timestampColumn, timestampFormat) : timestampSpec;
|
||||
this.dataSpec = dataSpec;
|
||||
this.granularitySpec = granularitySpec;
|
||||
this.pathSpec = pathSpec;
|
||||
this.workingPath = workingPath;
|
||||
this.segmentOutputPath = segmentOutputPath;
|
||||
this.version = version == null ? new DateTime().toString() : version;
|
||||
this.partitionsSpec = partitionsSpec;
|
||||
this.leaveIntermediate = leaveIntermediate;
|
||||
this.cleanupOnFailure = (cleanupOnFailure == null ? true : cleanupOnFailure);
|
||||
this.shardSpecs = (shardSpecs == null ? ImmutableMap.<DateTime, List<HadoopyShardSpec>>of() : shardSpecs);
|
||||
this.overwriteFiles = overwriteFiles;
|
||||
this.rollupSpec = rollupSpec;
|
||||
this.updaterJobSpec = updaterJobSpec;
|
||||
this.ignoreInvalidRows = ignoreInvalidRows;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getDataSource()
|
||||
{
|
||||
return dataSource;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public TimestampSpec getTimestampSpec()
|
||||
{
|
||||
return timestampSpec;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public DataSpec getDataSpec()
|
||||
{
|
||||
return dataSpec;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public GranularitySpec getGranularitySpec()
|
||||
{
|
||||
return granularitySpec;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public Map<String, Object> getPathSpec()
|
||||
{
|
||||
return pathSpec;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getWorkingPath()
|
||||
{
|
||||
return workingPath;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getSegmentOutputPath()
|
||||
{
|
||||
return segmentOutputPath;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getVersion()
|
||||
{
|
||||
return version;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public PartitionsSpec getPartitionsSpec()
|
||||
{
|
||||
return partitionsSpec;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public boolean isLeaveIntermediate()
|
||||
{
|
||||
return leaveIntermediate;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public boolean isCleanupOnFailure()
|
||||
{
|
||||
return cleanupOnFailure;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public Map<DateTime, List<HadoopyShardSpec>> getShardSpecs()
|
||||
{
|
||||
return shardSpecs;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public boolean isOverwriteFiles()
|
||||
{
|
||||
return overwriteFiles;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public DataRollupSpec getRollupSpec()
|
||||
{
|
||||
return rollupSpec;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public DbUpdaterJobSpec getUpdaterJobSpec()
|
||||
{
|
||||
return updaterJobSpec;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public boolean isIgnoreInvalidRows()
|
||||
{
|
||||
return ignoreInvalidRows;
|
||||
}
|
||||
}
|
|
@ -86,6 +86,7 @@ public class IndexGeneratorJob implements Jobby
|
|||
private static final Logger log = new Logger(IndexGeneratorJob.class);
|
||||
|
||||
private final HadoopDruidIndexerConfig config;
|
||||
|
||||
private IndexGeneratorStats jobStats;
|
||||
|
||||
public IndexGeneratorJob(
|
||||
|
@ -243,7 +244,7 @@ public class IndexGeneratorJob implements Jobby
|
|||
protected void setup(Context context)
|
||||
throws IOException, InterruptedException
|
||||
{
|
||||
config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
|
||||
config = HadoopDruidIndexerConfigBuilder.fromConfiguration(context.getConfiguration());
|
||||
|
||||
for (AggregatorFactory factory : config.getRollupSpec().getAggs()) {
|
||||
metricNames.add(factory.getName().toLowerCase());
|
||||
|
|
|
@ -60,7 +60,7 @@ public class JobHelper
|
|||
|
||||
final Configuration conf = groupByJob.getConfiguration();
|
||||
final FileSystem fs = FileSystem.get(conf);
|
||||
Path distributedClassPath = new Path(config.getJobOutputDir(), "classpath");
|
||||
Path distributedClassPath = new Path(config.getWorkingPath(), "classpath");
|
||||
|
||||
if (fs instanceof LocalFileSystem) {
|
||||
return;
|
||||
|
|
|
@ -63,12 +63,16 @@ public class GranularUnprocessedPathSpec extends GranularityPathSpec
|
|||
// This PathSpec breaks so many abstractions that we might as break some more
|
||||
Preconditions.checkState(
|
||||
config.getGranularitySpec() instanceof UniformGranularitySpec,
|
||||
String.format("Cannot use %s without %s", GranularUnprocessedPathSpec.class.getSimpleName(), UniformGranularitySpec.class.getSimpleName())
|
||||
String.format(
|
||||
"Cannot use %s without %s",
|
||||
GranularUnprocessedPathSpec.class.getSimpleName(),
|
||||
UniformGranularitySpec.class.getSimpleName()
|
||||
)
|
||||
);
|
||||
|
||||
final Path betaInput = new Path(getInputPath());
|
||||
final FileSystem fs = betaInput.getFileSystem(job.getConfiguration());
|
||||
final Granularity segmentGranularity = ((UniformGranularitySpec)config.getGranularitySpec()).getGranularity();
|
||||
final Granularity segmentGranularity = ((UniformGranularitySpec) config.getGranularitySpec()).getGranularity();
|
||||
|
||||
Map<DateTime, Long> inputModifiedTimes = new TreeMap<DateTime, Long>(
|
||||
Comparators.inverse(Comparators.<Comparable>comparable())
|
||||
|
@ -87,7 +91,11 @@ public class GranularUnprocessedPathSpec extends GranularityPathSpec
|
|||
DateTime timeBucket = entry.getKey();
|
||||
long mTime = entry.getValue();
|
||||
|
||||
String bucketOutput = String.format("%s/%s", config.getSegmentOutputDir(), segmentGranularity.toPath(timeBucket));
|
||||
String bucketOutput = String.format(
|
||||
"%s/%s",
|
||||
config.getSegmentOutputPath(),
|
||||
segmentGranularity.toPath(timeBucket)
|
||||
);
|
||||
for (FileStatus fileStatus : FSSpideringIterator.spiderIterable(fs, new Path(bucketOutput))) {
|
||||
if (fileStatus.getModificationTime() > mTime) {
|
||||
bucketsToRun.add(new Interval(timeBucket, segmentGranularity.increment(timeBucket)));
|
||||
|
|
|
@ -40,7 +40,8 @@ public class HadoopDruidIndexerConfigTest
|
|||
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||
|
||||
@Test
|
||||
public void testGranularitySpec() {
|
||||
public void testGranularitySpec()
|
||||
{
|
||||
final HadoopDruidIndexerConfig cfg;
|
||||
|
||||
try {
|
||||
|
@ -54,7 +55,8 @@ public class HadoopDruidIndexerConfigTest
|
|||
+ "}",
|
||||
HadoopDruidIndexerConfig.class
|
||||
);
|
||||
} catch(Exception e) {
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
|
||||
|
@ -74,7 +76,8 @@ public class HadoopDruidIndexerConfigTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testGranularitySpecLegacy() {
|
||||
public void testGranularitySpecLegacy()
|
||||
{
|
||||
// Deprecated and replaced by granularitySpec, but still supported
|
||||
final HadoopDruidIndexerConfig cfg;
|
||||
|
||||
|
@ -86,7 +89,8 @@ public class HadoopDruidIndexerConfigTest
|
|||
+ "}",
|
||||
HadoopDruidIndexerConfig.class
|
||||
);
|
||||
} catch(Exception e) {
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
|
||||
|
@ -106,40 +110,8 @@ public class HadoopDruidIndexerConfigTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testGranularitySpecPostConstructorIntervals() {
|
||||
// Deprecated and replaced by granularitySpec, but still supported
|
||||
final HadoopDruidIndexerConfig cfg;
|
||||
|
||||
try {
|
||||
cfg = jsonMapper.readValue(
|
||||
"{"
|
||||
+ "\"segmentGranularity\":\"day\""
|
||||
+ "}",
|
||||
HadoopDruidIndexerConfig.class
|
||||
);
|
||||
} catch(Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
|
||||
cfg.setIntervals(Lists.newArrayList(new Interval("2012-03-01/P1D")));
|
||||
|
||||
final UniformGranularitySpec granularitySpec = (UniformGranularitySpec) cfg.getGranularitySpec();
|
||||
|
||||
Assert.assertEquals(
|
||||
"getIntervals",
|
||||
Lists.newArrayList(new Interval("2012-03-01/P1D")),
|
||||
granularitySpec.getIntervals()
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
"getGranularity",
|
||||
"DAY",
|
||||
granularitySpec.getGranularity().toString()
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidGranularityCombination() {
|
||||
public void testInvalidGranularityCombination()
|
||||
{
|
||||
boolean thrown = false;
|
||||
try {
|
||||
final HadoopDruidIndexerConfig cfg = jsonReadWriteRead(
|
||||
|
@ -154,7 +126,8 @@ public class HadoopDruidIndexerConfigTest
|
|||
+ "}",
|
||||
HadoopDruidIndexerConfig.class
|
||||
);
|
||||
} catch(Exception e) {
|
||||
}
|
||||
catch (Exception e) {
|
||||
thrown = true;
|
||||
}
|
||||
|
||||
|
@ -162,7 +135,8 @@ public class HadoopDruidIndexerConfigTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testPartitionsSpecAutoDimension() {
|
||||
public void testPartitionsSpecAutoDimension()
|
||||
{
|
||||
final HadoopDruidIndexerConfig cfg;
|
||||
|
||||
try {
|
||||
|
@ -174,7 +148,8 @@ public class HadoopDruidIndexerConfigTest
|
|||
+ "}",
|
||||
HadoopDruidIndexerConfig.class
|
||||
);
|
||||
} catch(Exception e) {
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
|
||||
|
@ -200,7 +175,8 @@ public class HadoopDruidIndexerConfigTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testPartitionsSpecSpecificDimension() {
|
||||
public void testPartitionsSpecSpecificDimension()
|
||||
{
|
||||
final HadoopDruidIndexerConfig cfg;
|
||||
|
||||
try {
|
||||
|
@ -213,7 +189,8 @@ public class HadoopDruidIndexerConfigTest
|
|||
+ "}",
|
||||
HadoopDruidIndexerConfig.class
|
||||
);
|
||||
} catch(Exception e) {
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
|
||||
|
@ -245,7 +222,8 @@ public class HadoopDruidIndexerConfigTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testPartitionsSpecLegacy() {
|
||||
public void testPartitionsSpecLegacy()
|
||||
{
|
||||
final HadoopDruidIndexerConfig cfg;
|
||||
|
||||
try {
|
||||
|
@ -256,7 +234,8 @@ public class HadoopDruidIndexerConfigTest
|
|||
+ "}",
|
||||
HadoopDruidIndexerConfig.class
|
||||
);
|
||||
} catch(Exception e) {
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
|
||||
|
@ -288,7 +267,8 @@ public class HadoopDruidIndexerConfigTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testPartitionsSpecMaxPartitionSize() {
|
||||
public void testPartitionsSpecMaxPartitionSize()
|
||||
{
|
||||
final HadoopDruidIndexerConfig cfg;
|
||||
|
||||
try {
|
||||
|
@ -302,7 +282,8 @@ public class HadoopDruidIndexerConfigTest
|
|||
+ "}",
|
||||
HadoopDruidIndexerConfig.class
|
||||
);
|
||||
} catch(Exception e) {
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
|
||||
|
@ -334,7 +315,8 @@ public class HadoopDruidIndexerConfigTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidPartitionsCombination() {
|
||||
public void testInvalidPartitionsCombination()
|
||||
{
|
||||
boolean thrown = false;
|
||||
try {
|
||||
final HadoopDruidIndexerConfig cfg = jsonReadWriteRead(
|
||||
|
@ -346,7 +328,8 @@ public class HadoopDruidIndexerConfigTest
|
|||
+ "}",
|
||||
HadoopDruidIndexerConfig.class
|
||||
);
|
||||
} catch(Exception e) {
|
||||
}
|
||||
catch (Exception e) {
|
||||
thrown = true;
|
||||
}
|
||||
|
||||
|
@ -382,7 +365,8 @@ public class HadoopDruidIndexerConfigTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testDefaultSettings() {
|
||||
public void testDefaultSettings()
|
||||
{
|
||||
final HadoopDruidIndexerConfig cfg;
|
||||
|
||||
try {
|
||||
|
@ -390,7 +374,8 @@ public class HadoopDruidIndexerConfigTest
|
|||
"{}",
|
||||
HadoopDruidIndexerConfig.class
|
||||
);
|
||||
} catch(Exception e) {
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
|
||||
|
@ -414,7 +399,8 @@ public class HadoopDruidIndexerConfigTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testNoCleanupOnFailure() {
|
||||
public void testNoCleanupOnFailure()
|
||||
{
|
||||
final HadoopDruidIndexerConfig cfg;
|
||||
|
||||
try {
|
||||
|
@ -422,7 +408,8 @@ public class HadoopDruidIndexerConfigTest
|
|||
"{\"cleanupOnFailure\":false}",
|
||||
HadoopDruidIndexerConfig.class
|
||||
);
|
||||
} catch(Exception e) {
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
|
||||
|
@ -435,23 +422,25 @@ public class HadoopDruidIndexerConfigTest
|
|||
|
||||
|
||||
@Test
|
||||
public void shouldMakeHDFSCompliantSegmentOutputPath() {
|
||||
public void shouldMakeHDFSCompliantSegmentOutputPath()
|
||||
{
|
||||
final HadoopDruidIndexerConfig cfg;
|
||||
|
||||
try {
|
||||
cfg = jsonReadWriteRead(
|
||||
"{"
|
||||
+ "\"dataSource\": \"source\","
|
||||
+ " \"granularitySpec\":{"
|
||||
+ " \"type\":\"uniform\","
|
||||
+ " \"gran\":\"hour\","
|
||||
+ " \"intervals\":[\"2012-07-10/P1D\"]"
|
||||
+ " },"
|
||||
+ "\"segmentOutputPath\": \"hdfs://server:9100/tmp/druid/datatest\""
|
||||
+ "}",
|
||||
HadoopDruidIndexerConfig.class
|
||||
"{"
|
||||
+ "\"dataSource\": \"source\","
|
||||
+ " \"granularitySpec\":{"
|
||||
+ " \"type\":\"uniform\","
|
||||
+ " \"gran\":\"hour\","
|
||||
+ " \"intervals\":[\"2012-07-10/P1D\"]"
|
||||
+ " },"
|
||||
+ "\"segmentOutputPath\": \"hdfs://server:9100/tmp/druid/datatest\""
|
||||
+ "}",
|
||||
HadoopDruidIndexerConfig.class
|
||||
);
|
||||
} catch(Exception e) {
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
|
||||
|
@ -466,23 +455,25 @@ public class HadoopDruidIndexerConfigTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void shouldMakeDefaultSegmentOutputPathIfNotHDFS() {
|
||||
public void shouldMakeDefaultSegmentOutputPathIfNotHDFS()
|
||||
{
|
||||
final HadoopDruidIndexerConfig cfg;
|
||||
|
||||
try {
|
||||
cfg = jsonReadWriteRead(
|
||||
"{"
|
||||
+ "\"dataSource\": \"the:data:source\","
|
||||
+ " \"granularitySpec\":{"
|
||||
+ " \"type\":\"uniform\","
|
||||
+ " \"gran\":\"hour\","
|
||||
+ " \"intervals\":[\"2012-07-10/P1D\"]"
|
||||
+ " },"
|
||||
+ "\"segmentOutputPath\": \"/tmp/dru:id/data:test\""
|
||||
+ "}",
|
||||
HadoopDruidIndexerConfig.class
|
||||
"{"
|
||||
+ "\"dataSource\": \"the:data:source\","
|
||||
+ " \"granularitySpec\":{"
|
||||
+ " \"type\":\"uniform\","
|
||||
+ " \"gran\":\"hour\","
|
||||
+ " \"intervals\":[\"2012-07-10/P1D\"]"
|
||||
+ " },"
|
||||
+ "\"segmentOutputPath\": \"/tmp/dru:id/data:test\""
|
||||
+ "}",
|
||||
HadoopDruidIndexerConfig.class
|
||||
);
|
||||
} catch(Exception e) {
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
|
||||
|
@ -490,7 +481,10 @@ public class HadoopDruidIndexerConfigTest
|
|||
|
||||
Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712);
|
||||
Path path = cfg.makeSegmentOutputPath(new LocalFileSystem(), bucket);
|
||||
Assert.assertEquals("/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:version/4712", path.toString());
|
||||
Assert.assertEquals(
|
||||
"/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:version/4712",
|
||||
path.toString()
|
||||
);
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -22,34 +22,50 @@ package io.druid.indexing.common.task;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import io.druid.common.utils.JodaUtils;
|
||||
import io.druid.indexer.HadoopDruidIndexerConfig;
|
||||
import io.druid.indexer.HadoopDruidIndexerConfigBuilder;
|
||||
import io.druid.indexer.HadoopDruidIndexerJob;
|
||||
import io.druid.indexer.HadoopDruidIndexerSchema;
|
||||
import io.druid.indexing.common.TaskLock;
|
||||
import io.druid.indexing.common.TaskStatus;
|
||||
import io.druid.indexing.common.TaskToolbox;
|
||||
import io.druid.indexing.common.actions.SegmentInsertAction;
|
||||
import io.druid.initialization.Initialization;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import io.tesla.aether.internal.DefaultTeslaAether;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import java.io.File;
|
||||
import java.lang.reflect.Method;
|
||||
import java.net.URL;
|
||||
import java.net.URLClassLoader;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
public class HadoopIndexTask extends AbstractTask
|
||||
{
|
||||
@JsonIgnore
|
||||
private final HadoopDruidIndexerConfig config;
|
||||
|
||||
private static final Logger log = new Logger(HadoopIndexTask.class);
|
||||
private static String defaultHadoopCoordinates = "org.apache.hadoop:hadoop-core:1.0.3";
|
||||
|
||||
@JsonIgnore
|
||||
private final HadoopDruidIndexerSchema schema;
|
||||
|
||||
@JsonIgnore
|
||||
private final String hadoopCoordinates;
|
||||
|
||||
/**
|
||||
* @param config is used by the HadoopDruidIndexerJob to set up the appropriate parameters
|
||||
* @param schema is used by the HadoopDruidIndexerJob to set up the appropriate parameters
|
||||
* for creating Druid index segments. It may be modified.
|
||||
* <p/>
|
||||
* Here, we will ensure that the DbConnectorConfig field of the config is set to null, such that the
|
||||
* Here, we will ensure that the DbConnectorConfig field of the schema is set to null, such that the
|
||||
* job does not push a list of published segments the database. Instead, we will use the method
|
||||
* IndexGeneratorJob.getPublishedSegments() to simply return a list of the published
|
||||
* segments, and let the indexing service report these segments to the database.
|
||||
|
@ -58,21 +74,24 @@ public class HadoopIndexTask extends AbstractTask
|
|||
@JsonCreator
|
||||
public HadoopIndexTask(
|
||||
@JsonProperty("id") String id,
|
||||
@JsonProperty("config") HadoopDruidIndexerConfig config
|
||||
@JsonProperty("config") HadoopDruidIndexerSchema schema,
|
||||
@JsonProperty("hadoopCoordinates") String hadoopCoordinates
|
||||
)
|
||||
{
|
||||
super(
|
||||
id != null ? id : String.format("index_hadoop_%s_%s", config.getDataSource(), new DateTime()),
|
||||
config.getDataSource(),
|
||||
JodaUtils.umbrellaInterval(config.getIntervals())
|
||||
id != null ? id : String.format("index_hadoop_%s_%s", schema.getDataSource(), new DateTime()),
|
||||
schema.getDataSource(),
|
||||
JodaUtils.umbrellaInterval(JodaUtils.condenseIntervals(schema.getGranularitySpec().bucketIntervals()))
|
||||
);
|
||||
|
||||
// Some HadoopDruidIndexerConfig stuff doesn't make sense in the context of the indexing service
|
||||
Preconditions.checkArgument(config.getSegmentOutputDir() == null, "segmentOutputPath must be absent");
|
||||
Preconditions.checkArgument(config.getJobOutputDir() == null, "workingPath must be absent");
|
||||
Preconditions.checkArgument(!config.isUpdaterJobSpecSet(), "updaterJobSpec must be absent");
|
||||
|
||||
this.config = config;
|
||||
// Some HadoopDruidIndexerSchema stuff doesn't make sense in the context of the indexing service
|
||||
Preconditions.checkArgument(schema.getSegmentOutputPath() == null, "segmentOutputPath must be absent");
|
||||
Preconditions.checkArgument(schema.getWorkingPath() == null, "workingPath must be absent");
|
||||
Preconditions.checkArgument(schema.getUpdaterJobSpec() == null, "updaterJobSpec must be absent");
|
||||
|
||||
this.schema = schema;
|
||||
this.hadoopCoordinates = (hadoopCoordinates == null ? defaultHadoopCoordinates : hadoopCoordinates);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -81,34 +100,64 @@ public class HadoopIndexTask extends AbstractTask
|
|||
return "index_hadoop";
|
||||
}
|
||||
|
||||
|
||||
@JsonProperty("config")
|
||||
public HadoopDruidIndexerSchema getSchema()
|
||||
{
|
||||
return schema;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getHadoopCoordinates()
|
||||
{
|
||||
return hadoopCoordinates;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public TaskStatus run(TaskToolbox toolbox) throws Exception
|
||||
{
|
||||
// Copy config so we don't needlessly modify our provided one
|
||||
// Also necessary to make constructor validations work upon serde-after-run
|
||||
final HadoopDruidIndexerConfig configCopy = toolbox.getObjectMapper()
|
||||
.readValue(
|
||||
toolbox.getObjectMapper().writeValueAsBytes(config),
|
||||
HadoopDruidIndexerConfig.class
|
||||
);
|
||||
// setup Hadoop
|
||||
final DefaultTeslaAether aetherClient = new DefaultTeslaAether();
|
||||
final ClassLoader hadoopLoader = Initialization.getClassLoaderForCoordinates(
|
||||
aetherClient, hadoopCoordinates
|
||||
);
|
||||
final URL[] urLs = ((URLClassLoader) hadoopLoader).getURLs();
|
||||
|
||||
final URL[] nonHadoopUrls = ((URLClassLoader) HadoopIndexTask.class.getClassLoader()).getURLs();
|
||||
|
||||
List<URL> theURLS = Lists.newArrayList();
|
||||
theURLS.addAll(Arrays.asList(urLs));
|
||||
theURLS.addAll(Arrays.asList(nonHadoopUrls));
|
||||
|
||||
final URLClassLoader loader = new URLClassLoader(theURLS.toArray(new URL[theURLS.size()]), null);
|
||||
Thread.currentThread().setContextClassLoader(loader);
|
||||
|
||||
System.setProperty("druid.hadoop.internal.classpath", Joiner.on(File.pathSeparator).join(nonHadoopUrls));
|
||||
|
||||
final Class<?> mainClass = loader.loadClass(HadoopIndexTaskInnerProcessing.class.getName());
|
||||
final Method mainMethod = mainClass.getMethod("runTask", String[].class);
|
||||
|
||||
// We should have a lock from before we started running
|
||||
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
|
||||
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
|
||||
log.info("Setting version to: %s", myLock.getVersion());
|
||||
configCopy.setVersion(myLock.getVersion());
|
||||
|
||||
// Set workingPath to some reasonable default
|
||||
configCopy.setJobOutputDir(toolbox.getConfig().getHadoopWorkingPath());
|
||||
String[] args = new String[]{
|
||||
toolbox.getObjectMapper().writeValueAsString(schema),
|
||||
myLock.getVersion(),
|
||||
toolbox.getConfig().getHadoopWorkingPath(),
|
||||
toolbox.getSegmentPusher().getPathForHadoop(getDataSource()),
|
||||
};
|
||||
|
||||
configCopy.setSegmentOutputDir(toolbox.getSegmentPusher().getPathForHadoop(getDataSource()));
|
||||
String segments = (String) mainMethod.invoke(null, new Object[]{args});
|
||||
|
||||
HadoopDruidIndexerJob job = new HadoopDruidIndexerJob(configCopy);
|
||||
configCopy.verify();
|
||||
|
||||
log.info("Starting a hadoop index generator job...");
|
||||
if (job.run()) {
|
||||
List<DataSegment> publishedSegments = job.getPublishedSegments();
|
||||
|
||||
if (segments != null) {
|
||||
List<DataSegment> publishedSegments = toolbox.getObjectMapper().readValue(
|
||||
segments, new TypeReference<List<DataSegment>>()
|
||||
{
|
||||
}
|
||||
);
|
||||
// Request segment pushes
|
||||
toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(publishedSegments)));
|
||||
|
||||
|
@ -117,12 +166,41 @@ public class HadoopIndexTask extends AbstractTask
|
|||
} else {
|
||||
return TaskStatus.failure(getId());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public HadoopDruidIndexerConfig getConfig()
|
||||
public static class HadoopIndexTaskInnerProcessing
|
||||
{
|
||||
return config;
|
||||
public static String runTask(String[] args) throws Exception
|
||||
{
|
||||
final String schema = args[0];
|
||||
final String version = args[1];
|
||||
final String workingPath = args[2];
|
||||
final String segmentOutputPath = args[3];
|
||||
|
||||
final HadoopDruidIndexerSchema theSchema = HadoopDruidIndexerConfig.jsonMapper
|
||||
.readValue(
|
||||
schema,
|
||||
HadoopDruidIndexerSchema.class
|
||||
);
|
||||
final HadoopDruidIndexerConfig config =
|
||||
new HadoopDruidIndexerConfigBuilder().withSchema(theSchema)
|
||||
.withVersion(version)
|
||||
.withWorkingPath(
|
||||
workingPath
|
||||
)
|
||||
.withSegmentOutputPath(
|
||||
segmentOutputPath
|
||||
)
|
||||
.build();
|
||||
|
||||
HadoopDruidIndexerJob job = new HadoopDruidIndexerJob(config);
|
||||
|
||||
log.info("Starting a hadoop index generator job...");
|
||||
if (job.run()) {
|
||||
return HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(job.getPublishedSegments());
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,12 +22,13 @@ package io.druid.indexing.common.task;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Optional;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.metamx.common.Granularity;
|
||||
import io.druid.data.input.impl.JSONDataSpec;
|
||||
import io.druid.data.input.impl.TimestampSpec;
|
||||
import io.druid.granularity.QueryGranularity;
|
||||
import io.druid.indexer.HadoopDruidIndexerConfig;
|
||||
import io.druid.indexer.HadoopDruidIndexerSchema;
|
||||
import io.druid.indexer.granularity.UniformGranularitySpec;
|
||||
import io.druid.indexer.path.StaticPathSpec;
|
||||
import io.druid.indexer.rollup.DataRollupSpec;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
|
@ -338,17 +339,12 @@ public class TaskSerdeTest
|
|||
{
|
||||
final HadoopIndexTask task = new HadoopIndexTask(
|
||||
null,
|
||||
new HadoopDruidIndexerConfig(
|
||||
null,
|
||||
new HadoopDruidIndexerSchema(
|
||||
"foo",
|
||||
"timestamp",
|
||||
"auto",
|
||||
new TimestampSpec("timestamp", "auto"),
|
||||
new JSONDataSpec(ImmutableList.of("foo"), null),
|
||||
null,
|
||||
new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P1D"))),
|
||||
new StaticPathSpec("bar"),
|
||||
null,
|
||||
null,
|
||||
ImmutableMap.<String, Object>of("paths", "bar"),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
|
@ -359,8 +355,11 @@ public class TaskSerdeTest
|
|||
false,
|
||||
new DataRollupSpec(ImmutableList.<AggregatorFactory>of(), QueryGranularity.NONE),
|
||||
null,
|
||||
false
|
||||
)
|
||||
false,
|
||||
null,
|
||||
null
|
||||
),
|
||||
null
|
||||
);
|
||||
|
||||
final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
||||
|
|
|
@ -41,7 +41,7 @@ import java.util.List;
|
|||
*/
|
||||
@Command(
|
||||
name = "hadoop",
|
||||
description = "Runs the batch Hadoop Druid Indexer, see https://github.com/metamx/druid/wiki/Batch-ingestion for a description."
|
||||
description = "Runs the batch Hadoop Druid Indexer, see http://druid.io/docs/0.6.0/Batch-ingestion.html for a description."
|
||||
)
|
||||
public class CliHadoopIndexer implements Runnable
|
||||
{
|
||||
|
|
|
@ -24,8 +24,8 @@ import com.metamx.common.logger.Logger;
|
|||
import io.airlift.command.Arguments;
|
||||
import io.airlift.command.Command;
|
||||
import io.druid.indexer.HadoopDruidIndexerConfig;
|
||||
import io.druid.indexer.HadoopDruidIndexerConfigBuilder;
|
||||
import io.druid.indexer.HadoopDruidIndexerJob;
|
||||
import io.druid.timeline.partition.SingleDimensionShardSpec;
|
||||
|
||||
import java.io.File;
|
||||
|
||||
|
@ -58,9 +58,9 @@ public class CliInternalHadoopIndexer implements Runnable
|
|||
{
|
||||
try {
|
||||
if (argumentSpec.startsWith("{")) {
|
||||
return HadoopDruidIndexerConfig.fromString(argumentSpec);
|
||||
return HadoopDruidIndexerConfigBuilder.fromString(argumentSpec);
|
||||
} else {
|
||||
return HadoopDruidIndexerConfig.fromFile(new File(argumentSpec));
|
||||
return HadoopDruidIndexerConfigBuilder.fromFile(new File(argumentSpec));
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
|
|
|
@ -21,12 +21,9 @@ package io.druid.cli;
|
|||
|
||||
import com.fasterxml.jackson.databind.jsontype.NamedType;
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
import com.google.api.client.util.Lists;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.inject.Binder;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Key;
|
||||
import com.google.inject.multibindings.MapBinder;
|
||||
|
@ -43,7 +40,6 @@ import io.druid.guice.LifecycleModule;
|
|||
import io.druid.guice.ManageLifecycle;
|
||||
import io.druid.guice.NodeTypeConfig;
|
||||
import io.druid.guice.PolyBind;
|
||||
import io.druid.indexer.Utils;
|
||||
import io.druid.indexing.common.RetryPolicyConfig;
|
||||
import io.druid.indexing.common.RetryPolicyFactory;
|
||||
import io.druid.indexing.common.TaskToolboxFactory;
|
||||
|
@ -66,20 +62,15 @@ import io.druid.indexing.worker.executor.ChatHandlerResource;
|
|||
import io.druid.indexing.worker.executor.ExecutorLifecycle;
|
||||
import io.druid.indexing.worker.executor.ExecutorLifecycleConfig;
|
||||
import io.druid.initialization.DruidModule;
|
||||
import io.druid.initialization.Initialization;
|
||||
import io.druid.query.QuerySegmentWalker;
|
||||
import io.druid.segment.loading.DataSegmentKiller;
|
||||
import io.druid.segment.loading.OmniDataSegmentKiller;
|
||||
import io.druid.segment.loading.SegmentLoaderConfig;
|
||||
import io.druid.segment.loading.StorageLocationConfig;
|
||||
import io.druid.server.initialization.ExtensionsConfig;
|
||||
import io.druid.server.initialization.JettyServerInitializer;
|
||||
import io.tesla.aether.internal.DefaultTeslaAether;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
|
||||
import java.io.File;
|
||||
import java.net.URL;
|
||||
import java.net.URLClassLoader;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
|
|
Loading…
Reference in New Issue