Merge pull request #1374 from himanshug/batch_delta_ingestion3

Feature for hadoop batch re-ingesion and delta ingestion
This commit is contained in:
Charles Allen 2015-08-17 15:52:49 -07:00
commit b9792b57bc
42 changed files with 2865 additions and 319 deletions

View File

@ -136,7 +136,7 @@ There are multiple types of inputSpecs:
##### `static`
Is a type of data loader where a static path to where the data files are located is passed.
Is a type of inputSpec where a static path to where the data files are located is passed.
|Field|Type|Description|Required|
|-----|----|-----------|--------|
@ -150,7 +150,7 @@ For example, using the static input paths:
##### `granularity`
Is a type of data loader that expects data to be laid out in a specific path format. Specifically, it expects it to be segregated by day in this directory format `y=XXXX/m=XX/d=XX/H=XX/M=XX/S=XX` (dates are represented by lowercase, time is represented by uppercase).
Is a type of inputSpec that expects data to be laid out in a specific path format. Specifically, it expects it to be segregated by day in this directory format `y=XXXX/m=XX/d=XX/H=XX/M=XX/S=XX` (dates are represented by lowercase, time is represented by uppercase).
|Field|Type|Description|Required|
|-----|----|-----------|--------|
@ -166,6 +166,61 @@ s3n://billy-bucket/the/data/is/here/y=2012/m=06/d=01/H=01
...
s3n://billy-bucket/the/data/is/here/y=2012/m=06/d=01/H=23
```
##### `dataSource`
It is a type of inputSpec that reads data already stored inside druid. It is useful for doing "re-indexing". A usecase would be that you ingested some data in some interval and at a later time you wanted to change granularity of rows or remove some columns from the data stored in druid.
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|ingestionSpec|Json Object|Specification of druid segments to be loaded. See below.|yes|
|maxSplitSize|Number|Enables combining multiple segments into single Hadoop InputSplit according to size of segments. Default is none. |no|
Here is what goes inside "ingestionSpec"
|Field|Type|Description|Required|
|dataSource|String|Druid dataSource name from which you are loading the data.|yes|
|interval|String|A string representing ISO-8601 Intervals.|yes|
|granularity|String|Defines the granularity of the query while loading data. Default value is "none".See [Granularities](../querying/granularities.html).|no|
|filter|Json|See [Filters](../querying/filters.html)|no|
|dimensions|Array of String|Name of dimension columns to load. By default, the list will be constructed from parseSpec. If parseSpec does not have explicit list of dimensions then all the dimension columns present in stored data will be read.|no|
|metrics|Array of String|Name of metric columns to load. By default, the list will be constructed from the "name" of all the configured aggregators.|no|
For example
```
"ingestionSpec" :
{
"dataSource": "wikipedia",
"interval": "2014-10-20T00:00:00Z/P2W"
}
```
##### `multi`
It is a composing inputSpec to combine two other input specs. It is useful for doing "delta ingestion". A usecase would be that you ingested some data in some interval and at a later time you wanted to "append" more data to that interval. You can use this inputSpec to combine `dataSource` and `static` (or others) input specs to add more data to an already indexed interval.
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|children|Array of Json Objects|List of json objects containing other inputSpecs |yes|
For example
```
"children": [
{
"type" : "dataSource",
"ingestionSpec" : {
"dataSource": "wikipedia",
"interval": "2014-10-20T00:00:00Z/P2W"
}
},
{
"type" : "static",
"paths": "/path/to/more/wikipedia/data/"
}
]
```
#### Metadata Update Job Spec

View File

@ -88,7 +88,6 @@ public class DetermineHashedPartitionsJob implements Jobby
);
JobHelper.injectSystemProperties(groupByJob);
JobHelper.setInputFormat(groupByJob, config);
groupByJob.setMapperClass(DetermineCardinalityMapper.class);
groupByJob.setMapOutputKeyClass(LongWritable.class);
groupByJob.setMapOutputValueClass(BytesWritable.class);

View File

@ -126,7 +126,6 @@ public class DeterminePartitionsJob implements Jobby
);
JobHelper.injectSystemProperties(groupByJob);
JobHelper.setInputFormat(groupByJob, config);
groupByJob.setMapperClass(DeterminePartitionsGroupByMapper.class);
groupByJob.setMapOutputKeyClass(BytesWritable.class);
groupByJob.setMapOutputValueClass(NullWritable.class);
@ -173,7 +172,6 @@ public class DeterminePartitionsJob implements Jobby
} else {
// Directly read the source data, since we assume it's already grouped.
dimSelectionJob.setMapperClass(DeterminePartitionsDimSelectionAssumeGroupedMapper.class);
JobHelper.setInputFormat(dimSelectionJob, config);
config.addInputPaths(dimSelectionJob);
}

View File

@ -18,6 +18,7 @@
package io.druid.indexer;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
@ -55,7 +56,6 @@ import io.druid.timeline.partition.ShardSpecLookup;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -248,6 +248,12 @@ public class HadoopDruidIndexerConfig
return schema;
}
@JsonIgnore
public PathSpec getPathSpec()
{
return pathSpec;
}
public String getDataSource()
{
return schema.getDataSchema().getDataSource();
@ -354,11 +360,6 @@ public class HadoopDruidIndexerConfig
return pathSpec.addInputPaths(this, job);
}
public Class<? extends InputFormat> getInputFormatClass()
{
return pathSpec.getInputFormat();
}
/********************************************
Granularity/Bucket Helper Methods
********************************************/

View File

@ -103,7 +103,9 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
{
if(parser instanceof StringInputRowParser && value instanceof Text) {
//Note: This is to ensure backward compatibility with 0.7.0 and before
return ((StringInputRowParser)parser).parse(value.toString());
return ((StringInputRowParser) parser).parse(value.toString());
} else if(value instanceof InputRow) {
return (InputRow)value;
} else {
return parser.parse(value);
}

View File

@ -19,8 +19,16 @@ package io.druid.indexer;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.indexer.hadoop.DatasourceIngestionSpec;
import io.druid.indexer.path.UsedSegmentLister;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.IngestionSpec;
import io.druid.timeline.DataSegment;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
*/
@ -91,4 +99,45 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
config
);
}
public static HadoopIngestionSpec updateSegmentListIfDatasourcePathSpecIsUsed(
HadoopIngestionSpec spec,
ObjectMapper jsonMapper,
UsedSegmentLister segmentLister
)
throws IOException
{
String dataSource = "dataSource";
String type = "type";
String multi = "multi";
String children = "children";
String segments = "segments";
String ingestionSpec = "ingestionSpec";
Map<String, Object> pathSpec = spec.getIOConfig().getPathSpec();
Map<String, Object> datasourcePathSpec = null;
if(pathSpec.get(type).equals(dataSource)) {
datasourcePathSpec = pathSpec;
} else if(pathSpec.get(type).equals(multi)) {
List<Map<String, Object>> childPathSpecs = (List<Map<String, Object>>) pathSpec.get(children);
for(Map<String, Object> childPathSpec : childPathSpecs) {
if (childPathSpec.get(type).equals(dataSource)) {
datasourcePathSpec = childPathSpec;
break;
}
}
}
if (datasourcePathSpec != null) {
Map<String, Object> ingestionSpecMap = (Map<String, Object>) datasourcePathSpec.get(ingestionSpec);
DatasourceIngestionSpec ingestionSpecObj = jsonMapper.convertValue(ingestionSpecMap, DatasourceIngestionSpec.class);
List<DataSegment> segmentsList = segmentLister.getUsedSegmentsForInterval(
ingestionSpecObj.getDataSource(),
ingestionSpecObj.getInterval()
);
datasourcePathSpec.put(segments, segmentsList);
}
return spec;
}
}

View File

@ -35,6 +35,7 @@ import io.druid.collections.StupidPool;
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.data.input.Rows;
import io.druid.indexer.hadoop.SegmentInputRow;
import io.druid.offheap.OffheapBufferPool;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.IndexIO;
@ -139,8 +140,6 @@ public class IndexGeneratorJob implements Jobby
JobHelper.injectSystemProperties(job);
JobHelper.setInputFormat(job, config);
job.setMapperClass(IndexGeneratorMapper.class);
job.setMapOutputValueClass(BytesWritable.class);
@ -235,6 +234,7 @@ public class IndexGeneratorJob implements Jobby
private static final HashFunction hashFunction = Hashing.murmur3_128();
private AggregatorFactory[] aggregators;
private AggregatorFactory[] combiningAggs;
@Override
protected void setup(Context context)
@ -242,6 +242,10 @@ public class IndexGeneratorJob implements Jobby
{
super.setup(context);
aggregators = config.getSchema().getDataSchema().getAggregators();
combiningAggs = new AggregatorFactory[aggregators.length];
for (int i = 0; i < aggregators.length; ++i) {
combiningAggs[i] = aggregators[i].getCombiningFactory();
}
}
@Override
@ -268,6 +272,14 @@ public class IndexGeneratorJob implements Jobby
)
).asBytes();
// type SegmentInputRow serves as a marker that these InputRow instances have already been combined
// and they contain the columns as they show up in the segment after ingestion, not what you would see in raw
// data
byte[] serializedInputRow = inputRow instanceof SegmentInputRow ?
InputRowSerde.toBytes(inputRow, combiningAggs)
:
InputRowSerde.toBytes(inputRow, aggregators);
context.write(
new SortableBytes(
bucket.get().toGroupKey(),
@ -277,7 +289,7 @@ public class IndexGeneratorJob implements Jobby
.put(hashedDimensions)
.array()
).toBytesWritable(),
new BytesWritable(InputRowSerde.toBytes(inputRow, aggregators))
new BytesWritable(serializedInputRow)
);
}
}

View File

@ -17,6 +17,7 @@
package io.druid.indexer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
@ -28,6 +29,7 @@ import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.RetryUtils;
import com.metamx.common.logger.Logger;
import io.druid.indexer.updater.HadoopDruidConverterConfig;
import io.druid.segment.ProgressIndicator;
import io.druid.segment.SegmentUtils;
import io.druid.timeline.DataSegment;
@ -42,8 +44,6 @@ import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Progressable;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -56,8 +56,10 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
@ -203,17 +205,6 @@ public class JobHelper
return true;
}
public static void setInputFormat(Job job, HadoopDruidIndexerConfig indexerConfig)
{
if (indexerConfig.getInputFormatClass() != null) {
job.setInputFormatClass(indexerConfig.getInputFormatClass());
} else if (indexerConfig.isCombineText()) {
job.setInputFormatClass(CombineTextInputFormat.class);
} else {
job.setInputFormatClass(TextInputFormat.class);
}
}
public static DataSegment serializeOutIndex(
final DataSegment segmentTemplate,
final Configuration configuration,
@ -579,6 +570,38 @@ public class JobHelper
return zipPusher.push();
}
public static URI getURIFromSegment(DataSegment dataSegment)
{
// There is no good way around this...
// TODO: add getURI() to URIDataPuller
final Map<String, Object> loadSpec = dataSegment.getLoadSpec();
final String type = loadSpec.get("type").toString();
final URI segmentLocURI;
if ("s3_zip".equals(type)) {
segmentLocURI = URI.create(String.format("s3n://%s/%s", loadSpec.get("bucket"), loadSpec.get("key")));
} else if ("hdfs".equals(type)) {
segmentLocURI = URI.create(loadSpec.get("path").toString());
} else if ("local".equals(type)) {
try {
segmentLocURI = new URI("file", null, loadSpec.get("path").toString(), null, null);
}
catch (URISyntaxException e) {
throw new ISE(e, "Unable to form simple file uri");
}
} else {
try {
throw new IAE(
"Cannot figure out loadSpec %s",
HadoopDruidConverterConfig.jsonMapper.writeValueAsString(loadSpec)
);
}
catch (JsonProcessingException e) {
throw new ISE("Cannot write Map with json mapper");
}
}
return segmentLocURI;
}
public static ProgressIndicator progressIndicatorForContext(
final TaskAttemptContext context
)

View File

@ -0,0 +1,160 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer.hadoop;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import io.druid.granularity.QueryGranularity;
import io.druid.query.filter.DimFilter;
import org.joda.time.Interval;
import java.util.List;
public class DatasourceIngestionSpec
{
private final String dataSource;
private final Interval interval;
private final DimFilter filter;
private final QueryGranularity granularity;
private final List<String> dimensions;
private final List<String> metrics;
@JsonCreator
public DatasourceIngestionSpec(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("filter") DimFilter filter,
@JsonProperty("granularity") QueryGranularity granularity,
@JsonProperty("dimensions") List<String> dimensions,
@JsonProperty("metrics") List<String> metrics
)
{
this.dataSource = Preconditions.checkNotNull(dataSource, "null dataSource");
this.interval = Preconditions.checkNotNull(interval, "null interval");
this.filter = filter;
this.granularity = granularity == null ? QueryGranularity.NONE : granularity;
this.dimensions = dimensions;
this.metrics = metrics;
}
@JsonProperty
public String getDataSource()
{
return dataSource;
}
@JsonProperty
public Interval getInterval()
{
return interval;
}
@JsonProperty
public DimFilter getFilter()
{
return filter;
}
@JsonProperty
public QueryGranularity getGranularity()
{
return granularity;
}
@JsonProperty
public List<String> getDimensions()
{
return dimensions;
}
@JsonProperty
public List<String> getMetrics()
{
return metrics;
}
public DatasourceIngestionSpec withDimensions(List<String> dimensions)
{
return new DatasourceIngestionSpec(dataSource, interval, filter, granularity, dimensions, metrics);
}
public DatasourceIngestionSpec withMetrics(List<String> metrics)
{
return new DatasourceIngestionSpec(dataSource, interval, filter, granularity, dimensions, metrics);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DatasourceIngestionSpec that = (DatasourceIngestionSpec) o;
if (!dataSource.equals(that.dataSource)) {
return false;
}
if (!interval.equals(that.interval)) {
return false;
}
if (filter != null ? !filter.equals(that.filter) : that.filter != null) {
return false;
}
if (!granularity.equals(that.granularity)) {
return false;
}
if (dimensions != null ? !dimensions.equals(that.dimensions) : that.dimensions != null) {
return false;
}
return !(metrics != null ? !metrics.equals(that.metrics) : that.metrics != null);
}
@Override
public int hashCode()
{
int result = dataSource.hashCode();
result = 31 * result + interval.hashCode();
result = 31 * result + (filter != null ? filter.hashCode() : 0);
result = 31 * result + granularity.hashCode();
result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0);
result = 31 * result + (metrics != null ? metrics.hashCode() : 0);
return result;
}
@Override
public String toString()
{
return "DatasourceIngestionSpec{" +
"dataSource='" + dataSource + '\'' +
", interval=" + interval +
", filter=" + filter +
", granularity=" + granularity +
", dimensions=" + dimensions +
", metrics=" + metrics +
'}';
}
}

View File

@ -0,0 +1,121 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer.hadoop;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.data.input.InputRow;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.timeline.DataSegment;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
public class DatasourceInputFormat extends InputFormat<NullWritable, InputRow>
{
private static final Logger logger = new Logger(DatasourceInputFormat.class);
public static final String CONF_INPUT_SEGMENTS = "druid.segments";
public static final String CONF_DRUID_SCHEMA = "druid.datasource.schema";
public static final String CONF_MAX_SPLIT_SIZE = "druid.datasource.split.max.size";
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException
{
Configuration conf = context.getConfiguration();
String segmentsStr = Preconditions.checkNotNull(conf.get(CONF_INPUT_SEGMENTS), "No segments found to read");
List<DataSegment> segments = HadoopDruidIndexerConfig.jsonMapper.readValue(
segmentsStr,
new TypeReference<List<DataSegment>>()
{
}
);
if (segments == null || segments.size() == 0) {
throw new ISE("No segments found to read");
}
logger.info("segments to read [%s]", segmentsStr);
long maxSize = conf.getLong(CONF_MAX_SPLIT_SIZE, 0);
if (maxSize > 0) {
//combining is to happen, let us sort the segments list by size so that they
//are combined appropriately
Collections.sort(
segments,
new Comparator<DataSegment>()
{
@Override
public int compare(DataSegment s1, DataSegment s2)
{
return Long.compare(s1.getSize(), s2.getSize());
}
}
);
}
List<InputSplit> splits = Lists.newArrayList();
List<DataSegment> list = new ArrayList<>();
long size = 0;
for (DataSegment segment : segments) {
if (size + segment.getSize() > maxSize && size > 0) {
splits.add(new DatasourceInputSplit(list));
list = Lists.newArrayList();
size = 0;
}
list.add(segment);
size += segment.getSize();
}
if (list.size() > 0) {
splits.add(new DatasourceInputSplit(list));
}
logger.info("Number of splits [%d]", splits.size());
return splits;
}
@Override
public RecordReader<NullWritable, InputRow> createRecordReader(
InputSplit split,
TaskAttemptContext context
) throws IOException, InterruptedException
{
return new DatasourceRecordReader();
}
}

View File

@ -0,0 +1,88 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer.hadoop;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Preconditions;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.timeline.DataSegment;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import javax.validation.constraints.NotNull;
public class DatasourceInputSplit extends InputSplit implements Writable
{
private List<DataSegment> segments = null;
//required for deserialization
public DatasourceInputSplit()
{
}
public DatasourceInputSplit(@NotNull List<DataSegment> segments)
{
Preconditions.checkArgument(segments != null && segments.size() > 0, "no segments");
this.segments = segments;
}
@Override
public long getLength() throws IOException, InterruptedException
{
long size = 0;
for (DataSegment segment : segments) {
size += segment.getSize();
}
return size;
}
@Override
public String[] getLocations() throws IOException, InterruptedException
{
return new String[]{};
}
public List<DataSegment> getSegments()
{
return segments;
}
@Override
public void write(DataOutput out) throws IOException
{
out.writeUTF(HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(segments));
}
@Override
public void readFields(DataInput in) throws IOException
{
segments = HadoopDruidIndexerConfig.jsonMapper.readValue(
in.readUTF(),
new TypeReference<List<DataSegment>>()
{
}
);
}
}

View File

@ -0,0 +1,194 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer.hadoop;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.io.Closeables;
import com.google.common.io.Files;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.MapBasedRow;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.JobHelper;
import io.druid.segment.IndexIO;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexStorageAdapter;
import io.druid.segment.StorageAdapter;
import io.druid.segment.realtime.firehose.IngestSegmentFirehose;
import io.druid.timeline.DataSegment;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.File;
import java.io.IOException;
import java.util.List;
public class DatasourceRecordReader extends RecordReader<NullWritable, InputRow>
{
private static final Logger logger = new Logger(DatasourceRecordReader.class);
private DatasourceIngestionSpec spec;
private IngestSegmentFirehose firehose;
private int rowNum;
private MapBasedRow currRow;
private List<QueryableIndex> indexes = Lists.newArrayList();
private List<File> tmpSegmentDirs = Lists.newArrayList();
private int numRows;
@Override
public void initialize(InputSplit split, final TaskAttemptContext context) throws IOException, InterruptedException
{
spec = readAndVerifyDatasourceIngestionSpec(context.getConfiguration(), HadoopDruidIndexerConfig.jsonMapper);
List<DataSegment> segments = ((DatasourceInputSplit) split).getSegments();
List<StorageAdapter> adapters = Lists.transform(
segments,
new Function<DataSegment, StorageAdapter>()
{
@Override
public StorageAdapter apply(DataSegment segment)
{
try {
logger.info("Getting storage path for segment [%s]", segment.getIdentifier());
Path path = new Path(JobHelper.getURIFromSegment(segment));
logger.info("Fetch segment files from [%s]", path);
File dir = Files.createTempDir();
tmpSegmentDirs.add(dir);
logger.info("Locally storing fetched segment at [%s]", dir);
JobHelper.unzipNoGuava(path, context.getConfiguration(), dir, context);
logger.info("finished fetching segment files");
QueryableIndex index = IndexIO.loadIndex(dir);
indexes.add(index);
numRows += index.getNumRows();
return new QueryableIndexStorageAdapter(index);
}
catch (IOException ex) {
throw Throwables.propagate(ex);
}
}
}
);
firehose = new IngestSegmentFirehose(
adapters,
spec.getDimensions(),
spec.getMetrics(),
spec.getFilter(),
spec.getInterval(),
spec.getGranularity()
);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException
{
if (firehose.hasMore()) {
currRow = (MapBasedRow) firehose.nextRow();
rowNum++;
return true;
} else {
return false;
}
}
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException
{
return NullWritable.get();
}
@Override
public InputRow getCurrentValue() throws IOException, InterruptedException
{
return new SegmentInputRow(
new MapBasedInputRow(
currRow.getTimestamp(),
spec.getDimensions(),
currRow.getEvent()
)
);
}
@Override
public float getProgress() throws IOException, InterruptedException
{
if (numRows > 0) {
return (rowNum * 1.0f) / numRows;
} else {
return 0;
}
}
@Override
public void close() throws IOException
{
Closeables.close(firehose, true);
for (QueryableIndex qi : indexes) {
Closeables.close(qi, true);
}
for (File dir : tmpSegmentDirs) {
FileUtils.deleteDirectory(dir);
}
}
private DatasourceIngestionSpec readAndVerifyDatasourceIngestionSpec(Configuration config, ObjectMapper jsonMapper)
{
try {
String schema = Preconditions.checkNotNull(config.get(DatasourceInputFormat.CONF_DRUID_SCHEMA), "null schema");
logger.info("load schema [%s]", schema);
DatasourceIngestionSpec spec = jsonMapper.readValue(schema, DatasourceIngestionSpec.class);
if (spec.getDimensions() == null || spec.getDimensions().size() == 0) {
throw new ISE("load schema does not have dimensions");
}
if (spec.getMetrics() == null || spec.getMetrics().size() == 0) {
throw new ISE("load schema does not have metrics");
}
return spec;
}
catch (IOException ex) {
throw new RuntimeException("couldn't load segment load spec", ex);
}
}
}

View File

@ -0,0 +1,97 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer.hadoop;
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import org.joda.time.DateTime;
import java.util.List;
/**
* SegmentInputRow serves as a marker that these InputRow instances have already been combined
* and they contain the columns as they show up in the segment after ingestion, not what you would see in raw
* data.
* It must only be used to represent such InputRows.
*/
public class SegmentInputRow implements InputRow
{
private final InputRow delegate;
public SegmentInputRow(InputRow delegate){
this.delegate = delegate;
}
@Override
public List<String> getDimensions()
{
return delegate.getDimensions();
}
@Override
public long getTimestampFromEpoch()
{
return delegate.getTimestampFromEpoch();
}
@Override
public DateTime getTimestamp()
{
return delegate.getTimestamp();
}
@Override
public List<String> getDimension(String dimension)
{
return delegate.getDimension(dimension);
}
@Override
public Object getRaw(String dimension)
{
return delegate.getRaw(dimension);
}
@Override
public float getFloatMetric(String metric)
{
return delegate.getFloatMetric(metric);
}
@Override
public long getLongMetric(String metric)
{
return delegate.getLongMetric(metric);
}
@Override
public int compareTo(Row row)
{
return delegate.compareTo(row);
}
@Override
public String toString()
{
return "SegmentInputRow{" +
"delegate=" + delegate +
'}';
}
}

View File

@ -0,0 +1,186 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer.path;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.common.logger.Logger;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.hadoop.DatasourceIngestionSpec;
import io.druid.indexer.hadoop.DatasourceInputFormat;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.timeline.DataSegment;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import java.io.IOException;
import java.util.List;
import java.util.Set;
public class DatasourcePathSpec implements PathSpec
{
private static final Logger logger = new Logger(DatasourcePathSpec.class);
private final ObjectMapper mapper;
private final DatasourceIngestionSpec ingestionSpec;
private final long maxSplitSize;
private final List<DataSegment> segments;
public DatasourcePathSpec(
@JacksonInject ObjectMapper mapper,
@JsonProperty("segments") List<DataSegment> segments,
@JsonProperty("ingestionSpec") DatasourceIngestionSpec spec,
@JsonProperty("maxSplitSize") Long maxSplitSize
)
{
this.mapper = Preconditions.checkNotNull(mapper, "null mapper");
this.segments = segments;
this.ingestionSpec = Preconditions.checkNotNull(spec, "null ingestionSpec");
if(maxSplitSize == null) {
this.maxSplitSize = 0;
} else {
this.maxSplitSize = maxSplitSize.longValue();
}
}
@JsonProperty
public List<DataSegment> getSegments()
{
return segments;
}
@JsonProperty
public DatasourceIngestionSpec getIngestionSpec()
{
return ingestionSpec;
}
@JsonProperty
public long getMaxSplitSize()
{
return maxSplitSize;
}
@Override
public Job addInputPaths(
HadoopDruidIndexerConfig config, Job job
) throws IOException
{
Preconditions.checkArgument(segments != null && !segments.isEmpty(), "no segments provided");
logger.info(
"Found total [%d] segments for [%s] in interval [%s]",
segments.size(),
ingestionSpec.getDataSource(),
ingestionSpec.getInterval()
);
DatasourceIngestionSpec updatedIngestionSpec = ingestionSpec;
if (updatedIngestionSpec.getDimensions() == null) {
List<String> dims;
if (config.getParser().getParseSpec().getDimensionsSpec().hasCustomDimensions()) {
dims = config.getParser().getParseSpec().getDimensionsSpec().getDimensions();
} else {
Set<String> dimSet = Sets.newHashSet(
Iterables.concat(
Iterables.transform(
segments,
new Function<DataSegment, Iterable<String>>()
{
@Override
public Iterable<String> apply(DataSegment dataSegment)
{
return dataSegment.getDimensions();
}
}
)
)
);
dims = Lists.newArrayList(
Sets.difference(
dimSet,
config.getParser()
.getParseSpec()
.getDimensionsSpec()
.getDimensionExclusions()
)
);
}
updatedIngestionSpec = updatedIngestionSpec.withDimensions(dims);
}
if (updatedIngestionSpec.getMetrics() == null) {
Set<String> metrics = Sets.newHashSet();
final AggregatorFactory[] cols = config.getSchema().getDataSchema().getAggregators();
if (cols != null) {
for (AggregatorFactory col : cols) {
metrics.add(col.getName());
}
}
updatedIngestionSpec = updatedIngestionSpec.withMetrics(Lists.newArrayList(metrics));
}
job.getConfiguration().set(DatasourceInputFormat.CONF_DRUID_SCHEMA, mapper.writeValueAsString(updatedIngestionSpec));
job.getConfiguration().set(DatasourceInputFormat.CONF_INPUT_SEGMENTS, mapper.writeValueAsString(segments));
job.getConfiguration().set(DatasourceInputFormat.CONF_MAX_SPLIT_SIZE, String.valueOf(maxSplitSize));
MultipleInputs.addInputPath(job, new Path("/dummy/tobe/ignored"), DatasourceInputFormat.class);
return job;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DatasourcePathSpec that = (DatasourcePathSpec) o;
if (maxSplitSize != that.maxSplitSize) {
return false;
}
if (!ingestionSpec.equals(that.ingestionSpec)) {
return false;
}
return !(segments != null ? !segments.equals(that.segments) : that.segments != null);
}
@Override
public int hashCode()
{
int result = ingestionSpec.hashCode();
result = 31 * result + (int) (maxSplitSize ^ (maxSplitSize >>> 32));
result = 31 * result + (segments != null ? segments.hashCode() : 0);
return result;
}
}

View File

@ -30,7 +30,10 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.format.DateTimeFormat;
@ -152,7 +155,7 @@ public class GranularityPathSpec implements PathSpec
for (String path : paths) {
log.info("Appending path[%s]", path);
FileInputFormat.addInputPath(job, new Path(path));
StaticPathSpec.addToMultipleInputs(config, job, path, inputFormat);
}
return job;

View File

@ -0,0 +1,53 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer.path;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
import java.io.IOException;
import java.util.List;
/**
*/
public class MetadataStoreBasedUsedSegmentLister implements UsedSegmentLister
{
private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
@Inject
public MetadataStoreBasedUsedSegmentLister(IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator)
{
this.indexerMetadataStorageCoordinator = Preconditions.checkNotNull(
indexerMetadataStorageCoordinator,
"null indexerMetadataStorageCoordinator"
);
}
@Override
public List<DataSegment> getUsedSegmentsForInterval(
String dataSource, Interval interval
) throws IOException
{
return indexerMetadataStorageCoordinator.getUsedSegmentsForInterval(dataSource, interval);
}
}

View File

@ -0,0 +1,80 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer.path;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
import io.druid.indexer.HadoopDruidIndexerConfig;
import org.apache.hadoop.mapreduce.Job;
import java.io.IOException;
import java.util.List;
public class MultiplePathSpec implements PathSpec
{
private List<PathSpec> children;
public MultiplePathSpec(
@JsonProperty("children") List<PathSpec> children
)
{
Preconditions.checkArgument(children != null && children.size() > 0, "Null/Empty list of child PathSpecs");
this.children = children;
}
@JsonProperty
public List<PathSpec> getChildren()
{
return children;
}
@Override
public Job addInputPaths(
HadoopDruidIndexerConfig config, Job job
) throws IOException
{
for(PathSpec spec : children) {
spec.addInputPaths(config, job);
}
return job;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
MultiplePathSpec that = (MultiplePathSpec) o;
return children.equals(that.children);
}
@Override
public int hashCode()
{
return children.hashCode();
}
}

View File

@ -20,8 +20,6 @@ package io.druid.indexer.path;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.druid.indexer.HadoopDruidIndexerConfig;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import java.io.IOException;
@ -32,10 +30,11 @@ import java.io.IOException;
@JsonSubTypes(value={
@JsonSubTypes.Type(name="granular_unprocessed", value=GranularUnprocessedPathSpec.class),
@JsonSubTypes.Type(name="granularity", value=GranularityPathSpec.class),
@JsonSubTypes.Type(name="static", value=StaticPathSpec.class)
@JsonSubTypes.Type(name="static", value=StaticPathSpec.class),
@JsonSubTypes.Type(name="dataSource", value=DatasourcePathSpec.class),
@JsonSubTypes.Type(name="multi", value=MultiplePathSpec.class)
})
public interface PathSpec
{
public Job addInputPaths(HadoopDruidIndexerConfig config, Job job) throws IOException;
public Class<? extends InputFormat> getInputFormat();
}

View File

@ -20,10 +20,12 @@ package io.druid.indexer.path;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.metamx.common.logger.Logger;
import io.druid.indexer.HadoopDruidIndexerConfig;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import java.io.IOException;
@ -60,7 +62,9 @@ public class StaticPathSpec implements PathSpec
public Job addInputPaths(HadoopDruidIndexerConfig config, Job job) throws IOException
{
log.info("Adding paths[%s]", paths);
FileInputFormat.addInputPaths(job, paths);
addToMultipleInputs(config, job, paths, inputFormat);
return job;
}
@ -68,4 +72,54 @@ public class StaticPathSpec implements PathSpec
{
return inputFormat;
}
public String getPaths()
{
return paths;
}
public final static void addToMultipleInputs(
HadoopDruidIndexerConfig config,
Job job,
String path,
Class<? extends InputFormat> inputFormatClass
)
{
if (inputFormatClass == null) {
if (config.isCombineText()) {
MultipleInputs.addInputPath(job, new Path(path), CombineTextInputFormat.class);
} else {
MultipleInputs.addInputPath(job, new Path(path), TextInputFormat.class);
}
} else {
MultipleInputs.addInputPath(job, new Path(path), inputFormatClass);
}
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
StaticPathSpec that = (StaticPathSpec) o;
if (paths != null ? !paths.equals(that.paths) : that.paths != null) {
return false;
}
return !(inputFormat != null ? !inputFormat.equals(that.inputFormat) : that.inputFormat != null);
}
@Override
public int hashCode()
{
int result = paths != null ? paths.hashCode() : 0;
result = 31 * result + (inputFormat != null ? inputFormat.hashCode() : 0);
return result;
}
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer.path;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
import java.io.IOException;
import java.util.List;
/**
*/
public interface UsedSegmentLister
{
/**
* Get all segments which may include any data in the interval and are flagged as used.
*
* @param dataSource The datasource to query
* @param interval The interval for which all applicable and used datasources are requested. Start is inclusive, end is exclusive
*
* @return The DataSegments which include data in the requested interval. These segments may contain data outside the requested interval.
*
* @throws IOException
*/
public List<DataSegment> getUsedSegmentsForInterval(final String dataSource, final Interval interval)
throws IOException;
}

View File

@ -32,6 +32,7 @@ import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.indexer.JobHelper;
import io.druid.indexer.hadoop.DatasourceInputSplit;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.timeline.DataSegment;
@ -42,7 +43,6 @@ import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobPriority;
import org.apache.hadoop.mapreduce.InputFormat;
@ -62,15 +62,10 @@ import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.util.Progressable;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
@ -470,10 +465,10 @@ public class HadoopConverterJob
) throws IOException, InterruptedException
{
final InputSplit split = context.getInputSplit();
if (!(split instanceof DataSegmentSplit)) {
if (!(split instanceof DatasourceInputSplit)) {
throw new IAE(
"Unexpected split type. Expected [%s] was [%s]",
DataSegmentSplit.class.getCanonicalName(),
DatasourceInputSplit.class.getCanonicalName(),
split.getClass().getCanonicalName()
);
}
@ -481,13 +476,13 @@ public class HadoopConverterJob
final String tmpDirLoc = context.getConfiguration().get(TMP_FILE_LOC_KEY);
final File tmpDir = Paths.get(tmpDirLoc).toFile();
final DataSegment segment = ((DataSegmentSplit) split).getDataSegment();
final DataSegment segment = Iterables.getOnlyElement(((DatasourceInputSplit) split).getSegments());
final HadoopDruidConverterConfig config = converterConfigFromConfiguration(context.getConfiguration());
context.setStatus("DOWNLOADING");
context.progress();
final Path inPath = new Path(getURIFromSegment(segment));
final Path inPath = new Path(JobHelper.getURIFromSegment(segment));
final File inDir = new File(tmpDir, "in");
if (inDir.exists() && !inDir.delete()) {
@ -559,38 +554,6 @@ public class HadoopConverterJob
context.getConfiguration().set(TMP_FILE_LOC_KEY, tmpFile.getAbsolutePath());
}
private static URI getURIFromSegment(DataSegment dataSegment)
{
// There is no good way around this...
// TODO: add getURI() to URIDataPuller
final Map<String, Object> loadSpec = dataSegment.getLoadSpec();
final String type = loadSpec.get("type").toString();
final URI segmentLocURI;
if ("s3_zip".equals(type)) {
segmentLocURI = URI.create(String.format("s3n://%s/%s", loadSpec.get("bucket"), loadSpec.get("key")));
} else if ("hdfs".equals(type)) {
segmentLocURI = URI.create(loadSpec.get("path").toString());
} else if ("local".equals(type)) {
try {
segmentLocURI = new URI("file", null, loadSpec.get("path").toString(), null, null);
}
catch (URISyntaxException e) {
throw new ISE(e, "Unable to form simple file uri");
}
} else {
try {
throw new IAE(
"Cannot figure out loadSpec %s",
HadoopDruidConverterConfig.jsonMapper.writeValueAsString(loadSpec)
);
}
catch (JsonProcessingException e) {
throw new ISE("Cannot write Map with json mapper");
}
}
return segmentLocURI;
}
@Override
protected void cleanup(
Context context
@ -604,50 +567,6 @@ public class HadoopConverterJob
}
}
public static class DataSegmentSplit extends InputSplit implements Writable
{
private DataSegment dataSegment = null;
public DataSegmentSplit()
{
// For serialization purposes
}
public DataSegmentSplit(@NotNull DataSegment dataSegment)
{
this.dataSegment = Preconditions.checkNotNull(dataSegment, "dataSegment");
}
@Override
public long getLength() throws IOException, InterruptedException
{
return dataSegment.getSize();
}
@Override
public String[] getLocations() throws IOException, InterruptedException
{
return new String[]{};
}
protected DataSegment getDataSegment()
{
return dataSegment;
}
@Override
public void write(DataOutput out) throws IOException
{
out.write(HadoopDruidConverterConfig.jsonMapper.writeValueAsString(dataSegment).getBytes());
}
@Override
public void readFields(DataInput in) throws IOException
{
dataSegment = HadoopDruidConverterConfig.jsonMapper.readValue(in.readLine(), DataSegment.class);
}
}
public static class ConfigInputFormat extends InputFormat<String, String>
{
@Override
@ -665,7 +584,7 @@ public class HadoopConverterJob
@Override
public InputSplit apply(DataSegment input)
{
return new DataSegmentSplit(input);
return new DatasourceInputSplit(ImmutableList.of(input));
}
}
);

View File

@ -20,6 +20,7 @@ package io.druid.indexer.updater;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Supplier;
import io.druid.metadata.MetadataStorageConnectorConfig;
import io.druid.metadata.MetadataStorageTablesConfig;
import io.druid.metadata.PasswordProvider;
import javax.validation.constraints.NotNull;
@ -78,4 +79,20 @@ public class MetadataStorageUpdaterJobSpec implements Supplier<MetadataStorageCo
}
};
}
//Note: Currently it only supports configured segmentTable, other tables should be added if needed
//by the code using this
public MetadataStorageTablesConfig getMetadataStorageTablesConfig()
{
return new MetadataStorageTablesConfig(
null,
segmentTable,
null,
null,
null,
null,
null,
null
);
}
}

View File

@ -0,0 +1,359 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import com.metamx.common.Granularity;
import io.druid.data.input.Firehose;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.CSVParseSpec;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.indexer.path.DatasourcePathSpec;
import io.druid.indexer.path.UsedSegmentLister;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.segment.IndexIO;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexStorageAdapter;
import io.druid.segment.StorageAdapter;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.loading.LocalDataSegmentPuller;
import io.druid.segment.realtime.firehose.IngestSegmentFirehose;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
public class BatchDeltaIngestionTest
{
public final
@Rule
TemporaryFolder temporaryFolder = new TemporaryFolder();
private ObjectMapper mapper;
private Interval interval;
private List<DataSegment> segments;
public BatchDeltaIngestionTest() throws IOException
{
mapper = new DefaultObjectMapper();
mapper.registerSubtypes(new NamedType(HashBasedNumberedShardSpec.class, "hashed"));
InjectableValues inject = new InjectableValues.Std().addValue(ObjectMapper.class, mapper);
mapper.setInjectableValues(inject);
this.interval = new Interval("2014-10-22T00:00:00Z/P1D");
segments = ImmutableList.of(
new DefaultObjectMapper()
.readValue(
this.getClass().getClassLoader().getResource("test-segment/descriptor.json"),
DataSegment.class
)
.withLoadSpec(
ImmutableMap.<String, Object>of(
"type",
"local",
"path",
this.getClass().getClassLoader().getResource("test-segment/index.zip").getPath()
)
)
);
}
@Test
public void testReindexing() throws Exception
{
HadoopDruidIndexerConfig config = makeHadoopDruidIndexerConfig(
ImmutableMap.<String, Object>of(
"type",
"dataSource",
"ingestionSpec",
ImmutableMap.of(
"dataSource",
"xyz",
"interval",
interval
),
"segments",
segments
),
temporaryFolder.newFolder()
);
List<ImmutableMap<String, Object>> expectedRows = ImmutableList.of(
ImmutableMap.<String, Object>of(
"time", DateTime.parse("2014-10-22T00:00:00.000Z"),
"host", ImmutableList.of("a.example.com"),
"visited_sum", 100L,
"unique_hosts", 1.0d
),
ImmutableMap.<String, Object>of(
"time", DateTime.parse("2014-10-22T01:00:00.000Z"),
"host", ImmutableList.of("b.example.com"),
"visited_sum", 150L,
"unique_hosts", 1.0d
),
ImmutableMap.<String, Object>of(
"time", DateTime.parse("2014-10-22T02:00:00.000Z"),
"host", ImmutableList.of("c.example.com"),
"visited_sum", 200L,
"unique_hosts", 1.0d
)
);
testIngestion(config, expectedRows);
}
@Test
public void testDeltaIngestion() throws Exception
{
File dataFile = temporaryFolder.newFile();
FileUtils.writeLines(
dataFile,
ImmutableList.of(
"2014102200,a.example.com,a.example.com,90",
"2014102201,b.example.com,b.example.com,25",
"2014102202,c.example.com,c.example.com,70"
)
);
HadoopDruidIndexerConfig config = makeHadoopDruidIndexerConfig(
ImmutableMap.<String, Object>of(
"type",
"multi",
"children",
ImmutableList.of(
ImmutableMap.<String, Object>of(
"type",
"dataSource",
"ingestionSpec",
ImmutableMap.of(
"dataSource",
"xyz",
"interval",
interval
),
"segments",
segments
),
ImmutableMap.<String, Object>of(
"type",
"static",
"paths",
dataFile.getCanonicalPath()
)
)
),
temporaryFolder.newFolder()
);
List<ImmutableMap<String, Object>> expectedRows = ImmutableList.of(
ImmutableMap.<String, Object>of(
"time", DateTime.parse("2014-10-22T00:00:00.000Z"),
"host", ImmutableList.of("a.example.com"),
"visited_sum", 190L,
"unique_hosts", 1.0d
),
ImmutableMap.<String, Object>of(
"time", DateTime.parse("2014-10-22T01:00:00.000Z"),
"host", ImmutableList.of("b.example.com"),
"visited_sum", 175L,
"unique_hosts", 1.0d
),
ImmutableMap.<String, Object>of(
"time", DateTime.parse("2014-10-22T02:00:00.000Z"),
"host", ImmutableList.of("c.example.com"),
"visited_sum", 270L,
"unique_hosts", 1.0d
)
);
testIngestion(config, expectedRows);
}
private void testIngestion(HadoopDruidIndexerConfig config, List<ImmutableMap<String, Object>> expectedRowsGenerated)
throws Exception
{
IndexGeneratorJob job = new LegacyIndexGeneratorJob(config);
JobHelper.runJobs(ImmutableList.<Jobby>of(job), config);
File segmentFolder = new File(
String.format(
"%s/%s/%s_%s/%s/0",
config.getSchema().getIOConfig().getSegmentOutputPath(),
config.getSchema().getDataSchema().getDataSource(),
interval.getStart().toString(),
interval.getEnd().toString(),
config.getSchema().getTuningConfig().getVersion()
)
);
Assert.assertTrue(segmentFolder.exists());
File descriptor = new File(segmentFolder, "descriptor.json");
File indexZip = new File(segmentFolder, "index.zip");
Assert.assertTrue(descriptor.exists());
Assert.assertTrue(indexZip.exists());
DataSegment dataSegment = mapper.readValue(descriptor, DataSegment.class);
Assert.assertEquals("website", dataSegment.getDataSource());
Assert.assertEquals(config.getSchema().getTuningConfig().getVersion(), dataSegment.getVersion());
Assert.assertEquals(interval, dataSegment.getInterval());
Assert.assertEquals("local", dataSegment.getLoadSpec().get("type"));
Assert.assertEquals(indexZip.getCanonicalPath(), dataSegment.getLoadSpec().get("path"));
Assert.assertEquals("host", dataSegment.getDimensions().get(0));
Assert.assertEquals("visited_sum", dataSegment.getMetrics().get(0));
Assert.assertEquals("unique_hosts", dataSegment.getMetrics().get(1));
Assert.assertEquals(Integer.valueOf(9), dataSegment.getBinaryVersion());
HashBasedNumberedShardSpec spec = (HashBasedNumberedShardSpec) dataSegment.getShardSpec();
Assert.assertEquals(0, spec.getPartitionNum());
Assert.assertEquals(1, spec.getPartitions());
File tmpUnzippedSegmentDir = temporaryFolder.newFolder();
new LocalDataSegmentPuller().getSegmentFiles(dataSegment, tmpUnzippedSegmentDir);
QueryableIndex index = IndexIO.loadIndex(tmpUnzippedSegmentDir);
StorageAdapter adapter = new QueryableIndexStorageAdapter(index);
Firehose firehose = new IngestSegmentFirehose(
ImmutableList.of(adapter),
ImmutableList.of("host"),
ImmutableList.of("visited_sum", "unique_hosts"),
null,
interval,
QueryGranularity.NONE
);
List<InputRow> rows = Lists.newArrayList();
while (firehose.hasMore()) {
rows.add(firehose.nextRow());
}
verifyRows(expectedRowsGenerated, rows);
}
private HadoopDruidIndexerConfig makeHadoopDruidIndexerConfig(Map<String, Object> inputSpec, File tmpDir)
throws Exception
{
HadoopDruidIndexerConfig config = new HadoopDruidIndexerConfig(
new HadoopIngestionSpec(
new DataSchema(
"website",
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host"), null, null),
null,
ImmutableList.of("timestamp", "host", "host2", "visited_num")
)
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("visited_sum", "visited_num"),
new HyperUniquesAggregatorFactory("unique_hosts", "host2")
},
new UniformGranularitySpec(
Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(this.interval)
)
),
new HadoopIOConfig(
inputSpec,
null,
tmpDir.getCanonicalPath()
),
new HadoopTuningConfig(
tmpDir.getCanonicalPath(),
null,
null,
null,
null,
null,
false,
false,
false,
false,
null,
false,
false,
false,
null,
null,
false
)
)
);
config.setShardSpecs(
ImmutableMap.<DateTime, List<HadoopyShardSpec>>of(
interval.getStart(),
ImmutableList.of(
new HadoopyShardSpec(
new HashBasedNumberedShardSpec(0, 1, HadoopDruidIndexerConfig.jsonMapper),
0
)
)
)
);
config = HadoopDruidIndexerConfig.fromSpec(config.getSchema());
return config;
}
private void verifyRows(List<ImmutableMap<String, Object>> expectedRows, List<InputRow> actualRows)
{
Assert.assertEquals(expectedRows.size(), actualRows.size());
for (int i = 0; i < expectedRows.size(); i++) {
Map<String, Object> expected = expectedRows.get(i);
InputRow actual = actualRows.get(i);
Assert.assertEquals(ImmutableList.of("host"), actual.getDimensions());
Assert.assertEquals(expected.get("time"), actual.getTimestamp());
Assert.assertEquals(expected.get("host"), actual.getDimension("host"));
Assert.assertEquals(expected.get("visited_sum"), actual.getLongMetric("visited_sum"));
Assert.assertEquals(
(Double) expected.get("unique_hosts"),
(Double) HyperUniquesAggregatorFactory.estimateCardinality(actual.getRaw("unique_hosts")),
0.001
);
}
}
}

View File

@ -0,0 +1,154 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.Granularity;
import io.druid.indexer.hadoop.DatasourceIngestionSpec;
import io.druid.indexer.path.DatasourcePathSpec;
import io.druid.indexer.path.MultiplePathSpec;
import io.druid.indexer.path.PathSpec;
import io.druid.indexer.path.StaticPathSpec;
import io.druid.indexer.path.UsedSegmentLister;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
import java.util.Map;
/**
*/
public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest
{
private final String testDatasource = "test";
private final Interval testDatasourceInterval = new Interval("1970/2000");
private final ObjectMapper jsonMapper = new DefaultObjectMapper();
private final List<DataSegment> segments = ImmutableList.of(
new DataSegment(
"test1",
Interval.parse("2000/3000"),
"ver",
ImmutableMap.<String, Object>of(
"type", "local",
"path", "/tmp/index1.zip"
),
ImmutableList.of("host"),
ImmutableList.of("visited_sum", "unique_hosts"),
new NoneShardSpec(),
9,
2
)
);
@Test
public void testupdateSegmentListIfDatasourcePathSpecIsUsedWithNoDatasourcePathSpec() throws Exception
{
PathSpec pathSpec = new StaticPathSpec("/xyz", null);
HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(pathSpec);
Assert.assertTrue(config.getPathSpec() instanceof StaticPathSpec);
}
@Test
public void testupdateSegmentListIfDatasourcePathSpecIsUsedWithJustDatasourcePathSpec() throws Exception
{
PathSpec pathSpec = new DatasourcePathSpec(
jsonMapper,
null,
new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null),
null
);
HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(pathSpec);
Assert.assertEquals(segments, ((DatasourcePathSpec) config.getPathSpec()).getSegments());
}
@Test
public void testupdateSegmentListIfDatasourcePathSpecIsUsedWithMultiplePathSpec() throws Exception
{
PathSpec pathSpec = new MultiplePathSpec(
ImmutableList.of(
new StaticPathSpec("/xyz", null),
new DatasourcePathSpec(
jsonMapper,
null,
new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null),
null
)
)
);
HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(pathSpec);
Assert.assertEquals(
segments,
((DatasourcePathSpec) ((MultiplePathSpec) config.getPathSpec()).getChildren().get(1)).getSegments()
);
}
private HadoopDruidIndexerConfig testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(
PathSpec datasourcePathSpec
)
throws Exception
{
HadoopIngestionSpec spec = new HadoopIngestionSpec(
new DataSchema(
"foo",
null,
new AggregatorFactory[0],
new UniformGranularitySpec(
Granularity.DAY,
null,
ImmutableList.of(
new Interval("2010-01-01/P1D")
)
)
),
new HadoopIOConfig(
jsonMapper.convertValue(datasourcePathSpec, Map.class),
null,
null
),
null
);
spec = jsonMapper.readValue(
jsonMapper.writeValueAsString(spec),
HadoopIngestionSpec.class
);
UsedSegmentLister segmentLister = EasyMock.createMock(UsedSegmentLister.class);
EasyMock.expect(
segmentLister.getUsedSegmentsForInterval(testDatasource, testDatasourceInterval)
).andReturn(segments);
EasyMock.replay(segmentLister);
spec = HadoopIngestionSpec.updateSegmentListIfDatasourcePathSpecIsUsed(spec, jsonMapper, segmentLister);
return HadoopDruidIndexerConfig.fromString(jsonMapper.writeValueAsString(spec));
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer.hadoop;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import io.druid.granularity.QueryGranularity;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.filter.SelectorDimFilter;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
/**
*/
public class DatasourceIngestionSpecTest
{
@Test
public void testSerde() throws Exception
{
DatasourceIngestionSpec expected = new DatasourceIngestionSpec(
"test",
Interval.parse("2014/2015"),
new SelectorDimFilter("dim", "value"),
QueryGranularity.DAY,
Lists.newArrayList("d1", "d2"),
Lists.newArrayList("m1", "m2", "m3")
);
ObjectMapper jsonMapper = new DefaultObjectMapper();
DatasourceIngestionSpec actual = jsonMapper.readValue(jsonMapper.writeValueAsString(expected), DatasourceIngestionSpec.class);
Assert.assertEquals(expected, actual);
}
}

View File

@ -0,0 +1,154 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer.hadoop;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
/**
*/
public class DatasourceInputFormatTest
{
private List<DataSegment> segments;
private Configuration config;
private JobContext context;
@Before
public void setUp() throws Exception
{
segments = ImmutableList.of(
new DataSegment(
"test1",
Interval.parse("2000/3000"),
"ver",
ImmutableMap.<String, Object>of(
"type", "local",
"path", "/tmp/index1.zip"
),
ImmutableList.of("host"),
ImmutableList.of("visited_sum", "unique_hosts"),
new NoneShardSpec(),
9,
2
),
new DataSegment(
"test2",
Interval.parse("2050/3000"),
"ver",
ImmutableMap.<String, Object>of(
"type", "hdfs",
"path", "/tmp/index2.zip"
),
ImmutableList.of("host"),
ImmutableList.of("visited_sum", "unique_hosts"),
new NoneShardSpec(),
9,
11
),
new DataSegment(
"test3",
Interval.parse("2030/3000"),
"ver",
ImmutableMap.<String, Object>of(
"type", "hdfs",
"path", "/tmp/index3.zip"
),
ImmutableList.of("host"),
ImmutableList.of("visited_sum", "unique_hosts"),
new NoneShardSpec(),
9,
4
)
);
config = new Configuration();
config.set(
DatasourceInputFormat.CONF_INPUT_SEGMENTS,
new DefaultObjectMapper().writeValueAsString(segments)
);
context = EasyMock.createMock(JobContext.class);
EasyMock.expect(context.getConfiguration()).andReturn(config);
EasyMock.replay(context);
}
@Test
public void testGetSplitsNoCombining() throws Exception
{
List<InputSplit> splits = new DatasourceInputFormat().getSplits(context);
Assert.assertEquals(segments.size(), splits.size());
for (int i = 0; i < segments.size(); i++) {
Assert.assertEquals(segments.get(i), ((DatasourceInputSplit) splits.get(i)).getSegments().get(0));
}
}
@Test
public void testGetSplitsAllCombined() throws Exception
{
config.set(DatasourceInputFormat.CONF_MAX_SPLIT_SIZE, "999999");
List<InputSplit> splits = new DatasourceInputFormat().getSplits(context);
Assert.assertEquals(1, splits.size());
Assert.assertEquals(
Sets.newHashSet(segments),
Sets.newHashSet((((DatasourceInputSplit) splits.get(0)).getSegments()))
);
}
@Test
public void testGetSplitsCombineInTwo() throws Exception
{
config.set(DatasourceInputFormat.CONF_MAX_SPLIT_SIZE, "6");
List<InputSplit> splits = new DatasourceInputFormat().getSplits(context);
Assert.assertEquals(2, splits.size());
Assert.assertEquals(
Sets.newHashSet(segments.get(0), segments.get(2)),
Sets.newHashSet((((DatasourceInputSplit) splits.get(0)).getSegments()))
);
Assert.assertEquals(
Sets.newHashSet(segments.get(1)),
Sets.newHashSet((((DatasourceInputSplit) splits.get(1)).getSegments()))
);
}
@Test
public void testGetRecordReader() throws Exception
{
Assert.assertTrue(new DatasourceInputFormat().createRecordReader(null, null) instanceof DatasourceRecordReader);
}
}

View File

@ -0,0 +1,71 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer.hadoop;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.io.DataInput;
/**
*/
public class DatasourceInputSplitTest
{
@Test
public void testSerde() throws Exception
{
DatasourceInputSplit expected = new DatasourceInputSplit(
Lists.newArrayList(
new DataSegment(
"test",
Interval.parse("2000/3000"),
"ver",
ImmutableMap.<String, Object>of(
"type", "local",
"path", "/tmp/index.zip"
),
ImmutableList.of("host"),
ImmutableList.of("visited_sum", "unique_hosts"),
new NoneShardSpec(),
9,
12334
)
)
);
ByteArrayDataOutput out = ByteStreams.newDataOutput();
expected.write(out);
DataInput in = ByteStreams.newDataInput(out.toByteArray());
DatasourceInputSplit actual = new DatasourceInputSplit();
actual.readFields(in);
Assert.assertEquals(expected.getSegments(), actual.getSegments());
Assert.assertEquals(12334, actual.getLength());
}
}

View File

@ -0,0 +1,136 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer.hadoop;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.druid.data.input.InputRow;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.timeline.DataSegment;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
import java.util.Map;
/**
*/
public class DatasourceRecordReaderTest
{
@Test
public void testSanity() throws Exception
{
DataSegment segment = new DefaultObjectMapper()
.readValue(this.getClass().getClassLoader().getResource("test-segment/descriptor.json"), DataSegment.class)
.withLoadSpec(
ImmutableMap.<String, Object>of(
"type",
"local",
"path",
this.getClass().getClassLoader().getResource("test-segment/index.zip").getPath()
)
);
InputSplit split = new DatasourceInputSplit(Lists.newArrayList(segment));
Configuration config = new Configuration();
config.set(
DatasourceInputFormat.CONF_DRUID_SCHEMA,
HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(
new DatasourceIngestionSpec(
segment.getDataSource(),
segment.getInterval(),
null,
null,
segment.getDimensions(),
segment.getMetrics()
)
)
);
TaskAttemptContext context = EasyMock.createNiceMock(TaskAttemptContext.class);
EasyMock.expect(context.getConfiguration()).andReturn(config).anyTimes();
EasyMock.replay(context);
DatasourceRecordReader rr = new DatasourceRecordReader();
rr.initialize(split, context);
Assert.assertEquals(0, rr.getProgress(), 0.0001);
List<InputRow> rows = Lists.newArrayList();
while(rr.nextKeyValue()) {
rows.add(rr.getCurrentValue());
}
verifyRows(rows);
Assert.assertEquals(1, rr.getProgress(), 0.0001);
rr.close();
}
private void verifyRows(List<InputRow> actualRows)
{
List<ImmutableMap<String, Object>> expectedRows = ImmutableList.of(
ImmutableMap.<String, Object>of(
"time", DateTime.parse("2014-10-22T00:00:00.000Z"),
"host", ImmutableList.of("a.example.com"),
"visited_sum", 100L,
"unique_hosts", 1.0d
),
ImmutableMap.<String, Object>of(
"time", DateTime.parse("2014-10-22T01:00:00.000Z"),
"host", ImmutableList.of("b.example.com"),
"visited_sum", 150L,
"unique_hosts", 1.0d
),
ImmutableMap.<String, Object>of(
"time", DateTime.parse("2014-10-22T02:00:00.000Z"),
"host", ImmutableList.of("c.example.com"),
"visited_sum", 200L,
"unique_hosts", 1.0d
)
);
Assert.assertEquals(expectedRows.size(), actualRows.size());
for (int i = 0; i < expectedRows.size(); i++) {
Map<String, Object> expected = expectedRows.get(i);
InputRow actual = actualRows.get(i);
Assert.assertEquals(ImmutableList.of("host"), actual.getDimensions());
Assert.assertEquals(expected.get("time"), actual.getTimestamp());
Assert.assertEquals(expected.get("host"), actual.getDimension("host"));
Assert.assertEquals(expected.get("visited_sum"), actual.getLongMetric("visited_sum"));
Assert.assertEquals(
(Double) expected.get("unique_hosts"),
(Double) HyperUniquesAggregatorFactory.estimateCardinality(actual.getRaw("unique_hosts")),
0.001
);
}
}
}

View File

@ -0,0 +1,235 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer.path;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.metamx.common.Granularity;
import io.druid.data.input.impl.CSVParseSpec;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.guice.GuiceInjectors;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.annotations.Self;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.HadoopIOConfig;
import io.druid.indexer.HadoopIngestionSpec;
import io.druid.indexer.HadoopTuningConfig;
import io.druid.indexer.hadoop.DatasourceIngestionSpec;
import io.druid.indexer.hadoop.DatasourceInputFormat;
import io.druid.initialization.Initialization;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.server.DruidNode;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
/**
*/
public class DatasourcePathSpecTest
{
private DatasourceIngestionSpec ingestionSpec;
private List<DataSegment> segments;
public DatasourcePathSpecTest()
{
this.ingestionSpec = new DatasourceIngestionSpec(
"test",
Interval.parse("2000/3000"),
null,
null,
null,
null
);
segments = ImmutableList.of(
new DataSegment(
ingestionSpec.getDataSource(),
Interval.parse("2000/3000"),
"ver",
ImmutableMap.<String, Object>of(
"type", "local",
"path", "/tmp/index.zip"
),
ImmutableList.of("product"),
ImmutableList.of("visited_sum", "unique_hosts"),
new NoneShardSpec(),
9,
12334
),
new DataSegment(
ingestionSpec.getDataSource(),
Interval.parse("2050/3000"),
"ver",
ImmutableMap.<String, Object>of(
"type", "hdfs",
"path", "/tmp/index.zip"
),
ImmutableList.of("product"),
ImmutableList.of("visited_sum", "unique_hosts"),
new NoneShardSpec(),
9,
12335
)
);
}
@Test
public void testSerde() throws Exception
{
final UsedSegmentLister segmentList = EasyMock.createMock(
UsedSegmentLister.class
);
Injector injector = Initialization.makeInjectorWithModules(
GuiceInjectors.makeStartupInjector(),
ImmutableList.<Module>of(
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bind(UsedSegmentLister.class).toInstance(segmentList);
JsonConfigProvider.bindInstance(
binder, Key.get(DruidNode.class, Self.class), new DruidNode("dummy-node", null, null)
);
}
}
)
);
ObjectMapper jsonMapper = injector.getInstance(ObjectMapper.class);
DatasourcePathSpec expected = new DatasourcePathSpec(
jsonMapper,
null,
ingestionSpec,
Long.valueOf(10)
);
PathSpec actual = jsonMapper.readValue(jsonMapper.writeValueAsString(expected), PathSpec.class);
Assert.assertEquals(expected, actual);
expected = new DatasourcePathSpec(
jsonMapper,
null,
ingestionSpec,
null
);
actual = jsonMapper.readValue(jsonMapper.writeValueAsString(expected), PathSpec.class);
Assert.assertEquals(expected, actual);
expected = new DatasourcePathSpec(
jsonMapper,
segments,
ingestionSpec,
null
);
actual = jsonMapper.readValue(jsonMapper.writeValueAsString(expected), PathSpec.class);
Assert.assertEquals(expected, actual);
}
@Test
public void testAddInputPaths() throws Exception
{
HadoopDruidIndexerConfig hadoopIndexerConfig = new HadoopDruidIndexerConfig(
new HadoopIngestionSpec(
new DataSchema(
ingestionSpec.getDataSource(),
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(null, null, null),
null,
ImmutableList.of("timestamp", "host", "visited")
)
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("visited_sum", "visited")
},
new UniformGranularitySpec(
Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(Interval.parse("2000/3000"))
)
),
new HadoopIOConfig(
ImmutableMap.<String, Object>of(
"paths",
"/tmp/dummy",
"type",
"static"
),
null,
"/tmp/dummy"
),
HadoopTuningConfig.makeDefaultTuningConfig().withWorkingPath("/tmp/work").withVersion("ver")
)
);
ObjectMapper mapper = new DefaultObjectMapper();
DatasourcePathSpec pathSpec = new DatasourcePathSpec(
mapper,
segments,
ingestionSpec,
null
);
Configuration config = new Configuration();
Job job = EasyMock.createNiceMock(Job.class);
EasyMock.expect(job.getConfiguration()).andReturn(config).anyTimes();
EasyMock.replay(job);
pathSpec.addInputPaths(hadoopIndexerConfig, job);
List<DataSegment> actualSegments = mapper.readValue(
config.get(DatasourceInputFormat.CONF_INPUT_SEGMENTS),
new TypeReference<List<DataSegment>>()
{
}
);
Assert.assertEquals(segments, actualSegments);
DatasourceIngestionSpec actualIngestionSpec = mapper.readValue(config.get(DatasourceInputFormat.CONF_DRUID_SCHEMA), DatasourceIngestionSpec.class);
Assert.assertEquals(ingestionSpec
.withDimensions(ImmutableList.of("product"))
.withMetrics(ImmutableList.of("visited_sum")),
actualIngestionSpec);
}
}

View File

@ -76,18 +76,18 @@ public class GranularityPathSpecTest
}
@Test
public void testDeserialization() throws Exception
public void testSerdeCustomInputFormat() throws Exception
{
testDeserialization("/test/path", "*.test", "pat_pat", Granularity.SECOND, TextInputFormat.class);
testSerde("/test/path", "*.test", "pat_pat", Granularity.SECOND, TextInputFormat.class);
}
@Test
public void testDeserializationNoInputFormat() throws Exception
public void testSerdeNoInputFormat() throws Exception
{
testDeserialization("/test/path", "*.test", "pat_pat", Granularity.SECOND, null);
testSerde("/test/path", "*.test", "pat_pat", Granularity.SECOND, null);
}
private void testDeserialization(
private void testSerde(
String inputPath,
String filePattern,
String pathFormat,
@ -114,7 +114,7 @@ public class GranularityPathSpecTest
}
sb.append("\"type\" : \"granularity\"}");
GranularityPathSpec pathSpec = (GranularityPathSpec)jsonMapper.readValue(sb.toString(), PathSpec.class);
GranularityPathSpec pathSpec = (GranularityPathSpec) StaticPathSpecTest.readWriteRead(sb.toString(), jsonMapper);
Assert.assertEquals(inputFormat, pathSpec.getInputFormat());
Assert.assertEquals(inputPath, pathSpec.getInputPath());
Assert.assertEquals(filePattern, pathSpec.getFilePattern());

View File

@ -0,0 +1,65 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer.path;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import io.druid.jackson.DefaultObjectMapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
/**
*/
public class MultiplePathSpecTest
{
@Test
public void testSerde() throws Exception
{
PathSpec expected = new MultiplePathSpec(
Lists.<PathSpec>newArrayList(
new StaticPathSpec("/tmp/path1", null),
new StaticPathSpec("/tmp/path2", TextInputFormat.class)
)
);
ObjectMapper jsonMapper = new DefaultObjectMapper();
PathSpec actual = jsonMapper.readValue(jsonMapper.writeValueAsString(expected), PathSpec.class);
Assert.assertEquals(expected, actual);
}
@Test
public void testAddInputPaths() throws Exception
{
PathSpec ps1 = EasyMock.createMock(PathSpec.class);
EasyMock.expect(ps1.addInputPaths(null, null)).andReturn(null);
PathSpec ps2 = EasyMock.createMock(PathSpec.class);
EasyMock.expect(ps2.addInputPaths(null, null)).andReturn(null);
EasyMock.replay(ps1, ps2);
new MultiplePathSpec(Lists.newArrayList(ps1, ps2)).addInputPaths(null, null);
EasyMock.verify(ps1, ps2);
}
}

View File

@ -21,8 +21,6 @@ package io.druid.indexer.path;
import io.druid.jackson.DefaultObjectMapper;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.junit.Assert;
import org.junit.Test;
@ -34,18 +32,18 @@ public class StaticPathSpecTest
private final ObjectMapper jsonMapper = new DefaultObjectMapper();
@Test
public void testDeserialization() throws Exception
public void testSerdeCustomInputFormat() throws Exception
{
testDeserialization("/sample/path", TextInputFormat.class);
testSerde("/sample/path", TextInputFormat.class);
}
@Test
public void testDeserializationNoInputFormat() throws Exception
{
testDeserialization("/sample/path", null);
testSerde("/sample/path", null);
}
private void testDeserialization(String path, Class inputFormat) throws Exception
private void testSerde(String path, Class inputFormat) throws Exception
{
StringBuilder sb = new StringBuilder();
sb.append("{\"paths\" : \"");
@ -57,13 +55,22 @@ public class StaticPathSpecTest
sb.append("\",");
}
sb.append("\"type\" : \"static\"}");
StaticPathSpec pathSpec = (StaticPathSpec)jsonMapper.readValue(sb.toString(), PathSpec.class);
StaticPathSpec pathSpec = (StaticPathSpec) readWriteRead(sb.toString(), jsonMapper);
Assert.assertEquals(inputFormat, pathSpec.getInputFormat());
Job job = Job.getInstance();
pathSpec.addInputPaths(null, job);
Assert.assertEquals(
"file:" + path,
job.getConfiguration().get(FileInputFormat.INPUT_DIR));
Assert.assertEquals(path, pathSpec.getPaths());
}
public static final PathSpec readWriteRead(String jsonStr, ObjectMapper jsonMapper) throws Exception
{
return jsonMapper.readValue(
jsonMapper.writeValueAsString(
jsonMapper.readValue(
jsonStr,
PathSpec.class
)
),
PathSpec.class
);
}
}

View File

@ -0,0 +1,17 @@
{
"binaryVersion": 9,
"dataSource": "testds",
"dimensions": "host",
"identifier": "testds_2014-10-22T00:00:00.000Z_2014-10-23T00:00:00.000Z_2015-07-15T22:02:40.171Z",
"interval": "2014-10-22T00:00:00.000Z/2014-10-23T00:00:00.000Z",
"loadSpec": {
"path": "test-segment/index.zip",
"type": "local"
},
"metrics": "visited_sum,unique_hosts",
"shardSpec": {
"type": "none"
},
"size": 4096,
"version": "2015-07-15T22:02:40.171Z"
}

View File

@ -0,0 +1,5 @@
This is a test segment containing following columns.
__time, host, visited_sum, unique_hosts
2014-10-22T00:00:00.000Z, a.example.com, 100, HLL sketch of cardinality 1
2014-10-22T01:00:00.000Z, b.example.com, 150, HLL sketch of cardinality 1
2014-10-22T02:00:00.000Z, c.example.com, 200, HLL sketch of cardinality 1

View File

@ -17,10 +17,12 @@
package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@ -33,17 +35,22 @@ import io.druid.indexer.HadoopDruidIndexerJob;
import io.druid.indexer.HadoopIngestionSpec;
import io.druid.indexer.Jobby;
import io.druid.indexer.MetadataStorageUpdaterJobHandler;
import io.druid.indexer.hadoop.DatasourceIngestionSpec;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockAcquireAction;
import io.druid.indexing.common.actions.LockTryAcquireAction;
import io.druid.indexing.common.actions.SegmentListUsedAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.hadoop.OverlordActionBasedUsedSegmentLister;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
public class HadoopIndexTask extends HadoopTask
@ -56,10 +63,14 @@ public class HadoopIndexTask extends HadoopTask
}
@JsonIgnore
private final HadoopIngestionSpec spec;
private HadoopIngestionSpec spec;
@JsonIgnore
private final String classpathPrefix;
@JsonIgnore
private final ObjectMapper jsonMapper;
/**
* @param spec is used by the HadoopDruidIndexerJob to set up the appropriate parameters
* for creating Druid index segments. It may be modified.
@ -76,7 +87,8 @@ public class HadoopIndexTask extends HadoopTask
@JsonProperty("spec") HadoopIngestionSpec spec,
@JsonProperty("hadoopCoordinates") String hadoopCoordinates,
@JsonProperty("hadoopDependencyCoordinates") List<String> hadoopDependencyCoordinates,
@JsonProperty("classpathPrefix") String classpathPrefix
@JsonProperty("classpathPrefix") String classpathPrefix,
@JacksonInject ObjectMapper jsonMapper
)
{
super(
@ -102,6 +114,7 @@ public class HadoopIndexTask extends HadoopTask
);
this.classpathPrefix = classpathPrefix;
this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "null ObjectMappper");
}
@Override
@ -152,6 +165,11 @@ public class HadoopIndexTask extends HadoopTask
final ClassLoader loader = buildClassLoader(toolbox);
boolean determineIntervals = !spec.getDataSchema().getGranularitySpec().bucketIntervals().isPresent();
spec = HadoopIngestionSpec.updateSegmentListIfDatasourcePathSpecIsUsed(
spec,
jsonMapper,
new OverlordActionBasedUsedSegmentLister(toolbox));
final String config = invokeForeignLoader(
"io.druid.indexing.common.task.HadoopIndexTask$HadoopDetermineConfigInnerProcessing",
new String[]{

View File

@ -25,20 +25,13 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.inject.Injector;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.Yielder;
import com.metamx.common.guava.YieldingAccumulator;
import com.metamx.common.parsers.ParseException;
import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.granularity.QueryGranularity;
import io.druid.indexing.common.TaskToolbox;
@ -46,30 +39,19 @@ import io.druid.indexing.common.TaskToolboxFactory;
import io.druid.indexing.common.actions.SegmentListUsedAction;
import io.druid.indexing.common.task.NoopTask;
import io.druid.query.filter.DimFilter;
import io.druid.query.select.EventHolder;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import io.druid.segment.IndexIO;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.QueryableIndexStorageAdapter;
import io.druid.segment.StorageAdapter;
import io.druid.segment.column.Column;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.filter.Filters;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.realtime.firehose.IngestSegmentFirehose;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.PartitionChunk;
import io.druid.utils.Runnables;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -286,7 +268,7 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
)
);
return new IngestSegmentFirehose(adapters, dims, metricsList);
return new IngestSegmentFirehose(adapters, dims, metricsList, dimFilter, interval, QueryGranularity.NONE);
}
catch (IOException e) {
@ -297,154 +279,4 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
}
}
public class IngestSegmentFirehose implements Firehose
{
private volatile Yielder<InputRow> rowYielder;
public IngestSegmentFirehose(List<StorageAdapter> adapters, final List<String> dims, final List<String> metrics)
{
Sequence<InputRow> rows = Sequences.concat(
Iterables.transform(
adapters, new Function<StorageAdapter, Sequence<InputRow>>()
{
@Nullable
@Override
public Sequence<InputRow> apply(StorageAdapter adapter)
{
return Sequences.concat(
Sequences.map(
adapter.makeCursors(
Filters.convertDimensionFilters(dimFilter),
interval,
QueryGranularity.ALL
), new Function<Cursor, Sequence<InputRow>>()
{
@Nullable
@Override
public Sequence<InputRow> apply(final Cursor cursor)
{
final LongColumnSelector timestampColumnSelector = cursor.makeLongColumnSelector(Column.TIME_COLUMN_NAME);
final Map<String, DimensionSelector> dimSelectors = Maps.newHashMap();
for (String dim : dims) {
final DimensionSelector dimSelector = cursor.makeDimensionSelector(dim, null);
// dimSelector is null if the dimension is not present
if (dimSelector != null) {
dimSelectors.put(dim, dimSelector);
}
}
final Map<String, ObjectColumnSelector> metSelectors = Maps.newHashMap();
for (String metric : metrics) {
final ObjectColumnSelector metricSelector = cursor.makeObjectColumnSelector(metric);
if (metricSelector != null) {
metSelectors.put(metric, metricSelector);
}
}
return Sequences.simple(
new Iterable<InputRow>()
{
@Override
public Iterator<InputRow> iterator()
{
return new Iterator<InputRow>()
{
@Override
public boolean hasNext()
{
return !cursor.isDone();
}
@Override
public InputRow next()
{
final Map<String, Object> theEvent = Maps.newLinkedHashMap();
final long timestamp = timestampColumnSelector.get();
theEvent.put(EventHolder.timestampKey, new DateTime(timestamp));
for (Map.Entry<String, DimensionSelector> dimSelector : dimSelectors.entrySet()) {
final String dim = dimSelector.getKey();
final DimensionSelector selector = dimSelector.getValue();
final IndexedInts vals = selector.getRow();
if (vals.size() == 1) {
final String dimVal = selector.lookupName(vals.get(0));
theEvent.put(dim, dimVal);
} else {
List<String> dimVals = Lists.newArrayList();
for (int i = 0; i < vals.size(); ++i) {
dimVals.add(selector.lookupName(vals.get(i)));
}
theEvent.put(dim, dimVals);
}
}
for (Map.Entry<String, ObjectColumnSelector> metSelector : metSelectors.entrySet()) {
final String metric = metSelector.getKey();
final ObjectColumnSelector selector = metSelector.getValue();
theEvent.put(metric, selector.get());
}
cursor.advance();
return new MapBasedInputRow(timestamp, dims, theEvent);
}
@Override
public void remove()
{
throw new UnsupportedOperationException("Remove Not Supported");
}
};
}
}
);
}
}
)
);
}
}
)
);
rowYielder = rows.toYielder(
null,
new YieldingAccumulator<InputRow, InputRow>()
{
@Override
public InputRow accumulate(InputRow accumulated, InputRow in)
{
yield();
return in;
}
}
);
}
@Override
public boolean hasMore()
{
return !rowYielder.isDone();
}
@Override
public InputRow nextRow()
{
final InputRow inputRow = rowYielder.get();
rowYielder = rowYielder.next(null);
return inputRow;
}
@Override
public Runnable commit()
{
return Runnables.getNoopRunnable();
}
@Override
public void close() throws IOException
{
rowYielder.close();
}
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.hadoop;
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
import com.google.inject.Inject;
import io.druid.indexer.path.UsedSegmentLister;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentListUsedAction;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
import java.io.IOException;
import java.util.List;
/**
*/
public class OverlordActionBasedUsedSegmentLister implements UsedSegmentLister
{
private final TaskToolbox toolbox;
@Inject
public OverlordActionBasedUsedSegmentLister(TaskToolbox toolbox)
{
this.toolbox = Preconditions.checkNotNull(toolbox, "null task toolbox");
}
@Override
public List<DataSegment> getUsedSegmentsForInterval(
String dataSource, Interval interval
) throws IOException
{
return toolbox
.getTaskActionClient()
.submit(new SegmentListUsedAction(dataSource, interval));
}
}

View File

@ -471,11 +471,15 @@ public class TaskSerdeTest
),
null,
null,
"blah"
"blah",
jsonMapper
);
final String json = jsonMapper.writeValueAsString(task);
final HadoopIndexTask task2 = (HadoopIndexTask) jsonMapper.readValue(json, Task.class);
InjectableValues inject = new InjectableValues.Std()
.addValue(ObjectMapper.class, jsonMapper);
final HadoopIndexTask task2 = (HadoopIndexTask) jsonMapper.reader(Task.class).with(inject).readValue(json);
Assert.assertEquals("foo", task.getDataSource());

View File

@ -74,6 +74,7 @@ import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.loading.StorageLocationConfig;
import io.druid.segment.realtime.firehose.IngestSegmentFirehose;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NumberedShardSpec;
import org.joda.time.Interval;
@ -461,8 +462,8 @@ public class IngestSegmentFirehoseFactoryTest
{
Assert.assertEquals(MAX_SHARD_NUMBER.longValue(), segmentSet.size());
Integer rowcount = 0;
try (final IngestSegmentFirehoseFactory.IngestSegmentFirehose firehose =
(IngestSegmentFirehoseFactory.IngestSegmentFirehose)
try (final IngestSegmentFirehose firehose =
(IngestSegmentFirehose)
factory.connect(rowParser)) {
while (firehose.hasMore()) {
InputRow row = firehose.nextRow();

View File

@ -0,0 +1,202 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.realtime.firehose;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.Yielder;
import com.metamx.common.guava.YieldingAccumulator;
import io.druid.data.input.Firehose;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.granularity.QueryGranularity;
import io.druid.query.filter.DimFilter;
import io.druid.query.select.EventHolder;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.StorageAdapter;
import io.druid.segment.column.Column;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.filter.Filters;
import io.druid.utils.Runnables;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
public class IngestSegmentFirehose implements Firehose
{
private volatile Yielder<InputRow> rowYielder;
public IngestSegmentFirehose(List<StorageAdapter> adapters, final List<String> dims, final List<String> metrics, final DimFilter dimFilter, final Interval interval, final QueryGranularity granularity)
{
Sequence<InputRow> rows = Sequences.concat(
Iterables.transform(
adapters, new Function<StorageAdapter, Sequence<InputRow>>()
{
@Nullable
@Override
public Sequence<InputRow> apply(StorageAdapter adapter)
{
return Sequences.concat(
Sequences.map(
adapter.makeCursors(
Filters.convertDimensionFilters(dimFilter),
interval,
granularity
), new Function<Cursor, Sequence<InputRow>>()
{
@Nullable
@Override
public Sequence<InputRow> apply(final Cursor cursor)
{
final LongColumnSelector timestampColumnSelector = cursor.makeLongColumnSelector(Column.TIME_COLUMN_NAME);
final Map<String, DimensionSelector> dimSelectors = Maps.newHashMap();
for (String dim : dims) {
final DimensionSelector dimSelector = cursor.makeDimensionSelector(dim, null);
// dimSelector is null if the dimension is not present
if (dimSelector != null) {
dimSelectors.put(dim, dimSelector);
}
}
final Map<String, ObjectColumnSelector> metSelectors = Maps.newHashMap();
for (String metric : metrics) {
final ObjectColumnSelector metricSelector = cursor.makeObjectColumnSelector(metric);
if (metricSelector != null) {
metSelectors.put(metric, metricSelector);
}
}
return Sequences.simple(
new Iterable<InputRow>()
{
@Override
public Iterator<InputRow> iterator()
{
return new Iterator<InputRow>()
{
@Override
public boolean hasNext()
{
return !cursor.isDone();
}
@Override
public InputRow next()
{
final Map<String, Object> theEvent = Maps.newLinkedHashMap();
final long timestamp = timestampColumnSelector.get();
theEvent.put(EventHolder.timestampKey, new DateTime(timestamp));
for (Map.Entry<String, DimensionSelector> dimSelector : dimSelectors.entrySet()) {
final String dim = dimSelector.getKey();
final DimensionSelector selector = dimSelector.getValue();
final IndexedInts vals = selector.getRow();
if (vals.size() == 1) {
final String dimVal = selector.lookupName(vals.get(0));
theEvent.put(dim, dimVal);
} else {
List<String> dimVals = Lists.newArrayList();
for (int i = 0; i < vals.size(); ++i) {
dimVals.add(selector.lookupName(vals.get(i)));
}
theEvent.put(dim, dimVals);
}
}
for (Map.Entry<String, ObjectColumnSelector> metSelector : metSelectors.entrySet()) {
final String metric = metSelector.getKey();
final ObjectColumnSelector selector = metSelector.getValue();
theEvent.put(metric, selector.get());
}
cursor.advance();
return new MapBasedInputRow(timestamp, dims, theEvent);
}
@Override
public void remove()
{
throw new UnsupportedOperationException("Remove Not Supported");
}
};
}
}
);
}
}
)
);
}
}
)
);
rowYielder = rows.toYielder(
null,
new YieldingAccumulator<InputRow, InputRow>()
{
@Override
public InputRow accumulate(InputRow accumulated, InputRow in)
{
yield();
return in;
}
}
);
}
@Override
public boolean hasMore()
{
return !rowYielder.isDone();
}
@Override
public InputRow nextRow()
{
final InputRow inputRow = rowYielder.get();
rowYielder = rowYielder.next(null);
return inputRow;
}
@Override
public Runnable commit()
{
return Runnables.getNoopRunnable();
}
@Override
public void close() throws IOException
{
rowYielder.close();
}
}

View File

@ -30,14 +30,24 @@ import com.google.inject.name.Names;
import com.metamx.common.logger.Logger;
import io.airlift.command.Arguments;
import io.airlift.command.Command;
import io.druid.guice.LazySingleton;
import io.druid.indexer.HadoopDruidDetermineConfigurationJob;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.HadoopDruidIndexerJob;
import io.druid.indexer.HadoopIngestionSpec;
import io.druid.indexer.JobHelper;
import io.druid.indexer.Jobby;
import io.druid.indexer.MetadataStorageUpdaterJobHandler;
import io.druid.indexer.hadoop.DatasourceIngestionSpec;
import io.druid.indexer.path.DatasourcePathSpec;
import io.druid.indexer.path.MetadataStoreBasedUsedSegmentLister;
import io.druid.indexer.path.MultiplePathSpec;
import io.druid.indexer.path.PathSpec;
import io.druid.indexer.updater.MetadataStorageUpdaterJobSpec;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import io.druid.metadata.MetadataStorageConnectorConfig;
import io.druid.metadata.MetadataStorageTablesConfig;
import java.io.File;
import java.net.URI;
@ -84,6 +94,10 @@ public class CliInternalHadoopIndexer extends GuiceRunnable
binder.bind(new TypeLiteral<Supplier<MetadataStorageConnectorConfig>>() {})
.toInstance(metadataSpec);
binder.bind(MetadataStorageTablesConfig.class).toInstance(metadataSpec.getMetadataStorageTablesConfig());
binder.bind(IndexerMetadataStorageCoordinator.class).to(IndexerSQLMetadataStorageCoordinator.class).in(
LazySingleton.class
);
}
}
);
@ -95,11 +109,23 @@ public class CliInternalHadoopIndexer extends GuiceRunnable
try {
Injector injector = makeInjector();
MetadataStorageUpdaterJobSpec metadataSpec = getHadoopDruidIndexerConfig().getSchema().getIOConfig().getMetadataUpdateSpec();
config = getHadoopDruidIndexerConfig();
MetadataStorageUpdaterJobSpec metadataSpec = config.getSchema().getIOConfig().getMetadataUpdateSpec();
// override metadata storage type based on HadoopIOConfig
Preconditions.checkNotNull(metadataSpec.getType(), "type in metadataUpdateSpec must not be null");
injector.getInstance(Properties.class).setProperty("druid.metadata.storage.type", metadataSpec.getType());
config = HadoopDruidIndexerConfig.fromSpec(
HadoopIngestionSpec.updateSegmentListIfDatasourcePathSpecIsUsed(
config.getSchema(),
HadoopDruidIndexerConfig.jsonMapper,
new MetadataStoreBasedUsedSegmentLister(
injector.getInstance(IndexerMetadataStorageCoordinator.class)
)
)
);
List<Jobby> jobs = Lists.newArrayList();
jobs.add(new HadoopDruidDetermineConfigurationJob(config));
jobs.add(new HadoopDruidIndexerJob(config, injector.getInstance(MetadataStorageUpdaterJobHandler.class)));