From 803e8ff69e863fe98b9c32d8c1712f12a3e8e722 Mon Sep 17 00:00:00 2001 From: fjy Date: Tue, 14 May 2013 17:38:05 -0700 Subject: [PATCH] add configurable spatial dimensions to hadoop indexer --- .../metamx/druid/indexer/data/CSVDataSpec.java | 16 +++++++++++++++- .../com/metamx/druid/indexer/data/DataSpec.java | 3 +++ .../druid/indexer/data/DelimitedDataSpec.java | 16 +++++++++++++++- .../metamx/druid/indexer/data/JSONDataSpec.java | 16 +++++++++++++++- .../druid/indexer/data/ToLowercaseDataSpec.java | 7 +++++++ .../metamx/druid/indexer/IndexGeneratorJob.java | 17 +++++++++++------ .../druid/merger/common/task/TaskSerdeTest.java | 2 +- pom.xml | 2 +- .../com/metamx/druid/index/v1/TestIndex.java | 2 +- 9 files changed, 69 insertions(+), 12 deletions(-) diff --git a/index-common/src/main/java/com/metamx/druid/indexer/data/CSVDataSpec.java b/index-common/src/main/java/com/metamx/druid/indexer/data/CSVDataSpec.java index 7f4d5357057..60303703e18 100644 --- a/index-common/src/main/java/com/metamx/druid/indexer/data/CSVDataSpec.java +++ b/index-common/src/main/java/com/metamx/druid/indexer/data/CSVDataSpec.java @@ -22,8 +22,10 @@ package com.metamx.druid.indexer.data; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import com.metamx.common.parsers.CSVParser; import com.metamx.common.parsers.Parser; +import com.metamx.druid.index.v1.SpatialDimensionSchema; import java.util.List; @@ -33,11 +35,13 @@ public class CSVDataSpec implements DataSpec { private final List columns; private final List dimensions; + private final List spatialDimensions; @JsonCreator public CSVDataSpec( @JsonProperty("columns") List columns, - @JsonProperty("dimensions") List dimensions + @JsonProperty("dimensions") List dimensions, + @JsonProperty("spatialDimensions") List spatialDimensions ) { Preconditions.checkNotNull(columns, "columns"); @@ -47,6 +51,9 @@ public class CSVDataSpec implements DataSpec this.columns = columns; this.dimensions = dimensions; + this.spatialDimensions = (spatialDimensions == null) + ? Lists.newArrayList() + : spatialDimensions; } @JsonProperty("columns") @@ -62,6 +69,13 @@ public class CSVDataSpec implements DataSpec return dimensions; } + @JsonProperty("spatialDimensions") + @Override + public List getSpatialDimensions() + { + return spatialDimensions; + } + @Override public void verify(List usedCols) { diff --git a/index-common/src/main/java/com/metamx/druid/indexer/data/DataSpec.java b/index-common/src/main/java/com/metamx/druid/indexer/data/DataSpec.java index 814850a5c52..099d0c8d535 100644 --- a/index-common/src/main/java/com/metamx/druid/indexer/data/DataSpec.java +++ b/index-common/src/main/java/com/metamx/druid/indexer/data/DataSpec.java @@ -22,6 +22,7 @@ package com.metamx.druid.indexer.data; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.metamx.common.parsers.Parser; +import com.metamx.druid.index.v1.SpatialDimensionSchema; import java.util.List; @@ -41,5 +42,7 @@ public interface DataSpec public List getDimensions(); + public List getSpatialDimensions(); + public Parser getParser(); } diff --git a/index-common/src/main/java/com/metamx/druid/indexer/data/DelimitedDataSpec.java b/index-common/src/main/java/com/metamx/druid/indexer/data/DelimitedDataSpec.java index 90a28e1c8fd..c9aa2d253ce 100644 --- a/index-common/src/main/java/com/metamx/druid/indexer/data/DelimitedDataSpec.java +++ b/index-common/src/main/java/com/metamx/druid/indexer/data/DelimitedDataSpec.java @@ -22,8 +22,10 @@ package com.metamx.druid.indexer.data; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import com.metamx.common.parsers.DelimitedParser; import com.metamx.common.parsers.Parser; +import com.metamx.druid.index.v1.SpatialDimensionSchema; import java.util.List; @@ -34,12 +36,14 @@ public class DelimitedDataSpec implements DataSpec private final String delimiter; private final List columns; private final List dimensions; + private final List spatialDimensions; @JsonCreator public DelimitedDataSpec( @JsonProperty("delimiter") String delimiter, @JsonProperty("columns") List columns, - @JsonProperty("dimensions") List dimensions + @JsonProperty("dimensions") List dimensions, + @JsonProperty("spatialDimensions") List spatialDimensions ) { Preconditions.checkNotNull(columns); @@ -50,6 +54,9 @@ public class DelimitedDataSpec implements DataSpec this.delimiter = (delimiter == null) ? DelimitedParser.DEFAULT_DELIMITER : delimiter; this.columns = columns; this.dimensions = dimensions; + this.spatialDimensions = (spatialDimensions == null) + ? Lists.newArrayList() + : spatialDimensions; } @JsonProperty("delimiter") @@ -71,6 +78,13 @@ public class DelimitedDataSpec implements DataSpec return dimensions; } + @JsonProperty("spatialDimensions") + @Override + public List getSpatialDimensions() + { + return spatialDimensions; + } + @Override public void verify(List usedCols) { diff --git a/index-common/src/main/java/com/metamx/druid/indexer/data/JSONDataSpec.java b/index-common/src/main/java/com/metamx/druid/indexer/data/JSONDataSpec.java index 67119d2532c..8b12aa601e6 100644 --- a/index-common/src/main/java/com/metamx/druid/indexer/data/JSONDataSpec.java +++ b/index-common/src/main/java/com/metamx/druid/indexer/data/JSONDataSpec.java @@ -20,8 +20,10 @@ package com.metamx.druid.indexer.data; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Lists; import com.metamx.common.parsers.JSONParser; import com.metamx.common.parsers.Parser; +import com.metamx.druid.index.v1.SpatialDimensionSchema; import java.util.List; @@ -30,12 +32,17 @@ import java.util.List; public class JSONDataSpec implements DataSpec { private final List dimensions; + private final List spatialDimensions; public JSONDataSpec( - @JsonProperty("dimensions") List dimensions + @JsonProperty("dimensions") List dimensions, + @JsonProperty("spatialDimensions") List spatialDimensions ) { this.dimensions = dimensions; + this.spatialDimensions = (spatialDimensions == null) + ? Lists.newArrayList() + : spatialDimensions; } @JsonProperty("dimensions") @@ -45,6 +52,13 @@ public class JSONDataSpec implements DataSpec return dimensions; } + @JsonProperty("spatialDimensions") + @Override + public List getSpatialDimensions() + { + return spatialDimensions; + } + @Override public void verify(List usedCols) { diff --git a/index-common/src/main/java/com/metamx/druid/indexer/data/ToLowercaseDataSpec.java b/index-common/src/main/java/com/metamx/druid/indexer/data/ToLowercaseDataSpec.java index 95b3e6d2b80..c4eda7ffc55 100644 --- a/index-common/src/main/java/com/metamx/druid/indexer/data/ToLowercaseDataSpec.java +++ b/index-common/src/main/java/com/metamx/druid/indexer/data/ToLowercaseDataSpec.java @@ -22,6 +22,7 @@ package com.metamx.druid.indexer.data; import com.fasterxml.jackson.annotation.JsonValue; import com.metamx.common.parsers.Parser; import com.metamx.common.parsers.ToLowerCaseParser; +import com.metamx.druid.index.v1.SpatialDimensionSchema; import java.util.List; @@ -56,6 +57,12 @@ public class ToLowercaseDataSpec implements DataSpec return delegate.getDimensions(); } + @Override + public List getSpatialDimensions() + { + return delegate.getSpatialDimensions(); + } + @Override public Parser getParser() { diff --git a/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java b/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java index 7814c45e73e..874a30b7ad4 100644 --- a/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java +++ b/indexer/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java @@ -35,6 +35,7 @@ import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.client.DataSegment; import com.metamx.druid.index.QueryableIndex; import com.metamx.druid.index.v1.IncrementalIndex; +import com.metamx.druid.index.v1.IncrementalIndexSchema; import com.metamx.druid.index.v1.IndexIO; import com.metamx.druid.index.v1.IndexMerger; import com.metamx.druid.indexer.data.StringInputRowParser; @@ -152,7 +153,8 @@ public class IndexGeneratorJob implements Jobby } } - public static List getPublishedSegments(HadoopDruidIndexerConfig config) { + public static List getPublishedSegments(HadoopDruidIndexerConfig config) + { final Configuration conf = new Configuration(); final ObjectMapper jsonMapper = HadoopDruidIndexerConfig.jsonMapper; @@ -182,7 +184,7 @@ public class IndexGeneratorJob implements Jobby List publishedSegments = publishedSegmentsBuilder.build(); return publishedSegments; -} + } public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper @@ -197,7 +199,7 @@ public class IndexGeneratorJob implements Jobby // Group by bucket, sort by timestamp final Optional bucket = getConfig().getBucket(inputRow); - if(!bucket.isPresent()) { + if (!bucket.isPresent()) { throw new ISE("WTF?! No bucket found for row: %s", inputRow); } @@ -590,9 +592,12 @@ public class IndexGeneratorJob implements Jobby private IncrementalIndex makeIncrementalIndex(Bucket theBucket, AggregatorFactory[] aggs) { return new IncrementalIndex( - theBucket.time.getMillis(), - config.getRollupSpec().getRollupGranularity(), - aggs + new IncrementalIndexSchema.Builder() + .withMinTimestamp(theBucket.time.getMillis()) + .withSpatialDimensions(config.getDataSpec().getSpatialDimensions()) + .withQueryGranularity(config.getRollupSpec().getRollupGranularity()) + .withMetrics(aggs) + .build() ); } diff --git a/merger/src/test/java/com/metamx/druid/merger/common/task/TaskSerdeTest.java b/merger/src/test/java/com/metamx/druid/merger/common/task/TaskSerdeTest.java index 220ce19a16e..91b6f1ba0f7 100644 --- a/merger/src/test/java/com/metamx/druid/merger/common/task/TaskSerdeTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/common/task/TaskSerdeTest.java @@ -317,7 +317,7 @@ public class TaskSerdeTest "foo", "timestamp", "auto", - new JSONDataSpec(ImmutableList.of("foo")), + new JSONDataSpec(ImmutableList.of("foo"), null), null, new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P1D"))), new StaticPathSpec("bar"), diff --git a/pom.xml b/pom.xml index ffbc7844628..b5ac9116401 100644 --- a/pom.xml +++ b/pom.xml @@ -75,7 +75,7 @@ com.metamx bytebuffer-collections - 0.0.1-SNAPSHOT + 0.0.1 com.metamx diff --git a/server/src/test/java/com/metamx/druid/index/v1/TestIndex.java b/server/src/test/java/com/metamx/druid/index/v1/TestIndex.java index 164c18a13fc..df5761681af 100644 --- a/server/src/test/java/com/metamx/druid/index/v1/TestIndex.java +++ b/server/src/test/java/com/metamx/druid/index/v1/TestIndex.java @@ -162,7 +162,7 @@ public class TestIndex { StringInputRowParser parser = new StringInputRowParser( new TimestampSpec("ts", "iso"), - new DelimitedDataSpec("\t", Arrays.asList(COLUMNS), Arrays.asList(DIMENSIONS)), + new DelimitedDataSpec("\t", Arrays.asList(COLUMNS), Arrays.asList(DIMENSIONS), null), Arrays.asList() ); boolean runOnce = false;