diff --git a/docs/content/Batch-ingestion.md b/docs/content/Batch-ingestion.md index 001064be960..e3efaab8c65 100644 --- a/docs/content/Batch-ingestion.md +++ b/docs/content/Batch-ingestion.md @@ -24,8 +24,10 @@ The interval is the [ISO8601 interval](http://en.wikipedia.org/wiki/ISO_8601#Tim ```json { "dataSource": "the_data_source", - "timestampColumn": "ts", - "timestampFormat": "", + "timestampSpec" : { + "timestampColumn": "ts", + "timestampFormat": "" + }, "dataSpec": { "format": "", "columns": [ @@ -95,18 +97,17 @@ The interval is the [ISO8601 interval](http://en.wikipedia.org/wiki/ISO_8601#Tim |property|description|required?| |--------|-----------|---------| |dataSource|name of the dataSource the data will belong to|yes| -|timestampColumn|the column that is to be used as the timestamp column|yes| -|timestampFormat|the format of timestamps; auto = either iso or millis, Joda time formats:http://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html|yes| -|dataSpec|a specification of the data format and an array that names all of the columns in the input data|yes| -|dimensions|the columns that are to be used as dimensions|yes| -|granularitySpec|the time granularity and interval to chunk segments up into|yes| +|timestampSpec|includes the column that is to be used as the timestamp column and the format of the timestamps; auto = either iso or millis, Joda time formats can be found [here](http://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html).|yes| +|dataSpec|a specification of the data format and an array that names all of the columns in the input data.|yes| +|dimensions|the columns that are to be used as dimensions.|yes| +|granularitySpec|the time granularity and interval to chunk segments up into.|yes| |pathSpec|a specification of where to pull the data in from|yes| |rollupSpec|a specification of the rollup to perform while processing the data|yes| -|workingPath|the working path to use for intermediate results (results between Hadoop jobs)|yes| -|segmentOutputPath|the path to dump segments into|yes| -|leaveIntermediate|leave behind files in the workingPath when job completes or fails (debugging tool)|no| -|partitionsSpec|a specification of how to partition each time bucket into segments, absence of this property means no partitioning will occur|no| -|updaterJobSpec|a specification of how to update the metadata for the druid cluster these segments belong to|yes| +|workingPath|the working path to use for intermediate results (results between Hadoop jobs).|yes| +|segmentOutputPath|the path to dump segments into.|yes| +|leaveIntermediate|leave behind files in the workingPath when job completes or fails (debugging tool).|no| +|partitionsSpec|a specification of how to partition each time bucket into segments, absence of this property means no partitioning will occur.|no| +|updaterJobSpec|a specification of how to update the metadata for the druid cluster these segments belong to.|yes| ### Path specification @@ -118,9 +119,9 @@ Is a type of data loader that expects data to be laid out in a specific path for |property|description|required?| |--------|-----------|---------| -|dataGranularity|specifies the granularity to expect the data at, e.g. hour means to expect directories `y=XXXX/m=XX/d=XX/H=XX`|yes| -|inputPath|Base path to append the expected time path to|yes| -|filePattern|Pattern that files should match to be included|yes| +|dataGranularity|specifies the granularity to expect the data at, e.g. hour means to expect directories `y=XXXX/m=XX/d=XX/H=XX`.|yes| +|inputPath|Base path to append the expected time path to.|yes| +|filePattern|Pattern that files should match to be included.|yes| For example, if the sample config were run with the interval 2012-06-01/2012-06-02, it would expect data at the paths @@ -138,7 +139,7 @@ The indexing process has the ability to roll data up as it processes the incomin |property|description|required?| |--------|-----------|---------| |aggs|specifies a list of aggregators to aggregate for each bucket (a bucket is defined by the tuple of the truncated timestamp and the dimensions). Aggregators available here are the same as available when querying.|yes| -|rollupGranularity|The granularity to use when truncating incoming timestamps for bucketization|yes| +|rollupGranularity|The granularity to use when truncating incoming timestamps for bucketization.|yes| ### Partitioning specification @@ -158,11 +159,11 @@ This is a specification of the properties that tell the job how to update metada |property|description|required?| |--------|-----------|---------| -|type|"db" is the only value available|yes| -|connectURI|a valid JDBC url to MySQL|yes| -|user|username for db|yes| -|password|password for db|yes| -|segmentTable|table to use in DB|yes| +|type|"db" is the only value available.|yes| +|connectURI|A valid JDBC url to MySQL.|yes| +|user|Username for db.|yes| +|password|password for db.|yes| +|segmentTable|Table to use in DB.|yes| These properties should parrot what you have configured for your [Coordinator](Coordinator.html). @@ -177,15 +178,17 @@ java -Xmx2g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath lib/*:config/ov This will start up a very simple local indexing service. For more complex deployments of the indexing service, see [here](Indexing-Service.html). -The schema of the Hadoop Index Task is very similar to the schema for the Hadoop Index Config. A sample Hadoop index task is shown below: +The schema of the Hadoop Index Task contains a task "type" and a Hadoop Index Config. A sample Hadoop index task is shown below: ```json { "type" : "index_hadoop", "config": { "dataSource" : "example", - "timestampColumn" : "timestamp", - "timestampFormat" : "auto", + "timestampSpec" : { + "timestampColumn" : "timestamp", + "timestampFormat" : "auto" + }, "dataSpec" : { "format" : "json", "dimensions" : ["dim1","dim2","dim3"] @@ -224,8 +227,23 @@ The schema of the Hadoop Index Task is very similar to the schema for the Hadoop |property|description|required?| |--------|-----------|---------| -|type|"index_hadoop"|yes| -|config|a Hadoop Index Config|yes| +|type|This should be "index_hadoop".|yes| +|config|A Hadoop Index Config.|yes| The Hadoop Index Config submitted as part of an Hadoop Index Task is identical to the Hadoop Index Config used by the `HadoopBatchIndexer` except that three fields must be omitted: `segmentOutputPath`, `workingPath`, `updaterJobSpec`. The Indexing Service takes care of setting these fields internally. +To run the task: + +``` +curl -X 'POST' -H 'Content-Type:application/json' -d @example_index_hadoop_task.json localhost:8087/druid/indexer/v1/task +``` + +If the task succeeds, you should see in the logs of the indexing service: + +``` +2013-10-16 16:38:31,945 INFO [pool-6-thread-1] io.druid.indexing.overlord.exec.TaskConsumer - Task SUCCESS: HadoopIndexTask... +``` + +Having Problems? +---------------- +Getting data into Druid can definitely be difficult for first time users. Please don't hesitate to ask questions in our IRC channel or on our [google groups page](https://groups.google.com/forum/#!forum/druid-development). diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java index cd1d3bb753c..a832d4511cd 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java @@ -81,13 +81,13 @@ import java.util.Set; /** * Determines appropriate ShardSpecs for a job by determining whether or not partitioning is necessary, and if so, * choosing the best dimension that satisfies the criteria: - * + *

*

    - *
  • Must have exactly one value per row.
  • - *
  • Must not generate oversized partitions. A dimension with N rows having the same value will necessarily - * put all those rows in the same partition, and that partition may be much larger than the target size.
  • + *
  • Must have exactly one value per row.
  • + *
  • Must not generate oversized partitions. A dimension with N rows having the same value will necessarily + * put all those rows in the same partition, and that partition may be much larger than the target size.
  • *
- * + *

* "Best" means a very high cardinality dimension, or, if none exist, the dimension that minimizes variation of * segment size relative to the target. */ @@ -125,7 +125,7 @@ public class DeterminePartitionsJob implements Jobby * in the final segment. */ - if(!config.getPartitionsSpec().isAssumeGrouped()) { + if (!config.getPartitionsSpec().isAssumeGrouped()) { final Job groupByJob = new Job( new Configuration(), String.format("%s-determine_partitions_groupby-%s", config.getDataSource(), config.getIntervals()) @@ -150,7 +150,7 @@ public class DeterminePartitionsJob implements Jobby groupByJob.submit(); log.info("Job %s submitted, status available at: %s", groupByJob.getJobName(), groupByJob.getTrackingURL()); - if(!groupByJob.waitForCompletion(true)) { + if (!groupByJob.waitForCompletion(true)) { log.error("Job failed: %s", groupByJob.getJobID()); return false; } @@ -170,7 +170,7 @@ public class DeterminePartitionsJob implements Jobby injectSystemProperties(dimSelectionJob); - if(!config.getPartitionsSpec().isAssumeGrouped()) { + if (!config.getPartitionsSpec().isAssumeGrouped()) { // Read grouped data from the groupByJob. dimSelectionJob.setMapperClass(DeterminePartitionsDimSelectionPostGroupByMapper.class); dimSelectionJob.setInputFormatClass(SequenceFileInputFormat.class); @@ -203,7 +203,7 @@ public class DeterminePartitionsJob implements Jobby dimSelectionJob.getTrackingURL() ); - if(!dimSelectionJob.waitForCompletion(true)) { + if (!dimSelectionJob.waitForCompletion(true)) { log.error("Job failed: %s", dimSelectionJob.getJobID().toString()); return false; } @@ -237,15 +237,15 @@ public class DeterminePartitionsJob implements Jobby } shardSpecs.put(bucket, actualSpecs); - } - else { + } else { log.info("Path[%s] didn't exist!?", partitionInfoPath); } } config.setShardSpecs(shardSpecs); return true; - } catch(Exception e) { + } + catch (Exception e) { throw Throwables.propagate(e); } } @@ -271,9 +271,9 @@ public class DeterminePartitionsJob implements Jobby { // Create group key, there are probably more efficient ways of doing this final Map> dims = Maps.newTreeMap(); - for(final String dim : inputRow.getDimensions()) { + for (final String dim : inputRow.getDimensions()) { final Set dimValues = ImmutableSortedSet.copyOf(inputRow.getDimension(dim)); - if(dimValues.size() > 0) { + if (dimValues.size() > 0) { dims.put(dim, dimValues); } } @@ -314,7 +314,7 @@ public class DeterminePartitionsJob implements Jobby protected void setup(Context context) throws IOException, InterruptedException { - final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration()); + final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfigBuilder.fromConfiguration(context.getConfiguration()); final String partitionDimension = config.getPartitionDimension(); helper = new DeterminePartitionsDimSelectionMapperHelper(config, partitionDimension); } @@ -346,7 +346,7 @@ public class DeterminePartitionsJob implements Jobby throws IOException, InterruptedException { super.setup(context); - final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration()); + final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfigBuilder.fromConfiguration(context.getConfiguration()); final String partitionDimension = config.getPartitionDimension(); helper = new DeterminePartitionsDimSelectionMapperHelper(config, partitionDimension); } @@ -359,7 +359,7 @@ public class DeterminePartitionsJob implements Jobby ) throws IOException, InterruptedException { final Map> dims = Maps.newHashMap(); - for(final String dim : inputRow.getDimensions()) { + for (final String dim : inputRow.getDimensions()) { dims.put(dim, inputRow.getDimension(dim)); } helper.emitDimValueCounts(context, new DateTime(inputRow.getTimestampFromEpoch()), dims); @@ -383,9 +383,9 @@ public class DeterminePartitionsJob implements Jobby final ImmutableMap.Builder timeIndexBuilder = ImmutableMap.builder(); int idx = 0; - for(final Interval bucketInterval: config.getGranularitySpec().bucketIntervals()) { + for (final Interval bucketInterval : config.getGranularitySpec().bucketIntervals()) { timeIndexBuilder.put(bucketInterval.getStart(), idx); - idx ++; + idx++; } this.intervalIndexes = timeIndexBuilder.build(); @@ -399,7 +399,7 @@ public class DeterminePartitionsJob implements Jobby { final Optional maybeInterval = config.getGranularitySpec().bucketInterval(timestamp); - if(!maybeInterval.isPresent()) { + if (!maybeInterval.isPresent()) { throw new ISE("WTF?! No bucket found for timestamp: %s", timestamp); } @@ -414,13 +414,13 @@ public class DeterminePartitionsJob implements Jobby // Emit row-counter value. write(context, groupKey, new DimValueCount("", "", 1)); - for(final Map.Entry> dimAndValues : dims.entrySet()) { + for (final Map.Entry> dimAndValues : dims.entrySet()) { final String dim = dimAndValues.getKey(); - if(partitionDimension == null || partitionDimension.equals(dim)) { + if (partitionDimension == null || partitionDimension.equals(dim)) { final Iterable dimValues = dimAndValues.getValue(); - if(Iterables.size(dimValues) == 1) { + if (Iterables.size(dimValues) == 1) { // Emit this value. write(context, groupKey, new DimValueCount(dim, Iterables.getOnlyElement(dimValues), 1)); } else { @@ -433,7 +433,7 @@ public class DeterminePartitionsJob implements Jobby } public static class DeterminePartitionsDimSelectionPartitioner - extends Partitioner + extends Partitioner { @Override public int getPartition(BytesWritable bytesWritable, Text text, int numPartitions) @@ -463,7 +463,7 @@ public class DeterminePartitionsJob implements Jobby if (config == null) { synchronized (DeterminePartitionsDimSelectionBaseReducer.class) { if (config == null) { - config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration()); + config = HadoopDruidIndexerConfigBuilder.fromConfiguration(context.getConfiguration()); } } } @@ -561,7 +561,7 @@ public class DeterminePartitionsJob implements Jobby final DimValueCount firstDvc = iterator.next(); final int totalRows = firstDvc.numRows; - if(!firstDvc.dim.equals("") || !firstDvc.value.equals("")) { + if (!firstDvc.dim.equals("") || !firstDvc.value.equals("")) { throw new IllegalStateException("WTF?! Expected total row indicator on first k/v pair!"); } @@ -574,10 +574,10 @@ public class DeterminePartitionsJob implements Jobby // We'll store possible partitions in here final Map dimPartitionss = Maps.newHashMap(); - while(iterator.hasNext()) { + while (iterator.hasNext()) { final DimValueCount dvc = iterator.next(); - if(currentDimPartitions == null || !currentDimPartitions.dim.equals(dvc.dim)) { + if (currentDimPartitions == null || !currentDimPartitions.dim.equals(dvc.dim)) { // Starting a new dimension! Exciting! currentDimPartitions = new DimPartitions(dvc.dim); currentDimPartition = new DimPartition(); @@ -586,17 +586,17 @@ public class DeterminePartitionsJob implements Jobby } // Respect poisoning - if(!currentDimSkip && dvc.numRows < 0) { + if (!currentDimSkip && dvc.numRows < 0) { log.info("Cannot partition on multi-valued dimension: %s", dvc.dim); currentDimSkip = true; } - if(currentDimSkip) { + if (currentDimSkip) { continue; } // See if we need to cut a new partition ending immediately before this dimension value - if(currentDimPartition.rows > 0 && currentDimPartition.rows + dvc.numRows >= config.getTargetPartitionSize()) { + if (currentDimPartition.rows > 0 && currentDimPartition.rows + dvc.numRows >= config.getTargetPartitionSize()) { final ShardSpec shardSpec = new SingleDimensionShardSpec( currentDimPartitions.dim, currentDimPartitionStart, @@ -618,20 +618,20 @@ public class DeterminePartitionsJob implements Jobby } // Update counters - currentDimPartition.cardinality ++; + currentDimPartition.cardinality++; currentDimPartition.rows += dvc.numRows; - if(!iterator.hasNext() || !currentDimPartitions.dim.equals(iterator.peek().dim)) { + if (!iterator.hasNext() || !currentDimPartitions.dim.equals(iterator.peek().dim)) { // Finalize the current dimension - if(currentDimPartition.rows > 0) { + if (currentDimPartition.rows > 0) { // One more shard to go final ShardSpec shardSpec; if (currentDimPartitions.partitions.isEmpty()) { shardSpec = new NoneShardSpec(); } else { - if(currentDimPartition.rows < config.getTargetPartitionSize() * SHARD_COMBINE_THRESHOLD) { + if (currentDimPartition.rows < config.getTargetPartitionSize() * SHARD_COMBINE_THRESHOLD) { // Combine with previous shard final DimPartition previousDimPartition = currentDimPartitions.partitions.remove( currentDimPartitions.partitions.size() - 1 @@ -685,7 +685,7 @@ public class DeterminePartitionsJob implements Jobby } // Choose best dimension - if(dimPartitionss.isEmpty()) { + if (dimPartitionss.isEmpty()) { throw new ISE("No suitable partitioning dimension found!"); } @@ -694,8 +694,8 @@ public class DeterminePartitionsJob implements Jobby DimPartitions minDistancePartitions = null; DimPartitions maxCardinalityPartitions = null; - for(final DimPartitions dimPartitions : dimPartitionss.values()) { - if(dimPartitions.getRows() != totalRows) { + for (final DimPartitions dimPartitions : dimPartitionss.values()) { + if (dimPartitions.getRows() != totalRows) { log.info( "Dimension[%s] is not present in all rows (row count %,d != expected row count %,d)", dimPartitions.dim, @@ -708,32 +708,32 @@ public class DeterminePartitionsJob implements Jobby // Make sure none of these shards are oversized boolean oversized = false; - for(final DimPartition partition : dimPartitions.partitions) { - if(partition.rows > config.getMaxPartitionSize()) { + for (final DimPartition partition : dimPartitions.partitions) { + if (partition.rows > config.getMaxPartitionSize()) { log.info("Dimension[%s] has an oversized shard: %s", dimPartitions.dim, partition.shardSpec); oversized = true; } } - if(oversized) { + if (oversized) { continue; } final int cardinality = dimPartitions.getCardinality(); final long distance = dimPartitions.getDistanceSquaredFromTarget(config.getTargetPartitionSize()); - if(cardinality > maxCardinality) { + if (cardinality > maxCardinality) { maxCardinality = cardinality; maxCardinalityPartitions = dimPartitions; } - if(distance < minDistance) { + if (distance < minDistance) { minDistance = distance; minDistancePartitions = dimPartitions; } } - if(maxCardinalityPartitions == null) { + if (maxCardinalityPartitions == null) { throw new ISE("No suitable partitioning dimension found!"); } @@ -821,7 +821,7 @@ public class DeterminePartitionsJob implements Jobby public int getCardinality() { int sum = 0; - for(final DimPartition dimPartition : partitions) { + for (final DimPartition dimPartition : partitions) { sum += dimPartition.cardinality; } return sum; @@ -830,7 +830,7 @@ public class DeterminePartitionsJob implements Jobby public long getDistanceSquaredFromTarget(long target) { long distance = 0; - for(final DimPartition dimPartition : partitions) { + for (final DimPartition dimPartition : partitions) { distance += (dimPartition.rows - target) * (dimPartition.rows - target); } @@ -841,7 +841,7 @@ public class DeterminePartitionsJob implements Jobby public int getRows() { int sum = 0; - for(final DimPartition dimPartition : partitions) { + for (final DimPartition dimPartition : partitions) { sum += dimPartition.rows; } return sum; @@ -892,7 +892,11 @@ public class DeterminePartitionsJob implements Jobby throws IOException, InterruptedException { context.write( - new SortableBytes(groupKey, tabJoiner.join(dimValueCount.dim, dimValueCount.value).getBytes(HadoopDruidIndexerConfig.javaNativeCharset)).toBytesWritable(), + new SortableBytes( + groupKey, tabJoiner.join(dimValueCount.dim, dimValueCount.value).getBytes( + HadoopDruidIndexerConfig.javaNativeCharset + ) + ).toBytesWritable(), dimValueCount.toText() ); } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java index ec51d872f33..19586c14113 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java @@ -21,7 +21,6 @@ package io.druid.indexer; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Joiner; @@ -68,7 +67,6 @@ import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.format.ISODateTimeFormat; -import java.io.File; import java.io.IOException; import java.nio.charset.Charset; import java.util.Collections; @@ -80,13 +78,13 @@ import java.util.Set; */ public class HadoopDruidIndexerConfig { - public static final Charset javaNativeCharset = Charset.forName("Unicode"); - - public static final Splitter tabSplitter = Splitter.on("\t"); - public static final Joiner tabJoiner = Joiner.on("\t"); - + private static final Logger log = new Logger(HadoopDruidIndexerConfig.class); private static final Injector injector; + public static final String CONFIG_PROPERTY = "druid.indexer.config"; + public static final Charset javaNativeCharset = Charset.forName("Unicode"); + public static final Splitter tabSplitter = Splitter.on("\t"); + public static final Joiner tabJoiner = Joiner.on("\t"); public static final ObjectMapper jsonMapper; static { @@ -113,85 +111,33 @@ public class HadoopDruidIndexerConfig INVALID_ROW_COUNTER } - public static HadoopDruidIndexerConfig fromMap(Map argSpec) - { - return jsonMapper.convertValue(argSpec, HadoopDruidIndexerConfig.class); - } - - @SuppressWarnings("unchecked") - public static HadoopDruidIndexerConfig fromFile(File file) - { - try { - return fromMap((Map) jsonMapper.readValue(file, new TypeReference>(){})); - } - catch (IOException e) { - throw Throwables.propagate(e); - } - } - - @SuppressWarnings("unchecked") - public static HadoopDruidIndexerConfig fromString(String str) - { - try { - return fromMap( - (Map) jsonMapper.readValue( - str, new TypeReference>() - { - } - ) - ); - } - catch (IOException e) { - throw Throwables.propagate(e); - } - } - - public static HadoopDruidIndexerConfig fromConfiguration(Configuration conf) - { - final HadoopDruidIndexerConfig retVal = fromString(conf.get(CONFIG_PROPERTY)); - retVal.verify(); - return retVal; - } - - private static final Logger log = new Logger(HadoopDruidIndexerConfig.class); - - private static final String CONFIG_PROPERTY = "druid.indexer.config"; - private volatile String dataSource; - private volatile String timestampColumnName; - private volatile String timestampFormat; + private volatile TimestampSpec timestampSpec; private volatile DataSpec dataSpec; - @Deprecated - private volatile Granularity segmentGranularity; private volatile GranularitySpec granularitySpec; private volatile PathSpec pathSpec; - private volatile String jobOutputDir; - private volatile String segmentOutputDir; - private volatile String version = new DateTime().toString(); + private volatile String workingPath; + private volatile String segmentOutputPath; + private volatile String version; private volatile PartitionsSpec partitionsSpec; - private volatile boolean leaveIntermediate = false; - private volatile boolean cleanupOnFailure = true; - private volatile Map> shardSpecs = ImmutableMap.of(); - private volatile boolean overwriteFiles = false; + private volatile boolean leaveIntermediate; + private volatile boolean cleanupOnFailure; + private volatile Map> shardSpecs; + private volatile boolean overwriteFiles; private volatile DataRollupSpec rollupSpec; private volatile DbUpdaterJobSpec updaterJobSpec; - private volatile boolean ignoreInvalidRows = false; + private volatile boolean ignoreInvalidRows; @JsonCreator public HadoopDruidIndexerConfig( - final @JsonProperty("intervals") List intervals, final @JsonProperty("dataSource") String dataSource, - final @JsonProperty("timestampColumn") String timestampColumnName, - final @JsonProperty("timestampFormat") String timestampFormat, + final @JsonProperty("timestampSpec") TimestampSpec timestampSpec, final @JsonProperty("dataSpec") DataSpec dataSpec, - final @JsonProperty("segmentGranularity") Granularity segmentGranularity, final @JsonProperty("granularitySpec") GranularitySpec granularitySpec, final @JsonProperty("pathSpec") PathSpec pathSpec, - final @JsonProperty("workingPath") String jobOutputDir, - final @JsonProperty("segmentOutputPath") String segmentOutputDir, + final @JsonProperty("workingPath") String workingPath, + final @JsonProperty("segmentOutputPath") String segmentOutputPath, final @JsonProperty("version") String version, - final @JsonProperty("partitionDimension") String partitionDimension, - final @JsonProperty("targetPartitionSize") Long targetPartitionSize, final @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec, final @JsonProperty("leaveIntermediate") boolean leaveIntermediate, final @JsonProperty("cleanupOnFailure") Boolean cleanupOnFailure, @@ -199,48 +145,51 @@ public class HadoopDruidIndexerConfig final @JsonProperty("overwriteFiles") boolean overwriteFiles, final @JsonProperty("rollupSpec") DataRollupSpec rollupSpec, final @JsonProperty("updaterJobSpec") DbUpdaterJobSpec updaterJobSpec, - final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows + final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows, + // These fields are deprecated and will be removed in the future + final @JsonProperty("timestampColumn") String timestampColumn, + final @JsonProperty("timestampFormat") String timestampFormat, + final @JsonProperty("intervals") List intervals, + final @JsonProperty("segmentGranularity") Granularity segmentGranularity, + final @JsonProperty("partitionDimension") String partitionDimension, + final @JsonProperty("targetPartitionSize") Long targetPartitionSize ) { this.dataSource = dataSource; - this.timestampColumnName = (timestampColumnName == null) ? null : timestampColumnName.toLowerCase(); - this.timestampFormat = timestampFormat; + this.timestampSpec = (timestampSpec == null) ? new TimestampSpec(timestampColumn, timestampFormat) : timestampSpec; this.dataSpec = dataSpec; - this.granularitySpec = granularitySpec; this.pathSpec = pathSpec; - this.jobOutputDir = jobOutputDir; - this.segmentOutputDir = segmentOutputDir; + this.workingPath = workingPath; + this.segmentOutputPath = segmentOutputPath; this.version = version == null ? new DateTime().toString() : version; - this.partitionsSpec = partitionsSpec; this.leaveIntermediate = leaveIntermediate; this.cleanupOnFailure = (cleanupOnFailure == null ? true : cleanupOnFailure); - this.shardSpecs = shardSpecs; + this.shardSpecs = (shardSpecs == null ? ImmutableMap.>of() : shardSpecs); this.overwriteFiles = overwriteFiles; this.rollupSpec = rollupSpec; this.updaterJobSpec = updaterJobSpec; this.ignoreInvalidRows = ignoreInvalidRows; - if(partitionsSpec != null) { + if (partitionsSpec != null) { Preconditions.checkArgument( partitionDimension == null && targetPartitionSize == null, "Cannot mix partitionsSpec with partitionDimension/targetPartitionSize" ); - this.partitionsSpec = partitionsSpec; } else { // Backwards compatibility this.partitionsSpec = new PartitionsSpec(partitionDimension, targetPartitionSize, null, false); } - if(granularitySpec != null) { + if (granularitySpec != null) { Preconditions.checkArgument( segmentGranularity == null && intervals == null, "Cannot mix granularitySpec with segmentGranularity/intervals" ); + this.granularitySpec = granularitySpec; } else { // Backwards compatibility - this.segmentGranularity = segmentGranularity; - if(segmentGranularity != null && intervals != null) { + if (segmentGranularity != null && intervals != null) { this.granularitySpec = new UniformGranularitySpec(segmentGranularity, intervals); } } @@ -253,21 +202,6 @@ public class HadoopDruidIndexerConfig { } - public List getIntervals() - { - return JodaUtils.condenseIntervals(getGranularitySpec().bucketIntervals()); - } - - @Deprecated - public void setIntervals(List intervals) - { - Preconditions.checkState(this.granularitySpec == null, "Cannot mix setIntervals with granularitySpec"); - Preconditions.checkState(this.segmentGranularity != null, "Cannot use setIntervals without segmentGranularity"); - - // For backwards compatibility - this.granularitySpec = new UniformGranularitySpec(this.segmentGranularity, intervals); - } - @JsonProperty public String getDataSource() { @@ -279,31 +213,15 @@ public class HadoopDruidIndexerConfig this.dataSource = dataSource.toLowerCase(); } - @JsonProperty("timestampColumn") - public String getTimestampColumnName() - { - return timestampColumnName; - } - - public void setTimestampColumnName(String timestampColumnName) - { - this.timestampColumnName = timestampColumnName.toLowerCase(); - } - - @JsonProperty() - public String getTimestampFormat() - { - return timestampFormat; - } - - public void setTimestampFormat(String timestampFormat) - { - this.timestampFormat = timestampFormat; - } - + @JsonProperty public TimestampSpec getTimestampSpec() { - return new TimestampSpec(timestampColumnName, timestampFormat); + return timestampSpec; + } + + public void setTimestampSpec(TimestampSpec timestampSpec) + { + this.timestampSpec = timestampSpec; } @JsonProperty @@ -317,32 +235,6 @@ public class HadoopDruidIndexerConfig this.dataSpec = new ToLowercaseDataSpec(dataSpec); } - public StringInputRowParser getParser() - { - final List dimensionExclusions; - - if(getDataSpec().hasCustomDimensions()) { - dimensionExclusions = null; - } else { - dimensionExclusions = Lists.newArrayList(); - dimensionExclusions.add(getTimestampColumnName()); - dimensionExclusions.addAll( - Lists.transform( - getRollupSpec().getAggs(), new Function() - { - @Override - public String apply(AggregatorFactory aggregatorFactory) - { - return aggregatorFactory.getName(); - } - } - ) - ); - } - - return new StringInputRowParser(getTimestampSpec(), getDataSpec(), dimensionExclusions); - } - @JsonProperty public GranularitySpec getGranularitySpec() { @@ -354,17 +246,6 @@ public class HadoopDruidIndexerConfig this.granularitySpec = granularitySpec; } - @JsonProperty - public PartitionsSpec getPartitionsSpec() - { - return partitionsSpec; - } - - public void setPartitionsSpec(PartitionsSpec partitionsSpec) - { - this.partitionsSpec = partitionsSpec; - } - @JsonProperty public PathSpec getPathSpec() { @@ -376,26 +257,26 @@ public class HadoopDruidIndexerConfig this.pathSpec = pathSpec; } - @JsonProperty("workingPath") - public String getJobOutputDir() + @JsonProperty + public String getWorkingPath() { - return jobOutputDir; + return workingPath; } - public void setJobOutputDir(String jobOutputDir) + public void setWorkingPath(String workingPath) { - this.jobOutputDir = jobOutputDir; + this.workingPath = workingPath; } - @JsonProperty("segmentOutputPath") - public String getSegmentOutputDir() + @JsonProperty + public String getSegmentOutputPath() { - return segmentOutputDir; + return segmentOutputPath; } - public void setSegmentOutputDir(String segmentOutputDir) + public void setSegmentOutputPath(String segmentOutputPath) { - this.segmentOutputDir = segmentOutputDir; + this.segmentOutputPath = segmentOutputPath; } @JsonProperty @@ -409,29 +290,15 @@ public class HadoopDruidIndexerConfig this.version = version; } - public String getPartitionDimension() + @JsonProperty + public PartitionsSpec getPartitionsSpec() { - return partitionsSpec.getPartitionDimension(); + return partitionsSpec; } - public boolean partitionByDimension() + public void setPartitionsSpec(PartitionsSpec partitionsSpec) { - return partitionsSpec.isDeterminingPartitions(); - } - - public Long getTargetPartitionSize() - { - return partitionsSpec.getTargetPartitionSize(); - } - - public long getMaxPartitionSize() - { - return partitionsSpec.getMaxPartitionSize(); - } - - public boolean isUpdaterJobSpecSet() - { - return (updaterJobSpec != null); + this.partitionsSpec = partitionsSpec; } @JsonProperty @@ -511,6 +378,72 @@ public class HadoopDruidIndexerConfig this.ignoreInvalidRows = ignoreInvalidRows; } + public List getIntervals() + { + return JodaUtils.condenseIntervals(getGranularitySpec().bucketIntervals()); + } + + public String getPartitionDimension() + { + return partitionsSpec.getPartitionDimension(); + } + + public boolean partitionByDimension() + { + return partitionsSpec.isDeterminingPartitions(); + } + + public Long getTargetPartitionSize() + { + return partitionsSpec.getTargetPartitionSize(); + } + + public long getMaxPartitionSize() + { + return partitionsSpec.getMaxPartitionSize(); + } + + public boolean isUpdaterJobSpecSet() + { + return (updaterJobSpec != null); + } + + public StringInputRowParser getParser() + { + final List dimensionExclusions; + + if (getDataSpec().hasCustomDimensions()) { + dimensionExclusions = null; + } else { + dimensionExclusions = Lists.newArrayList(); + dimensionExclusions.add(timestampSpec.getTimestampColumn()); + dimensionExclusions.addAll( + Lists.transform( + getRollupSpec().getAggs(), new Function() + { + @Override + public String apply(AggregatorFactory aggregatorFactory) + { + return aggregatorFactory.getName(); + } + } + ) + ); + } + + return new StringInputRowParser(getTimestampSpec(), getDataSpec(), dimensionExclusions); + } + + public HadoopyShardSpec getShardSpec(Bucket bucket) + { + return shardSpecs.get(bucket.time).get(bucket.partitionNum); + } + + public Job addInputPaths(Job job) throws IOException + { + return getPathSpec().addInputPaths(this, job); + } + /******************************************** Granularity/Bucket Helper Methods ********************************************/ @@ -590,11 +523,6 @@ public class HadoopDruidIndexerConfig ); } - public HadoopyShardSpec getShardSpec(Bucket bucket) - { - return shardSpecs.get(bucket.time).get(bucket.partitionNum); - } - /****************************************** Path helper logic ******************************************/ @@ -606,7 +534,7 @@ public class HadoopDruidIndexerConfig */ public Path makeIntermediatePath() { - return new Path(String.format("%s/%s/%s", getJobOutputDir(), dataSource, getVersion().replace(":", ""))); + return new Path(String.format("%s/%s/%s", getWorkingPath(), getDataSource(), getVersion().replace(":", ""))); } public Path makeSegmentPartitionInfoPath(Bucket bucket) @@ -638,38 +566,33 @@ public class HadoopDruidIndexerConfig return new Path(makeDescriptorInfoDir(), String.format("%s.json", segment.getIdentifier().replace(":", ""))); } - public Path makeSegmentOutputPath(FileSystem fileSystem, Bucket bucket) - { - final Interval bucketInterval = getGranularitySpec().bucketInterval(bucket.time).get(); - if (fileSystem instanceof DistributedFileSystem) - { - return new Path( - String.format( - "%s/%s/%s_%s/%s/%s", - getSegmentOutputDir(), - dataSource, - bucketInterval.getStart().toString(ISODateTimeFormat.basicDateTime()), - bucketInterval.getEnd().toString(ISODateTimeFormat.basicDateTime()), - getVersion().replace(":", "_"), - bucket.partitionNum - ) - ); - } - return new Path( - String.format( - "%s/%s/%s_%s/%s/%s", - getSegmentOutputDir(), - dataSource, - bucketInterval.getStart().toString(), - bucketInterval.getEnd().toString(), - getVersion(), - bucket.partitionNum - )); - } - - public Job addInputPaths(Job job) throws IOException + public Path makeSegmentOutputPath(FileSystem fileSystem, Bucket bucket) { - return pathSpec.addInputPaths(this, job); + final Interval bucketInterval = getGranularitySpec().bucketInterval(bucket.time).get(); + if (fileSystem instanceof DistributedFileSystem) { + return new Path( + String.format( + "%s/%s/%s_%s/%s/%s", + getSegmentOutputPath(), + getDataSource(), + bucketInterval.getStart().toString(ISODateTimeFormat.basicDateTime()), + bucketInterval.getEnd().toString(ISODateTimeFormat.basicDateTime()), + getVersion().replace(":", "_"), + bucket.partitionNum + ) + ); + } + return new Path( + String.format( + "%s/%s/%s_%s/%s/%s", + getSegmentOutputPath(), + getDataSource(), + bucketInterval.getStart().toString(), + bucketInterval.getEnd().toString(), + getVersion(), + bucket.partitionNum + ) + ); } public void intoConfiguration(Job job) @@ -677,7 +600,7 @@ public class HadoopDruidIndexerConfig Configuration conf = job.getConfiguration(); try { - conf.set(CONFIG_PROPERTY, jsonMapper.writeValueAsString(this)); + conf.set(HadoopDruidIndexerConfig.CONFIG_PROPERTY, HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(this)); } catch (IOException e) { throw Throwables.propagate(e); @@ -695,12 +618,11 @@ public class HadoopDruidIndexerConfig Preconditions.checkNotNull(dataSource, "dataSource"); Preconditions.checkNotNull(dataSpec, "dataSpec"); - Preconditions.checkNotNull(timestampColumnName, "timestampColumn"); - Preconditions.checkNotNull(timestampFormat, "timestampFormat"); + Preconditions.checkNotNull(timestampSpec, "timestampSpec"); Preconditions.checkNotNull(granularitySpec, "granularitySpec"); Preconditions.checkNotNull(pathSpec, "pathSpec"); - Preconditions.checkNotNull(jobOutputDir, "workingPath"); - Preconditions.checkNotNull(segmentOutputDir, "segmentOutputPath"); + Preconditions.checkNotNull(workingPath, "workingPath"); + Preconditions.checkNotNull(segmentOutputPath, "segmentOutputPath"); Preconditions.checkNotNull(version, "version"); Preconditions.checkNotNull(rollupSpec, "rollupSpec"); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfigBuilder.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfigBuilder.java new file mode 100644 index 00000000000..b077cf2fc22 --- /dev/null +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfigBuilder.java @@ -0,0 +1,278 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.indexer; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import io.druid.data.input.impl.DataSpec; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.indexer.granularity.GranularitySpec; +import io.druid.indexer.partitions.PartitionsSpec; +import io.druid.indexer.path.PathSpec; +import io.druid.indexer.rollup.DataRollupSpec; +import io.druid.indexer.updater.DbUpdaterJobSpec; +import org.apache.hadoop.conf.Configuration; +import org.joda.time.DateTime; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + */ +public class HadoopDruidIndexerConfigBuilder +{ + public static HadoopDruidIndexerConfig fromSchema(HadoopDruidIndexerSchema schema) + { + return HadoopDruidIndexerConfig.jsonMapper.convertValue(schema, HadoopDruidIndexerConfig.class); + } + + public static HadoopDruidIndexerConfig fromMap(Map argSpec) + { + return HadoopDruidIndexerConfig.jsonMapper.convertValue(argSpec, HadoopDruidIndexerConfig.class); + } + + @SuppressWarnings("unchecked") + public static HadoopDruidIndexerConfig fromFile(File file) + { + try { + return fromMap( + (Map) HadoopDruidIndexerConfig.jsonMapper.readValue( + file, new TypeReference>() + { + } + ) + ); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + + @SuppressWarnings("unchecked") + public static HadoopDruidIndexerConfig fromString(String str) + { + try { + return fromMap( + (Map) HadoopDruidIndexerConfig.jsonMapper.readValue( + str, new TypeReference>() + { + } + ) + ); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + + public static HadoopDruidIndexerConfig fromConfiguration(Configuration conf) + { + final HadoopDruidIndexerConfig retVal = fromString(conf.get(HadoopDruidIndexerConfig.CONFIG_PROPERTY)); + retVal.verify(); + return retVal; + } + + private volatile String dataSource; + private volatile TimestampSpec timestampSpec; + private volatile DataSpec dataSpec; + private volatile GranularitySpec granularitySpec; + private volatile PathSpec pathSpec; + private volatile String workingPath; + private volatile String segmentOutputPath; + private volatile String version; + private volatile PartitionsSpec partitionsSpec; + private volatile boolean leaveIntermediate; + private volatile boolean cleanupOnFailure; + private volatile Map> shardSpecs; + private volatile boolean overwriteFiles; + private volatile DataRollupSpec rollupSpec; + private volatile DbUpdaterJobSpec updaterJobSpec; + private volatile boolean ignoreInvalidRows; + + public HadoopDruidIndexerConfigBuilder() + { + this.dataSource = null; + this.timestampSpec = null; + this.dataSpec = null; + this.granularitySpec = null; + this.pathSpec = null; + this.workingPath = null; + this.segmentOutputPath = null; + this.version = new DateTime().toString(); + this.partitionsSpec = null; + this.leaveIntermediate = false; + this.cleanupOnFailure = true; + this.shardSpecs = ImmutableMap.of(); + this.overwriteFiles = false; + this.rollupSpec = null; + this.updaterJobSpec = null; + this.ignoreInvalidRows = false; + } + + public HadoopDruidIndexerConfigBuilder withDataSource(String dataSource) + { + this.dataSource = dataSource; + return this; + } + + public HadoopDruidIndexerConfigBuilder withTimestampSpec(TimestampSpec timestampSpec) + { + this.timestampSpec = timestampSpec; + return this; + } + + public HadoopDruidIndexerConfigBuilder withDataSpec(DataSpec dataSpec) + { + this.dataSpec = dataSpec; + return this; + } + + public HadoopDruidIndexerConfigBuilder withGranularitySpec(GranularitySpec granularitySpec) + { + this.granularitySpec = granularitySpec; + return this; + + } + + public HadoopDruidIndexerConfigBuilder withPathSpec(PathSpec pathSpec) + { + this.pathSpec = pathSpec; + return this; + } + + public HadoopDruidIndexerConfigBuilder withWorkingPath(String workingPath) + { + this.workingPath = workingPath; + return this; + } + + public HadoopDruidIndexerConfigBuilder withSegmentOutputPath(String segmentOutputPath) + { + this.segmentOutputPath = segmentOutputPath; + return this; + } + + public HadoopDruidIndexerConfigBuilder withVersion(String version) + { + this.version = version; + return this; + } + + public HadoopDruidIndexerConfigBuilder withPartitionsSpec(PartitionsSpec partitionsSpec) + { + this.partitionsSpec = partitionsSpec; + return this; + } + + public HadoopDruidIndexerConfigBuilder withLeaveIntermediate(boolean leaveIntermediate) + { + this.leaveIntermediate = leaveIntermediate; + return this; + } + + public HadoopDruidIndexerConfigBuilder withCleanupOnFailure(boolean cleanupOnFailure) + { + this.cleanupOnFailure = cleanupOnFailure; + return this; + } + + public HadoopDruidIndexerConfigBuilder withShardSpecs(Map> shardSpecs) + { + this.shardSpecs = shardSpecs; + return this; + } + + public HadoopDruidIndexerConfigBuilder withOverwriteFiles(boolean overwriteFiles) + { + this.overwriteFiles = overwriteFiles; + return this; + } + + public HadoopDruidIndexerConfigBuilder withRollupSpec(DataRollupSpec rollupSpec) + { + this.rollupSpec = rollupSpec; + return this; + } + + public HadoopDruidIndexerConfigBuilder withUpdaterJobSpec(DbUpdaterJobSpec updaterJobSpec) + { + this.updaterJobSpec = updaterJobSpec; + return this; + } + + public HadoopDruidIndexerConfigBuilder withIgnoreInvalidRows(boolean ignoreInvalidRows) + { + this.ignoreInvalidRows = ignoreInvalidRows; + return this; + } + + public HadoopDruidIndexerConfigBuilder withSchema(HadoopDruidIndexerSchema schema) + { + this.dataSource = schema.getDataSource(); + this.timestampSpec = schema.getTimestampSpec(); + this.dataSpec = schema.getDataSpec(); + this.granularitySpec = schema.getGranularitySpec(); + this.pathSpec = HadoopDruidIndexerConfig.jsonMapper.convertValue(schema.getPathSpec(), PathSpec.class); + this.workingPath = schema.getWorkingPath(); + this.segmentOutputPath = schema.getSegmentOutputPath(); + this.version = schema.getVersion(); + this.partitionsSpec = schema.getPartitionsSpec(); + this.leaveIntermediate = schema.isLeaveIntermediate(); + this.cleanupOnFailure = schema.isCleanupOnFailure(); + this.shardSpecs = schema.getShardSpecs(); + this.overwriteFiles = schema.isOverwriteFiles(); + this.rollupSpec = schema.getRollupSpec(); + this.updaterJobSpec = schema.getUpdaterJobSpec(); + this.ignoreInvalidRows = schema.isIgnoreInvalidRows(); + + return this; + } + + public HadoopDruidIndexerConfig build() + { + return new HadoopDruidIndexerConfig( + dataSource, + timestampSpec, + dataSpec, + granularitySpec, + pathSpec, + workingPath, + segmentOutputPath, + version, + partitionsSpec, + leaveIntermediate, + cleanupOnFailure, + shardSpecs, + overwriteFiles, + rollupSpec, + updaterJobSpec, + ignoreInvalidRows, + null, + null, + null, + null, + null, + null + ); + } +} diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerJob.java index ba024125cb8..2c593af68a3 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerJob.java @@ -46,6 +46,7 @@ public class HadoopDruidIndexerJob implements Jobby private final HadoopDruidIndexerConfig config; private final DbUpdaterJob dbUpdaterJob; + private IndexGeneratorJob indexJob; private volatile List publishedSegments = null; diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMain.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMain.java deleted file mode 100644 index 163a17923cf..00000000000 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMain.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012, 2013 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package io.druid.indexer; - -import com.google.common.collect.ImmutableList; -import com.metamx.common.Pair; -import com.metamx.common.lifecycle.Lifecycle; -import com.metamx.common.logger.Logger; - -import java.util.List; - -/** - */ -public class HadoopDruidIndexerMain -{ - private static final Logger log = new Logger(HadoopDruidIndexerMain.class); - - public static void main(String[] args) throws Exception - { - if (args.length != 1) { - printHelp(); - System.exit(2); - } - - HadoopDruidIndexerNode node = HadoopDruidIndexerNode.builder().build(); - - node.setArgumentSpec(args[0]); - - Lifecycle lifecycle = new Lifecycle(); - lifecycle.addManagedInstance(node); - - try { - lifecycle.start(); - } - catch (Throwable t) { - log.info(t, "Throwable caught at startup, committing seppuku"); - Thread.sleep(500); - printHelp(); - System.exit(1); - } - } - - private static final List> expectedFields = - ImmutableList.>builder() - .add(Pair.of("dataSource", "Name of dataSource")) - .add(Pair.of("timestampColumn", "Column name of the timestamp column")) - .add(Pair.of("timestampFormat", "Format name of the timestamp column (posix or iso)")) - .add( - Pair.of( - "dataSpec", - "A JSON object with fields " - + - "format=(json, csv, tsv), " - + - "columns=JSON array of column names for the delimited text input file (only for csv and tsv formats)," - + - "dimensions=JSON array of dimensionn names (must match names in columns)," - + - "delimiter=delimiter of the data (only for tsv format)" - ) - ) - .add(Pair.of("granularitySpec", "A JSON object indicating the Granularity that segments should be created at.")) - .add( - Pair.of( - "pathSpec", - "A JSON object with fields type=granularity, inputPath, filePattern, dataGranularity" - ) - ) - .add( - Pair.of( - "rollupSpec", - "JSON object with fields rollupGranularity, aggs=JSON Array of Aggregator specs" - ) - ) - .add(Pair.of("workingPath", "Path to store intermediate output data. Deleted when finished.")) - .add(Pair.of("segmentOutputPath", "Path to store output segments.")) - .add( - Pair.of( - "updaterJobSpec", - "JSON object with fields type=db, connectURI of the database, username, password, and segment table name" - ) - ) - .add(Pair.of("cleanupOnFailure", "Clean up intermediate files on failure? (default: true)")) - .add(Pair.of("leaveIntermediate", "Leave intermediate files. (default: false)")) - .add(Pair.of("partitionDimension", "Dimension to partition by (optional)")) - .add( - Pair.of( - "targetPartitionSize", - "Integer representing the target number of rows in a partition (required if partitionDimension != null)" - ) - ) - .build(); - - public static void printHelp() - { - System.out.println("Usage: "); - System.out.println(" is either a JSON object or the path to a file that contains a JSON object."); - System.out.println(); - System.out.println("JSON object description:"); - System.out.println("{"); - for (Pair expectedField : expectedFields) { - System.out.printf(" \"%s\": %s%n", expectedField.lhs, expectedField.rhs); - } - System.out.println("}"); - } -} diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java index 11d37df65ad..9f1bd030284 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java @@ -38,7 +38,7 @@ public abstract class HadoopDruidIndexerMapper extends Mapper< protected void setup(Context context) throws IOException, InterruptedException { - config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration()); + config = HadoopDruidIndexerConfigBuilder.fromConfiguration(context.getConfiguration()); parser = config.getParser(); } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerNode.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerNode.java deleted file mode 100644 index 2fb441c6518..00000000000 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerNode.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012, 2013 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package io.druid.indexer; - -import com.fasterxml.jackson.databind.jsontype.NamedType; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.io.CharStreams; -import com.google.common.io.InputSupplier; -import com.metamx.common.lifecycle.LifecycleStart; -import com.metamx.common.lifecycle.LifecycleStop; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.joda.time.Interval; - -import java.io.File; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.Arrays; -import java.util.List; - -/** - */ -public class HadoopDruidIndexerNode -{ - public static Builder builder() - { - return new Builder(); - } - - private String intervalSpec = null; - private String argumentSpec = null; - - public String getIntervalSpec() - { - return intervalSpec; - } - - public String getArgumentSpec() - { - return argumentSpec; - } - - public HadoopDruidIndexerNode setIntervalSpec(String intervalSpec) - { - this.intervalSpec = intervalSpec; - return this; - } - - public HadoopDruidIndexerNode setArgumentSpec(String argumentSpec) - { - this.argumentSpec = argumentSpec; - return this; - } - - @SuppressWarnings("unchecked") - public HadoopDruidIndexerNode registerJacksonSubtype(Class... clazzes) - { - HadoopDruidIndexerConfig.jsonMapper.registerSubtypes(clazzes); - return this; - } - - @SuppressWarnings("unchecked") - public HadoopDruidIndexerNode registerJacksonSubtype(NamedType... namedTypes) - { - HadoopDruidIndexerConfig.jsonMapper.registerSubtypes(namedTypes); - return this; - } - - @LifecycleStart - public void start() throws Exception - { - Preconditions.checkNotNull(argumentSpec, "argumentSpec"); - - final HadoopDruidIndexerConfig config; - if (argumentSpec.startsWith("{")) { - config = HadoopDruidIndexerConfig.fromString(argumentSpec); - } else if (argumentSpec.startsWith("s3://")) { - final Path s3nPath = new Path(String.format("s3n://%s", argumentSpec.substring("s3://".length()))); - final FileSystem fs = s3nPath.getFileSystem(new Configuration()); - - String configString = CharStreams.toString(new InputSupplier() - { - @Override - public InputStreamReader getInput() throws IOException - { - return new InputStreamReader(fs.open(s3nPath)); - } - }); - - config = HadoopDruidIndexerConfig.fromString(configString); - } else { - config = HadoopDruidIndexerConfig.fromFile(new File(argumentSpec)); - } - - if (intervalSpec != null) { - final List dataInterval = Lists.transform( - Arrays.asList(intervalSpec.split(",")), - new StringIntervalFunction() - ); - - config.setIntervals(dataInterval); - } - - new HadoopDruidIndexerJob(config).run(); - } - - @LifecycleStop - public void stop() - { - } - - public static class Builder - { - public HadoopDruidIndexerNode build() - { - return new HadoopDruidIndexerNode(); - } - } -} diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerSchema.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerSchema.java new file mode 100644 index 00000000000..34d6b2c65b1 --- /dev/null +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerSchema.java @@ -0,0 +1,194 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.indexer; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; +import io.druid.data.input.impl.DataSpec; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.indexer.granularity.GranularitySpec; +import io.druid.indexer.partitions.PartitionsSpec; +import io.druid.indexer.rollup.DataRollupSpec; +import io.druid.indexer.updater.DbUpdaterJobSpec; +import org.joda.time.DateTime; + +import java.util.List; +import java.util.Map; + +/** + */ +public class HadoopDruidIndexerSchema +{ + private final String dataSource; + private final TimestampSpec timestampSpec; + private final DataSpec dataSpec; + private final GranularitySpec granularitySpec; + private final Map pathSpec; // This cannot just be a PathSpec object + private final String workingPath; + private final String segmentOutputPath; + private final String version; + private final PartitionsSpec partitionsSpec; + private final boolean leaveIntermediate; + private final boolean cleanupOnFailure; + private final Map> shardSpecs; + private final boolean overwriteFiles; + private final DataRollupSpec rollupSpec; + private final DbUpdaterJobSpec updaterJobSpec; + private final boolean ignoreInvalidRows; + + @JsonCreator + public HadoopDruidIndexerSchema( + final @JsonProperty("dataSource") String dataSource, + final @JsonProperty("timestampSpec") TimestampSpec timestampSpec, + final @JsonProperty("dataSpec") DataSpec dataSpec, + final @JsonProperty("granularitySpec") GranularitySpec granularitySpec, + final @JsonProperty("pathSpec") Map pathSpec, + final @JsonProperty("workingPath") String workingPath, + final @JsonProperty("segmentOutputPath") String segmentOutputPath, + final @JsonProperty("version") String version, + final @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec, + final @JsonProperty("leaveIntermediate") boolean leaveIntermediate, + final @JsonProperty("cleanupOnFailure") Boolean cleanupOnFailure, + final @JsonProperty("shardSpecs") Map> shardSpecs, + final @JsonProperty("overwriteFiles") boolean overwriteFiles, + final @JsonProperty("rollupSpec") DataRollupSpec rollupSpec, + final @JsonProperty("updaterJobSpec") DbUpdaterJobSpec updaterJobSpec, + final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows, + // These fields are deprecated and will be removed in the future + final @JsonProperty("timestampColumn") String timestampColumn, + final @JsonProperty("timestampFormat") String timestampFormat + + ) + { + this.dataSource = dataSource; + this.timestampSpec = (timestampSpec == null) ? new TimestampSpec(timestampColumn, timestampFormat) : timestampSpec; + this.dataSpec = dataSpec; + this.granularitySpec = granularitySpec; + this.pathSpec = pathSpec; + this.workingPath = workingPath; + this.segmentOutputPath = segmentOutputPath; + this.version = version == null ? new DateTime().toString() : version; + this.partitionsSpec = partitionsSpec; + this.leaveIntermediate = leaveIntermediate; + this.cleanupOnFailure = (cleanupOnFailure == null ? true : cleanupOnFailure); + this.shardSpecs = (shardSpecs == null ? ImmutableMap.>of() : shardSpecs); + this.overwriteFiles = overwriteFiles; + this.rollupSpec = rollupSpec; + this.updaterJobSpec = updaterJobSpec; + this.ignoreInvalidRows = ignoreInvalidRows; + } + + @JsonProperty + public String getDataSource() + { + return dataSource; + } + + @JsonProperty + public TimestampSpec getTimestampSpec() + { + return timestampSpec; + } + + @JsonProperty + public DataSpec getDataSpec() + { + return dataSpec; + } + + @JsonProperty + public GranularitySpec getGranularitySpec() + { + return granularitySpec; + } + + @JsonProperty + public Map getPathSpec() + { + return pathSpec; + } + + @JsonProperty + public String getWorkingPath() + { + return workingPath; + } + + @JsonProperty + public String getSegmentOutputPath() + { + return segmentOutputPath; + } + + @JsonProperty + public String getVersion() + { + return version; + } + + @JsonProperty + public PartitionsSpec getPartitionsSpec() + { + return partitionsSpec; + } + + @JsonProperty + public boolean isLeaveIntermediate() + { + return leaveIntermediate; + } + + @JsonProperty + public boolean isCleanupOnFailure() + { + return cleanupOnFailure; + } + + @JsonProperty + public Map> getShardSpecs() + { + return shardSpecs; + } + + @JsonProperty + public boolean isOverwriteFiles() + { + return overwriteFiles; + } + + @JsonProperty + public DataRollupSpec getRollupSpec() + { + return rollupSpec; + } + + @JsonProperty + public DbUpdaterJobSpec getUpdaterJobSpec() + { + return updaterJobSpec; + } + + @JsonProperty + public boolean isIgnoreInvalidRows() + { + return ignoreInvalidRows; + } +} diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index c4b9738e191..fde7161d8a4 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -86,6 +86,7 @@ public class IndexGeneratorJob implements Jobby private static final Logger log = new Logger(IndexGeneratorJob.class); private final HadoopDruidIndexerConfig config; + private IndexGeneratorStats jobStats; public IndexGeneratorJob( @@ -243,7 +244,7 @@ public class IndexGeneratorJob implements Jobby protected void setup(Context context) throws IOException, InterruptedException { - config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration()); + config = HadoopDruidIndexerConfigBuilder.fromConfiguration(context.getConfiguration()); for (AggregatorFactory factory : config.getRollupSpec().getAggs()) { metricNames.add(factory.getName().toLowerCase()); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java index ded0801f41a..654f70b5b4d 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java @@ -60,7 +60,7 @@ public class JobHelper final Configuration conf = groupByJob.getConfiguration(); final FileSystem fs = FileSystem.get(conf); - Path distributedClassPath = new Path(config.getJobOutputDir(), "classpath"); + Path distributedClassPath = new Path(config.getWorkingPath(), "classpath"); if (fs instanceof LocalFileSystem) { return; diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularUnprocessedPathSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularUnprocessedPathSpec.java index b9a3340b0dc..c879b1a8e69 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularUnprocessedPathSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularUnprocessedPathSpec.java @@ -63,12 +63,16 @@ public class GranularUnprocessedPathSpec extends GranularityPathSpec // This PathSpec breaks so many abstractions that we might as break some more Preconditions.checkState( config.getGranularitySpec() instanceof UniformGranularitySpec, - String.format("Cannot use %s without %s", GranularUnprocessedPathSpec.class.getSimpleName(), UniformGranularitySpec.class.getSimpleName()) + String.format( + "Cannot use %s without %s", + GranularUnprocessedPathSpec.class.getSimpleName(), + UniformGranularitySpec.class.getSimpleName() + ) ); final Path betaInput = new Path(getInputPath()); final FileSystem fs = betaInput.getFileSystem(job.getConfiguration()); - final Granularity segmentGranularity = ((UniformGranularitySpec)config.getGranularitySpec()).getGranularity(); + final Granularity segmentGranularity = ((UniformGranularitySpec) config.getGranularitySpec()).getGranularity(); Map inputModifiedTimes = new TreeMap( Comparators.inverse(Comparators.comparable()) @@ -87,7 +91,11 @@ public class GranularUnprocessedPathSpec extends GranularityPathSpec DateTime timeBucket = entry.getKey(); long mTime = entry.getValue(); - String bucketOutput = String.format("%s/%s", config.getSegmentOutputDir(), segmentGranularity.toPath(timeBucket)); + String bucketOutput = String.format( + "%s/%s", + config.getSegmentOutputPath(), + segmentGranularity.toPath(timeBucket) + ); for (FileStatus fileStatus : FSSpideringIterator.spiderIterable(fs, new Path(bucketOutput))) { if (fileStatus.getModificationTime() > mTime) { bucketsToRun.add(new Interval(timeBucket, segmentGranularity.increment(timeBucket))); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java index 70885356f0e..b36f2822d26 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java @@ -40,7 +40,8 @@ public class HadoopDruidIndexerConfigTest private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); @Test - public void testGranularitySpec() { + public void testGranularitySpec() + { final HadoopDruidIndexerConfig cfg; try { @@ -54,7 +55,8 @@ public class HadoopDruidIndexerConfigTest + "}", HadoopDruidIndexerConfig.class ); - } catch(Exception e) { + } + catch (Exception e) { throw Throwables.propagate(e); } @@ -74,7 +76,8 @@ public class HadoopDruidIndexerConfigTest } @Test - public void testGranularitySpecLegacy() { + public void testGranularitySpecLegacy() + { // Deprecated and replaced by granularitySpec, but still supported final HadoopDruidIndexerConfig cfg; @@ -86,7 +89,8 @@ public class HadoopDruidIndexerConfigTest + "}", HadoopDruidIndexerConfig.class ); - } catch(Exception e) { + } + catch (Exception e) { throw Throwables.propagate(e); } @@ -106,40 +110,8 @@ public class HadoopDruidIndexerConfigTest } @Test - public void testGranularitySpecPostConstructorIntervals() { - // Deprecated and replaced by granularitySpec, but still supported - final HadoopDruidIndexerConfig cfg; - - try { - cfg = jsonMapper.readValue( - "{" - + "\"segmentGranularity\":\"day\"" - + "}", - HadoopDruidIndexerConfig.class - ); - } catch(Exception e) { - throw Throwables.propagate(e); - } - - cfg.setIntervals(Lists.newArrayList(new Interval("2012-03-01/P1D"))); - - final UniformGranularitySpec granularitySpec = (UniformGranularitySpec) cfg.getGranularitySpec(); - - Assert.assertEquals( - "getIntervals", - Lists.newArrayList(new Interval("2012-03-01/P1D")), - granularitySpec.getIntervals() - ); - - Assert.assertEquals( - "getGranularity", - "DAY", - granularitySpec.getGranularity().toString() - ); - } - - @Test - public void testInvalidGranularityCombination() { + public void testInvalidGranularityCombination() + { boolean thrown = false; try { final HadoopDruidIndexerConfig cfg = jsonReadWriteRead( @@ -154,7 +126,8 @@ public class HadoopDruidIndexerConfigTest + "}", HadoopDruidIndexerConfig.class ); - } catch(Exception e) { + } + catch (Exception e) { thrown = true; } @@ -162,7 +135,8 @@ public class HadoopDruidIndexerConfigTest } @Test - public void testPartitionsSpecAutoDimension() { + public void testPartitionsSpecAutoDimension() + { final HadoopDruidIndexerConfig cfg; try { @@ -174,7 +148,8 @@ public class HadoopDruidIndexerConfigTest + "}", HadoopDruidIndexerConfig.class ); - } catch(Exception e) { + } + catch (Exception e) { throw Throwables.propagate(e); } @@ -200,7 +175,8 @@ public class HadoopDruidIndexerConfigTest } @Test - public void testPartitionsSpecSpecificDimension() { + public void testPartitionsSpecSpecificDimension() + { final HadoopDruidIndexerConfig cfg; try { @@ -213,7 +189,8 @@ public class HadoopDruidIndexerConfigTest + "}", HadoopDruidIndexerConfig.class ); - } catch(Exception e) { + } + catch (Exception e) { throw Throwables.propagate(e); } @@ -245,7 +222,8 @@ public class HadoopDruidIndexerConfigTest } @Test - public void testPartitionsSpecLegacy() { + public void testPartitionsSpecLegacy() + { final HadoopDruidIndexerConfig cfg; try { @@ -256,7 +234,8 @@ public class HadoopDruidIndexerConfigTest + "}", HadoopDruidIndexerConfig.class ); - } catch(Exception e) { + } + catch (Exception e) { throw Throwables.propagate(e); } @@ -288,7 +267,8 @@ public class HadoopDruidIndexerConfigTest } @Test - public void testPartitionsSpecMaxPartitionSize() { + public void testPartitionsSpecMaxPartitionSize() + { final HadoopDruidIndexerConfig cfg; try { @@ -302,7 +282,8 @@ public class HadoopDruidIndexerConfigTest + "}", HadoopDruidIndexerConfig.class ); - } catch(Exception e) { + } + catch (Exception e) { throw Throwables.propagate(e); } @@ -334,7 +315,8 @@ public class HadoopDruidIndexerConfigTest } @Test - public void testInvalidPartitionsCombination() { + public void testInvalidPartitionsCombination() + { boolean thrown = false; try { final HadoopDruidIndexerConfig cfg = jsonReadWriteRead( @@ -346,7 +328,8 @@ public class HadoopDruidIndexerConfigTest + "}", HadoopDruidIndexerConfig.class ); - } catch(Exception e) { + } + catch (Exception e) { thrown = true; } @@ -382,7 +365,8 @@ public class HadoopDruidIndexerConfigTest } @Test - public void testDefaultSettings() { + public void testDefaultSettings() + { final HadoopDruidIndexerConfig cfg; try { @@ -390,7 +374,8 @@ public class HadoopDruidIndexerConfigTest "{}", HadoopDruidIndexerConfig.class ); - } catch(Exception e) { + } + catch (Exception e) { throw Throwables.propagate(e); } @@ -414,7 +399,8 @@ public class HadoopDruidIndexerConfigTest } @Test - public void testNoCleanupOnFailure() { + public void testNoCleanupOnFailure() + { final HadoopDruidIndexerConfig cfg; try { @@ -422,7 +408,8 @@ public class HadoopDruidIndexerConfigTest "{\"cleanupOnFailure\":false}", HadoopDruidIndexerConfig.class ); - } catch(Exception e) { + } + catch (Exception e) { throw Throwables.propagate(e); } @@ -435,23 +422,25 @@ public class HadoopDruidIndexerConfigTest @Test - public void shouldMakeHDFSCompliantSegmentOutputPath() { + public void shouldMakeHDFSCompliantSegmentOutputPath() + { final HadoopDruidIndexerConfig cfg; try { cfg = jsonReadWriteRead( - "{" - + "\"dataSource\": \"source\"," - + " \"granularitySpec\":{" - + " \"type\":\"uniform\"," - + " \"gran\":\"hour\"," - + " \"intervals\":[\"2012-07-10/P1D\"]" - + " }," - + "\"segmentOutputPath\": \"hdfs://server:9100/tmp/druid/datatest\"" - + "}", - HadoopDruidIndexerConfig.class + "{" + + "\"dataSource\": \"source\"," + + " \"granularitySpec\":{" + + " \"type\":\"uniform\"," + + " \"gran\":\"hour\"," + + " \"intervals\":[\"2012-07-10/P1D\"]" + + " }," + + "\"segmentOutputPath\": \"hdfs://server:9100/tmp/druid/datatest\"" + + "}", + HadoopDruidIndexerConfig.class ); - } catch(Exception e) { + } + catch (Exception e) { throw Throwables.propagate(e); } @@ -466,23 +455,25 @@ public class HadoopDruidIndexerConfigTest } @Test - public void shouldMakeDefaultSegmentOutputPathIfNotHDFS() { + public void shouldMakeDefaultSegmentOutputPathIfNotHDFS() + { final HadoopDruidIndexerConfig cfg; try { cfg = jsonReadWriteRead( - "{" - + "\"dataSource\": \"the:data:source\"," - + " \"granularitySpec\":{" - + " \"type\":\"uniform\"," - + " \"gran\":\"hour\"," - + " \"intervals\":[\"2012-07-10/P1D\"]" - + " }," - + "\"segmentOutputPath\": \"/tmp/dru:id/data:test\"" - + "}", - HadoopDruidIndexerConfig.class + "{" + + "\"dataSource\": \"the:data:source\"," + + " \"granularitySpec\":{" + + " \"type\":\"uniform\"," + + " \"gran\":\"hour\"," + + " \"intervals\":[\"2012-07-10/P1D\"]" + + " }," + + "\"segmentOutputPath\": \"/tmp/dru:id/data:test\"" + + "}", + HadoopDruidIndexerConfig.class ); - } catch(Exception e) { + } + catch (Exception e) { throw Throwables.propagate(e); } @@ -490,7 +481,10 @@ public class HadoopDruidIndexerConfigTest Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712); Path path = cfg.makeSegmentOutputPath(new LocalFileSystem(), bucket); - Assert.assertEquals("/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:version/4712", path.toString()); + Assert.assertEquals( + "/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:version/4712", + path.toString() + ); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java index 09f42e56017..4a753c4e749 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java @@ -22,34 +22,50 @@ package io.druid.indexing.common.task; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import com.metamx.common.logger.Logger; import io.druid.common.utils.JodaUtils; import io.druid.indexer.HadoopDruidIndexerConfig; +import io.druid.indexer.HadoopDruidIndexerConfigBuilder; import io.druid.indexer.HadoopDruidIndexerJob; +import io.druid.indexer.HadoopDruidIndexerSchema; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.SegmentInsertAction; +import io.druid.initialization.Initialization; import io.druid.timeline.DataSegment; +import io.tesla.aether.internal.DefaultTeslaAether; import org.joda.time.DateTime; +import java.io.File; +import java.lang.reflect.Method; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.Arrays; import java.util.List; public class HadoopIndexTask extends AbstractTask { - @JsonIgnore - private final HadoopDruidIndexerConfig config; - private static final Logger log = new Logger(HadoopIndexTask.class); + private static String defaultHadoopCoordinates = "org.apache.hadoop:hadoop-core:1.0.3"; + + @JsonIgnore + private final HadoopDruidIndexerSchema schema; + + @JsonIgnore + private final String hadoopCoordinates; /** - * @param config is used by the HadoopDruidIndexerJob to set up the appropriate parameters + * @param schema is used by the HadoopDruidIndexerJob to set up the appropriate parameters * for creating Druid index segments. It may be modified. *

- * Here, we will ensure that the DbConnectorConfig field of the config is set to null, such that the + * Here, we will ensure that the DbConnectorConfig field of the schema is set to null, such that the * job does not push a list of published segments the database. Instead, we will use the method * IndexGeneratorJob.getPublishedSegments() to simply return a list of the published * segments, and let the indexing service report these segments to the database. @@ -58,21 +74,24 @@ public class HadoopIndexTask extends AbstractTask @JsonCreator public HadoopIndexTask( @JsonProperty("id") String id, - @JsonProperty("config") HadoopDruidIndexerConfig config + @JsonProperty("config") HadoopDruidIndexerSchema schema, + @JsonProperty("hadoopCoordinates") String hadoopCoordinates ) { super( - id != null ? id : String.format("index_hadoop_%s_%s", config.getDataSource(), new DateTime()), - config.getDataSource(), - JodaUtils.umbrellaInterval(config.getIntervals()) + id != null ? id : String.format("index_hadoop_%s_%s", schema.getDataSource(), new DateTime()), + schema.getDataSource(), + JodaUtils.umbrellaInterval(JodaUtils.condenseIntervals(schema.getGranularitySpec().bucketIntervals())) ); - // Some HadoopDruidIndexerConfig stuff doesn't make sense in the context of the indexing service - Preconditions.checkArgument(config.getSegmentOutputDir() == null, "segmentOutputPath must be absent"); - Preconditions.checkArgument(config.getJobOutputDir() == null, "workingPath must be absent"); - Preconditions.checkArgument(!config.isUpdaterJobSpecSet(), "updaterJobSpec must be absent"); - this.config = config; + // Some HadoopDruidIndexerSchema stuff doesn't make sense in the context of the indexing service + Preconditions.checkArgument(schema.getSegmentOutputPath() == null, "segmentOutputPath must be absent"); + Preconditions.checkArgument(schema.getWorkingPath() == null, "workingPath must be absent"); + Preconditions.checkArgument(schema.getUpdaterJobSpec() == null, "updaterJobSpec must be absent"); + + this.schema = schema; + this.hadoopCoordinates = (hadoopCoordinates == null ? defaultHadoopCoordinates : hadoopCoordinates); } @Override @@ -81,34 +100,64 @@ public class HadoopIndexTask extends AbstractTask return "index_hadoop"; } + + @JsonProperty("config") + public HadoopDruidIndexerSchema getSchema() + { + return schema; + } + + @JsonProperty + public String getHadoopCoordinates() + { + return hadoopCoordinates; + } + + @SuppressWarnings("unchecked") @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { - // Copy config so we don't needlessly modify our provided one - // Also necessary to make constructor validations work upon serde-after-run - final HadoopDruidIndexerConfig configCopy = toolbox.getObjectMapper() - .readValue( - toolbox.getObjectMapper().writeValueAsBytes(config), - HadoopDruidIndexerConfig.class - ); + // setup Hadoop + final DefaultTeslaAether aetherClient = new DefaultTeslaAether(); + final ClassLoader hadoopLoader = Initialization.getClassLoaderForCoordinates( + aetherClient, hadoopCoordinates + ); + final URL[] urLs = ((URLClassLoader) hadoopLoader).getURLs(); + + final URL[] nonHadoopUrls = ((URLClassLoader) HadoopIndexTask.class.getClassLoader()).getURLs(); + + List theURLS = Lists.newArrayList(); + theURLS.addAll(Arrays.asList(urLs)); + theURLS.addAll(Arrays.asList(nonHadoopUrls)); + + final URLClassLoader loader = new URLClassLoader(theURLS.toArray(new URL[theURLS.size()]), null); + Thread.currentThread().setContextClassLoader(loader); + + System.setProperty("druid.hadoop.internal.classpath", Joiner.on(File.pathSeparator).join(nonHadoopUrls)); + + final Class mainClass = loader.loadClass(HadoopIndexTaskInnerProcessing.class.getName()); + final Method mainMethod = mainClass.getMethod("runTask", String[].class); // We should have a lock from before we started running - final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox)); + final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox)); log.info("Setting version to: %s", myLock.getVersion()); - configCopy.setVersion(myLock.getVersion()); - // Set workingPath to some reasonable default - configCopy.setJobOutputDir(toolbox.getConfig().getHadoopWorkingPath()); + String[] args = new String[]{ + toolbox.getObjectMapper().writeValueAsString(schema), + myLock.getVersion(), + toolbox.getConfig().getHadoopWorkingPath(), + toolbox.getSegmentPusher().getPathForHadoop(getDataSource()), + }; - configCopy.setSegmentOutputDir(toolbox.getSegmentPusher().getPathForHadoop(getDataSource())); + String segments = (String) mainMethod.invoke(null, new Object[]{args}); - HadoopDruidIndexerJob job = new HadoopDruidIndexerJob(configCopy); - configCopy.verify(); - - log.info("Starting a hadoop index generator job..."); - if (job.run()) { - List publishedSegments = job.getPublishedSegments(); + if (segments != null) { + List publishedSegments = toolbox.getObjectMapper().readValue( + segments, new TypeReference>() + { + } + ); // Request segment pushes toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(publishedSegments))); @@ -117,12 +166,41 @@ public class HadoopIndexTask extends AbstractTask } else { return TaskStatus.failure(getId()); } - } - @JsonProperty - public HadoopDruidIndexerConfig getConfig() + public static class HadoopIndexTaskInnerProcessing { - return config; + public static String runTask(String[] args) throws Exception + { + final String schema = args[0]; + final String version = args[1]; + final String workingPath = args[2]; + final String segmentOutputPath = args[3]; + + final HadoopDruidIndexerSchema theSchema = HadoopDruidIndexerConfig.jsonMapper + .readValue( + schema, + HadoopDruidIndexerSchema.class + ); + final HadoopDruidIndexerConfig config = + new HadoopDruidIndexerConfigBuilder().withSchema(theSchema) + .withVersion(version) + .withWorkingPath( + workingPath + ) + .withSegmentOutputPath( + segmentOutputPath + ) + .build(); + + HadoopDruidIndexerJob job = new HadoopDruidIndexerJob(config); + + log.info("Starting a hadoop index generator job..."); + if (job.run()) { + return HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(job.getPublishedSegments()); + } + + return null; + } } } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index 6594b860a55..36946e196d7 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -22,12 +22,13 @@ package io.druid.indexing.common.task; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.metamx.common.Granularity; import io.druid.data.input.impl.JSONDataSpec; +import io.druid.data.input.impl.TimestampSpec; import io.druid.granularity.QueryGranularity; -import io.druid.indexer.HadoopDruidIndexerConfig; +import io.druid.indexer.HadoopDruidIndexerSchema; import io.druid.indexer.granularity.UniformGranularitySpec; -import io.druid.indexer.path.StaticPathSpec; import io.druid.indexer.rollup.DataRollupSpec; import io.druid.jackson.DefaultObjectMapper; import io.druid.query.aggregation.AggregatorFactory; @@ -338,17 +339,12 @@ public class TaskSerdeTest { final HadoopIndexTask task = new HadoopIndexTask( null, - new HadoopDruidIndexerConfig( - null, + new HadoopDruidIndexerSchema( "foo", - "timestamp", - "auto", + new TimestampSpec("timestamp", "auto"), new JSONDataSpec(ImmutableList.of("foo"), null), - null, new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P1D"))), - new StaticPathSpec("bar"), - null, - null, + ImmutableMap.of("paths", "bar"), null, null, null, @@ -359,8 +355,11 @@ public class TaskSerdeTest false, new DataRollupSpec(ImmutableList.of(), QueryGranularity.NONE), null, - false - ) + false, + null, + null + ), + null ); final ObjectMapper jsonMapper = new DefaultObjectMapper(); diff --git a/services/src/main/java/io/druid/cli/CliHadoopIndexer.java b/services/src/main/java/io/druid/cli/CliHadoopIndexer.java index 98e88d46ade..39b67651bce 100644 --- a/services/src/main/java/io/druid/cli/CliHadoopIndexer.java +++ b/services/src/main/java/io/druid/cli/CliHadoopIndexer.java @@ -41,7 +41,7 @@ import java.util.List; */ @Command( name = "hadoop", - description = "Runs the batch Hadoop Druid Indexer, see https://github.com/metamx/druid/wiki/Batch-ingestion for a description." + description = "Runs the batch Hadoop Druid Indexer, see http://druid.io/docs/0.6.0/Batch-ingestion.html for a description." ) public class CliHadoopIndexer implements Runnable { diff --git a/services/src/main/java/io/druid/cli/CliInternalHadoopIndexer.java b/services/src/main/java/io/druid/cli/CliInternalHadoopIndexer.java index f3e51d1e31d..5c9270edda7 100644 --- a/services/src/main/java/io/druid/cli/CliInternalHadoopIndexer.java +++ b/services/src/main/java/io/druid/cli/CliInternalHadoopIndexer.java @@ -24,8 +24,8 @@ import com.metamx.common.logger.Logger; import io.airlift.command.Arguments; import io.airlift.command.Command; import io.druid.indexer.HadoopDruidIndexerConfig; +import io.druid.indexer.HadoopDruidIndexerConfigBuilder; import io.druid.indexer.HadoopDruidIndexerJob; -import io.druid.timeline.partition.SingleDimensionShardSpec; import java.io.File; @@ -58,9 +58,9 @@ public class CliInternalHadoopIndexer implements Runnable { try { if (argumentSpec.startsWith("{")) { - return HadoopDruidIndexerConfig.fromString(argumentSpec); + return HadoopDruidIndexerConfigBuilder.fromString(argumentSpec); } else { - return HadoopDruidIndexerConfig.fromFile(new File(argumentSpec)); + return HadoopDruidIndexerConfigBuilder.fromFile(new File(argumentSpec)); } } catch (Exception e) { diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java index d2ccffe859e..bcc0d35b6fc 100644 --- a/services/src/main/java/io/druid/cli/CliPeon.java +++ b/services/src/main/java/io/druid/cli/CliPeon.java @@ -21,12 +21,9 @@ package io.druid.cli; import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.module.SimpleModule; -import com.google.api.client.util.Lists; -import com.google.common.base.Joiner; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; -import com.google.inject.Inject; import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.multibindings.MapBinder; @@ -43,7 +40,6 @@ import io.druid.guice.LifecycleModule; import io.druid.guice.ManageLifecycle; import io.druid.guice.NodeTypeConfig; import io.druid.guice.PolyBind; -import io.druid.indexer.Utils; import io.druid.indexing.common.RetryPolicyConfig; import io.druid.indexing.common.RetryPolicyFactory; import io.druid.indexing.common.TaskToolboxFactory; @@ -66,20 +62,15 @@ import io.druid.indexing.worker.executor.ChatHandlerResource; import io.druid.indexing.worker.executor.ExecutorLifecycle; import io.druid.indexing.worker.executor.ExecutorLifecycleConfig; import io.druid.initialization.DruidModule; -import io.druid.initialization.Initialization; import io.druid.query.QuerySegmentWalker; import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.OmniDataSegmentKiller; import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.StorageLocationConfig; -import io.druid.server.initialization.ExtensionsConfig; import io.druid.server.initialization.JettyServerInitializer; -import io.tesla.aether.internal.DefaultTeslaAether; import org.eclipse.jetty.server.Server; import java.io.File; -import java.net.URL; -import java.net.URLClassLoader; import java.util.Arrays; import java.util.List;