Druid Hadoop InputFormat and pathSpec

Conflicts:
	indexing-hadoop/src/main/java/io/druid/indexer/path/PathSpec.java
	indexing-service/pom.xml
This commit is contained in:
Himanshu Gupta 2015-06-25 16:10:28 -05:00
parent f1d309a671
commit 1ae56f139b
18 changed files with 1567 additions and 89 deletions

View File

@ -35,6 +35,7 @@ import io.druid.collections.StupidPool;
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.data.input.Rows;
import io.druid.indexer.hadoop.SegmentInputRow;
import io.druid.offheap.OffheapBufferPool;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.IndexIO;
@ -235,6 +236,7 @@ public class IndexGeneratorJob implements Jobby
private static final HashFunction hashFunction = Hashing.murmur3_128();
private AggregatorFactory[] aggregators;
private AggregatorFactory[] combiningAggs;
@Override
protected void setup(Context context)
@ -242,6 +244,10 @@ public class IndexGeneratorJob implements Jobby
{
super.setup(context);
aggregators = config.getSchema().getDataSchema().getAggregators();
combiningAggs = new AggregatorFactory[aggregators.length];
for (int i = 0; i < aggregators.length; ++i) {
combiningAggs[i] = aggregators[i].getCombiningFactory();
}
}
@Override
@ -268,6 +274,14 @@ public class IndexGeneratorJob implements Jobby
)
).asBytes();
// type SegmentInputRow serves as a marker that these InputRow instances have already been combined
// and they contain the columns as they show up in the segment after ingestion, not what you would see in raw
// data
byte[] serializedInputRow = inputRow instanceof SegmentInputRow ?
InputRowSerde.toBytes(inputRow, combiningAggs)
:
InputRowSerde.toBytes(inputRow, aggregators);
context.write(
new SortableBytes(
bucket.get().toGroupKey(),
@ -277,7 +291,7 @@ public class IndexGeneratorJob implements Jobby
.put(hashedDimensions)
.array()
).toBytesWritable(),
new BytesWritable(InputRowSerde.toBytes(inputRow, aggregators))
new BytesWritable(serializedInputRow)
);
}
}

View File

@ -17,6 +17,7 @@
package io.druid.indexer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
@ -28,6 +29,7 @@ import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.RetryUtils;
import com.metamx.common.logger.Logger;
import io.druid.indexer.updater.HadoopDruidConverterConfig;
import io.druid.segment.ProgressIndicator;
import io.druid.segment.SegmentUtils;
import io.druid.timeline.DataSegment;
@ -56,8 +58,10 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
@ -579,6 +583,38 @@ public class JobHelper
return zipPusher.push();
}
public static URI getURIFromSegment(DataSegment dataSegment)
{
// There is no good way around this...
// TODO: add getURI() to URIDataPuller
final Map<String, Object> loadSpec = dataSegment.getLoadSpec();
final String type = loadSpec.get("type").toString();
final URI segmentLocURI;
if ("s3_zip".equals(type)) {
segmentLocURI = URI.create(String.format("s3n://%s/%s", loadSpec.get("bucket"), loadSpec.get("key")));
} else if ("hdfs".equals(type)) {
segmentLocURI = URI.create(loadSpec.get("path").toString());
} else if ("local".equals(type)) {
try {
segmentLocURI = new URI("file", null, loadSpec.get("path").toString(), null, null);
}
catch (URISyntaxException e) {
throw new ISE(e, "Unable to form simple file uri");
}
} else {
try {
throw new IAE(
"Cannot figure out loadSpec %s",
HadoopDruidConverterConfig.jsonMapper.writeValueAsString(loadSpec)
);
}
catch (JsonProcessingException e) {
throw new ISE("Cannot write Map with json mapper");
}
}
return segmentLocURI;
}
public static ProgressIndicator progressIndicatorForContext(
final TaskAttemptContext context
)

View File

@ -0,0 +1,160 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer.hadoop;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import io.druid.granularity.QueryGranularity;
import io.druid.query.filter.DimFilter;
import org.joda.time.Interval;
import java.util.List;
public class DatasourceIngestionSpec
{
private final String dataSource;
private final Interval interval;
private final DimFilter filter;
private final QueryGranularity granularity;
private final List<String> dimensions;
private final List<String> metrics;
@JsonCreator
public DatasourceIngestionSpec(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("filter") DimFilter filter,
@JsonProperty("granularity") QueryGranularity granularity,
@JsonProperty("dimensions") List<String> dimensions,
@JsonProperty("metrics") List<String> metrics
)
{
this.dataSource = Preconditions.checkNotNull(dataSource, "null dataSource");
this.interval = Preconditions.checkNotNull(interval, "null interval");
this.filter = filter;
this.granularity = granularity == null ? QueryGranularity.NONE : granularity;
this.dimensions = dimensions;
this.metrics = metrics;
}
@JsonProperty
public String getDataSource()
{
return dataSource;
}
@JsonProperty
public Interval getInterval()
{
return interval;
}
@JsonProperty
public DimFilter getFilter()
{
return filter;
}
@JsonProperty
public QueryGranularity getGranularity()
{
return granularity;
}
@JsonProperty
public List<String> getDimensions()
{
return dimensions;
}
@JsonProperty
public List<String> getMetrics()
{
return metrics;
}
public DatasourceIngestionSpec withDimensions(List<String> dimensions)
{
return new DatasourceIngestionSpec(dataSource, interval, filter, granularity, dimensions, metrics);
}
public DatasourceIngestionSpec withMetrics(List<String> metrics)
{
return new DatasourceIngestionSpec(dataSource, interval, filter, granularity, dimensions, metrics);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DatasourceIngestionSpec that = (DatasourceIngestionSpec) o;
if (!dataSource.equals(that.dataSource)) {
return false;
}
if (!interval.equals(that.interval)) {
return false;
}
if (filter != null ? !filter.equals(that.filter) : that.filter != null) {
return false;
}
if (!granularity.equals(that.granularity)) {
return false;
}
if (dimensions != null ? !dimensions.equals(that.dimensions) : that.dimensions != null) {
return false;
}
return !(metrics != null ? !metrics.equals(that.metrics) : that.metrics != null);
}
@Override
public int hashCode()
{
int result = dataSource.hashCode();
result = 31 * result + interval.hashCode();
result = 31 * result + (filter != null ? filter.hashCode() : 0);
result = 31 * result + granularity.hashCode();
result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0);
result = 31 * result + (metrics != null ? metrics.hashCode() : 0);
return result;
}
@Override
public String toString()
{
return "DatasourceIngestionSpec{" +
"dataSource='" + dataSource + '\'' +
", interval=" + interval +
", filter=" + filter +
", granularity=" + granularity +
", dimensions=" + dimensions +
", metrics=" + metrics +
'}';
}
}

View File

@ -0,0 +1,121 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer.hadoop;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.data.input.InputRow;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.timeline.DataSegment;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
public class DatasourceInputFormat extends InputFormat<NullWritable, InputRow>
{
private static final Logger logger = new Logger(DatasourceInputFormat.class);
public static final String CONF_INPUT_SEGMENTS = "druid.segments";
public static final String CONF_DRUID_SCHEMA = "druid.datasource.schema";
public static final String CONF_MAX_SPLIT_SIZE = "druid.datasource.split.max.size";
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException
{
Configuration conf = context.getConfiguration();
String segmentsStr = Preconditions.checkNotNull(conf.get(CONF_INPUT_SEGMENTS), "No segments found to read");
List<DataSegment> segments = HadoopDruidIndexerConfig.jsonMapper.readValue(
segmentsStr,
new TypeReference<List<DataSegment>>()
{
}
);
if (segments == null || segments.size() == 0) {
throw new ISE("No segments found to read");
}
logger.info("segments to read [%s]", segmentsStr);
long maxSize = conf.getLong(CONF_MAX_SPLIT_SIZE, 0);
if (maxSize > 0) {
//combining is to happen, let us sort the segments list by size so that they
//are combined appropriately
Collections.sort(
segments,
new Comparator<DataSegment>()
{
@Override
public int compare(DataSegment s1, DataSegment s2)
{
return Long.compare(s1.getSize(), s2.getSize());
}
}
);
}
List<InputSplit> splits = Lists.newArrayList();
List<DataSegment> list = new ArrayList<>();
long size = 0;
for (DataSegment segment : segments) {
if (size + segment.getSize() > maxSize && size > 0) {
splits.add(new DatasourceInputSplit(list));
list = Lists.newArrayList();
size = 0;
}
list.add(segment);
size += segment.getSize();
}
if (list.size() > 0) {
splits.add(new DatasourceInputSplit(list));
}
logger.info("Number of splits [%d]", splits.size());
return splits;
}
@Override
public RecordReader<NullWritable, InputRow> createRecordReader(
InputSplit split,
TaskAttemptContext context
) throws IOException, InterruptedException
{
return new DatasourceRecordReader();
}
}

View File

@ -0,0 +1,88 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer.hadoop;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Preconditions;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.timeline.DataSegment;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import javax.validation.constraints.NotNull;
public class DatasourceInputSplit extends InputSplit implements Writable
{
private List<DataSegment> segments = null;
//required for deserialization
public DatasourceInputSplit()
{
}
public DatasourceInputSplit(@NotNull List<DataSegment> segments)
{
Preconditions.checkArgument(segments != null && segments.size() > 0, "no segments");
this.segments = segments;
}
@Override
public long getLength() throws IOException, InterruptedException
{
long size = 0;
for (DataSegment segment : segments) {
size += segment.getSize();
}
return size;
}
@Override
public String[] getLocations() throws IOException, InterruptedException
{
return new String[]{};
}
public List<DataSegment> getSegments()
{
return segments;
}
@Override
public void write(DataOutput out) throws IOException
{
out.writeUTF(HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(segments));
}
@Override
public void readFields(DataInput in) throws IOException
{
segments = HadoopDruidIndexerConfig.jsonMapper.readValue(
in.readUTF(),
new TypeReference<List<DataSegment>>()
{
}
);
}
}

View File

@ -0,0 +1,194 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer.hadoop;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.io.Closeables;
import com.google.common.io.Files;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.MapBasedRow;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.JobHelper;
import io.druid.segment.IndexIO;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexStorageAdapter;
import io.druid.segment.StorageAdapter;
import io.druid.segment.realtime.firehose.IngestSegmentFirehose;
import io.druid.timeline.DataSegment;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.File;
import java.io.IOException;
import java.util.List;
public class DatasourceRecordReader extends RecordReader<NullWritable, InputRow>
{
private static final Logger logger = new Logger(DatasourceRecordReader.class);
private DatasourceIngestionSpec spec;
private IngestSegmentFirehose firehose;
private int rowNum;
private MapBasedRow currRow;
private List<QueryableIndex> indexes = Lists.newArrayList();
private List<File> tmpSegmentDirs = Lists.newArrayList();
private int numRows;
@Override
public void initialize(InputSplit split, final TaskAttemptContext context) throws IOException, InterruptedException
{
spec = readAndVerifyDatasourceIngestionSpec(context.getConfiguration(), HadoopDruidIndexerConfig.jsonMapper);
List<DataSegment> segments = ((DatasourceInputSplit) split).getSegments();
List<StorageAdapter> adapters = Lists.transform(
segments,
new Function<DataSegment, StorageAdapter>()
{
@Override
public StorageAdapter apply(DataSegment segment)
{
try {
logger.info("Getting storage path for segment [%s]", segment.getIdentifier());
Path path = new Path(JobHelper.getURIFromSegment(segment));
logger.info("Fetch segment files from [%s]", path);
File dir = Files.createTempDir();
tmpSegmentDirs.add(dir);
logger.info("Locally storing fetched segment at [%s]", dir);
JobHelper.unzipNoGuava(path, context.getConfiguration(), dir, context);
logger.info("finished fetching segment files");
QueryableIndex index = IndexIO.loadIndex(dir);
indexes.add(index);
numRows += index.getNumRows();
return new QueryableIndexStorageAdapter(index);
}
catch (IOException ex) {
throw Throwables.propagate(ex);
}
}
}
);
firehose = new IngestSegmentFirehose(
adapters,
spec.getDimensions(),
spec.getMetrics(),
spec.getFilter(),
spec.getInterval(),
spec.getGranularity()
);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException
{
if (firehose.hasMore()) {
currRow = (MapBasedRow) firehose.nextRow();
rowNum++;
return true;
} else {
return false;
}
}
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException
{
return NullWritable.get();
}
@Override
public InputRow getCurrentValue() throws IOException, InterruptedException
{
return new SegmentInputRow(
new MapBasedInputRow(
currRow.getTimestamp(),
spec.getDimensions(),
currRow.getEvent()
)
);
}
@Override
public float getProgress() throws IOException, InterruptedException
{
if (numRows > 0) {
return (rowNum * 1.0f) / numRows;
} else {
return 0;
}
}
@Override
public void close() throws IOException
{
Closeables.close(firehose, true);
for (QueryableIndex qi : indexes) {
Closeables.close(qi, true);
}
for (File dir : tmpSegmentDirs) {
FileUtils.deleteDirectory(dir);
}
}
private DatasourceIngestionSpec readAndVerifyDatasourceIngestionSpec(Configuration config, ObjectMapper jsonMapper)
{
try {
String schema = Preconditions.checkNotNull(config.get(DatasourceInputFormat.CONF_DRUID_SCHEMA), "null schema");
logger.info("load schema [%s]", schema);
DatasourceIngestionSpec spec = jsonMapper.readValue(schema, DatasourceIngestionSpec.class);
if (spec.getDimensions() == null || spec.getDimensions().size() == 0) {
throw new ISE("load schema does not have dimensions");
}
if (spec.getMetrics() == null || spec.getMetrics().size() == 0) {
throw new ISE("load schema does not have metrics");
}
return spec;
}
catch (IOException ex) {
throw new RuntimeException("couldn't load segment load spec", ex);
}
}
}

View File

@ -0,0 +1,97 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer.hadoop;
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import org.joda.time.DateTime;
import java.util.List;
/**
* SegmentInputRow serves as a marker that these InputRow instances have already been combined
* and they contain the columns as they show up in the segment after ingestion, not what you would see in raw
* data.
* It must only be used to represent such InputRows.
*/
public class SegmentInputRow implements InputRow
{
private final InputRow delegate;
public SegmentInputRow(InputRow delegate){
this.delegate = delegate;
}
@Override
public List<String> getDimensions()
{
return delegate.getDimensions();
}
@Override
public long getTimestampFromEpoch()
{
return delegate.getTimestampFromEpoch();
}
@Override
public DateTime getTimestamp()
{
return delegate.getTimestamp();
}
@Override
public List<String> getDimension(String dimension)
{
return delegate.getDimension(dimension);
}
@Override
public Object getRaw(String dimension)
{
return delegate.getRaw(dimension);
}
@Override
public float getFloatMetric(String metric)
{
return delegate.getFloatMetric(metric);
}
@Override
public long getLongMetric(String metric)
{
return delegate.getLongMetric(metric);
}
@Override
public int compareTo(Row row)
{
return delegate.compareTo(row);
}
@Override
public String toString()
{
return "SegmentInputRow{" +
"delegate=" + delegate +
'}';
}
}

View File

@ -0,0 +1,184 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer.path;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.common.logger.Logger;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.hadoop.DatasourceIngestionSpec;
import io.druid.indexer.hadoop.DatasourceInputFormat;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.timeline.DataSegment;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import java.io.IOException;
import java.util.List;
import java.util.Set;
public class DatasourcePathSpec implements PathSpec
{
private static final Logger logger = new Logger(DatasourcePathSpec.class);
private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
private ObjectMapper mapper;
private DatasourceIngestionSpec ingestionSpec;
private long maxSplitSize;
public DatasourcePathSpec(
@JacksonInject IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator,
@JacksonInject ObjectMapper mapper,
@JsonProperty("ingestionSpec") DatasourceIngestionSpec spec,
@JsonProperty("maxSplitSize") Long maxSplitSize
)
{
this.indexerMetadataStorageCoordinator = Preconditions.checkNotNull(indexerMetadataStorageCoordinator, "null indexerMetadataStorageCoordinator");
this.mapper = Preconditions.checkNotNull(mapper, "null mapper");
this.ingestionSpec = Preconditions.checkNotNull(spec, "null ingestionSpec");
if(maxSplitSize == null) {
this.maxSplitSize = 0;
} else {
this.maxSplitSize = maxSplitSize.longValue();
}
}
@JsonProperty
public DatasourceIngestionSpec getIngestionSpec()
{
return ingestionSpec;
}
@JsonProperty
public long getMaxSplitSize()
{
return maxSplitSize;
}
@Override
public Job addInputPaths(
HadoopDruidIndexerConfig config, Job job
) throws IOException
{
final List<DataSegment> segments = indexerMetadataStorageCoordinator.getUsedSegmentsForInterval(
ingestionSpec.getDataSource(),
ingestionSpec.getInterval()
);
logger.info(
"Found total [%d] segments for [%s] in interval [%s]",
segments.size(),
ingestionSpec.getDataSource(),
ingestionSpec.getInterval()
);
if (ingestionSpec.getDimensions() == null) {
List<String> dims;
if (config.getParser().getParseSpec().getDimensionsSpec().hasCustomDimensions()) {
dims = config.getParser().getParseSpec().getDimensionsSpec().getDimensions();
} else {
Set<String> dimSet = Sets.newHashSet(
Iterables.concat(
Iterables.transform(
segments,
new Function<DataSegment, Iterable<String>>()
{
@Override
public Iterable<String> apply(DataSegment dataSegment)
{
return dataSegment.getDimensions();
}
}
)
)
);
dims = Lists.newArrayList(
Sets.difference(
dimSet,
config.getParser()
.getParseSpec()
.getDimensionsSpec()
.getDimensionExclusions()
)
);
}
ingestionSpec = ingestionSpec.withDimensions(dims);
}
if (ingestionSpec.getMetrics() == null) {
Set<String> metrics = Sets.newHashSet();
final AggregatorFactory[] cols = config.getSchema().getDataSchema().getAggregators();
if (cols != null) {
for (AggregatorFactory col : cols) {
metrics.add(col.getName());
}
}
ingestionSpec = ingestionSpec.withMetrics(Lists.newArrayList(metrics));
}
job.getConfiguration().set(DatasourceInputFormat.CONF_DRUID_SCHEMA, mapper.writeValueAsString(ingestionSpec));
job.getConfiguration().set(DatasourceInputFormat.CONF_INPUT_SEGMENTS, mapper.writeValueAsString(segments));
job.getConfiguration().set(DatasourceInputFormat.CONF_MAX_SPLIT_SIZE, String.valueOf(maxSplitSize));
return job;
}
public Class<? extends InputFormat> getInputFormat()
{
return DatasourceInputFormat.class;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DatasourcePathSpec that = (DatasourcePathSpec) o;
if (maxSplitSize != that.maxSplitSize) {
return false;
}
return ingestionSpec.equals(that.ingestionSpec);
}
@Override
public int hashCode()
{
int result = ingestionSpec.hashCode();
result = 31 * result + (int) (maxSplitSize ^ (maxSplitSize >>> 32));
return result;
}
}

View File

@ -32,7 +32,8 @@ import java.io.IOException;
@JsonSubTypes(value={
@JsonSubTypes.Type(name="granular_unprocessed", value=GranularUnprocessedPathSpec.class),
@JsonSubTypes.Type(name="granularity", value=GranularityPathSpec.class),
@JsonSubTypes.Type(name="static", value=StaticPathSpec.class)
@JsonSubTypes.Type(name="static", value=StaticPathSpec.class),
@JsonSubTypes.Type(name="dataSource", value=DatasourcePathSpec.class)
})
public interface PathSpec
{

View File

@ -32,6 +32,7 @@ import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.indexer.JobHelper;
import io.druid.indexer.hadoop.DatasourceInputSplit;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.timeline.DataSegment;
@ -42,7 +43,6 @@ import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobPriority;
import org.apache.hadoop.mapreduce.InputFormat;
@ -62,15 +62,10 @@ import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.util.Progressable;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
@ -470,10 +465,10 @@ public class HadoopConverterJob
) throws IOException, InterruptedException
{
final InputSplit split = context.getInputSplit();
if (!(split instanceof DataSegmentSplit)) {
if (!(split instanceof DatasourceInputSplit)) {
throw new IAE(
"Unexpected split type. Expected [%s] was [%s]",
DataSegmentSplit.class.getCanonicalName(),
DatasourceInputSplit.class.getCanonicalName(),
split.getClass().getCanonicalName()
);
}
@ -481,13 +476,13 @@ public class HadoopConverterJob
final String tmpDirLoc = context.getConfiguration().get(TMP_FILE_LOC_KEY);
final File tmpDir = Paths.get(tmpDirLoc).toFile();
final DataSegment segment = ((DataSegmentSplit) split).getDataSegment();
final DataSegment segment = Iterables.getOnlyElement(((DatasourceInputSplit) split).getSegments());
final HadoopDruidConverterConfig config = converterConfigFromConfiguration(context.getConfiguration());
context.setStatus("DOWNLOADING");
context.progress();
final Path inPath = new Path(getURIFromSegment(segment));
final Path inPath = new Path(JobHelper.getURIFromSegment(segment));
final File inDir = new File(tmpDir, "in");
if (inDir.exists() && !inDir.delete()) {
@ -559,38 +554,6 @@ public class HadoopConverterJob
context.getConfiguration().set(TMP_FILE_LOC_KEY, tmpFile.getAbsolutePath());
}
private static URI getURIFromSegment(DataSegment dataSegment)
{
// There is no good way around this...
// TODO: add getURI() to URIDataPuller
final Map<String, Object> loadSpec = dataSegment.getLoadSpec();
final String type = loadSpec.get("type").toString();
final URI segmentLocURI;
if ("s3_zip".equals(type)) {
segmentLocURI = URI.create(String.format("s3n://%s/%s", loadSpec.get("bucket"), loadSpec.get("key")));
} else if ("hdfs".equals(type)) {
segmentLocURI = URI.create(loadSpec.get("path").toString());
} else if ("local".equals(type)) {
try {
segmentLocURI = new URI("file", null, loadSpec.get("path").toString(), null, null);
}
catch (URISyntaxException e) {
throw new ISE(e, "Unable to form simple file uri");
}
} else {
try {
throw new IAE(
"Cannot figure out loadSpec %s",
HadoopDruidConverterConfig.jsonMapper.writeValueAsString(loadSpec)
);
}
catch (JsonProcessingException e) {
throw new ISE("Cannot write Map with json mapper");
}
}
return segmentLocURI;
}
@Override
protected void cleanup(
Context context
@ -604,50 +567,6 @@ public class HadoopConverterJob
}
}
public static class DataSegmentSplit extends InputSplit implements Writable
{
private DataSegment dataSegment = null;
public DataSegmentSplit()
{
// For serialization purposes
}
public DataSegmentSplit(@NotNull DataSegment dataSegment)
{
this.dataSegment = Preconditions.checkNotNull(dataSegment, "dataSegment");
}
@Override
public long getLength() throws IOException, InterruptedException
{
return dataSegment.getSize();
}
@Override
public String[] getLocations() throws IOException, InterruptedException
{
return new String[]{};
}
protected DataSegment getDataSegment()
{
return dataSegment;
}
@Override
public void write(DataOutput out) throws IOException
{
out.write(HadoopDruidConverterConfig.jsonMapper.writeValueAsString(dataSegment).getBytes());
}
@Override
public void readFields(DataInput in) throws IOException
{
dataSegment = HadoopDruidConverterConfig.jsonMapper.readValue(in.readLine(), DataSegment.class);
}
}
public static class ConfigInputFormat extends InputFormat<String, String>
{
@Override
@ -665,7 +584,7 @@ public class HadoopConverterJob
@Override
public InputSplit apply(DataSegment input)
{
return new DataSegmentSplit(input);
return new DatasourceInputSplit(ImmutableList.of(input));
}
}
);

View File

@ -0,0 +1,52 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer.hadoop;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import io.druid.granularity.QueryGranularity;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.filter.SelectorDimFilter;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
/**
*/
public class DatasourceIngestionSpecTest
{
@Test
public void testSerde() throws Exception
{
DatasourceIngestionSpec expected = new DatasourceIngestionSpec(
"test",
Interval.parse("2014/2015"),
new SelectorDimFilter("dim", "value"),
QueryGranularity.DAY,
Lists.newArrayList("d1", "d2"),
Lists.newArrayList("m1", "m2", "m3")
);
ObjectMapper jsonMapper = new DefaultObjectMapper();
DatasourceIngestionSpec actual = jsonMapper.readValue(jsonMapper.writeValueAsString(expected), DatasourceIngestionSpec.class);
Assert.assertEquals(expected, actual);
}
}

View File

@ -0,0 +1,154 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer.hadoop;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
/**
*/
public class DatasourceInputFormatTest
{
private List<DataSegment> segments;
private Configuration config;
private JobContext context;
@Before
public void setUp() throws Exception
{
segments = ImmutableList.of(
new DataSegment(
"test1",
Interval.parse("2000/3000"),
"ver",
ImmutableMap.<String, Object>of(
"type", "local",
"path", "/tmp/index1.zip"
),
ImmutableList.of("host"),
ImmutableList.of("visited_sum", "unique_hosts"),
new NoneShardSpec(),
9,
2
),
new DataSegment(
"test2",
Interval.parse("2050/3000"),
"ver",
ImmutableMap.<String, Object>of(
"type", "hdfs",
"path", "/tmp/index2.zip"
),
ImmutableList.of("host"),
ImmutableList.of("visited_sum", "unique_hosts"),
new NoneShardSpec(),
9,
11
),
new DataSegment(
"test3",
Interval.parse("2030/3000"),
"ver",
ImmutableMap.<String, Object>of(
"type", "hdfs",
"path", "/tmp/index3.zip"
),
ImmutableList.of("host"),
ImmutableList.of("visited_sum", "unique_hosts"),
new NoneShardSpec(),
9,
4
)
);
config = new Configuration();
config.set(
DatasourceInputFormat.CONF_INPUT_SEGMENTS,
new DefaultObjectMapper().writeValueAsString(segments)
);
context = EasyMock.createMock(JobContext.class);
EasyMock.expect(context.getConfiguration()).andReturn(config);
EasyMock.replay(context);
}
@Test
public void testGetSplitsNoCombining() throws Exception
{
List<InputSplit> splits = new DatasourceInputFormat().getSplits(context);
Assert.assertEquals(segments.size(), splits.size());
for (int i = 0; i < segments.size(); i++) {
Assert.assertEquals(segments.get(i), ((DatasourceInputSplit) splits.get(i)).getSegments().get(0));
}
}
@Test
public void testGetSplitsAllCombined() throws Exception
{
config.set(DatasourceInputFormat.CONF_MAX_SPLIT_SIZE, "999999");
List<InputSplit> splits = new DatasourceInputFormat().getSplits(context);
Assert.assertEquals(1, splits.size());
Assert.assertEquals(
Sets.newHashSet(segments),
Sets.newHashSet((((DatasourceInputSplit) splits.get(0)).getSegments()))
);
}
@Test
public void testGetSplitsCombineInTwo() throws Exception
{
config.set(DatasourceInputFormat.CONF_MAX_SPLIT_SIZE, "6");
List<InputSplit> splits = new DatasourceInputFormat().getSplits(context);
Assert.assertEquals(2, splits.size());
Assert.assertEquals(
Sets.newHashSet(segments.get(0), segments.get(2)),
Sets.newHashSet((((DatasourceInputSplit) splits.get(0)).getSegments()))
);
Assert.assertEquals(
Sets.newHashSet(segments.get(1)),
Sets.newHashSet((((DatasourceInputSplit) splits.get(1)).getSegments()))
);
}
@Test
public void testGetRecordReader() throws Exception
{
Assert.assertTrue(new DatasourceInputFormat().createRecordReader(null, null) instanceof DatasourceRecordReader);
}
}

View File

@ -0,0 +1,71 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer.hadoop;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.io.DataInput;
/**
*/
public class DatasourceInputSplitTest
{
@Test
public void testSerde() throws Exception
{
DatasourceInputSplit expected = new DatasourceInputSplit(
Lists.newArrayList(
new DataSegment(
"test",
Interval.parse("2000/3000"),
"ver",
ImmutableMap.<String, Object>of(
"type", "local",
"path", "/tmp/index.zip"
),
ImmutableList.of("host"),
ImmutableList.of("visited_sum", "unique_hosts"),
new NoneShardSpec(),
9,
12334
)
)
);
ByteArrayDataOutput out = ByteStreams.newDataOutput();
expected.write(out);
DataInput in = ByteStreams.newDataInput(out.toByteArray());
DatasourceInputSplit actual = new DatasourceInputSplit();
actual.readFields(in);
Assert.assertEquals(expected.getSegments(), actual.getSegments());
Assert.assertEquals(12334, actual.getLength());
}
}

View File

@ -0,0 +1,136 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer.hadoop;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.druid.data.input.InputRow;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.timeline.DataSegment;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
import java.util.Map;
/**
*/
public class DatasourceRecordReaderTest
{
@Test
public void testSanity() throws Exception
{
DataSegment segment = new DefaultObjectMapper()
.readValue(this.getClass().getClassLoader().getResource("test-segment/descriptor.json"), DataSegment.class)
.withLoadSpec(
ImmutableMap.<String, Object>of(
"type",
"local",
"path",
this.getClass().getClassLoader().getResource("test-segment/index.zip").getPath()
)
);
InputSplit split = new DatasourceInputSplit(Lists.newArrayList(segment));
Configuration config = new Configuration();
config.set(
DatasourceInputFormat.CONF_DRUID_SCHEMA,
HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(
new DatasourceIngestionSpec(
segment.getDataSource(),
segment.getInterval(),
null,
null,
segment.getDimensions(),
segment.getMetrics()
)
)
);
TaskAttemptContext context = EasyMock.createNiceMock(TaskAttemptContext.class);
EasyMock.expect(context.getConfiguration()).andReturn(config).anyTimes();
EasyMock.replay(context);
DatasourceRecordReader rr = new DatasourceRecordReader();
rr.initialize(split, context);
Assert.assertEquals(0, rr.getProgress(), 0.0001);
List<InputRow> rows = Lists.newArrayList();
while(rr.nextKeyValue()) {
rows.add(rr.getCurrentValue());
}
verifyRows(rows);
Assert.assertEquals(1, rr.getProgress(), 0.0001);
rr.close();
}
private void verifyRows(List<InputRow> actualRows)
{
List<ImmutableMap<String, Object>> expectedRows = ImmutableList.of(
ImmutableMap.<String, Object>of(
"time", DateTime.parse("2014-10-22T00:00:00.000Z"),
"host", ImmutableList.of("a.example.com"),
"visited_sum", 100L,
"unique_hosts", 1.0d
),
ImmutableMap.<String, Object>of(
"time", DateTime.parse("2014-10-22T01:00:00.000Z"),
"host", ImmutableList.of("b.example.com"),
"visited_sum", 150L,
"unique_hosts", 1.0d
),
ImmutableMap.<String, Object>of(
"time", DateTime.parse("2014-10-22T02:00:00.000Z"),
"host", ImmutableList.of("c.example.com"),
"visited_sum", 200L,
"unique_hosts", 1.0d
)
);
Assert.assertEquals(expectedRows.size(), actualRows.size());
for (int i = 0; i < expectedRows.size(); i++) {
Map<String, Object> expected = expectedRows.get(i);
InputRow actual = actualRows.get(i);
Assert.assertEquals(ImmutableList.of("host"), actual.getDimensions());
Assert.assertEquals(expected.get("time"), actual.getTimestamp());
Assert.assertEquals(expected.get("host"), actual.getDimension("host"));
Assert.assertEquals(expected.get("visited_sum"), actual.getLongMetric("visited_sum"));
Assert.assertEquals(
(Double) expected.get("unique_hosts"),
(Double) HyperUniquesAggregatorFactory.estimateCardinality(actual.getRaw("unique_hosts")),
0.001
);
}
}
}

View File

@ -0,0 +1,229 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexer.path;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.metamx.common.Granularity;
import io.druid.data.input.impl.CSVParseSpec;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.guice.GuiceInjectors;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.annotations.Self;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.HadoopIOConfig;
import io.druid.indexer.HadoopIngestionSpec;
import io.druid.indexer.HadoopTuningConfig;
import io.druid.indexer.hadoop.DatasourceIngestionSpec;
import io.druid.indexer.hadoop.DatasourceInputFormat;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.initialization.Initialization;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.server.DruidNode;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
/**
*/
public class DatasourcePathSpecTest
{
private DatasourceIngestionSpec ingestionSpec;
public DatasourcePathSpecTest()
{
this.ingestionSpec = new DatasourceIngestionSpec(
"test",
Interval.parse("2000/3000"),
null,
null,
null,
null
);
}
@Test
public void testSerde() throws Exception
{
final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator = EasyMock.createMock(
IndexerMetadataStorageCoordinator.class
);
Injector injector = Initialization.makeInjectorWithModules(
GuiceInjectors.makeStartupInjector(),
ImmutableList.<Module>of(
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bind(IndexerMetadataStorageCoordinator.class).toInstance(indexerMetadataStorageCoordinator);
JsonConfigProvider.bindInstance(
binder, Key.get(DruidNode.class, Self.class), new DruidNode("dummy-node", null, null)
);
}
}
)
);
ObjectMapper jsonMapper = injector.getInstance(ObjectMapper.class);
DatasourcePathSpec expected = new DatasourcePathSpec(
indexerMetadataStorageCoordinator,
jsonMapper,
ingestionSpec,
Long.valueOf(10)
);
PathSpec actual = jsonMapper.readValue(jsonMapper.writeValueAsString(expected), PathSpec.class);
Assert.assertEquals(expected, actual);
expected = new DatasourcePathSpec(
indexerMetadataStorageCoordinator,
jsonMapper,
ingestionSpec,
null
);
actual = jsonMapper.readValue(jsonMapper.writeValueAsString(expected), PathSpec.class);
Assert.assertEquals(expected, actual);
}
@Test
public void testAddInputPaths() throws Exception
{
HadoopDruidIndexerConfig hadoopIndexerConfig = new HadoopDruidIndexerConfig(
new HadoopIngestionSpec(
new DataSchema(
ingestionSpec.getDataSource(),
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(null, null, null),
null,
ImmutableList.of("timestamp", "host", "visited")
)
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("visited_sum", "visited")
},
new UniformGranularitySpec(
Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(Interval.parse("2000/3000"))
)
),
new HadoopIOConfig(
ImmutableMap.<String, Object>of(
"paths",
"/tmp/dummy",
"type",
"static"
),
null,
"/tmp/dummy"
),
HadoopTuningConfig.makeDefaultTuningConfig().withWorkingPath("/tmp/work").withVersion("ver")
)
);
List<DataSegment> segments = ImmutableList.of(
new DataSegment(
ingestionSpec.getDataSource(),
Interval.parse("2000/3000"),
"ver",
ImmutableMap.<String, Object>of(
"type", "local",
"path", "/tmp/index.zip"
),
ImmutableList.of("product"),
ImmutableList.of("visited_sum", "unique_hosts"),
new NoneShardSpec(),
9,
12334
),
new DataSegment(
ingestionSpec.getDataSource(),
Interval.parse("2050/3000"),
"ver",
ImmutableMap.<String, Object>of(
"type", "hdfs",
"path", "/tmp/index.zip"
),
ImmutableList.of("product"),
ImmutableList.of("visited_sum", "unique_hosts"),
new NoneShardSpec(),
9,
12335
)
);
IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator = EasyMock.createMock(IndexerMetadataStorageCoordinator.class);
EasyMock.expect(indexerMetadataStorageCoordinator.getUsedSegmentsForInterval(ingestionSpec.getDataSource(), ingestionSpec.getInterval())).andReturn(segments);
EasyMock.replay(indexerMetadataStorageCoordinator);
ObjectMapper mapper = new DefaultObjectMapper();
DatasourcePathSpec pathSpec = new DatasourcePathSpec(
indexerMetadataStorageCoordinator,
mapper,
ingestionSpec,
null
);
Configuration config = new Configuration();
Job job = EasyMock.createNiceMock(Job.class);
EasyMock.expect(job.getConfiguration()).andReturn(config).anyTimes();
EasyMock.replay(job);
pathSpec.addInputPaths(hadoopIndexerConfig, job);
List<DataSegment> actualSegments = mapper.readValue(
config.get(DatasourceInputFormat.CONF_INPUT_SEGMENTS),
new TypeReference<List<DataSegment>>()
{
}
);
Assert.assertEquals(segments, actualSegments);
DatasourceIngestionSpec actualIngestionSpec = mapper.readValue(config.get(DatasourceInputFormat.CONF_DRUID_SCHEMA), DatasourceIngestionSpec.class);
Assert.assertEquals(ingestionSpec
.withDimensions(ImmutableList.of("product"))
.withMetrics(ImmutableList.of("visited_sum")),
actualIngestionSpec);
}
}

View File

@ -0,0 +1,17 @@
{
"binaryVersion": 9,
"dataSource": "testds",
"dimensions": "host",
"identifier": "testds_2014-10-22T00:00:00.000Z_2014-10-23T00:00:00.000Z_2015-07-15T22:02:40.171Z",
"interval": "2014-10-22T00:00:00.000Z/2014-10-23T00:00:00.000Z",
"loadSpec": {
"path": "test-segment/index.zip",
"type": "local"
},
"metrics": "visited_sum,unique_hosts",
"shardSpec": {
"type": "none"
},
"size": 4096,
"version": "2015-07-15T22:02:40.171Z"
}

View File

@ -0,0 +1,5 @@
This is a test segment containing following columns.
__time, host, visited_sum, unique_hosts
2014-10-22T00:00:00.000Z, a.example.com, 100, HLL sketch of cardinality 1
2014-10-22T01:00:00.000Z, b.example.com, 150, HLL sketch of cardinality 1
2014-10-22T02:00:00.000Z, c.example.com, 200, HLL sketch of cardinality 1