diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index 56230251828..08d852046ff 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -35,6 +35,7 @@ import io.druid.collections.StupidPool; import io.druid.data.input.InputRow; import io.druid.data.input.Row; import io.druid.data.input.Rows; +import io.druid.indexer.hadoop.SegmentInputRow; import io.druid.offheap.OffheapBufferPool; import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.IndexIO; @@ -235,6 +236,7 @@ public class IndexGeneratorJob implements Jobby private static final HashFunction hashFunction = Hashing.murmur3_128(); private AggregatorFactory[] aggregators; + private AggregatorFactory[] combiningAggs; @Override protected void setup(Context context) @@ -242,6 +244,10 @@ public class IndexGeneratorJob implements Jobby { super.setup(context); aggregators = config.getSchema().getDataSchema().getAggregators(); + combiningAggs = new AggregatorFactory[aggregators.length]; + for (int i = 0; i < aggregators.length; ++i) { + combiningAggs[i] = aggregators[i].getCombiningFactory(); + } } @Override @@ -268,6 +274,14 @@ public class IndexGeneratorJob implements Jobby ) ).asBytes(); + // type SegmentInputRow serves as a marker that these InputRow instances have already been combined + // and they contain the columns as they show up in the segment after ingestion, not what you would see in raw + // data + byte[] serializedInputRow = inputRow instanceof SegmentInputRow ? + InputRowSerde.toBytes(inputRow, combiningAggs) + : + InputRowSerde.toBytes(inputRow, aggregators); + context.write( new SortableBytes( bucket.get().toGroupKey(), @@ -277,7 +291,7 @@ public class IndexGeneratorJob implements Jobby .put(hashedDimensions) .array() ).toBytesWritable(), - new BytesWritable(InputRowSerde.toBytes(inputRow, aggregators)) + new BytesWritable(serializedInputRow) ); } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java index d9a3de92ad6..541f561fc48 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java @@ -17,6 +17,7 @@ package io.druid.indexer; +import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; @@ -28,6 +29,7 @@ import com.metamx.common.IAE; import com.metamx.common.ISE; import com.metamx.common.RetryUtils; import com.metamx.common.logger.Logger; +import io.druid.indexer.updater.HadoopDruidConverterConfig; import io.druid.segment.ProgressIndicator; import io.druid.segment.SegmentUtils; import io.druid.timeline.DataSegment; @@ -56,8 +58,10 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.net.URI; +import java.net.URISyntaxException; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; @@ -579,6 +583,38 @@ public class JobHelper return zipPusher.push(); } + public static URI getURIFromSegment(DataSegment dataSegment) + { + // There is no good way around this... + // TODO: add getURI() to URIDataPuller + final Map loadSpec = dataSegment.getLoadSpec(); + final String type = loadSpec.get("type").toString(); + final URI segmentLocURI; + if ("s3_zip".equals(type)) { + segmentLocURI = URI.create(String.format("s3n://%s/%s", loadSpec.get("bucket"), loadSpec.get("key"))); + } else if ("hdfs".equals(type)) { + segmentLocURI = URI.create(loadSpec.get("path").toString()); + } else if ("local".equals(type)) { + try { + segmentLocURI = new URI("file", null, loadSpec.get("path").toString(), null, null); + } + catch (URISyntaxException e) { + throw new ISE(e, "Unable to form simple file uri"); + } + } else { + try { + throw new IAE( + "Cannot figure out loadSpec %s", + HadoopDruidConverterConfig.jsonMapper.writeValueAsString(loadSpec) + ); + } + catch (JsonProcessingException e) { + throw new ISE("Cannot write Map with json mapper"); + } + } + return segmentLocURI; + } + public static ProgressIndicator progressIndicatorForContext( final TaskAttemptContext context ) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceIngestionSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceIngestionSpec.java new file mode 100644 index 00000000000..4bff88c3e79 --- /dev/null +++ b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceIngestionSpec.java @@ -0,0 +1,160 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.hadoop; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import io.druid.granularity.QueryGranularity; +import io.druid.query.filter.DimFilter; +import org.joda.time.Interval; + +import java.util.List; + +public class DatasourceIngestionSpec +{ + private final String dataSource; + private final Interval interval; + private final DimFilter filter; + private final QueryGranularity granularity; + private final List dimensions; + private final List metrics; + + @JsonCreator + public DatasourceIngestionSpec( + @JsonProperty("dataSource") String dataSource, + @JsonProperty("interval") Interval interval, + @JsonProperty("filter") DimFilter filter, + @JsonProperty("granularity") QueryGranularity granularity, + @JsonProperty("dimensions") List dimensions, + @JsonProperty("metrics") List metrics + ) + { + this.dataSource = Preconditions.checkNotNull(dataSource, "null dataSource"); + this.interval = Preconditions.checkNotNull(interval, "null interval"); + this.filter = filter; + this.granularity = granularity == null ? QueryGranularity.NONE : granularity; + + this.dimensions = dimensions; + this.metrics = metrics; + } + + @JsonProperty + public String getDataSource() + { + return dataSource; + } + + @JsonProperty + public Interval getInterval() + { + return interval; + } + + @JsonProperty + public DimFilter getFilter() + { + return filter; + } + + @JsonProperty + public QueryGranularity getGranularity() + { + return granularity; + } + + @JsonProperty + public List getDimensions() + { + return dimensions; + } + + @JsonProperty + public List getMetrics() + { + return metrics; + } + + public DatasourceIngestionSpec withDimensions(List dimensions) + { + return new DatasourceIngestionSpec(dataSource, interval, filter, granularity, dimensions, metrics); + } + + public DatasourceIngestionSpec withMetrics(List metrics) + { + return new DatasourceIngestionSpec(dataSource, interval, filter, granularity, dimensions, metrics); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + DatasourceIngestionSpec that = (DatasourceIngestionSpec) o; + + if (!dataSource.equals(that.dataSource)) { + return false; + } + if (!interval.equals(that.interval)) { + return false; + } + if (filter != null ? !filter.equals(that.filter) : that.filter != null) { + return false; + } + if (!granularity.equals(that.granularity)) { + return false; + } + if (dimensions != null ? !dimensions.equals(that.dimensions) : that.dimensions != null) { + return false; + } + return !(metrics != null ? !metrics.equals(that.metrics) : that.metrics != null); + + } + + @Override + public int hashCode() + { + int result = dataSource.hashCode(); + result = 31 * result + interval.hashCode(); + result = 31 * result + (filter != null ? filter.hashCode() : 0); + result = 31 * result + granularity.hashCode(); + result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0); + result = 31 * result + (metrics != null ? metrics.hashCode() : 0); + return result; + } + + @Override + public String toString() + { + return "DatasourceIngestionSpec{" + + "dataSource='" + dataSource + '\'' + + ", interval=" + interval + + ", filter=" + filter + + ", granularity=" + granularity + + ", dimensions=" + dimensions + + ", metrics=" + metrics + + '}'; + } +} diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputFormat.java b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputFormat.java new file mode 100644 index 00000000000..3677612bc3a --- /dev/null +++ b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputFormat.java @@ -0,0 +1,121 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.hadoop; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; +import io.druid.data.input.InputRow; +import io.druid.indexer.HadoopDruidIndexerConfig; +import io.druid.timeline.DataSegment; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +public class DatasourceInputFormat extends InputFormat +{ + private static final Logger logger = new Logger(DatasourceInputFormat.class); + + public static final String CONF_INPUT_SEGMENTS = "druid.segments"; + public static final String CONF_DRUID_SCHEMA = "druid.datasource.schema"; + public static final String CONF_MAX_SPLIT_SIZE = "druid.datasource.split.max.size"; + + @Override + public List getSplits(JobContext context) throws IOException, InterruptedException + { + Configuration conf = context.getConfiguration(); + + String segmentsStr = Preconditions.checkNotNull(conf.get(CONF_INPUT_SEGMENTS), "No segments found to read"); + List segments = HadoopDruidIndexerConfig.jsonMapper.readValue( + segmentsStr, + new TypeReference>() + { + } + ); + if (segments == null || segments.size() == 0) { + throw new ISE("No segments found to read"); + } + + logger.info("segments to read [%s]", segmentsStr); + + long maxSize = conf.getLong(CONF_MAX_SPLIT_SIZE, 0); + + if (maxSize > 0) { + //combining is to happen, let us sort the segments list by size so that they + //are combined appropriately + Collections.sort( + segments, + new Comparator() + { + @Override + public int compare(DataSegment s1, DataSegment s2) + { + return Long.compare(s1.getSize(), s2.getSize()); + } + } + ); + } + + List splits = Lists.newArrayList(); + + List list = new ArrayList<>(); + long size = 0; + + for (DataSegment segment : segments) { + if (size + segment.getSize() > maxSize && size > 0) { + splits.add(new DatasourceInputSplit(list)); + list = Lists.newArrayList(); + size = 0; + } + + list.add(segment); + size += segment.getSize(); + } + + if (list.size() > 0) { + splits.add(new DatasourceInputSplit(list)); + } + + logger.info("Number of splits [%d]", splits.size()); + return splits; + } + + @Override + public RecordReader createRecordReader( + InputSplit split, + TaskAttemptContext context + ) throws IOException, InterruptedException + { + return new DatasourceRecordReader(); + } +} diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputSplit.java b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputSplit.java new file mode 100644 index 00000000000..6b80159ffb3 --- /dev/null +++ b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputSplit.java @@ -0,0 +1,88 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.hadoop; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.base.Preconditions; +import io.druid.indexer.HadoopDruidIndexerConfig; +import io.druid.timeline.DataSegment; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputSplit; + +import javax.validation.constraints.NotNull; + +public class DatasourceInputSplit extends InputSplit implements Writable +{ + private List segments = null; + + //required for deserialization + public DatasourceInputSplit() + { + } + + public DatasourceInputSplit(@NotNull List segments) + { + Preconditions.checkArgument(segments != null && segments.size() > 0, "no segments"); + this.segments = segments; + } + + @Override + public long getLength() throws IOException, InterruptedException + { + long size = 0; + for (DataSegment segment : segments) { + size += segment.getSize(); + } + return size; + } + + @Override + public String[] getLocations() throws IOException, InterruptedException + { + return new String[]{}; + } + + public List getSegments() + { + return segments; + } + + @Override + public void write(DataOutput out) throws IOException + { + out.writeUTF(HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(segments)); + } + + @Override + public void readFields(DataInput in) throws IOException + { + segments = HadoopDruidIndexerConfig.jsonMapper.readValue( + in.readUTF(), + new TypeReference>() + { + } + ); + } +} diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceRecordReader.java b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceRecordReader.java new file mode 100644 index 00000000000..ef20db3ed52 --- /dev/null +++ b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceRecordReader.java @@ -0,0 +1,194 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.hadoop; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import com.google.common.io.Closeables; +import com.google.common.io.Files; +import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; +import io.druid.data.input.InputRow; +import io.druid.data.input.MapBasedInputRow; +import io.druid.data.input.MapBasedRow; +import io.druid.indexer.HadoopDruidIndexerConfig; +import io.druid.indexer.JobHelper; +import io.druid.segment.IndexIO; +import io.druid.segment.QueryableIndex; +import io.druid.segment.QueryableIndexStorageAdapter; +import io.druid.segment.StorageAdapter; +import io.druid.segment.realtime.firehose.IngestSegmentFirehose; +import io.druid.timeline.DataSegment; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.File; +import java.io.IOException; +import java.util.List; + +public class DatasourceRecordReader extends RecordReader +{ + private static final Logger logger = new Logger(DatasourceRecordReader.class); + + private DatasourceIngestionSpec spec; + private IngestSegmentFirehose firehose; + + private int rowNum; + private MapBasedRow currRow; + + private List indexes = Lists.newArrayList(); + private List tmpSegmentDirs = Lists.newArrayList(); + private int numRows; + + @Override + public void initialize(InputSplit split, final TaskAttemptContext context) throws IOException, InterruptedException + { + spec = readAndVerifyDatasourceIngestionSpec(context.getConfiguration(), HadoopDruidIndexerConfig.jsonMapper); + + List segments = ((DatasourceInputSplit) split).getSegments(); + + List adapters = Lists.transform( + segments, + new Function() + { + @Override + public StorageAdapter apply(DataSegment segment) + { + try { + logger.info("Getting storage path for segment [%s]", segment.getIdentifier()); + Path path = new Path(JobHelper.getURIFromSegment(segment)); + + logger.info("Fetch segment files from [%s]", path); + + File dir = Files.createTempDir(); + tmpSegmentDirs.add(dir); + logger.info("Locally storing fetched segment at [%s]", dir); + + JobHelper.unzipNoGuava(path, context.getConfiguration(), dir, context); + logger.info("finished fetching segment files"); + + QueryableIndex index = IndexIO.loadIndex(dir); + indexes.add(index); + numRows += index.getNumRows(); + + return new QueryableIndexStorageAdapter(index); + } + catch (IOException ex) { + throw Throwables.propagate(ex); + } + } + } + ); + + firehose = new IngestSegmentFirehose( + adapters, + spec.getDimensions(), + spec.getMetrics(), + spec.getFilter(), + spec.getInterval(), + spec.getGranularity() + ); + + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException + { + if (firehose.hasMore()) { + currRow = (MapBasedRow) firehose.nextRow(); + rowNum++; + return true; + } else { + return false; + } + } + + @Override + public NullWritable getCurrentKey() throws IOException, InterruptedException + { + return NullWritable.get(); + } + + @Override + public InputRow getCurrentValue() throws IOException, InterruptedException + { + return new SegmentInputRow( + new MapBasedInputRow( + currRow.getTimestamp(), + spec.getDimensions(), + currRow.getEvent() + ) + ); + } + + @Override + public float getProgress() throws IOException, InterruptedException + { + if (numRows > 0) { + return (rowNum * 1.0f) / numRows; + } else { + return 0; + } + } + + @Override + public void close() throws IOException + { + Closeables.close(firehose, true); + for (QueryableIndex qi : indexes) { + Closeables.close(qi, true); + } + + for (File dir : tmpSegmentDirs) { + FileUtils.deleteDirectory(dir); + } + } + + private DatasourceIngestionSpec readAndVerifyDatasourceIngestionSpec(Configuration config, ObjectMapper jsonMapper) + { + try { + String schema = Preconditions.checkNotNull(config.get(DatasourceInputFormat.CONF_DRUID_SCHEMA), "null schema"); + logger.info("load schema [%s]", schema); + + DatasourceIngestionSpec spec = jsonMapper.readValue(schema, DatasourceIngestionSpec.class); + + if (spec.getDimensions() == null || spec.getDimensions().size() == 0) { + throw new ISE("load schema does not have dimensions"); + } + + if (spec.getMetrics() == null || spec.getMetrics().size() == 0) { + throw new ISE("load schema does not have metrics"); + } + + return spec; + } + catch (IOException ex) { + throw new RuntimeException("couldn't load segment load spec", ex); + } + } +} diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/SegmentInputRow.java b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/SegmentInputRow.java new file mode 100644 index 00000000000..b25e95918cf --- /dev/null +++ b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/SegmentInputRow.java @@ -0,0 +1,97 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.hadoop; + +import io.druid.data.input.InputRow; +import io.druid.data.input.Row; +import org.joda.time.DateTime; + +import java.util.List; + +/** + * SegmentInputRow serves as a marker that these InputRow instances have already been combined + * and they contain the columns as they show up in the segment after ingestion, not what you would see in raw + * data. + * It must only be used to represent such InputRows. + */ +public class SegmentInputRow implements InputRow +{ + private final InputRow delegate; + + public SegmentInputRow(InputRow delegate){ + this.delegate = delegate; + } + + @Override + public List getDimensions() + { + return delegate.getDimensions(); + } + + @Override + public long getTimestampFromEpoch() + { + return delegate.getTimestampFromEpoch(); + } + + @Override + public DateTime getTimestamp() + { + return delegate.getTimestamp(); + } + + @Override + public List getDimension(String dimension) + { + return delegate.getDimension(dimension); + } + + @Override + public Object getRaw(String dimension) + { + return delegate.getRaw(dimension); + } + + @Override + public float getFloatMetric(String metric) + { + return delegate.getFloatMetric(metric); + } + + @Override + public long getLongMetric(String metric) + { + return delegate.getLongMetric(metric); + } + + @Override + public int compareTo(Row row) + { + return delegate.compareTo(row); + } + + @Override + public String toString() + { + return "SegmentInputRow{" + + "delegate=" + delegate + + '}'; + } +} diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/path/DatasourcePathSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/path/DatasourcePathSpec.java new file mode 100644 index 00000000000..6d912eeb65c --- /dev/null +++ b/indexing-hadoop/src/main/java/io/druid/indexer/path/DatasourcePathSpec.java @@ -0,0 +1,184 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.path; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.client.repackaged.com.google.common.base.Preconditions; +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.metamx.common.logger.Logger; +import io.druid.indexer.HadoopDruidIndexerConfig; +import io.druid.indexer.hadoop.DatasourceIngestionSpec; +import io.druid.indexer.hadoop.DatasourceInputFormat; +import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.timeline.DataSegment; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; + +import java.io.IOException; +import java.util.List; +import java.util.Set; + +public class DatasourcePathSpec implements PathSpec +{ + private static final Logger logger = new Logger(DatasourcePathSpec.class); + + private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator; + private ObjectMapper mapper; + + private DatasourceIngestionSpec ingestionSpec; + private long maxSplitSize; + + public DatasourcePathSpec( + @JacksonInject IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator, + @JacksonInject ObjectMapper mapper, + @JsonProperty("ingestionSpec") DatasourceIngestionSpec spec, + @JsonProperty("maxSplitSize") Long maxSplitSize + ) + { + this.indexerMetadataStorageCoordinator = Preconditions.checkNotNull(indexerMetadataStorageCoordinator, "null indexerMetadataStorageCoordinator"); + this.mapper = Preconditions.checkNotNull(mapper, "null mapper"); + this.ingestionSpec = Preconditions.checkNotNull(spec, "null ingestionSpec"); + + if(maxSplitSize == null) { + this.maxSplitSize = 0; + } else { + this.maxSplitSize = maxSplitSize.longValue(); + } + } + + @JsonProperty + public DatasourceIngestionSpec getIngestionSpec() + { + return ingestionSpec; + } + + @JsonProperty + public long getMaxSplitSize() + { + return maxSplitSize; + } + + @Override + public Job addInputPaths( + HadoopDruidIndexerConfig config, Job job + ) throws IOException + { + final List segments = indexerMetadataStorageCoordinator.getUsedSegmentsForInterval( + ingestionSpec.getDataSource(), + ingestionSpec.getInterval() + ); + logger.info( + "Found total [%d] segments for [%s] in interval [%s]", + segments.size(), + ingestionSpec.getDataSource(), + ingestionSpec.getInterval() + ); + + if (ingestionSpec.getDimensions() == null) { + List dims; + if (config.getParser().getParseSpec().getDimensionsSpec().hasCustomDimensions()) { + dims = config.getParser().getParseSpec().getDimensionsSpec().getDimensions(); + } else { + Set dimSet = Sets.newHashSet( + Iterables.concat( + Iterables.transform( + segments, + new Function>() + { + @Override + public Iterable apply(DataSegment dataSegment) + { + return dataSegment.getDimensions(); + } + } + ) + ) + ); + dims = Lists.newArrayList( + Sets.difference( + dimSet, + config.getParser() + .getParseSpec() + .getDimensionsSpec() + .getDimensionExclusions() + ) + ); + } + ingestionSpec = ingestionSpec.withDimensions(dims); + } + + if (ingestionSpec.getMetrics() == null) { + Set metrics = Sets.newHashSet(); + final AggregatorFactory[] cols = config.getSchema().getDataSchema().getAggregators(); + if (cols != null) { + for (AggregatorFactory col : cols) { + metrics.add(col.getName()); + } + } + ingestionSpec = ingestionSpec.withMetrics(Lists.newArrayList(metrics)); + } + + job.getConfiguration().set(DatasourceInputFormat.CONF_DRUID_SCHEMA, mapper.writeValueAsString(ingestionSpec)); + job.getConfiguration().set(DatasourceInputFormat.CONF_INPUT_SEGMENTS, mapper.writeValueAsString(segments)); + job.getConfiguration().set(DatasourceInputFormat.CONF_MAX_SPLIT_SIZE, String.valueOf(maxSplitSize)); + + return job; + } + + public Class getInputFormat() + { + return DatasourceInputFormat.class; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + DatasourcePathSpec that = (DatasourcePathSpec) o; + + if (maxSplitSize != that.maxSplitSize) { + return false; + } + return ingestionSpec.equals(that.ingestionSpec); + + } + + @Override + public int hashCode() + { + int result = ingestionSpec.hashCode(); + result = 31 * result + (int) (maxSplitSize ^ (maxSplitSize >>> 32)); + return result; + } +} diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/path/PathSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/path/PathSpec.java index 90feb83bc95..f1de5131689 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/path/PathSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/path/PathSpec.java @@ -32,7 +32,8 @@ import java.io.IOException; @JsonSubTypes(value={ @JsonSubTypes.Type(name="granular_unprocessed", value=GranularUnprocessedPathSpec.class), @JsonSubTypes.Type(name="granularity", value=GranularityPathSpec.class), - @JsonSubTypes.Type(name="static", value=StaticPathSpec.class) + @JsonSubTypes.Type(name="static", value=StaticPathSpec.class), + @JsonSubTypes.Type(name="dataSource", value=DatasourcePathSpec.class) }) public interface PathSpec { diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java index fb3c63eae0f..02125160ffa 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java @@ -32,6 +32,7 @@ import com.metamx.common.IAE; import com.metamx.common.ISE; import com.metamx.common.logger.Logger; import io.druid.indexer.JobHelper; +import io.druid.indexer.hadoop.DatasourceInputSplit; import io.druid.segment.IndexIO; import io.druid.segment.IndexMerger; import io.druid.timeline.DataSegment; @@ -42,7 +43,6 @@ import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobPriority; import org.apache.hadoop.mapreduce.InputFormat; @@ -62,15 +62,10 @@ import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.util.Progressable; import javax.annotation.Nullable; -import javax.validation.constraints.NotNull; -import java.io.DataInput; -import java.io.DataOutput; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.net.URI; -import java.net.URISyntaxException; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; @@ -470,10 +465,10 @@ public class HadoopConverterJob ) throws IOException, InterruptedException { final InputSplit split = context.getInputSplit(); - if (!(split instanceof DataSegmentSplit)) { + if (!(split instanceof DatasourceInputSplit)) { throw new IAE( "Unexpected split type. Expected [%s] was [%s]", - DataSegmentSplit.class.getCanonicalName(), + DatasourceInputSplit.class.getCanonicalName(), split.getClass().getCanonicalName() ); } @@ -481,13 +476,13 @@ public class HadoopConverterJob final String tmpDirLoc = context.getConfiguration().get(TMP_FILE_LOC_KEY); final File tmpDir = Paths.get(tmpDirLoc).toFile(); - final DataSegment segment = ((DataSegmentSplit) split).getDataSegment(); + final DataSegment segment = Iterables.getOnlyElement(((DatasourceInputSplit) split).getSegments()); final HadoopDruidConverterConfig config = converterConfigFromConfiguration(context.getConfiguration()); context.setStatus("DOWNLOADING"); context.progress(); - final Path inPath = new Path(getURIFromSegment(segment)); + final Path inPath = new Path(JobHelper.getURIFromSegment(segment)); final File inDir = new File(tmpDir, "in"); if (inDir.exists() && !inDir.delete()) { @@ -559,38 +554,6 @@ public class HadoopConverterJob context.getConfiguration().set(TMP_FILE_LOC_KEY, tmpFile.getAbsolutePath()); } - private static URI getURIFromSegment(DataSegment dataSegment) - { - // There is no good way around this... - // TODO: add getURI() to URIDataPuller - final Map loadSpec = dataSegment.getLoadSpec(); - final String type = loadSpec.get("type").toString(); - final URI segmentLocURI; - if ("s3_zip".equals(type)) { - segmentLocURI = URI.create(String.format("s3n://%s/%s", loadSpec.get("bucket"), loadSpec.get("key"))); - } else if ("hdfs".equals(type)) { - segmentLocURI = URI.create(loadSpec.get("path").toString()); - } else if ("local".equals(type)) { - try { - segmentLocURI = new URI("file", null, loadSpec.get("path").toString(), null, null); - } - catch (URISyntaxException e) { - throw new ISE(e, "Unable to form simple file uri"); - } - } else { - try { - throw new IAE( - "Cannot figure out loadSpec %s", - HadoopDruidConverterConfig.jsonMapper.writeValueAsString(loadSpec) - ); - } - catch (JsonProcessingException e) { - throw new ISE("Cannot write Map with json mapper"); - } - } - return segmentLocURI; - } - @Override protected void cleanup( Context context @@ -604,50 +567,6 @@ public class HadoopConverterJob } } - public static class DataSegmentSplit extends InputSplit implements Writable - { - private DataSegment dataSegment = null; - - public DataSegmentSplit() - { - // For serialization purposes - } - - public DataSegmentSplit(@NotNull DataSegment dataSegment) - { - this.dataSegment = Preconditions.checkNotNull(dataSegment, "dataSegment"); - } - - @Override - public long getLength() throws IOException, InterruptedException - { - return dataSegment.getSize(); - } - - @Override - public String[] getLocations() throws IOException, InterruptedException - { - return new String[]{}; - } - - protected DataSegment getDataSegment() - { - return dataSegment; - } - - @Override - public void write(DataOutput out) throws IOException - { - out.write(HadoopDruidConverterConfig.jsonMapper.writeValueAsString(dataSegment).getBytes()); - } - - @Override - public void readFields(DataInput in) throws IOException - { - dataSegment = HadoopDruidConverterConfig.jsonMapper.readValue(in.readLine(), DataSegment.class); - } - } - public static class ConfigInputFormat extends InputFormat { @Override @@ -665,7 +584,7 @@ public class HadoopConverterJob @Override public InputSplit apply(DataSegment input) { - return new DataSegmentSplit(input); + return new DatasourceInputSplit(ImmutableList.of(input)); } } ); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceIngestionSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceIngestionSpecTest.java new file mode 100644 index 00000000000..702047ee12a --- /dev/null +++ b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceIngestionSpecTest.java @@ -0,0 +1,52 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.hadoop; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import io.druid.granularity.QueryGranularity; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.filter.SelectorDimFilter; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +/** + */ +public class DatasourceIngestionSpecTest +{ + @Test + public void testSerde() throws Exception + { + DatasourceIngestionSpec expected = new DatasourceIngestionSpec( + "test", + Interval.parse("2014/2015"), + new SelectorDimFilter("dim", "value"), + QueryGranularity.DAY, + Lists.newArrayList("d1", "d2"), + Lists.newArrayList("m1", "m2", "m3") + ); + + ObjectMapper jsonMapper = new DefaultObjectMapper(); + + DatasourceIngestionSpec actual = jsonMapper.readValue(jsonMapper.writeValueAsString(expected), DatasourceIngestionSpec.class); + Assert.assertEquals(expected, actual); + } +} diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceInputFormatTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceInputFormatTest.java new file mode 100644 index 00000000000..fe3802eccb0 --- /dev/null +++ b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceInputFormatTest.java @@ -0,0 +1,154 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.hadoop; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NoneShardSpec; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.easymock.EasyMock; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; + +/** + */ +public class DatasourceInputFormatTest +{ + private List segments; + private Configuration config; + private JobContext context; + + @Before + public void setUp() throws Exception + { + segments = ImmutableList.of( + new DataSegment( + "test1", + Interval.parse("2000/3000"), + "ver", + ImmutableMap.of( + "type", "local", + "path", "/tmp/index1.zip" + ), + ImmutableList.of("host"), + ImmutableList.of("visited_sum", "unique_hosts"), + new NoneShardSpec(), + 9, + 2 + ), + new DataSegment( + "test2", + Interval.parse("2050/3000"), + "ver", + ImmutableMap.of( + "type", "hdfs", + "path", "/tmp/index2.zip" + ), + ImmutableList.of("host"), + ImmutableList.of("visited_sum", "unique_hosts"), + new NoneShardSpec(), + 9, + 11 + ), + new DataSegment( + "test3", + Interval.parse("2030/3000"), + "ver", + ImmutableMap.of( + "type", "hdfs", + "path", "/tmp/index3.zip" + ), + ImmutableList.of("host"), + ImmutableList.of("visited_sum", "unique_hosts"), + new NoneShardSpec(), + 9, + 4 + ) + ); + + config = new Configuration(); + config.set( + DatasourceInputFormat.CONF_INPUT_SEGMENTS, + new DefaultObjectMapper().writeValueAsString(segments) + ); + + context = EasyMock.createMock(JobContext.class); + EasyMock.expect(context.getConfiguration()).andReturn(config); + EasyMock.replay(context); + } + + @Test + public void testGetSplitsNoCombining() throws Exception + { + List splits = new DatasourceInputFormat().getSplits(context); + + Assert.assertEquals(segments.size(), splits.size()); + for (int i = 0; i < segments.size(); i++) { + Assert.assertEquals(segments.get(i), ((DatasourceInputSplit) splits.get(i)).getSegments().get(0)); + } + } + + @Test + public void testGetSplitsAllCombined() throws Exception + { + config.set(DatasourceInputFormat.CONF_MAX_SPLIT_SIZE, "999999"); + List splits = new DatasourceInputFormat().getSplits(context); + + Assert.assertEquals(1, splits.size()); + Assert.assertEquals( + Sets.newHashSet(segments), + Sets.newHashSet((((DatasourceInputSplit) splits.get(0)).getSegments())) + ); + } + + @Test + public void testGetSplitsCombineInTwo() throws Exception + { + config.set(DatasourceInputFormat.CONF_MAX_SPLIT_SIZE, "6"); + List splits = new DatasourceInputFormat().getSplits(context); + + Assert.assertEquals(2, splits.size()); + + Assert.assertEquals( + Sets.newHashSet(segments.get(0), segments.get(2)), + Sets.newHashSet((((DatasourceInputSplit) splits.get(0)).getSegments())) + ); + + Assert.assertEquals( + Sets.newHashSet(segments.get(1)), + Sets.newHashSet((((DatasourceInputSplit) splits.get(1)).getSegments())) + ); + } + + @Test + public void testGetRecordReader() throws Exception + { + Assert.assertTrue(new DatasourceInputFormat().createRecordReader(null, null) instanceof DatasourceRecordReader); + } +} diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceInputSplitTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceInputSplitTest.java new file mode 100644 index 00000000000..87872339cfe --- /dev/null +++ b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceInputSplitTest.java @@ -0,0 +1,71 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.hadoop; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NoneShardSpec; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.io.DataInput; + +/** + */ +public class DatasourceInputSplitTest +{ + @Test + public void testSerde() throws Exception + { + DatasourceInputSplit expected = new DatasourceInputSplit( + Lists.newArrayList( + new DataSegment( + "test", + Interval.parse("2000/3000"), + "ver", + ImmutableMap.of( + "type", "local", + "path", "/tmp/index.zip" + ), + ImmutableList.of("host"), + ImmutableList.of("visited_sum", "unique_hosts"), + new NoneShardSpec(), + 9, + 12334 + ) + ) + ); + + ByteArrayDataOutput out = ByteStreams.newDataOutput(); + expected.write(out); + + DataInput in = ByteStreams.newDataInput(out.toByteArray()); + DatasourceInputSplit actual = new DatasourceInputSplit(); + actual.readFields(in); + + Assert.assertEquals(expected.getSegments(), actual.getSegments()); + Assert.assertEquals(12334, actual.getLength()); + } +} diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceRecordReaderTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceRecordReaderTest.java new file mode 100644 index 00000000000..c4c25f15d4f --- /dev/null +++ b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceRecordReaderTest.java @@ -0,0 +1,136 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.hadoop; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import io.druid.data.input.InputRow; +import io.druid.indexer.HadoopDruidIndexerConfig; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; +import io.druid.timeline.DataSegment; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.easymock.EasyMock; +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.Map; + +/** + */ +public class DatasourceRecordReaderTest +{ + @Test + public void testSanity() throws Exception + { + DataSegment segment = new DefaultObjectMapper() + .readValue(this.getClass().getClassLoader().getResource("test-segment/descriptor.json"), DataSegment.class) + .withLoadSpec( + ImmutableMap.of( + "type", + "local", + "path", + this.getClass().getClassLoader().getResource("test-segment/index.zip").getPath() + ) + ); + InputSplit split = new DatasourceInputSplit(Lists.newArrayList(segment)); + + Configuration config = new Configuration(); + config.set( + DatasourceInputFormat.CONF_DRUID_SCHEMA, + HadoopDruidIndexerConfig.jsonMapper.writeValueAsString( + new DatasourceIngestionSpec( + segment.getDataSource(), + segment.getInterval(), + null, + null, + segment.getDimensions(), + segment.getMetrics() + ) + ) + ); + + TaskAttemptContext context = EasyMock.createNiceMock(TaskAttemptContext.class); + EasyMock.expect(context.getConfiguration()).andReturn(config).anyTimes(); + EasyMock.replay(context); + + DatasourceRecordReader rr = new DatasourceRecordReader(); + rr.initialize(split, context); + + Assert.assertEquals(0, rr.getProgress(), 0.0001); + + List rows = Lists.newArrayList(); + while(rr.nextKeyValue()) { + rows.add(rr.getCurrentValue()); + } + verifyRows(rows); + + Assert.assertEquals(1, rr.getProgress(), 0.0001); + + rr.close(); + } + + private void verifyRows(List actualRows) + { + List> expectedRows = ImmutableList.of( + ImmutableMap.of( + "time", DateTime.parse("2014-10-22T00:00:00.000Z"), + "host", ImmutableList.of("a.example.com"), + "visited_sum", 100L, + "unique_hosts", 1.0d + ), + ImmutableMap.of( + "time", DateTime.parse("2014-10-22T01:00:00.000Z"), + "host", ImmutableList.of("b.example.com"), + "visited_sum", 150L, + "unique_hosts", 1.0d + ), + ImmutableMap.of( + "time", DateTime.parse("2014-10-22T02:00:00.000Z"), + "host", ImmutableList.of("c.example.com"), + "visited_sum", 200L, + "unique_hosts", 1.0d + ) + ); + + Assert.assertEquals(expectedRows.size(), actualRows.size()); + + for (int i = 0; i < expectedRows.size(); i++) { + Map expected = expectedRows.get(i); + InputRow actual = actualRows.get(i); + + Assert.assertEquals(ImmutableList.of("host"), actual.getDimensions()); + + Assert.assertEquals(expected.get("time"), actual.getTimestamp()); + Assert.assertEquals(expected.get("host"), actual.getDimension("host")); + Assert.assertEquals(expected.get("visited_sum"), actual.getLongMetric("visited_sum")); + Assert.assertEquals( + (Double) expected.get("unique_hosts"), + (Double) HyperUniquesAggregatorFactory.estimateCardinality(actual.getRaw("unique_hosts")), + 0.001 + ); + } + } +} diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java new file mode 100644 index 00000000000..304006c8de9 --- /dev/null +++ b/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java @@ -0,0 +1,229 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.path; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Binder; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.Module; +import com.metamx.common.Granularity; +import io.druid.data.input.impl.CSVParseSpec; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.StringInputRowParser; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.granularity.QueryGranularity; +import io.druid.guice.GuiceInjectors; +import io.druid.guice.JsonConfigProvider; +import io.druid.guice.annotations.Self; +import io.druid.indexer.HadoopDruidIndexerConfig; +import io.druid.indexer.HadoopIOConfig; +import io.druid.indexer.HadoopIngestionSpec; +import io.druid.indexer.HadoopTuningConfig; +import io.druid.indexer.hadoop.DatasourceIngestionSpec; +import io.druid.indexer.hadoop.DatasourceInputFormat; +import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import io.druid.initialization.Initialization; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.granularity.UniformGranularitySpec; +import io.druid.server.DruidNode; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NoneShardSpec; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Job; +import org.easymock.EasyMock; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +/** + */ +public class DatasourcePathSpecTest +{ + private DatasourceIngestionSpec ingestionSpec; + + public DatasourcePathSpecTest() + { + this.ingestionSpec = new DatasourceIngestionSpec( + "test", + Interval.parse("2000/3000"), + null, + null, + null, + null + ); + } + + @Test + public void testSerde() throws Exception + { + final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator = EasyMock.createMock( + IndexerMetadataStorageCoordinator.class + ); + + Injector injector = Initialization.makeInjectorWithModules( + GuiceInjectors.makeStartupInjector(), + ImmutableList.of( + new Module() + { + @Override + public void configure(Binder binder) + { + binder.bind(IndexerMetadataStorageCoordinator.class).toInstance(indexerMetadataStorageCoordinator); + JsonConfigProvider.bindInstance( + binder, Key.get(DruidNode.class, Self.class), new DruidNode("dummy-node", null, null) + ); + } + } + ) + ); + + ObjectMapper jsonMapper = injector.getInstance(ObjectMapper.class); + + DatasourcePathSpec expected = new DatasourcePathSpec( + indexerMetadataStorageCoordinator, + jsonMapper, + ingestionSpec, + Long.valueOf(10) + ); + PathSpec actual = jsonMapper.readValue(jsonMapper.writeValueAsString(expected), PathSpec.class); + Assert.assertEquals(expected, actual); + + expected = new DatasourcePathSpec( + indexerMetadataStorageCoordinator, + jsonMapper, + ingestionSpec, + null + ); + actual = jsonMapper.readValue(jsonMapper.writeValueAsString(expected), PathSpec.class); + Assert.assertEquals(expected, actual); + } + + @Test + public void testAddInputPaths() throws Exception + { + HadoopDruidIndexerConfig hadoopIndexerConfig = new HadoopDruidIndexerConfig( + new HadoopIngestionSpec( + new DataSchema( + ingestionSpec.getDataSource(), + new StringInputRowParser( + new CSVParseSpec( + new TimestampSpec("timestamp", "yyyyMMddHH", null), + new DimensionsSpec(null, null, null), + null, + ImmutableList.of("timestamp", "host", "visited") + ) + ), + new AggregatorFactory[]{ + new LongSumAggregatorFactory("visited_sum", "visited") + }, + new UniformGranularitySpec( + Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(Interval.parse("2000/3000")) + ) + ), + new HadoopIOConfig( + ImmutableMap.of( + "paths", + "/tmp/dummy", + "type", + "static" + ), + null, + "/tmp/dummy" + ), + HadoopTuningConfig.makeDefaultTuningConfig().withWorkingPath("/tmp/work").withVersion("ver") + ) + ); + + List segments = ImmutableList.of( + new DataSegment( + ingestionSpec.getDataSource(), + Interval.parse("2000/3000"), + "ver", + ImmutableMap.of( + "type", "local", + "path", "/tmp/index.zip" + ), + ImmutableList.of("product"), + ImmutableList.of("visited_sum", "unique_hosts"), + new NoneShardSpec(), + 9, + 12334 + ), + new DataSegment( + ingestionSpec.getDataSource(), + Interval.parse("2050/3000"), + "ver", + ImmutableMap.of( + "type", "hdfs", + "path", "/tmp/index.zip" + ), + ImmutableList.of("product"), + ImmutableList.of("visited_sum", "unique_hosts"), + new NoneShardSpec(), + 9, + 12335 + ) + ); + + IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator = EasyMock.createMock(IndexerMetadataStorageCoordinator.class); + EasyMock.expect(indexerMetadataStorageCoordinator.getUsedSegmentsForInterval(ingestionSpec.getDataSource(), ingestionSpec.getInterval())).andReturn(segments); + EasyMock.replay(indexerMetadataStorageCoordinator); + + ObjectMapper mapper = new DefaultObjectMapper(); + + DatasourcePathSpec pathSpec = new DatasourcePathSpec( + indexerMetadataStorageCoordinator, + mapper, + ingestionSpec, + null + ); + + Configuration config = new Configuration(); + Job job = EasyMock.createNiceMock(Job.class); + EasyMock.expect(job.getConfiguration()).andReturn(config).anyTimes(); + EasyMock.replay(job); + + pathSpec.addInputPaths(hadoopIndexerConfig, job); + List actualSegments = mapper.readValue( + config.get(DatasourceInputFormat.CONF_INPUT_SEGMENTS), + new TypeReference>() + { + } + ); + + Assert.assertEquals(segments, actualSegments); + + DatasourceIngestionSpec actualIngestionSpec = mapper.readValue(config.get(DatasourceInputFormat.CONF_DRUID_SCHEMA), DatasourceIngestionSpec.class); + Assert.assertEquals(ingestionSpec + .withDimensions(ImmutableList.of("product")) + .withMetrics(ImmutableList.of("visited_sum")), + actualIngestionSpec); + } +} diff --git a/indexing-hadoop/src/test/resources/test-segment/descriptor.json b/indexing-hadoop/src/test/resources/test-segment/descriptor.json new file mode 100644 index 00000000000..f892b765f55 --- /dev/null +++ b/indexing-hadoop/src/test/resources/test-segment/descriptor.json @@ -0,0 +1,17 @@ +{ + "binaryVersion": 9, + "dataSource": "testds", + "dimensions": "host", + "identifier": "testds_2014-10-22T00:00:00.000Z_2014-10-23T00:00:00.000Z_2015-07-15T22:02:40.171Z", + "interval": "2014-10-22T00:00:00.000Z/2014-10-23T00:00:00.000Z", + "loadSpec": { + "path": "test-segment/index.zip", + "type": "local" + }, + "metrics": "visited_sum,unique_hosts", + "shardSpec": { + "type": "none" + }, + "size": 4096, + "version": "2015-07-15T22:02:40.171Z" +} diff --git a/indexing-hadoop/src/test/resources/test-segment/index.zip b/indexing-hadoop/src/test/resources/test-segment/index.zip new file mode 100644 index 00000000000..f9e15b415ff Binary files /dev/null and b/indexing-hadoop/src/test/resources/test-segment/index.zip differ diff --git a/indexing-hadoop/src/test/resources/test-segment/note b/indexing-hadoop/src/test/resources/test-segment/note new file mode 100644 index 00000000000..70d8a95d315 --- /dev/null +++ b/indexing-hadoop/src/test/resources/test-segment/note @@ -0,0 +1,5 @@ +This is a test segment containing following columns. +__time, host, visited_sum, unique_hosts +2014-10-22T00:00:00.000Z, a.example.com, 100, HLL sketch of cardinality 1 +2014-10-22T01:00:00.000Z, b.example.com, 150, HLL sketch of cardinality 1 +2014-10-22T02:00:00.000Z, c.example.com, 200, HLL sketch of cardinality 1