add configurable spatial dimensions to hadoop indexer

This commit is contained in:
fjy 2013-05-14 17:38:05 -07:00
parent 5af188f18d
commit 803e8ff69e
9 changed files with 69 additions and 12 deletions

View File

@ -22,8 +22,10 @@ package com.metamx.druid.indexer.data;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.metamx.common.parsers.CSVParser; import com.metamx.common.parsers.CSVParser;
import com.metamx.common.parsers.Parser; import com.metamx.common.parsers.Parser;
import com.metamx.druid.index.v1.SpatialDimensionSchema;
import java.util.List; import java.util.List;
@ -33,11 +35,13 @@ public class CSVDataSpec implements DataSpec
{ {
private final List<String> columns; private final List<String> columns;
private final List<String> dimensions; private final List<String> dimensions;
private final List<SpatialDimensionSchema> spatialDimensions;
@JsonCreator @JsonCreator
public CSVDataSpec( public CSVDataSpec(
@JsonProperty("columns") List<String> columns, @JsonProperty("columns") List<String> columns,
@JsonProperty("dimensions") List<String> dimensions @JsonProperty("dimensions") List<String> dimensions,
@JsonProperty("spatialDimensions") List<SpatialDimensionSchema> spatialDimensions
) )
{ {
Preconditions.checkNotNull(columns, "columns"); Preconditions.checkNotNull(columns, "columns");
@ -47,6 +51,9 @@ public class CSVDataSpec implements DataSpec
this.columns = columns; this.columns = columns;
this.dimensions = dimensions; this.dimensions = dimensions;
this.spatialDimensions = (spatialDimensions == null)
? Lists.<SpatialDimensionSchema>newArrayList()
: spatialDimensions;
} }
@JsonProperty("columns") @JsonProperty("columns")
@ -62,6 +69,13 @@ public class CSVDataSpec implements DataSpec
return dimensions; return dimensions;
} }
@JsonProperty("spatialDimensions")
@Override
public List<SpatialDimensionSchema> getSpatialDimensions()
{
return spatialDimensions;
}
@Override @Override
public void verify(List<String> usedCols) public void verify(List<String> usedCols)
{ {

View File

@ -22,6 +22,7 @@ package com.metamx.druid.indexer.data;
import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.metamx.common.parsers.Parser; import com.metamx.common.parsers.Parser;
import com.metamx.druid.index.v1.SpatialDimensionSchema;
import java.util.List; import java.util.List;
@ -41,5 +42,7 @@ public interface DataSpec
public List<String> getDimensions(); public List<String> getDimensions();
public List<SpatialDimensionSchema> getSpatialDimensions();
public Parser<String, Object> getParser(); public Parser<String, Object> getParser();
} }

View File

@ -22,8 +22,10 @@ package com.metamx.druid.indexer.data;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.metamx.common.parsers.DelimitedParser; import com.metamx.common.parsers.DelimitedParser;
import com.metamx.common.parsers.Parser; import com.metamx.common.parsers.Parser;
import com.metamx.druid.index.v1.SpatialDimensionSchema;
import java.util.List; import java.util.List;
@ -34,12 +36,14 @@ public class DelimitedDataSpec implements DataSpec
private final String delimiter; private final String delimiter;
private final List<String> columns; private final List<String> columns;
private final List<String> dimensions; private final List<String> dimensions;
private final List<SpatialDimensionSchema> spatialDimensions;
@JsonCreator @JsonCreator
public DelimitedDataSpec( public DelimitedDataSpec(
@JsonProperty("delimiter") String delimiter, @JsonProperty("delimiter") String delimiter,
@JsonProperty("columns") List<String> columns, @JsonProperty("columns") List<String> columns,
@JsonProperty("dimensions") List<String> dimensions @JsonProperty("dimensions") List<String> dimensions,
@JsonProperty("spatialDimensions") List<SpatialDimensionSchema> spatialDimensions
) )
{ {
Preconditions.checkNotNull(columns); Preconditions.checkNotNull(columns);
@ -50,6 +54,9 @@ public class DelimitedDataSpec implements DataSpec
this.delimiter = (delimiter == null) ? DelimitedParser.DEFAULT_DELIMITER : delimiter; this.delimiter = (delimiter == null) ? DelimitedParser.DEFAULT_DELIMITER : delimiter;
this.columns = columns; this.columns = columns;
this.dimensions = dimensions; this.dimensions = dimensions;
this.spatialDimensions = (spatialDimensions == null)
? Lists.<SpatialDimensionSchema>newArrayList()
: spatialDimensions;
} }
@JsonProperty("delimiter") @JsonProperty("delimiter")
@ -71,6 +78,13 @@ public class DelimitedDataSpec implements DataSpec
return dimensions; return dimensions;
} }
@JsonProperty("spatialDimensions")
@Override
public List<SpatialDimensionSchema> getSpatialDimensions()
{
return spatialDimensions;
}
@Override @Override
public void verify(List<String> usedCols) public void verify(List<String> usedCols)
{ {

View File

@ -20,8 +20,10 @@
package com.metamx.druid.indexer.data; package com.metamx.druid.indexer.data;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Lists;
import com.metamx.common.parsers.JSONParser; import com.metamx.common.parsers.JSONParser;
import com.metamx.common.parsers.Parser; import com.metamx.common.parsers.Parser;
import com.metamx.druid.index.v1.SpatialDimensionSchema;
import java.util.List; import java.util.List;
@ -30,12 +32,17 @@ import java.util.List;
public class JSONDataSpec implements DataSpec public class JSONDataSpec implements DataSpec
{ {
private final List<String> dimensions; private final List<String> dimensions;
private final List<SpatialDimensionSchema> spatialDimensions;
public JSONDataSpec( public JSONDataSpec(
@JsonProperty("dimensions") List<String> dimensions @JsonProperty("dimensions") List<String> dimensions,
@JsonProperty("spatialDimensions") List<SpatialDimensionSchema> spatialDimensions
) )
{ {
this.dimensions = dimensions; this.dimensions = dimensions;
this.spatialDimensions = (spatialDimensions == null)
? Lists.<SpatialDimensionSchema>newArrayList()
: spatialDimensions;
} }
@JsonProperty("dimensions") @JsonProperty("dimensions")
@ -45,6 +52,13 @@ public class JSONDataSpec implements DataSpec
return dimensions; return dimensions;
} }
@JsonProperty("spatialDimensions")
@Override
public List<SpatialDimensionSchema> getSpatialDimensions()
{
return spatialDimensions;
}
@Override @Override
public void verify(List<String> usedCols) public void verify(List<String> usedCols)
{ {

View File

@ -22,6 +22,7 @@ package com.metamx.druid.indexer.data;
import com.fasterxml.jackson.annotation.JsonValue; import com.fasterxml.jackson.annotation.JsonValue;
import com.metamx.common.parsers.Parser; import com.metamx.common.parsers.Parser;
import com.metamx.common.parsers.ToLowerCaseParser; import com.metamx.common.parsers.ToLowerCaseParser;
import com.metamx.druid.index.v1.SpatialDimensionSchema;
import java.util.List; import java.util.List;
@ -56,6 +57,12 @@ public class ToLowercaseDataSpec implements DataSpec
return delegate.getDimensions(); return delegate.getDimensions();
} }
@Override
public List<SpatialDimensionSchema> getSpatialDimensions()
{
return delegate.getSpatialDimensions();
}
@Override @Override
public Parser<String, Object> getParser() public Parser<String, Object> getParser()
{ {

View File

@ -35,6 +35,7 @@ import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.QueryableIndex; import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.v1.IncrementalIndex; import com.metamx.druid.index.v1.IncrementalIndex;
import com.metamx.druid.index.v1.IncrementalIndexSchema;
import com.metamx.druid.index.v1.IndexIO; import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.index.v1.IndexMerger; import com.metamx.druid.index.v1.IndexMerger;
import com.metamx.druid.indexer.data.StringInputRowParser; import com.metamx.druid.indexer.data.StringInputRowParser;
@ -152,7 +153,8 @@ public class IndexGeneratorJob implements Jobby
} }
} }
public static List<DataSegment> getPublishedSegments(HadoopDruidIndexerConfig config) { public static List<DataSegment> getPublishedSegments(HadoopDruidIndexerConfig config)
{
final Configuration conf = new Configuration(); final Configuration conf = new Configuration();
final ObjectMapper jsonMapper = HadoopDruidIndexerConfig.jsonMapper; final ObjectMapper jsonMapper = HadoopDruidIndexerConfig.jsonMapper;
@ -182,7 +184,7 @@ public class IndexGeneratorJob implements Jobby
List<DataSegment> publishedSegments = publishedSegmentsBuilder.build(); List<DataSegment> publishedSegments = publishedSegmentsBuilder.build();
return publishedSegments; return publishedSegments;
} }
public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper<BytesWritable, Text> public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper<BytesWritable, Text>
@ -197,7 +199,7 @@ public class IndexGeneratorJob implements Jobby
// Group by bucket, sort by timestamp // Group by bucket, sort by timestamp
final Optional<Bucket> bucket = getConfig().getBucket(inputRow); final Optional<Bucket> bucket = getConfig().getBucket(inputRow);
if(!bucket.isPresent()) { if (!bucket.isPresent()) {
throw new ISE("WTF?! No bucket found for row: %s", inputRow); throw new ISE("WTF?! No bucket found for row: %s", inputRow);
} }
@ -590,9 +592,12 @@ public class IndexGeneratorJob implements Jobby
private IncrementalIndex makeIncrementalIndex(Bucket theBucket, AggregatorFactory[] aggs) private IncrementalIndex makeIncrementalIndex(Bucket theBucket, AggregatorFactory[] aggs)
{ {
return new IncrementalIndex( return new IncrementalIndex(
theBucket.time.getMillis(), new IncrementalIndexSchema.Builder()
config.getRollupSpec().getRollupGranularity(), .withMinTimestamp(theBucket.time.getMillis())
aggs .withSpatialDimensions(config.getDataSpec().getSpatialDimensions())
.withQueryGranularity(config.getRollupSpec().getRollupGranularity())
.withMetrics(aggs)
.build()
); );
} }

View File

@ -317,7 +317,7 @@ public class TaskSerdeTest
"foo", "foo",
"timestamp", "timestamp",
"auto", "auto",
new JSONDataSpec(ImmutableList.of("foo")), new JSONDataSpec(ImmutableList.of("foo"), null),
null, null,
new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P1D"))), new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P1D"))),
new StaticPathSpec("bar"), new StaticPathSpec("bar"),

View File

@ -75,7 +75,7 @@
<dependency> <dependency>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>bytebuffer-collections</artifactId> <artifactId>bytebuffer-collections</artifactId>
<version>0.0.1-SNAPSHOT</version> <version>0.0.1</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>

View File

@ -162,7 +162,7 @@ public class TestIndex
{ {
StringInputRowParser parser = new StringInputRowParser( StringInputRowParser parser = new StringInputRowParser(
new TimestampSpec("ts", "iso"), new TimestampSpec("ts", "iso"),
new DelimitedDataSpec("\t", Arrays.asList(COLUMNS), Arrays.asList(DIMENSIONS)), new DelimitedDataSpec("\t", Arrays.asList(COLUMNS), Arrays.asList(DIMENSIONS), null),
Arrays.<String>asList() Arrays.<String>asList()
); );
boolean runOnce = false; boolean runOnce = false;