diff --git a/docs/content/ingestion/batch-ingestion.md b/docs/content/ingestion/batch-ingestion.md index 35a671c6769..4af56f2f21d 100644 --- a/docs/content/ingestion/batch-ingestion.md +++ b/docs/content/ingestion/batch-ingestion.md @@ -136,7 +136,7 @@ There are multiple types of inputSpecs: ##### `static` -Is a type of data loader where a static path to where the data files are located is passed. +Is a type of inputSpec where a static path to where the data files are located is passed. |Field|Type|Description|Required| |-----|----|-----------|--------| @@ -150,7 +150,7 @@ For example, using the static input paths: ##### `granularity` -Is a type of data loader that expects data to be laid out in a specific path format. Specifically, it expects it to be segregated by day in this directory format `y=XXXX/m=XX/d=XX/H=XX/M=XX/S=XX` (dates are represented by lowercase, time is represented by uppercase). +Is a type of inputSpec that expects data to be laid out in a specific path format. Specifically, it expects it to be segregated by day in this directory format `y=XXXX/m=XX/d=XX/H=XX/M=XX/S=XX` (dates are represented by lowercase, time is represented by uppercase). |Field|Type|Description|Required| |-----|----|-----------|--------| @@ -166,6 +166,61 @@ s3n://billy-bucket/the/data/is/here/y=2012/m=06/d=01/H=01 ... s3n://billy-bucket/the/data/is/here/y=2012/m=06/d=01/H=23 ``` +##### `dataSource` + +It is a type of inputSpec that reads data already stored inside druid. It is useful for doing "re-indexing". A usecase would be that you ingested some data in some interval and at a later time you wanted to change granularity of rows or remove some columns from the data stored in druid. + +|Field|Type|Description|Required| +|-----|----|-----------|--------| +|ingestionSpec|Json Object|Specification of druid segments to be loaded. See below.|yes| +|maxSplitSize|Number|Enables combining multiple segments into single Hadoop InputSplit according to size of segments. Default is none. |no| + +Here is what goes inside "ingestionSpec" +|Field|Type|Description|Required| +|dataSource|String|Druid dataSource name from which you are loading the data.|yes| +|interval|String|A string representing ISO-8601 Intervals.|yes| +|granularity|String|Defines the granularity of the query while loading data. Default value is "none".See [Granularities](../querying/granularities.html).|no| +|filter|Json|See [Filters](../querying/filters.html)|no| +|dimensions|Array of String|Name of dimension columns to load. By default, the list will be constructed from parseSpec. If parseSpec does not have explicit list of dimensions then all the dimension columns present in stored data will be read.|no| +|metrics|Array of String|Name of metric columns to load. By default, the list will be constructed from the "name" of all the configured aggregators.|no| + + +For example + +``` +"ingestionSpec" : + { + "dataSource": "wikipedia", + "interval": "2014-10-20T00:00:00Z/P2W" + } +``` + +##### `multi` + +It is a composing inputSpec to combine two other input specs. It is useful for doing "delta ingestion". A usecase would be that you ingested some data in some interval and at a later time you wanted to "append" more data to that interval. You can use this inputSpec to combine `dataSource` and `static` (or others) input specs to add more data to an already indexed interval. + +|Field|Type|Description|Required| +|-----|----|-----------|--------| +|children|Array of Json Objects|List of json objects containing other inputSpecs |yes| + +For example + +``` +"children": [ + { + "type" : "dataSource", + "ingestionSpec" : { + "dataSource": "wikipedia", + "interval": "2014-10-20T00:00:00Z/P2W" + } + }, + { + "type" : "static", + "paths": "/path/to/more/wikipedia/data/" + } +] +``` + #### Metadata Update Job Spec diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java index e217b82ee3e..74e9b6295be 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java @@ -88,7 +88,6 @@ public class DetermineHashedPartitionsJob implements Jobby ); JobHelper.injectSystemProperties(groupByJob); - JobHelper.setInputFormat(groupByJob, config); groupByJob.setMapperClass(DetermineCardinalityMapper.class); groupByJob.setMapOutputKeyClass(LongWritable.class); groupByJob.setMapOutputValueClass(BytesWritable.class); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java index a21e5f7437b..0060303888f 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java @@ -126,7 +126,6 @@ public class DeterminePartitionsJob implements Jobby ); JobHelper.injectSystemProperties(groupByJob); - JobHelper.setInputFormat(groupByJob, config); groupByJob.setMapperClass(DeterminePartitionsGroupByMapper.class); groupByJob.setMapOutputKeyClass(BytesWritable.class); groupByJob.setMapOutputValueClass(NullWritable.class); @@ -173,7 +172,6 @@ public class DeterminePartitionsJob implements Jobby } else { // Directly read the source data, since we assume it's already grouped. dimSelectionJob.setMapperClass(DeterminePartitionsDimSelectionAssumeGroupedMapper.class); - JobHelper.setInputFormat(dimSelectionJob, config); config.addInputPaths(dimSelectionJob); } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java index 59f187fb8a2..3a5409453fe 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java @@ -18,6 +18,7 @@ package io.druid.indexer; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; @@ -55,7 +56,6 @@ import io.druid.timeline.partition.ShardSpecLookup; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -248,6 +248,12 @@ public class HadoopDruidIndexerConfig return schema; } + @JsonIgnore + public PathSpec getPathSpec() + { + return pathSpec; + } + public String getDataSource() { return schema.getDataSchema().getDataSource(); @@ -354,11 +360,6 @@ public class HadoopDruidIndexerConfig return pathSpec.addInputPaths(this, job); } - public Class getInputFormatClass() - { - return pathSpec.getInputFormat(); - } - /******************************************** Granularity/Bucket Helper Methods ********************************************/ diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java index e90a0c59018..803e527b4dd 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java @@ -103,7 +103,9 @@ public abstract class HadoopDruidIndexerMapper extends Mapper< { if(parser instanceof StringInputRowParser && value instanceof Text) { //Note: This is to ensure backward compatibility with 0.7.0 and before - return ((StringInputRowParser)parser).parse(value.toString()); + return ((StringInputRowParser) parser).parse(value.toString()); + } else if(value instanceof InputRow) { + return (InputRow)value; } else { return parser.parse(value); } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java index 4721b6f5f2c..ea972eea7b0 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java @@ -19,8 +19,16 @@ package io.druid.indexer; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.indexer.hadoop.DatasourceIngestionSpec; +import io.druid.indexer.path.UsedSegmentLister; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.IngestionSpec; +import io.druid.timeline.DataSegment; + +import java.io.IOException; +import java.util.List; +import java.util.Map; /** */ @@ -91,4 +99,45 @@ public class HadoopIngestionSpec extends IngestionSpec pathSpec = spec.getIOConfig().getPathSpec(); + Map datasourcePathSpec = null; + if(pathSpec.get(type).equals(dataSource)) { + datasourcePathSpec = pathSpec; + } else if(pathSpec.get(type).equals(multi)) { + List> childPathSpecs = (List>) pathSpec.get(children); + for(Map childPathSpec : childPathSpecs) { + if (childPathSpec.get(type).equals(dataSource)) { + datasourcePathSpec = childPathSpec; + break; + } + } + } + if (datasourcePathSpec != null) { + Map ingestionSpecMap = (Map) datasourcePathSpec.get(ingestionSpec); + DatasourceIngestionSpec ingestionSpecObj = jsonMapper.convertValue(ingestionSpecMap, DatasourceIngestionSpec.class); + List segmentsList = segmentLister.getUsedSegmentsForInterval( + ingestionSpecObj.getDataSource(), + ingestionSpecObj.getInterval() + ); + datasourcePathSpec.put(segments, segmentsList); + } + + return spec; + } + } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index 56230251828..f8a3b7be9b4 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -35,6 +35,7 @@ import io.druid.collections.StupidPool; import io.druid.data.input.InputRow; import io.druid.data.input.Row; import io.druid.data.input.Rows; +import io.druid.indexer.hadoop.SegmentInputRow; import io.druid.offheap.OffheapBufferPool; import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.IndexIO; @@ -139,8 +140,6 @@ public class IndexGeneratorJob implements Jobby JobHelper.injectSystemProperties(job); - JobHelper.setInputFormat(job, config); - job.setMapperClass(IndexGeneratorMapper.class); job.setMapOutputValueClass(BytesWritable.class); @@ -235,6 +234,7 @@ public class IndexGeneratorJob implements Jobby private static final HashFunction hashFunction = Hashing.murmur3_128(); private AggregatorFactory[] aggregators; + private AggregatorFactory[] combiningAggs; @Override protected void setup(Context context) @@ -242,6 +242,10 @@ public class IndexGeneratorJob implements Jobby { super.setup(context); aggregators = config.getSchema().getDataSchema().getAggregators(); + combiningAggs = new AggregatorFactory[aggregators.length]; + for (int i = 0; i < aggregators.length; ++i) { + combiningAggs[i] = aggregators[i].getCombiningFactory(); + } } @Override @@ -268,6 +272,14 @@ public class IndexGeneratorJob implements Jobby ) ).asBytes(); + // type SegmentInputRow serves as a marker that these InputRow instances have already been combined + // and they contain the columns as they show up in the segment after ingestion, not what you would see in raw + // data + byte[] serializedInputRow = inputRow instanceof SegmentInputRow ? + InputRowSerde.toBytes(inputRow, combiningAggs) + : + InputRowSerde.toBytes(inputRow, aggregators); + context.write( new SortableBytes( bucket.get().toGroupKey(), @@ -277,7 +289,7 @@ public class IndexGeneratorJob implements Jobby .put(hashedDimensions) .array() ).toBytesWritable(), - new BytesWritable(InputRowSerde.toBytes(inputRow, aggregators)) + new BytesWritable(serializedInputRow) ); } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java index d9a3de92ad6..a9c282ddfad 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java @@ -17,6 +17,7 @@ package io.druid.indexer; +import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; @@ -28,6 +29,7 @@ import com.metamx.common.IAE; import com.metamx.common.ISE; import com.metamx.common.RetryUtils; import com.metamx.common.logger.Logger; +import io.druid.indexer.updater.HadoopDruidConverterConfig; import io.druid.segment.ProgressIndicator; import io.druid.segment.SegmentUtils; import io.druid.timeline.DataSegment; @@ -42,8 +44,6 @@ import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.util.Progressable; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -56,8 +56,10 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.net.URI; +import java.net.URISyntaxException; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; @@ -203,17 +205,6 @@ public class JobHelper return true; } - public static void setInputFormat(Job job, HadoopDruidIndexerConfig indexerConfig) - { - if (indexerConfig.getInputFormatClass() != null) { - job.setInputFormatClass(indexerConfig.getInputFormatClass()); - } else if (indexerConfig.isCombineText()) { - job.setInputFormatClass(CombineTextInputFormat.class); - } else { - job.setInputFormatClass(TextInputFormat.class); - } - } - public static DataSegment serializeOutIndex( final DataSegment segmentTemplate, final Configuration configuration, @@ -579,6 +570,38 @@ public class JobHelper return zipPusher.push(); } + public static URI getURIFromSegment(DataSegment dataSegment) + { + // There is no good way around this... + // TODO: add getURI() to URIDataPuller + final Map loadSpec = dataSegment.getLoadSpec(); + final String type = loadSpec.get("type").toString(); + final URI segmentLocURI; + if ("s3_zip".equals(type)) { + segmentLocURI = URI.create(String.format("s3n://%s/%s", loadSpec.get("bucket"), loadSpec.get("key"))); + } else if ("hdfs".equals(type)) { + segmentLocURI = URI.create(loadSpec.get("path").toString()); + } else if ("local".equals(type)) { + try { + segmentLocURI = new URI("file", null, loadSpec.get("path").toString(), null, null); + } + catch (URISyntaxException e) { + throw new ISE(e, "Unable to form simple file uri"); + } + } else { + try { + throw new IAE( + "Cannot figure out loadSpec %s", + HadoopDruidConverterConfig.jsonMapper.writeValueAsString(loadSpec) + ); + } + catch (JsonProcessingException e) { + throw new ISE("Cannot write Map with json mapper"); + } + } + return segmentLocURI; + } + public static ProgressIndicator progressIndicatorForContext( final TaskAttemptContext context ) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceIngestionSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceIngestionSpec.java new file mode 100644 index 00000000000..4bff88c3e79 --- /dev/null +++ b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceIngestionSpec.java @@ -0,0 +1,160 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.hadoop; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import io.druid.granularity.QueryGranularity; +import io.druid.query.filter.DimFilter; +import org.joda.time.Interval; + +import java.util.List; + +public class DatasourceIngestionSpec +{ + private final String dataSource; + private final Interval interval; + private final DimFilter filter; + private final QueryGranularity granularity; + private final List dimensions; + private final List metrics; + + @JsonCreator + public DatasourceIngestionSpec( + @JsonProperty("dataSource") String dataSource, + @JsonProperty("interval") Interval interval, + @JsonProperty("filter") DimFilter filter, + @JsonProperty("granularity") QueryGranularity granularity, + @JsonProperty("dimensions") List dimensions, + @JsonProperty("metrics") List metrics + ) + { + this.dataSource = Preconditions.checkNotNull(dataSource, "null dataSource"); + this.interval = Preconditions.checkNotNull(interval, "null interval"); + this.filter = filter; + this.granularity = granularity == null ? QueryGranularity.NONE : granularity; + + this.dimensions = dimensions; + this.metrics = metrics; + } + + @JsonProperty + public String getDataSource() + { + return dataSource; + } + + @JsonProperty + public Interval getInterval() + { + return interval; + } + + @JsonProperty + public DimFilter getFilter() + { + return filter; + } + + @JsonProperty + public QueryGranularity getGranularity() + { + return granularity; + } + + @JsonProperty + public List getDimensions() + { + return dimensions; + } + + @JsonProperty + public List getMetrics() + { + return metrics; + } + + public DatasourceIngestionSpec withDimensions(List dimensions) + { + return new DatasourceIngestionSpec(dataSource, interval, filter, granularity, dimensions, metrics); + } + + public DatasourceIngestionSpec withMetrics(List metrics) + { + return new DatasourceIngestionSpec(dataSource, interval, filter, granularity, dimensions, metrics); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + DatasourceIngestionSpec that = (DatasourceIngestionSpec) o; + + if (!dataSource.equals(that.dataSource)) { + return false; + } + if (!interval.equals(that.interval)) { + return false; + } + if (filter != null ? !filter.equals(that.filter) : that.filter != null) { + return false; + } + if (!granularity.equals(that.granularity)) { + return false; + } + if (dimensions != null ? !dimensions.equals(that.dimensions) : that.dimensions != null) { + return false; + } + return !(metrics != null ? !metrics.equals(that.metrics) : that.metrics != null); + + } + + @Override + public int hashCode() + { + int result = dataSource.hashCode(); + result = 31 * result + interval.hashCode(); + result = 31 * result + (filter != null ? filter.hashCode() : 0); + result = 31 * result + granularity.hashCode(); + result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0); + result = 31 * result + (metrics != null ? metrics.hashCode() : 0); + return result; + } + + @Override + public String toString() + { + return "DatasourceIngestionSpec{" + + "dataSource='" + dataSource + '\'' + + ", interval=" + interval + + ", filter=" + filter + + ", granularity=" + granularity + + ", dimensions=" + dimensions + + ", metrics=" + metrics + + '}'; + } +} diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputFormat.java b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputFormat.java new file mode 100644 index 00000000000..3677612bc3a --- /dev/null +++ b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputFormat.java @@ -0,0 +1,121 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.hadoop; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; +import io.druid.data.input.InputRow; +import io.druid.indexer.HadoopDruidIndexerConfig; +import io.druid.timeline.DataSegment; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +public class DatasourceInputFormat extends InputFormat +{ + private static final Logger logger = new Logger(DatasourceInputFormat.class); + + public static final String CONF_INPUT_SEGMENTS = "druid.segments"; + public static final String CONF_DRUID_SCHEMA = "druid.datasource.schema"; + public static final String CONF_MAX_SPLIT_SIZE = "druid.datasource.split.max.size"; + + @Override + public List getSplits(JobContext context) throws IOException, InterruptedException + { + Configuration conf = context.getConfiguration(); + + String segmentsStr = Preconditions.checkNotNull(conf.get(CONF_INPUT_SEGMENTS), "No segments found to read"); + List segments = HadoopDruidIndexerConfig.jsonMapper.readValue( + segmentsStr, + new TypeReference>() + { + } + ); + if (segments == null || segments.size() == 0) { + throw new ISE("No segments found to read"); + } + + logger.info("segments to read [%s]", segmentsStr); + + long maxSize = conf.getLong(CONF_MAX_SPLIT_SIZE, 0); + + if (maxSize > 0) { + //combining is to happen, let us sort the segments list by size so that they + //are combined appropriately + Collections.sort( + segments, + new Comparator() + { + @Override + public int compare(DataSegment s1, DataSegment s2) + { + return Long.compare(s1.getSize(), s2.getSize()); + } + } + ); + } + + List splits = Lists.newArrayList(); + + List list = new ArrayList<>(); + long size = 0; + + for (DataSegment segment : segments) { + if (size + segment.getSize() > maxSize && size > 0) { + splits.add(new DatasourceInputSplit(list)); + list = Lists.newArrayList(); + size = 0; + } + + list.add(segment); + size += segment.getSize(); + } + + if (list.size() > 0) { + splits.add(new DatasourceInputSplit(list)); + } + + logger.info("Number of splits [%d]", splits.size()); + return splits; + } + + @Override + public RecordReader createRecordReader( + InputSplit split, + TaskAttemptContext context + ) throws IOException, InterruptedException + { + return new DatasourceRecordReader(); + } +} diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputSplit.java b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputSplit.java new file mode 100644 index 00000000000..6b80159ffb3 --- /dev/null +++ b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputSplit.java @@ -0,0 +1,88 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.hadoop; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.base.Preconditions; +import io.druid.indexer.HadoopDruidIndexerConfig; +import io.druid.timeline.DataSegment; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputSplit; + +import javax.validation.constraints.NotNull; + +public class DatasourceInputSplit extends InputSplit implements Writable +{ + private List segments = null; + + //required for deserialization + public DatasourceInputSplit() + { + } + + public DatasourceInputSplit(@NotNull List segments) + { + Preconditions.checkArgument(segments != null && segments.size() > 0, "no segments"); + this.segments = segments; + } + + @Override + public long getLength() throws IOException, InterruptedException + { + long size = 0; + for (DataSegment segment : segments) { + size += segment.getSize(); + } + return size; + } + + @Override + public String[] getLocations() throws IOException, InterruptedException + { + return new String[]{}; + } + + public List getSegments() + { + return segments; + } + + @Override + public void write(DataOutput out) throws IOException + { + out.writeUTF(HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(segments)); + } + + @Override + public void readFields(DataInput in) throws IOException + { + segments = HadoopDruidIndexerConfig.jsonMapper.readValue( + in.readUTF(), + new TypeReference>() + { + } + ); + } +} diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceRecordReader.java b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceRecordReader.java new file mode 100644 index 00000000000..ef20db3ed52 --- /dev/null +++ b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceRecordReader.java @@ -0,0 +1,194 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.hadoop; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import com.google.common.io.Closeables; +import com.google.common.io.Files; +import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; +import io.druid.data.input.InputRow; +import io.druid.data.input.MapBasedInputRow; +import io.druid.data.input.MapBasedRow; +import io.druid.indexer.HadoopDruidIndexerConfig; +import io.druid.indexer.JobHelper; +import io.druid.segment.IndexIO; +import io.druid.segment.QueryableIndex; +import io.druid.segment.QueryableIndexStorageAdapter; +import io.druid.segment.StorageAdapter; +import io.druid.segment.realtime.firehose.IngestSegmentFirehose; +import io.druid.timeline.DataSegment; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.File; +import java.io.IOException; +import java.util.List; + +public class DatasourceRecordReader extends RecordReader +{ + private static final Logger logger = new Logger(DatasourceRecordReader.class); + + private DatasourceIngestionSpec spec; + private IngestSegmentFirehose firehose; + + private int rowNum; + private MapBasedRow currRow; + + private List indexes = Lists.newArrayList(); + private List tmpSegmentDirs = Lists.newArrayList(); + private int numRows; + + @Override + public void initialize(InputSplit split, final TaskAttemptContext context) throws IOException, InterruptedException + { + spec = readAndVerifyDatasourceIngestionSpec(context.getConfiguration(), HadoopDruidIndexerConfig.jsonMapper); + + List segments = ((DatasourceInputSplit) split).getSegments(); + + List adapters = Lists.transform( + segments, + new Function() + { + @Override + public StorageAdapter apply(DataSegment segment) + { + try { + logger.info("Getting storage path for segment [%s]", segment.getIdentifier()); + Path path = new Path(JobHelper.getURIFromSegment(segment)); + + logger.info("Fetch segment files from [%s]", path); + + File dir = Files.createTempDir(); + tmpSegmentDirs.add(dir); + logger.info("Locally storing fetched segment at [%s]", dir); + + JobHelper.unzipNoGuava(path, context.getConfiguration(), dir, context); + logger.info("finished fetching segment files"); + + QueryableIndex index = IndexIO.loadIndex(dir); + indexes.add(index); + numRows += index.getNumRows(); + + return new QueryableIndexStorageAdapter(index); + } + catch (IOException ex) { + throw Throwables.propagate(ex); + } + } + } + ); + + firehose = new IngestSegmentFirehose( + adapters, + spec.getDimensions(), + spec.getMetrics(), + spec.getFilter(), + spec.getInterval(), + spec.getGranularity() + ); + + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException + { + if (firehose.hasMore()) { + currRow = (MapBasedRow) firehose.nextRow(); + rowNum++; + return true; + } else { + return false; + } + } + + @Override + public NullWritable getCurrentKey() throws IOException, InterruptedException + { + return NullWritable.get(); + } + + @Override + public InputRow getCurrentValue() throws IOException, InterruptedException + { + return new SegmentInputRow( + new MapBasedInputRow( + currRow.getTimestamp(), + spec.getDimensions(), + currRow.getEvent() + ) + ); + } + + @Override + public float getProgress() throws IOException, InterruptedException + { + if (numRows > 0) { + return (rowNum * 1.0f) / numRows; + } else { + return 0; + } + } + + @Override + public void close() throws IOException + { + Closeables.close(firehose, true); + for (QueryableIndex qi : indexes) { + Closeables.close(qi, true); + } + + for (File dir : tmpSegmentDirs) { + FileUtils.deleteDirectory(dir); + } + } + + private DatasourceIngestionSpec readAndVerifyDatasourceIngestionSpec(Configuration config, ObjectMapper jsonMapper) + { + try { + String schema = Preconditions.checkNotNull(config.get(DatasourceInputFormat.CONF_DRUID_SCHEMA), "null schema"); + logger.info("load schema [%s]", schema); + + DatasourceIngestionSpec spec = jsonMapper.readValue(schema, DatasourceIngestionSpec.class); + + if (spec.getDimensions() == null || spec.getDimensions().size() == 0) { + throw new ISE("load schema does not have dimensions"); + } + + if (spec.getMetrics() == null || spec.getMetrics().size() == 0) { + throw new ISE("load schema does not have metrics"); + } + + return spec; + } + catch (IOException ex) { + throw new RuntimeException("couldn't load segment load spec", ex); + } + } +} diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/SegmentInputRow.java b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/SegmentInputRow.java new file mode 100644 index 00000000000..b25e95918cf --- /dev/null +++ b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/SegmentInputRow.java @@ -0,0 +1,97 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.hadoop; + +import io.druid.data.input.InputRow; +import io.druid.data.input.Row; +import org.joda.time.DateTime; + +import java.util.List; + +/** + * SegmentInputRow serves as a marker that these InputRow instances have already been combined + * and they contain the columns as they show up in the segment after ingestion, not what you would see in raw + * data. + * It must only be used to represent such InputRows. + */ +public class SegmentInputRow implements InputRow +{ + private final InputRow delegate; + + public SegmentInputRow(InputRow delegate){ + this.delegate = delegate; + } + + @Override + public List getDimensions() + { + return delegate.getDimensions(); + } + + @Override + public long getTimestampFromEpoch() + { + return delegate.getTimestampFromEpoch(); + } + + @Override + public DateTime getTimestamp() + { + return delegate.getTimestamp(); + } + + @Override + public List getDimension(String dimension) + { + return delegate.getDimension(dimension); + } + + @Override + public Object getRaw(String dimension) + { + return delegate.getRaw(dimension); + } + + @Override + public float getFloatMetric(String metric) + { + return delegate.getFloatMetric(metric); + } + + @Override + public long getLongMetric(String metric) + { + return delegate.getLongMetric(metric); + } + + @Override + public int compareTo(Row row) + { + return delegate.compareTo(row); + } + + @Override + public String toString() + { + return "SegmentInputRow{" + + "delegate=" + delegate + + '}'; + } +} diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/path/DatasourcePathSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/path/DatasourcePathSpec.java new file mode 100644 index 00000000000..90d09248862 --- /dev/null +++ b/indexing-hadoop/src/main/java/io/druid/indexer/path/DatasourcePathSpec.java @@ -0,0 +1,186 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.path; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.metamx.common.logger.Logger; +import io.druid.indexer.HadoopDruidIndexerConfig; +import io.druid.indexer.hadoop.DatasourceIngestionSpec; +import io.druid.indexer.hadoop.DatasourceInputFormat; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.timeline.DataSegment; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; + +import java.io.IOException; +import java.util.List; +import java.util.Set; + +public class DatasourcePathSpec implements PathSpec +{ + private static final Logger logger = new Logger(DatasourcePathSpec.class); + + private final ObjectMapper mapper; + private final DatasourceIngestionSpec ingestionSpec; + private final long maxSplitSize; + private final List segments; + + public DatasourcePathSpec( + @JacksonInject ObjectMapper mapper, + @JsonProperty("segments") List segments, + @JsonProperty("ingestionSpec") DatasourceIngestionSpec spec, + @JsonProperty("maxSplitSize") Long maxSplitSize + ) + { + this.mapper = Preconditions.checkNotNull(mapper, "null mapper"); + this.segments = segments; + this.ingestionSpec = Preconditions.checkNotNull(spec, "null ingestionSpec"); + + if(maxSplitSize == null) { + this.maxSplitSize = 0; + } else { + this.maxSplitSize = maxSplitSize.longValue(); + } + } + + @JsonProperty + public List getSegments() + { + return segments; + } + + @JsonProperty + public DatasourceIngestionSpec getIngestionSpec() + { + return ingestionSpec; + } + + @JsonProperty + public long getMaxSplitSize() + { + return maxSplitSize; + } + + @Override + public Job addInputPaths( + HadoopDruidIndexerConfig config, Job job + ) throws IOException + { + Preconditions.checkArgument(segments != null && !segments.isEmpty(), "no segments provided"); + + logger.info( + "Found total [%d] segments for [%s] in interval [%s]", + segments.size(), + ingestionSpec.getDataSource(), + ingestionSpec.getInterval() + ); + + DatasourceIngestionSpec updatedIngestionSpec = ingestionSpec; + if (updatedIngestionSpec.getDimensions() == null) { + List dims; + if (config.getParser().getParseSpec().getDimensionsSpec().hasCustomDimensions()) { + dims = config.getParser().getParseSpec().getDimensionsSpec().getDimensions(); + } else { + Set dimSet = Sets.newHashSet( + Iterables.concat( + Iterables.transform( + segments, + new Function>() + { + @Override + public Iterable apply(DataSegment dataSegment) + { + return dataSegment.getDimensions(); + } + } + ) + ) + ); + dims = Lists.newArrayList( + Sets.difference( + dimSet, + config.getParser() + .getParseSpec() + .getDimensionsSpec() + .getDimensionExclusions() + ) + ); + } + updatedIngestionSpec = updatedIngestionSpec.withDimensions(dims); + } + + if (updatedIngestionSpec.getMetrics() == null) { + Set metrics = Sets.newHashSet(); + final AggregatorFactory[] cols = config.getSchema().getDataSchema().getAggregators(); + if (cols != null) { + for (AggregatorFactory col : cols) { + metrics.add(col.getName()); + } + } + updatedIngestionSpec = updatedIngestionSpec.withMetrics(Lists.newArrayList(metrics)); + } + + job.getConfiguration().set(DatasourceInputFormat.CONF_DRUID_SCHEMA, mapper.writeValueAsString(updatedIngestionSpec)); + job.getConfiguration().set(DatasourceInputFormat.CONF_INPUT_SEGMENTS, mapper.writeValueAsString(segments)); + job.getConfiguration().set(DatasourceInputFormat.CONF_MAX_SPLIT_SIZE, String.valueOf(maxSplitSize)); + MultipleInputs.addInputPath(job, new Path("/dummy/tobe/ignored"), DatasourceInputFormat.class); + + return job; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + DatasourcePathSpec that = (DatasourcePathSpec) o; + + if (maxSplitSize != that.maxSplitSize) { + return false; + } + if (!ingestionSpec.equals(that.ingestionSpec)) { + return false; + } + return !(segments != null ? !segments.equals(that.segments) : that.segments != null); + + } + + @Override + public int hashCode() + { + int result = ingestionSpec.hashCode(); + result = 31 * result + (int) (maxSplitSize ^ (maxSplitSize >>> 32)); + result = 31 * result + (segments != null ? segments.hashCode() : 0); + return result; + } +} diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularityPathSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularityPathSpec.java index 14f8db615f9..e793827b65b 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularityPathSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularityPathSpec.java @@ -30,7 +30,10 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.format.DateTimeFormat; @@ -152,7 +155,7 @@ public class GranularityPathSpec implements PathSpec for (String path : paths) { log.info("Appending path[%s]", path); - FileInputFormat.addInputPath(job, new Path(path)); + StaticPathSpec.addToMultipleInputs(config, job, path, inputFormat); } return job; diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/path/MetadataStoreBasedUsedSegmentLister.java b/indexing-hadoop/src/main/java/io/druid/indexer/path/MetadataStoreBasedUsedSegmentLister.java new file mode 100644 index 00000000000..f39a49bcc0b --- /dev/null +++ b/indexing-hadoop/src/main/java/io/druid/indexer/path/MetadataStoreBasedUsedSegmentLister.java @@ -0,0 +1,53 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.path; + +import com.google.common.base.Preconditions; +import com.google.inject.Inject; +import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import io.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import java.io.IOException; +import java.util.List; + +/** + */ +public class MetadataStoreBasedUsedSegmentLister implements UsedSegmentLister +{ + private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator; + + @Inject + public MetadataStoreBasedUsedSegmentLister(IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator) + { + this.indexerMetadataStorageCoordinator = Preconditions.checkNotNull( + indexerMetadataStorageCoordinator, + "null indexerMetadataStorageCoordinator" + ); + } + + @Override + public List getUsedSegmentsForInterval( + String dataSource, Interval interval + ) throws IOException + { + return indexerMetadataStorageCoordinator.getUsedSegmentsForInterval(dataSource, interval); + } +} diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/path/MultiplePathSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/path/MultiplePathSpec.java new file mode 100644 index 00000000000..7c8808f00e7 --- /dev/null +++ b/indexing-hadoop/src/main/java/io/druid/indexer/path/MultiplePathSpec.java @@ -0,0 +1,80 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.path; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.api.client.repackaged.com.google.common.base.Preconditions; +import io.druid.indexer.HadoopDruidIndexerConfig; +import org.apache.hadoop.mapreduce.Job; + +import java.io.IOException; +import java.util.List; + +public class MultiplePathSpec implements PathSpec +{ + private List children; + + public MultiplePathSpec( + @JsonProperty("children") List children + ) + { + Preconditions.checkArgument(children != null && children.size() > 0, "Null/Empty list of child PathSpecs"); + this.children = children; + } + + @JsonProperty + public List getChildren() + { + return children; + } + + @Override + public Job addInputPaths( + HadoopDruidIndexerConfig config, Job job + ) throws IOException + { + for(PathSpec spec : children) { + spec.addInputPaths(config, job); + } + return job; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + MultiplePathSpec that = (MultiplePathSpec) o; + + return children.equals(that.children); + + } + + @Override + public int hashCode() + { + return children.hashCode(); + } +} diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/path/PathSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/path/PathSpec.java index 90feb83bc95..f7433fc0cc7 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/path/PathSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/path/PathSpec.java @@ -20,8 +20,6 @@ package io.druid.indexer.path; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import io.druid.indexer.HadoopDruidIndexerConfig; - -import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import java.io.IOException; @@ -32,10 +30,11 @@ import java.io.IOException; @JsonSubTypes(value={ @JsonSubTypes.Type(name="granular_unprocessed", value=GranularUnprocessedPathSpec.class), @JsonSubTypes.Type(name="granularity", value=GranularityPathSpec.class), - @JsonSubTypes.Type(name="static", value=StaticPathSpec.class) + @JsonSubTypes.Type(name="static", value=StaticPathSpec.class), + @JsonSubTypes.Type(name="dataSource", value=DatasourcePathSpec.class), + @JsonSubTypes.Type(name="multi", value=MultiplePathSpec.class) }) public interface PathSpec { public Job addInputPaths(HadoopDruidIndexerConfig config, Job job) throws IOException; - public Class getInputFormat(); } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/path/StaticPathSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/path/StaticPathSpec.java index c6cc63d794a..b0f5e19e188 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/path/StaticPathSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/path/StaticPathSpec.java @@ -20,10 +20,12 @@ package io.druid.indexer.path; import com.fasterxml.jackson.annotation.JsonProperty; import com.metamx.common.logger.Logger; import io.druid.indexer.HadoopDruidIndexerConfig; - +import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat; +import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import java.io.IOException; @@ -60,7 +62,9 @@ public class StaticPathSpec implements PathSpec public Job addInputPaths(HadoopDruidIndexerConfig config, Job job) throws IOException { log.info("Adding paths[%s]", paths); - FileInputFormat.addInputPaths(job, paths); + + addToMultipleInputs(config, job, paths, inputFormat); + return job; } @@ -68,4 +72,54 @@ public class StaticPathSpec implements PathSpec { return inputFormat; } + + public String getPaths() + { + return paths; + } + + public final static void addToMultipleInputs( + HadoopDruidIndexerConfig config, + Job job, + String path, + Class inputFormatClass + ) + { + if (inputFormatClass == null) { + if (config.isCombineText()) { + MultipleInputs.addInputPath(job, new Path(path), CombineTextInputFormat.class); + } else { + MultipleInputs.addInputPath(job, new Path(path), TextInputFormat.class); + } + } else { + MultipleInputs.addInputPath(job, new Path(path), inputFormatClass); + } + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + StaticPathSpec that = (StaticPathSpec) o; + + if (paths != null ? !paths.equals(that.paths) : that.paths != null) { + return false; + } + return !(inputFormat != null ? !inputFormat.equals(that.inputFormat) : that.inputFormat != null); + + } + + @Override + public int hashCode() + { + int result = paths != null ? paths.hashCode() : 0; + result = 31 * result + (inputFormat != null ? inputFormat.hashCode() : 0); + return result; + } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/path/UsedSegmentLister.java b/indexing-hadoop/src/main/java/io/druid/indexer/path/UsedSegmentLister.java new file mode 100644 index 00000000000..186c4563d40 --- /dev/null +++ b/indexing-hadoop/src/main/java/io/druid/indexer/path/UsedSegmentLister.java @@ -0,0 +1,44 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.path; + +import io.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import java.io.IOException; +import java.util.List; + +/** + */ +public interface UsedSegmentLister +{ + /** + * Get all segments which may include any data in the interval and are flagged as used. + * + * @param dataSource The datasource to query + * @param interval The interval for which all applicable and used datasources are requested. Start is inclusive, end is exclusive + * + * @return The DataSegments which include data in the requested interval. These segments may contain data outside the requested interval. + * + * @throws IOException + */ + public List getUsedSegmentsForInterval(final String dataSource, final Interval interval) + throws IOException; +} diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java index fb3c63eae0f..02125160ffa 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java @@ -32,6 +32,7 @@ import com.metamx.common.IAE; import com.metamx.common.ISE; import com.metamx.common.logger.Logger; import io.druid.indexer.JobHelper; +import io.druid.indexer.hadoop.DatasourceInputSplit; import io.druid.segment.IndexIO; import io.druid.segment.IndexMerger; import io.druid.timeline.DataSegment; @@ -42,7 +43,6 @@ import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobPriority; import org.apache.hadoop.mapreduce.InputFormat; @@ -62,15 +62,10 @@ import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.util.Progressable; import javax.annotation.Nullable; -import javax.validation.constraints.NotNull; -import java.io.DataInput; -import java.io.DataOutput; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.net.URI; -import java.net.URISyntaxException; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; @@ -470,10 +465,10 @@ public class HadoopConverterJob ) throws IOException, InterruptedException { final InputSplit split = context.getInputSplit(); - if (!(split instanceof DataSegmentSplit)) { + if (!(split instanceof DatasourceInputSplit)) { throw new IAE( "Unexpected split type. Expected [%s] was [%s]", - DataSegmentSplit.class.getCanonicalName(), + DatasourceInputSplit.class.getCanonicalName(), split.getClass().getCanonicalName() ); } @@ -481,13 +476,13 @@ public class HadoopConverterJob final String tmpDirLoc = context.getConfiguration().get(TMP_FILE_LOC_KEY); final File tmpDir = Paths.get(tmpDirLoc).toFile(); - final DataSegment segment = ((DataSegmentSplit) split).getDataSegment(); + final DataSegment segment = Iterables.getOnlyElement(((DatasourceInputSplit) split).getSegments()); final HadoopDruidConverterConfig config = converterConfigFromConfiguration(context.getConfiguration()); context.setStatus("DOWNLOADING"); context.progress(); - final Path inPath = new Path(getURIFromSegment(segment)); + final Path inPath = new Path(JobHelper.getURIFromSegment(segment)); final File inDir = new File(tmpDir, "in"); if (inDir.exists() && !inDir.delete()) { @@ -559,38 +554,6 @@ public class HadoopConverterJob context.getConfiguration().set(TMP_FILE_LOC_KEY, tmpFile.getAbsolutePath()); } - private static URI getURIFromSegment(DataSegment dataSegment) - { - // There is no good way around this... - // TODO: add getURI() to URIDataPuller - final Map loadSpec = dataSegment.getLoadSpec(); - final String type = loadSpec.get("type").toString(); - final URI segmentLocURI; - if ("s3_zip".equals(type)) { - segmentLocURI = URI.create(String.format("s3n://%s/%s", loadSpec.get("bucket"), loadSpec.get("key"))); - } else if ("hdfs".equals(type)) { - segmentLocURI = URI.create(loadSpec.get("path").toString()); - } else if ("local".equals(type)) { - try { - segmentLocURI = new URI("file", null, loadSpec.get("path").toString(), null, null); - } - catch (URISyntaxException e) { - throw new ISE(e, "Unable to form simple file uri"); - } - } else { - try { - throw new IAE( - "Cannot figure out loadSpec %s", - HadoopDruidConverterConfig.jsonMapper.writeValueAsString(loadSpec) - ); - } - catch (JsonProcessingException e) { - throw new ISE("Cannot write Map with json mapper"); - } - } - return segmentLocURI; - } - @Override protected void cleanup( Context context @@ -604,50 +567,6 @@ public class HadoopConverterJob } } - public static class DataSegmentSplit extends InputSplit implements Writable - { - private DataSegment dataSegment = null; - - public DataSegmentSplit() - { - // For serialization purposes - } - - public DataSegmentSplit(@NotNull DataSegment dataSegment) - { - this.dataSegment = Preconditions.checkNotNull(dataSegment, "dataSegment"); - } - - @Override - public long getLength() throws IOException, InterruptedException - { - return dataSegment.getSize(); - } - - @Override - public String[] getLocations() throws IOException, InterruptedException - { - return new String[]{}; - } - - protected DataSegment getDataSegment() - { - return dataSegment; - } - - @Override - public void write(DataOutput out) throws IOException - { - out.write(HadoopDruidConverterConfig.jsonMapper.writeValueAsString(dataSegment).getBytes()); - } - - @Override - public void readFields(DataInput in) throws IOException - { - dataSegment = HadoopDruidConverterConfig.jsonMapper.readValue(in.readLine(), DataSegment.class); - } - } - public static class ConfigInputFormat extends InputFormat { @Override @@ -665,7 +584,7 @@ public class HadoopConverterJob @Override public InputSplit apply(DataSegment input) { - return new DataSegmentSplit(input); + return new DatasourceInputSplit(ImmutableList.of(input)); } } ); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/updater/MetadataStorageUpdaterJobSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/updater/MetadataStorageUpdaterJobSpec.java index ee8ae08f0d4..4595c1affbd 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/updater/MetadataStorageUpdaterJobSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/updater/MetadataStorageUpdaterJobSpec.java @@ -20,6 +20,7 @@ package io.druid.indexer.updater; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Supplier; import io.druid.metadata.MetadataStorageConnectorConfig; +import io.druid.metadata.MetadataStorageTablesConfig; import io.druid.metadata.PasswordProvider; import javax.validation.constraints.NotNull; @@ -78,4 +79,20 @@ public class MetadataStorageUpdaterJobSpec implements Supplier segments; + + public BatchDeltaIngestionTest() throws IOException + { + mapper = new DefaultObjectMapper(); + mapper.registerSubtypes(new NamedType(HashBasedNumberedShardSpec.class, "hashed")); + InjectableValues inject = new InjectableValues.Std().addValue(ObjectMapper.class, mapper); + mapper.setInjectableValues(inject); + + this.interval = new Interval("2014-10-22T00:00:00Z/P1D"); + segments = ImmutableList.of( + new DefaultObjectMapper() + .readValue( + this.getClass().getClassLoader().getResource("test-segment/descriptor.json"), + DataSegment.class + ) + .withLoadSpec( + ImmutableMap.of( + "type", + "local", + "path", + this.getClass().getClassLoader().getResource("test-segment/index.zip").getPath() + ) + ) + ); + } + + @Test + public void testReindexing() throws Exception + { + HadoopDruidIndexerConfig config = makeHadoopDruidIndexerConfig( + ImmutableMap.of( + "type", + "dataSource", + "ingestionSpec", + ImmutableMap.of( + "dataSource", + "xyz", + "interval", + interval + ), + "segments", + segments + ), + temporaryFolder.newFolder() + ); + + List> expectedRows = ImmutableList.of( + ImmutableMap.of( + "time", DateTime.parse("2014-10-22T00:00:00.000Z"), + "host", ImmutableList.of("a.example.com"), + "visited_sum", 100L, + "unique_hosts", 1.0d + ), + ImmutableMap.of( + "time", DateTime.parse("2014-10-22T01:00:00.000Z"), + "host", ImmutableList.of("b.example.com"), + "visited_sum", 150L, + "unique_hosts", 1.0d + ), + ImmutableMap.of( + "time", DateTime.parse("2014-10-22T02:00:00.000Z"), + "host", ImmutableList.of("c.example.com"), + "visited_sum", 200L, + "unique_hosts", 1.0d + ) + ); + + testIngestion(config, expectedRows); + } + + @Test + public void testDeltaIngestion() throws Exception + { + File dataFile = temporaryFolder.newFile(); + FileUtils.writeLines( + dataFile, + ImmutableList.of( + "2014102200,a.example.com,a.example.com,90", + "2014102201,b.example.com,b.example.com,25", + "2014102202,c.example.com,c.example.com,70" + ) + ); + + HadoopDruidIndexerConfig config = makeHadoopDruidIndexerConfig( + ImmutableMap.of( + "type", + "multi", + "children", + ImmutableList.of( + ImmutableMap.of( + "type", + "dataSource", + "ingestionSpec", + ImmutableMap.of( + "dataSource", + "xyz", + "interval", + interval + ), + "segments", + segments + ), + ImmutableMap.of( + "type", + "static", + "paths", + dataFile.getCanonicalPath() + ) + ) + ), + temporaryFolder.newFolder() + ); + + List> expectedRows = ImmutableList.of( + ImmutableMap.of( + "time", DateTime.parse("2014-10-22T00:00:00.000Z"), + "host", ImmutableList.of("a.example.com"), + "visited_sum", 190L, + "unique_hosts", 1.0d + ), + ImmutableMap.of( + "time", DateTime.parse("2014-10-22T01:00:00.000Z"), + "host", ImmutableList.of("b.example.com"), + "visited_sum", 175L, + "unique_hosts", 1.0d + ), + ImmutableMap.of( + "time", DateTime.parse("2014-10-22T02:00:00.000Z"), + "host", ImmutableList.of("c.example.com"), + "visited_sum", 270L, + "unique_hosts", 1.0d + ) + ); + + testIngestion(config, expectedRows); + } + + private void testIngestion(HadoopDruidIndexerConfig config, List> expectedRowsGenerated) + throws Exception + { + IndexGeneratorJob job = new LegacyIndexGeneratorJob(config); + JobHelper.runJobs(ImmutableList.of(job), config); + + File segmentFolder = new File( + String.format( + "%s/%s/%s_%s/%s/0", + config.getSchema().getIOConfig().getSegmentOutputPath(), + config.getSchema().getDataSchema().getDataSource(), + interval.getStart().toString(), + interval.getEnd().toString(), + config.getSchema().getTuningConfig().getVersion() + ) + ); + + Assert.assertTrue(segmentFolder.exists()); + + File descriptor = new File(segmentFolder, "descriptor.json"); + File indexZip = new File(segmentFolder, "index.zip"); + Assert.assertTrue(descriptor.exists()); + Assert.assertTrue(indexZip.exists()); + + DataSegment dataSegment = mapper.readValue(descriptor, DataSegment.class); + Assert.assertEquals("website", dataSegment.getDataSource()); + Assert.assertEquals(config.getSchema().getTuningConfig().getVersion(), dataSegment.getVersion()); + Assert.assertEquals(interval, dataSegment.getInterval()); + Assert.assertEquals("local", dataSegment.getLoadSpec().get("type")); + Assert.assertEquals(indexZip.getCanonicalPath(), dataSegment.getLoadSpec().get("path")); + Assert.assertEquals("host", dataSegment.getDimensions().get(0)); + Assert.assertEquals("visited_sum", dataSegment.getMetrics().get(0)); + Assert.assertEquals("unique_hosts", dataSegment.getMetrics().get(1)); + Assert.assertEquals(Integer.valueOf(9), dataSegment.getBinaryVersion()); + + HashBasedNumberedShardSpec spec = (HashBasedNumberedShardSpec) dataSegment.getShardSpec(); + Assert.assertEquals(0, spec.getPartitionNum()); + Assert.assertEquals(1, spec.getPartitions()); + + File tmpUnzippedSegmentDir = temporaryFolder.newFolder(); + new LocalDataSegmentPuller().getSegmentFiles(dataSegment, tmpUnzippedSegmentDir); + + QueryableIndex index = IndexIO.loadIndex(tmpUnzippedSegmentDir); + StorageAdapter adapter = new QueryableIndexStorageAdapter(index); + + Firehose firehose = new IngestSegmentFirehose( + ImmutableList.of(adapter), + ImmutableList.of("host"), + ImmutableList.of("visited_sum", "unique_hosts"), + null, + interval, + QueryGranularity.NONE + ); + + List rows = Lists.newArrayList(); + while (firehose.hasMore()) { + rows.add(firehose.nextRow()); + } + + verifyRows(expectedRowsGenerated, rows); + } + + private HadoopDruidIndexerConfig makeHadoopDruidIndexerConfig(Map inputSpec, File tmpDir) + throws Exception + { + HadoopDruidIndexerConfig config = new HadoopDruidIndexerConfig( + new HadoopIngestionSpec( + new DataSchema( + "website", + new StringInputRowParser( + new CSVParseSpec( + new TimestampSpec("timestamp", "yyyyMMddHH", null), + new DimensionsSpec(ImmutableList.of("host"), null, null), + null, + ImmutableList.of("timestamp", "host", "host2", "visited_num") + ) + ), + new AggregatorFactory[]{ + new LongSumAggregatorFactory("visited_sum", "visited_num"), + new HyperUniquesAggregatorFactory("unique_hosts", "host2") + }, + new UniformGranularitySpec( + Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(this.interval) + ) + ), + new HadoopIOConfig( + inputSpec, + null, + tmpDir.getCanonicalPath() + ), + new HadoopTuningConfig( + tmpDir.getCanonicalPath(), + null, + null, + null, + null, + null, + false, + false, + false, + false, + null, + false, + false, + false, + null, + null, + false + ) + ) + ); + + config.setShardSpecs( + ImmutableMap.>of( + interval.getStart(), + ImmutableList.of( + new HadoopyShardSpec( + new HashBasedNumberedShardSpec(0, 1, HadoopDruidIndexerConfig.jsonMapper), + 0 + ) + ) + ) + ); + config = HadoopDruidIndexerConfig.fromSpec(config.getSchema()); + return config; + } + + private void verifyRows(List> expectedRows, List actualRows) + { + Assert.assertEquals(expectedRows.size(), actualRows.size()); + + for (int i = 0; i < expectedRows.size(); i++) { + Map expected = expectedRows.get(i); + InputRow actual = actualRows.get(i); + + Assert.assertEquals(ImmutableList.of("host"), actual.getDimensions()); + + Assert.assertEquals(expected.get("time"), actual.getTimestamp()); + Assert.assertEquals(expected.get("host"), actual.getDimension("host")); + Assert.assertEquals(expected.get("visited_sum"), actual.getLongMetric("visited_sum")); + Assert.assertEquals( + (Double) expected.get("unique_hosts"), + (Double) HyperUniquesAggregatorFactory.estimateCardinality(actual.getRaw("unique_hosts")), + 0.001 + ); + } + } +} diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest.java new file mode 100644 index 00000000000..646cd3d9f61 --- /dev/null +++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest.java @@ -0,0 +1,154 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.metamx.common.Granularity; +import io.druid.indexer.hadoop.DatasourceIngestionSpec; +import io.druid.indexer.path.DatasourcePathSpec; +import io.druid.indexer.path.MultiplePathSpec; +import io.druid.indexer.path.PathSpec; +import io.druid.indexer.path.StaticPathSpec; +import io.druid.indexer.path.UsedSegmentLister; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.granularity.UniformGranularitySpec; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NoneShardSpec; +import org.easymock.EasyMock; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.Map; + +/** + */ +public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest +{ + private final String testDatasource = "test"; + private final Interval testDatasourceInterval = new Interval("1970/2000"); + + private final ObjectMapper jsonMapper = new DefaultObjectMapper(); + + private final List segments = ImmutableList.of( + new DataSegment( + "test1", + Interval.parse("2000/3000"), + "ver", + ImmutableMap.of( + "type", "local", + "path", "/tmp/index1.zip" + ), + ImmutableList.of("host"), + ImmutableList.of("visited_sum", "unique_hosts"), + new NoneShardSpec(), + 9, + 2 + ) + ); + + @Test + public void testupdateSegmentListIfDatasourcePathSpecIsUsedWithNoDatasourcePathSpec() throws Exception + { + PathSpec pathSpec = new StaticPathSpec("/xyz", null); + HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(pathSpec); + Assert.assertTrue(config.getPathSpec() instanceof StaticPathSpec); + } + + @Test + public void testupdateSegmentListIfDatasourcePathSpecIsUsedWithJustDatasourcePathSpec() throws Exception + { + PathSpec pathSpec = new DatasourcePathSpec( + jsonMapper, + null, + new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null), + null + ); + HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(pathSpec); + Assert.assertEquals(segments, ((DatasourcePathSpec) config.getPathSpec()).getSegments()); + } + + @Test + public void testupdateSegmentListIfDatasourcePathSpecIsUsedWithMultiplePathSpec() throws Exception + { + PathSpec pathSpec = new MultiplePathSpec( + ImmutableList.of( + new StaticPathSpec("/xyz", null), + new DatasourcePathSpec( + jsonMapper, + null, + new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null), + null + ) + ) + ); + HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(pathSpec); + Assert.assertEquals( + segments, + ((DatasourcePathSpec) ((MultiplePathSpec) config.getPathSpec()).getChildren().get(1)).getSegments() + ); + } + + private HadoopDruidIndexerConfig testRunUpdateSegmentListIfDatasourcePathSpecIsUsed( + PathSpec datasourcePathSpec + ) + throws Exception + { + HadoopIngestionSpec spec = new HadoopIngestionSpec( + new DataSchema( + "foo", + null, + new AggregatorFactory[0], + new UniformGranularitySpec( + Granularity.DAY, + null, + ImmutableList.of( + new Interval("2010-01-01/P1D") + ) + ) + ), + new HadoopIOConfig( + jsonMapper.convertValue(datasourcePathSpec, Map.class), + null, + null + ), + null + ); + + spec = jsonMapper.readValue( + jsonMapper.writeValueAsString(spec), + HadoopIngestionSpec.class + ); + + UsedSegmentLister segmentLister = EasyMock.createMock(UsedSegmentLister.class); + EasyMock.expect( + segmentLister.getUsedSegmentsForInterval(testDatasource, testDatasourceInterval) + ).andReturn(segments); + EasyMock.replay(segmentLister); + + spec = HadoopIngestionSpec.updateSegmentListIfDatasourcePathSpecIsUsed(spec, jsonMapper, segmentLister); + return HadoopDruidIndexerConfig.fromString(jsonMapper.writeValueAsString(spec)); + } +} diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceIngestionSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceIngestionSpecTest.java new file mode 100644 index 00000000000..702047ee12a --- /dev/null +++ b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceIngestionSpecTest.java @@ -0,0 +1,52 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.hadoop; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import io.druid.granularity.QueryGranularity; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.filter.SelectorDimFilter; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +/** + */ +public class DatasourceIngestionSpecTest +{ + @Test + public void testSerde() throws Exception + { + DatasourceIngestionSpec expected = new DatasourceIngestionSpec( + "test", + Interval.parse("2014/2015"), + new SelectorDimFilter("dim", "value"), + QueryGranularity.DAY, + Lists.newArrayList("d1", "d2"), + Lists.newArrayList("m1", "m2", "m3") + ); + + ObjectMapper jsonMapper = new DefaultObjectMapper(); + + DatasourceIngestionSpec actual = jsonMapper.readValue(jsonMapper.writeValueAsString(expected), DatasourceIngestionSpec.class); + Assert.assertEquals(expected, actual); + } +} diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceInputFormatTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceInputFormatTest.java new file mode 100644 index 00000000000..fe3802eccb0 --- /dev/null +++ b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceInputFormatTest.java @@ -0,0 +1,154 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.hadoop; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NoneShardSpec; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.easymock.EasyMock; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; + +/** + */ +public class DatasourceInputFormatTest +{ + private List segments; + private Configuration config; + private JobContext context; + + @Before + public void setUp() throws Exception + { + segments = ImmutableList.of( + new DataSegment( + "test1", + Interval.parse("2000/3000"), + "ver", + ImmutableMap.of( + "type", "local", + "path", "/tmp/index1.zip" + ), + ImmutableList.of("host"), + ImmutableList.of("visited_sum", "unique_hosts"), + new NoneShardSpec(), + 9, + 2 + ), + new DataSegment( + "test2", + Interval.parse("2050/3000"), + "ver", + ImmutableMap.of( + "type", "hdfs", + "path", "/tmp/index2.zip" + ), + ImmutableList.of("host"), + ImmutableList.of("visited_sum", "unique_hosts"), + new NoneShardSpec(), + 9, + 11 + ), + new DataSegment( + "test3", + Interval.parse("2030/3000"), + "ver", + ImmutableMap.of( + "type", "hdfs", + "path", "/tmp/index3.zip" + ), + ImmutableList.of("host"), + ImmutableList.of("visited_sum", "unique_hosts"), + new NoneShardSpec(), + 9, + 4 + ) + ); + + config = new Configuration(); + config.set( + DatasourceInputFormat.CONF_INPUT_SEGMENTS, + new DefaultObjectMapper().writeValueAsString(segments) + ); + + context = EasyMock.createMock(JobContext.class); + EasyMock.expect(context.getConfiguration()).andReturn(config); + EasyMock.replay(context); + } + + @Test + public void testGetSplitsNoCombining() throws Exception + { + List splits = new DatasourceInputFormat().getSplits(context); + + Assert.assertEquals(segments.size(), splits.size()); + for (int i = 0; i < segments.size(); i++) { + Assert.assertEquals(segments.get(i), ((DatasourceInputSplit) splits.get(i)).getSegments().get(0)); + } + } + + @Test + public void testGetSplitsAllCombined() throws Exception + { + config.set(DatasourceInputFormat.CONF_MAX_SPLIT_SIZE, "999999"); + List splits = new DatasourceInputFormat().getSplits(context); + + Assert.assertEquals(1, splits.size()); + Assert.assertEquals( + Sets.newHashSet(segments), + Sets.newHashSet((((DatasourceInputSplit) splits.get(0)).getSegments())) + ); + } + + @Test + public void testGetSplitsCombineInTwo() throws Exception + { + config.set(DatasourceInputFormat.CONF_MAX_SPLIT_SIZE, "6"); + List splits = new DatasourceInputFormat().getSplits(context); + + Assert.assertEquals(2, splits.size()); + + Assert.assertEquals( + Sets.newHashSet(segments.get(0), segments.get(2)), + Sets.newHashSet((((DatasourceInputSplit) splits.get(0)).getSegments())) + ); + + Assert.assertEquals( + Sets.newHashSet(segments.get(1)), + Sets.newHashSet((((DatasourceInputSplit) splits.get(1)).getSegments())) + ); + } + + @Test + public void testGetRecordReader() throws Exception + { + Assert.assertTrue(new DatasourceInputFormat().createRecordReader(null, null) instanceof DatasourceRecordReader); + } +} diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceInputSplitTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceInputSplitTest.java new file mode 100644 index 00000000000..87872339cfe --- /dev/null +++ b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceInputSplitTest.java @@ -0,0 +1,71 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.hadoop; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NoneShardSpec; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.io.DataInput; + +/** + */ +public class DatasourceInputSplitTest +{ + @Test + public void testSerde() throws Exception + { + DatasourceInputSplit expected = new DatasourceInputSplit( + Lists.newArrayList( + new DataSegment( + "test", + Interval.parse("2000/3000"), + "ver", + ImmutableMap.of( + "type", "local", + "path", "/tmp/index.zip" + ), + ImmutableList.of("host"), + ImmutableList.of("visited_sum", "unique_hosts"), + new NoneShardSpec(), + 9, + 12334 + ) + ) + ); + + ByteArrayDataOutput out = ByteStreams.newDataOutput(); + expected.write(out); + + DataInput in = ByteStreams.newDataInput(out.toByteArray()); + DatasourceInputSplit actual = new DatasourceInputSplit(); + actual.readFields(in); + + Assert.assertEquals(expected.getSegments(), actual.getSegments()); + Assert.assertEquals(12334, actual.getLength()); + } +} diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceRecordReaderTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceRecordReaderTest.java new file mode 100644 index 00000000000..c4c25f15d4f --- /dev/null +++ b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceRecordReaderTest.java @@ -0,0 +1,136 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.hadoop; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import io.druid.data.input.InputRow; +import io.druid.indexer.HadoopDruidIndexerConfig; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; +import io.druid.timeline.DataSegment; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.easymock.EasyMock; +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.Map; + +/** + */ +public class DatasourceRecordReaderTest +{ + @Test + public void testSanity() throws Exception + { + DataSegment segment = new DefaultObjectMapper() + .readValue(this.getClass().getClassLoader().getResource("test-segment/descriptor.json"), DataSegment.class) + .withLoadSpec( + ImmutableMap.of( + "type", + "local", + "path", + this.getClass().getClassLoader().getResource("test-segment/index.zip").getPath() + ) + ); + InputSplit split = new DatasourceInputSplit(Lists.newArrayList(segment)); + + Configuration config = new Configuration(); + config.set( + DatasourceInputFormat.CONF_DRUID_SCHEMA, + HadoopDruidIndexerConfig.jsonMapper.writeValueAsString( + new DatasourceIngestionSpec( + segment.getDataSource(), + segment.getInterval(), + null, + null, + segment.getDimensions(), + segment.getMetrics() + ) + ) + ); + + TaskAttemptContext context = EasyMock.createNiceMock(TaskAttemptContext.class); + EasyMock.expect(context.getConfiguration()).andReturn(config).anyTimes(); + EasyMock.replay(context); + + DatasourceRecordReader rr = new DatasourceRecordReader(); + rr.initialize(split, context); + + Assert.assertEquals(0, rr.getProgress(), 0.0001); + + List rows = Lists.newArrayList(); + while(rr.nextKeyValue()) { + rows.add(rr.getCurrentValue()); + } + verifyRows(rows); + + Assert.assertEquals(1, rr.getProgress(), 0.0001); + + rr.close(); + } + + private void verifyRows(List actualRows) + { + List> expectedRows = ImmutableList.of( + ImmutableMap.of( + "time", DateTime.parse("2014-10-22T00:00:00.000Z"), + "host", ImmutableList.of("a.example.com"), + "visited_sum", 100L, + "unique_hosts", 1.0d + ), + ImmutableMap.of( + "time", DateTime.parse("2014-10-22T01:00:00.000Z"), + "host", ImmutableList.of("b.example.com"), + "visited_sum", 150L, + "unique_hosts", 1.0d + ), + ImmutableMap.of( + "time", DateTime.parse("2014-10-22T02:00:00.000Z"), + "host", ImmutableList.of("c.example.com"), + "visited_sum", 200L, + "unique_hosts", 1.0d + ) + ); + + Assert.assertEquals(expectedRows.size(), actualRows.size()); + + for (int i = 0; i < expectedRows.size(); i++) { + Map expected = expectedRows.get(i); + InputRow actual = actualRows.get(i); + + Assert.assertEquals(ImmutableList.of("host"), actual.getDimensions()); + + Assert.assertEquals(expected.get("time"), actual.getTimestamp()); + Assert.assertEquals(expected.get("host"), actual.getDimension("host")); + Assert.assertEquals(expected.get("visited_sum"), actual.getLongMetric("visited_sum")); + Assert.assertEquals( + (Double) expected.get("unique_hosts"), + (Double) HyperUniquesAggregatorFactory.estimateCardinality(actual.getRaw("unique_hosts")), + 0.001 + ); + } + } +} diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java new file mode 100644 index 00000000000..cf62f47277e --- /dev/null +++ b/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java @@ -0,0 +1,235 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.path; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Binder; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.Module; +import com.metamx.common.Granularity; +import io.druid.data.input.impl.CSVParseSpec; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.StringInputRowParser; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.granularity.QueryGranularity; +import io.druid.guice.GuiceInjectors; +import io.druid.guice.JsonConfigProvider; +import io.druid.guice.annotations.Self; +import io.druid.indexer.HadoopDruidIndexerConfig; +import io.druid.indexer.HadoopIOConfig; +import io.druid.indexer.HadoopIngestionSpec; +import io.druid.indexer.HadoopTuningConfig; +import io.druid.indexer.hadoop.DatasourceIngestionSpec; +import io.druid.indexer.hadoop.DatasourceInputFormat; +import io.druid.initialization.Initialization; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.granularity.UniformGranularitySpec; +import io.druid.server.DruidNode; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NoneShardSpec; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Job; +import org.easymock.EasyMock; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +/** + */ +public class DatasourcePathSpecTest +{ + private DatasourceIngestionSpec ingestionSpec; + private List segments; + + public DatasourcePathSpecTest() + { + this.ingestionSpec = new DatasourceIngestionSpec( + "test", + Interval.parse("2000/3000"), + null, + null, + null, + null + ); + + segments = ImmutableList.of( + new DataSegment( + ingestionSpec.getDataSource(), + Interval.parse("2000/3000"), + "ver", + ImmutableMap.of( + "type", "local", + "path", "/tmp/index.zip" + ), + ImmutableList.of("product"), + ImmutableList.of("visited_sum", "unique_hosts"), + new NoneShardSpec(), + 9, + 12334 + ), + new DataSegment( + ingestionSpec.getDataSource(), + Interval.parse("2050/3000"), + "ver", + ImmutableMap.of( + "type", "hdfs", + "path", "/tmp/index.zip" + ), + ImmutableList.of("product"), + ImmutableList.of("visited_sum", "unique_hosts"), + new NoneShardSpec(), + 9, + 12335 + ) + ); + } + + @Test + public void testSerde() throws Exception + { + final UsedSegmentLister segmentList = EasyMock.createMock( + UsedSegmentLister.class + ); + + Injector injector = Initialization.makeInjectorWithModules( + GuiceInjectors.makeStartupInjector(), + ImmutableList.of( + new Module() + { + @Override + public void configure(Binder binder) + { + binder.bind(UsedSegmentLister.class).toInstance(segmentList); + JsonConfigProvider.bindInstance( + binder, Key.get(DruidNode.class, Self.class), new DruidNode("dummy-node", null, null) + ); + } + } + ) + ); + + ObjectMapper jsonMapper = injector.getInstance(ObjectMapper.class); + + DatasourcePathSpec expected = new DatasourcePathSpec( + jsonMapper, + null, + ingestionSpec, + Long.valueOf(10) + ); + PathSpec actual = jsonMapper.readValue(jsonMapper.writeValueAsString(expected), PathSpec.class); + Assert.assertEquals(expected, actual); + + expected = new DatasourcePathSpec( + jsonMapper, + null, + ingestionSpec, + null + ); + actual = jsonMapper.readValue(jsonMapper.writeValueAsString(expected), PathSpec.class); + Assert.assertEquals(expected, actual); + + expected = new DatasourcePathSpec( + jsonMapper, + segments, + ingestionSpec, + null + ); + actual = jsonMapper.readValue(jsonMapper.writeValueAsString(expected), PathSpec.class); + Assert.assertEquals(expected, actual); + } + + @Test + public void testAddInputPaths() throws Exception + { + HadoopDruidIndexerConfig hadoopIndexerConfig = new HadoopDruidIndexerConfig( + new HadoopIngestionSpec( + new DataSchema( + ingestionSpec.getDataSource(), + new StringInputRowParser( + new CSVParseSpec( + new TimestampSpec("timestamp", "yyyyMMddHH", null), + new DimensionsSpec(null, null, null), + null, + ImmutableList.of("timestamp", "host", "visited") + ) + ), + new AggregatorFactory[]{ + new LongSumAggregatorFactory("visited_sum", "visited") + }, + new UniformGranularitySpec( + Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(Interval.parse("2000/3000")) + ) + ), + new HadoopIOConfig( + ImmutableMap.of( + "paths", + "/tmp/dummy", + "type", + "static" + ), + null, + "/tmp/dummy" + ), + HadoopTuningConfig.makeDefaultTuningConfig().withWorkingPath("/tmp/work").withVersion("ver") + ) + ); + + + + ObjectMapper mapper = new DefaultObjectMapper(); + + DatasourcePathSpec pathSpec = new DatasourcePathSpec( + mapper, + segments, + ingestionSpec, + null + ); + + Configuration config = new Configuration(); + Job job = EasyMock.createNiceMock(Job.class); + EasyMock.expect(job.getConfiguration()).andReturn(config).anyTimes(); + EasyMock.replay(job); + + pathSpec.addInputPaths(hadoopIndexerConfig, job); + List actualSegments = mapper.readValue( + config.get(DatasourceInputFormat.CONF_INPUT_SEGMENTS), + new TypeReference>() + { + } + ); + + Assert.assertEquals(segments, actualSegments); + + DatasourceIngestionSpec actualIngestionSpec = mapper.readValue(config.get(DatasourceInputFormat.CONF_DRUID_SCHEMA), DatasourceIngestionSpec.class); + Assert.assertEquals(ingestionSpec + .withDimensions(ImmutableList.of("product")) + .withMetrics(ImmutableList.of("visited_sum")), + actualIngestionSpec); + } +} diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java index a7c5fc2033a..fcad18866ff 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java @@ -76,18 +76,18 @@ public class GranularityPathSpecTest } @Test - public void testDeserialization() throws Exception + public void testSerdeCustomInputFormat() throws Exception { - testDeserialization("/test/path", "*.test", "pat_pat", Granularity.SECOND, TextInputFormat.class); + testSerde("/test/path", "*.test", "pat_pat", Granularity.SECOND, TextInputFormat.class); } @Test - public void testDeserializationNoInputFormat() throws Exception + public void testSerdeNoInputFormat() throws Exception { - testDeserialization("/test/path", "*.test", "pat_pat", Granularity.SECOND, null); + testSerde("/test/path", "*.test", "pat_pat", Granularity.SECOND, null); } - private void testDeserialization( + private void testSerde( String inputPath, String filePattern, String pathFormat, @@ -114,7 +114,7 @@ public class GranularityPathSpecTest } sb.append("\"type\" : \"granularity\"}"); - GranularityPathSpec pathSpec = (GranularityPathSpec)jsonMapper.readValue(sb.toString(), PathSpec.class); + GranularityPathSpec pathSpec = (GranularityPathSpec) StaticPathSpecTest.readWriteRead(sb.toString(), jsonMapper); Assert.assertEquals(inputFormat, pathSpec.getInputFormat()); Assert.assertEquals(inputPath, pathSpec.getInputPath()); Assert.assertEquals(filePattern, pathSpec.getFilePattern()); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/path/MultiplePathSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/path/MultiplePathSpecTest.java new file mode 100644 index 00000000000..168d41e1953 --- /dev/null +++ b/indexing-hadoop/src/test/java/io/druid/indexer/path/MultiplePathSpecTest.java @@ -0,0 +1,65 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.path; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import io.druid.jackson.DefaultObjectMapper; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Test; + +/** + */ +public class MultiplePathSpecTest +{ + @Test + public void testSerde() throws Exception + { + PathSpec expected = new MultiplePathSpec( + Lists.newArrayList( + new StaticPathSpec("/tmp/path1", null), + new StaticPathSpec("/tmp/path2", TextInputFormat.class) + ) + ); + + ObjectMapper jsonMapper = new DefaultObjectMapper(); + + PathSpec actual = jsonMapper.readValue(jsonMapper.writeValueAsString(expected), PathSpec.class); + Assert.assertEquals(expected, actual); + } + + @Test + public void testAddInputPaths() throws Exception + { + PathSpec ps1 = EasyMock.createMock(PathSpec.class); + EasyMock.expect(ps1.addInputPaths(null, null)).andReturn(null); + + PathSpec ps2 = EasyMock.createMock(PathSpec.class); + EasyMock.expect(ps2.addInputPaths(null, null)).andReturn(null); + + EasyMock.replay(ps1, ps2); + + new MultiplePathSpec(Lists.newArrayList(ps1, ps2)).addInputPaths(null, null); + + EasyMock.verify(ps1, ps2); + } +} diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/path/StaticPathSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/path/StaticPathSpecTest.java index c8bf9a8575a..b654d7849f0 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/path/StaticPathSpecTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/path/StaticPathSpecTest.java @@ -21,8 +21,6 @@ package io.druid.indexer.path; import io.druid.jackson.DefaultObjectMapper; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.junit.Assert; import org.junit.Test; @@ -34,18 +32,18 @@ public class StaticPathSpecTest private final ObjectMapper jsonMapper = new DefaultObjectMapper(); @Test - public void testDeserialization() throws Exception + public void testSerdeCustomInputFormat() throws Exception { - testDeserialization("/sample/path", TextInputFormat.class); + testSerde("/sample/path", TextInputFormat.class); } @Test public void testDeserializationNoInputFormat() throws Exception { - testDeserialization("/sample/path", null); + testSerde("/sample/path", null); } - private void testDeserialization(String path, Class inputFormat) throws Exception + private void testSerde(String path, Class inputFormat) throws Exception { StringBuilder sb = new StringBuilder(); sb.append("{\"paths\" : \""); @@ -57,13 +55,22 @@ public class StaticPathSpecTest sb.append("\","); } sb.append("\"type\" : \"static\"}"); - StaticPathSpec pathSpec = (StaticPathSpec)jsonMapper.readValue(sb.toString(), PathSpec.class); + + StaticPathSpec pathSpec = (StaticPathSpec) readWriteRead(sb.toString(), jsonMapper); Assert.assertEquals(inputFormat, pathSpec.getInputFormat()); - - Job job = Job.getInstance(); - pathSpec.addInputPaths(null, job); - Assert.assertEquals( - "file:" + path, - job.getConfiguration().get(FileInputFormat.INPUT_DIR)); + Assert.assertEquals(path, pathSpec.getPaths()); + } + + public static final PathSpec readWriteRead(String jsonStr, ObjectMapper jsonMapper) throws Exception + { + return jsonMapper.readValue( + jsonMapper.writeValueAsString( + jsonMapper.readValue( + jsonStr, + PathSpec.class + ) + ), + PathSpec.class + ); } } diff --git a/indexing-hadoop/src/test/resources/test-segment/descriptor.json b/indexing-hadoop/src/test/resources/test-segment/descriptor.json new file mode 100644 index 00000000000..f892b765f55 --- /dev/null +++ b/indexing-hadoop/src/test/resources/test-segment/descriptor.json @@ -0,0 +1,17 @@ +{ + "binaryVersion": 9, + "dataSource": "testds", + "dimensions": "host", + "identifier": "testds_2014-10-22T00:00:00.000Z_2014-10-23T00:00:00.000Z_2015-07-15T22:02:40.171Z", + "interval": "2014-10-22T00:00:00.000Z/2014-10-23T00:00:00.000Z", + "loadSpec": { + "path": "test-segment/index.zip", + "type": "local" + }, + "metrics": "visited_sum,unique_hosts", + "shardSpec": { + "type": "none" + }, + "size": 4096, + "version": "2015-07-15T22:02:40.171Z" +} diff --git a/indexing-hadoop/src/test/resources/test-segment/index.zip b/indexing-hadoop/src/test/resources/test-segment/index.zip new file mode 100644 index 00000000000..f9e15b415ff Binary files /dev/null and b/indexing-hadoop/src/test/resources/test-segment/index.zip differ diff --git a/indexing-hadoop/src/test/resources/test-segment/note b/indexing-hadoop/src/test/resources/test-segment/note new file mode 100644 index 00000000000..70d8a95d315 --- /dev/null +++ b/indexing-hadoop/src/test/resources/test-segment/note @@ -0,0 +1,5 @@ +This is a test segment containing following columns. +__time, host, visited_sum, unique_hosts +2014-10-22T00:00:00.000Z, a.example.com, 100, HLL sketch of cardinality 1 +2014-10-22T01:00:00.000Z, b.example.com, 150, HLL sketch of cardinality 1 +2014-10-22T02:00:00.000Z, c.example.com, 200, HLL sketch of cardinality 1 diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java index 08d9c48fa02..e865842ce99 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java @@ -17,10 +17,12 @@ package io.druid.indexing.common.task; +import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -33,17 +35,22 @@ import io.druid.indexer.HadoopDruidIndexerJob; import io.druid.indexer.HadoopIngestionSpec; import io.druid.indexer.Jobby; import io.druid.indexer.MetadataStorageUpdaterJobHandler; +import io.druid.indexer.hadoop.DatasourceIngestionSpec; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.LockAcquireAction; import io.druid.indexing.common.actions.LockTryAcquireAction; +import io.druid.indexing.common.actions.SegmentListUsedAction; import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.indexing.hadoop.OverlordActionBasedUsedSegmentLister; import io.druid.timeline.DataSegment; import org.joda.time.DateTime; import org.joda.time.Interval; +import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.SortedSet; public class HadoopIndexTask extends HadoopTask @@ -56,10 +63,14 @@ public class HadoopIndexTask extends HadoopTask } @JsonIgnore - private final HadoopIngestionSpec spec; + private HadoopIngestionSpec spec; + @JsonIgnore private final String classpathPrefix; + @JsonIgnore + private final ObjectMapper jsonMapper; + /** * @param spec is used by the HadoopDruidIndexerJob to set up the appropriate parameters * for creating Druid index segments. It may be modified. @@ -76,7 +87,8 @@ public class HadoopIndexTask extends HadoopTask @JsonProperty("spec") HadoopIngestionSpec spec, @JsonProperty("hadoopCoordinates") String hadoopCoordinates, @JsonProperty("hadoopDependencyCoordinates") List hadoopDependencyCoordinates, - @JsonProperty("classpathPrefix") String classpathPrefix + @JsonProperty("classpathPrefix") String classpathPrefix, + @JacksonInject ObjectMapper jsonMapper ) { super( @@ -102,6 +114,7 @@ public class HadoopIndexTask extends HadoopTask ); this.classpathPrefix = classpathPrefix; + this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "null ObjectMappper"); } @Override @@ -152,6 +165,11 @@ public class HadoopIndexTask extends HadoopTask final ClassLoader loader = buildClassLoader(toolbox); boolean determineIntervals = !spec.getDataSchema().getGranularitySpec().bucketIntervals().isPresent(); + spec = HadoopIngestionSpec.updateSegmentListIfDatasourcePathSpecIsUsed( + spec, + jsonMapper, + new OverlordActionBasedUsedSegmentLister(toolbox)); + final String config = invokeForeignLoader( "io.druid.indexing.common.task.HadoopIndexTask$HadoopDetermineConfigInnerProcessing", new String[]{ diff --git a/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java index 9d510c14300..d620d90dc85 100644 --- a/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -25,20 +25,13 @@ import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.inject.Injector; -import com.metamx.common.guava.Sequence; -import com.metamx.common.guava.Sequences; -import com.metamx.common.guava.Yielder; -import com.metamx.common.guava.YieldingAccumulator; import com.metamx.common.parsers.ParseException; import com.metamx.emitter.EmittingLogger; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; -import io.druid.data.input.InputRow; -import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.impl.InputRowParser; import io.druid.granularity.QueryGranularity; import io.druid.indexing.common.TaskToolbox; @@ -46,30 +39,19 @@ import io.druid.indexing.common.TaskToolboxFactory; import io.druid.indexing.common.actions.SegmentListUsedAction; import io.druid.indexing.common.task.NoopTask; import io.druid.query.filter.DimFilter; -import io.druid.query.select.EventHolder; -import io.druid.segment.Cursor; -import io.druid.segment.DimensionSelector; import io.druid.segment.IndexIO; -import io.druid.segment.LongColumnSelector; -import io.druid.segment.ObjectColumnSelector; import io.druid.segment.QueryableIndexStorageAdapter; import io.druid.segment.StorageAdapter; -import io.druid.segment.column.Column; -import io.druid.segment.data.IndexedInts; -import io.druid.segment.filter.Filters; import io.druid.segment.loading.SegmentLoadingException; +import io.druid.segment.realtime.firehose.IngestSegmentFirehose; import io.druid.timeline.DataSegment; import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.partition.PartitionChunk; -import io.druid.utils.Runnables; -import org.joda.time.DateTime; import org.joda.time.Interval; -import javax.annotation.Nullable; import java.io.File; import java.io.IOException; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -286,7 +268,7 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory rowYielder; - - public IngestSegmentFirehose(List adapters, final List dims, final List metrics) - { - Sequence rows = Sequences.concat( - Iterables.transform( - adapters, new Function>() - { - @Nullable - @Override - public Sequence apply(StorageAdapter adapter) - { - return Sequences.concat( - Sequences.map( - adapter.makeCursors( - Filters.convertDimensionFilters(dimFilter), - interval, - QueryGranularity.ALL - ), new Function>() - { - @Nullable - @Override - public Sequence apply(final Cursor cursor) - { - final LongColumnSelector timestampColumnSelector = cursor.makeLongColumnSelector(Column.TIME_COLUMN_NAME); - - final Map dimSelectors = Maps.newHashMap(); - for (String dim : dims) { - final DimensionSelector dimSelector = cursor.makeDimensionSelector(dim, null); - // dimSelector is null if the dimension is not present - if (dimSelector != null) { - dimSelectors.put(dim, dimSelector); - } - } - - final Map metSelectors = Maps.newHashMap(); - for (String metric : metrics) { - final ObjectColumnSelector metricSelector = cursor.makeObjectColumnSelector(metric); - if (metricSelector != null) { - metSelectors.put(metric, metricSelector); - } - } - - return Sequences.simple( - new Iterable() - { - @Override - public Iterator iterator() - { - return new Iterator() - { - @Override - public boolean hasNext() - { - return !cursor.isDone(); - } - - @Override - public InputRow next() - { - final Map theEvent = Maps.newLinkedHashMap(); - final long timestamp = timestampColumnSelector.get(); - theEvent.put(EventHolder.timestampKey, new DateTime(timestamp)); - - for (Map.Entry dimSelector : dimSelectors.entrySet()) { - final String dim = dimSelector.getKey(); - final DimensionSelector selector = dimSelector.getValue(); - final IndexedInts vals = selector.getRow(); - - if (vals.size() == 1) { - final String dimVal = selector.lookupName(vals.get(0)); - theEvent.put(dim, dimVal); - } else { - List dimVals = Lists.newArrayList(); - for (int i = 0; i < vals.size(); ++i) { - dimVals.add(selector.lookupName(vals.get(i))); - } - theEvent.put(dim, dimVals); - } - } - - for (Map.Entry metSelector : metSelectors.entrySet()) { - final String metric = metSelector.getKey(); - final ObjectColumnSelector selector = metSelector.getValue(); - theEvent.put(metric, selector.get()); - } - cursor.advance(); - return new MapBasedInputRow(timestamp, dims, theEvent); - } - - @Override - public void remove() - { - throw new UnsupportedOperationException("Remove Not Supported"); - } - }; - } - } - ); - } - } - ) - ); - } - } - ) - ); - rowYielder = rows.toYielder( - null, - new YieldingAccumulator() - { - @Override - public InputRow accumulate(InputRow accumulated, InputRow in) - { - yield(); - return in; - } - } - ); - } - - @Override - public boolean hasMore() - { - return !rowYielder.isDone(); - } - - @Override - public InputRow nextRow() - { - final InputRow inputRow = rowYielder.get(); - rowYielder = rowYielder.next(null); - return inputRow; - } - - @Override - public Runnable commit() - { - return Runnables.getNoopRunnable(); - } - - @Override - public void close() throws IOException - { - rowYielder.close(); - } - } } diff --git a/indexing-service/src/main/java/io/druid/indexing/hadoop/OverlordActionBasedUsedSegmentLister.java b/indexing-service/src/main/java/io/druid/indexing/hadoop/OverlordActionBasedUsedSegmentLister.java new file mode 100644 index 00000000000..723d927aeac --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/hadoop/OverlordActionBasedUsedSegmentLister.java @@ -0,0 +1,54 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexing.hadoop; + +import com.google.api.client.repackaged.com.google.common.base.Preconditions; +import com.google.inject.Inject; +import io.druid.indexer.path.UsedSegmentLister; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.SegmentListUsedAction; +import io.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import java.io.IOException; +import java.util.List; + +/** + */ +public class OverlordActionBasedUsedSegmentLister implements UsedSegmentLister +{ + private final TaskToolbox toolbox; + + @Inject + public OverlordActionBasedUsedSegmentLister(TaskToolbox toolbox) + { + this.toolbox = Preconditions.checkNotNull(toolbox, "null task toolbox"); + } + + @Override + public List getUsedSegmentsForInterval( + String dataSource, Interval interval + ) throws IOException + { + return toolbox + .getTaskActionClient() + .submit(new SegmentListUsedAction(dataSource, interval)); + } +} diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index 52b41685573..a406ef39a7e 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -471,11 +471,15 @@ public class TaskSerdeTest ), null, null, - "blah" + "blah", + jsonMapper ); final String json = jsonMapper.writeValueAsString(task); - final HadoopIndexTask task2 = (HadoopIndexTask) jsonMapper.readValue(json, Task.class); + + InjectableValues inject = new InjectableValues.Std() + .addValue(ObjectMapper.class, jsonMapper); + final HadoopIndexTask task2 = (HadoopIndexTask) jsonMapper.reader(Task.class).with(inject).readValue(json); Assert.assertEquals("foo", task.getDataSource()); diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index 7bb22af7fc3..c632c84ad6d 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -74,6 +74,7 @@ import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.SegmentLoaderLocalCacheManager; import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.loading.StorageLocationConfig; +import io.druid.segment.realtime.firehose.IngestSegmentFirehose; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NumberedShardSpec; import org.joda.time.Interval; @@ -461,8 +462,8 @@ public class IngestSegmentFirehoseFactoryTest { Assert.assertEquals(MAX_SHARD_NUMBER.longValue(), segmentSet.size()); Integer rowcount = 0; - try (final IngestSegmentFirehoseFactory.IngestSegmentFirehose firehose = - (IngestSegmentFirehoseFactory.IngestSegmentFirehose) + try (final IngestSegmentFirehose firehose = + (IngestSegmentFirehose) factory.connect(rowParser)) { while (firehose.hasMore()) { InputRow row = firehose.nextRow(); diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/IngestSegmentFirehose.java b/server/src/main/java/io/druid/segment/realtime/firehose/IngestSegmentFirehose.java new file mode 100644 index 00000000000..bc7da0bfcf2 --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/firehose/IngestSegmentFirehose.java @@ -0,0 +1,202 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.segment.realtime.firehose; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; +import com.metamx.common.guava.Yielder; +import com.metamx.common.guava.YieldingAccumulator; +import io.druid.data.input.Firehose; +import io.druid.data.input.InputRow; +import io.druid.data.input.MapBasedInputRow; +import io.druid.granularity.QueryGranularity; +import io.druid.query.filter.DimFilter; +import io.druid.query.select.EventHolder; +import io.druid.segment.Cursor; +import io.druid.segment.DimensionSelector; +import io.druid.segment.LongColumnSelector; +import io.druid.segment.ObjectColumnSelector; +import io.druid.segment.StorageAdapter; +import io.druid.segment.column.Column; +import io.druid.segment.data.IndexedInts; +import io.druid.segment.filter.Filters; +import io.druid.utils.Runnables; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +public class IngestSegmentFirehose implements Firehose +{ + private volatile Yielder rowYielder; + + public IngestSegmentFirehose(List adapters, final List dims, final List metrics, final DimFilter dimFilter, final Interval interval, final QueryGranularity granularity) + { + Sequence rows = Sequences.concat( + Iterables.transform( + adapters, new Function>() + { + @Nullable + @Override + public Sequence apply(StorageAdapter adapter) + { + return Sequences.concat( + Sequences.map( + adapter.makeCursors( + Filters.convertDimensionFilters(dimFilter), + interval, + granularity + ), new Function>() + { + @Nullable + @Override + public Sequence apply(final Cursor cursor) + { + final LongColumnSelector timestampColumnSelector = cursor.makeLongColumnSelector(Column.TIME_COLUMN_NAME); + + final Map dimSelectors = Maps.newHashMap(); + for (String dim : dims) { + final DimensionSelector dimSelector = cursor.makeDimensionSelector(dim, null); + // dimSelector is null if the dimension is not present + if (dimSelector != null) { + dimSelectors.put(dim, dimSelector); + } + } + + final Map metSelectors = Maps.newHashMap(); + for (String metric : metrics) { + final ObjectColumnSelector metricSelector = cursor.makeObjectColumnSelector(metric); + if (metricSelector != null) { + metSelectors.put(metric, metricSelector); + } + } + + return Sequences.simple( + new Iterable() + { + @Override + public Iterator iterator() + { + return new Iterator() + { + @Override + public boolean hasNext() + { + return !cursor.isDone(); + } + + @Override + public InputRow next() + { + final Map theEvent = Maps.newLinkedHashMap(); + final long timestamp = timestampColumnSelector.get(); + theEvent.put(EventHolder.timestampKey, new DateTime(timestamp)); + + for (Map.Entry dimSelector : dimSelectors.entrySet()) { + final String dim = dimSelector.getKey(); + final DimensionSelector selector = dimSelector.getValue(); + final IndexedInts vals = selector.getRow(); + + if (vals.size() == 1) { + final String dimVal = selector.lookupName(vals.get(0)); + theEvent.put(dim, dimVal); + } else { + List dimVals = Lists.newArrayList(); + for (int i = 0; i < vals.size(); ++i) { + dimVals.add(selector.lookupName(vals.get(i))); + } + theEvent.put(dim, dimVals); + } + } + + for (Map.Entry metSelector : metSelectors.entrySet()) { + final String metric = metSelector.getKey(); + final ObjectColumnSelector selector = metSelector.getValue(); + theEvent.put(metric, selector.get()); + } + cursor.advance(); + return new MapBasedInputRow(timestamp, dims, theEvent); + } + + @Override + public void remove() + { + throw new UnsupportedOperationException("Remove Not Supported"); + } + }; + } + } + ); + } + } + ) + ); + } + } + ) + ); + rowYielder = rows.toYielder( + null, + new YieldingAccumulator() + { + @Override + public InputRow accumulate(InputRow accumulated, InputRow in) + { + yield(); + return in; + } + } + ); + } + + @Override + public boolean hasMore() + { + return !rowYielder.isDone(); + } + + @Override + public InputRow nextRow() + { + final InputRow inputRow = rowYielder.get(); + rowYielder = rowYielder.next(null); + return inputRow; + } + + @Override + public Runnable commit() + { + return Runnables.getNoopRunnable(); + } + + @Override + public void close() throws IOException + { + rowYielder.close(); + } +} diff --git a/services/src/main/java/io/druid/cli/CliInternalHadoopIndexer.java b/services/src/main/java/io/druid/cli/CliInternalHadoopIndexer.java index 03a28f02d13..d7d35a33aac 100644 --- a/services/src/main/java/io/druid/cli/CliInternalHadoopIndexer.java +++ b/services/src/main/java/io/druid/cli/CliInternalHadoopIndexer.java @@ -30,14 +30,24 @@ import com.google.inject.name.Names; import com.metamx.common.logger.Logger; import io.airlift.command.Arguments; import io.airlift.command.Command; +import io.druid.guice.LazySingleton; import io.druid.indexer.HadoopDruidDetermineConfigurationJob; import io.druid.indexer.HadoopDruidIndexerConfig; import io.druid.indexer.HadoopDruidIndexerJob; +import io.druid.indexer.HadoopIngestionSpec; import io.druid.indexer.JobHelper; import io.druid.indexer.Jobby; import io.druid.indexer.MetadataStorageUpdaterJobHandler; +import io.druid.indexer.hadoop.DatasourceIngestionSpec; +import io.druid.indexer.path.DatasourcePathSpec; +import io.druid.indexer.path.MetadataStoreBasedUsedSegmentLister; +import io.druid.indexer.path.MultiplePathSpec; +import io.druid.indexer.path.PathSpec; import io.druid.indexer.updater.MetadataStorageUpdaterJobSpec; +import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import io.druid.metadata.IndexerSQLMetadataStorageCoordinator; import io.druid.metadata.MetadataStorageConnectorConfig; +import io.druid.metadata.MetadataStorageTablesConfig; import java.io.File; import java.net.URI; @@ -84,6 +94,10 @@ public class CliInternalHadoopIndexer extends GuiceRunnable binder.bind(new TypeLiteral>() {}) .toInstance(metadataSpec); + binder.bind(MetadataStorageTablesConfig.class).toInstance(metadataSpec.getMetadataStorageTablesConfig()); + binder.bind(IndexerMetadataStorageCoordinator.class).to(IndexerSQLMetadataStorageCoordinator.class).in( + LazySingleton.class + ); } } ); @@ -95,11 +109,23 @@ public class CliInternalHadoopIndexer extends GuiceRunnable try { Injector injector = makeInjector(); - MetadataStorageUpdaterJobSpec metadataSpec = getHadoopDruidIndexerConfig().getSchema().getIOConfig().getMetadataUpdateSpec(); + config = getHadoopDruidIndexerConfig(); + + MetadataStorageUpdaterJobSpec metadataSpec = config.getSchema().getIOConfig().getMetadataUpdateSpec(); // override metadata storage type based on HadoopIOConfig Preconditions.checkNotNull(metadataSpec.getType(), "type in metadataUpdateSpec must not be null"); injector.getInstance(Properties.class).setProperty("druid.metadata.storage.type", metadataSpec.getType()); + config = HadoopDruidIndexerConfig.fromSpec( + HadoopIngestionSpec.updateSegmentListIfDatasourcePathSpecIsUsed( + config.getSchema(), + HadoopDruidIndexerConfig.jsonMapper, + new MetadataStoreBasedUsedSegmentLister( + injector.getInstance(IndexerMetadataStorageCoordinator.class) + ) + ) + ); + List jobs = Lists.newArrayList(); jobs.add(new HadoopDruidDetermineConfigurationJob(config)); jobs.add(new HadoopDruidIndexerJob(config, injector.getInstance(MetadataStorageUpdaterJobHandler.class)));