From 3dc2974894d86b98345a6e2ca34861f2e3898b71 Mon Sep 17 00:00:00 2001 From: kaijianding Date: Tue, 26 Jul 2016 06:45:30 +0800 Subject: [PATCH] Add timestampSpec to metadata.drd and SegmentMetadataQuery (#3227) * save TimestampSpec in metadata.drd * add timestampSpec info in SegmentMetadataQuery --- .../druid/data/input/impl/TimestampSpec.java | 22 +++++++ docs/content/querying/segmentmetadataquery.md | 4 ++ .../io/druid/indexer/IndexGeneratorJob.java | 1 + .../SegmentMetadataQueryQueryToolChest.java | 10 +++ .../SegmentMetadataQueryRunnerFactory.java | 12 ++++ .../metadata/metadata/SegmentAnalysis.java | 14 +++- .../metadata/SegmentMetadataQuery.java | 6 ++ .../main/java/io/druid/segment/Metadata.java | 39 +++++++++-- .../segment/incremental/IncrementalIndex.java | 1 + .../incremental/IncrementalIndexSchema.java | 32 ++++++++- ...egmentMetadataQueryQueryToolChestTest.java | 9 +++ .../metadata/SegmentMetadataQueryTest.java | 66 +++++++++++++++++++ .../SegmentMetadataUnionQueryTest.java | 1 + .../io/druid/segment/IndexMergerTest.java | 12 ++-- .../java/io/druid/segment/MetadataTest.java | 6 ++ .../test/java/io/druid/segment/TestIndex.java | 1 + .../incremental/IncrementalIndexTest.java | 12 ++-- .../druid/segment/realtime/plumber/Sink.java | 1 + 18 files changed, 229 insertions(+), 20 deletions(-) diff --git a/api/src/main/java/io/druid/data/input/impl/TimestampSpec.java b/api/src/main/java/io/druid/data/input/impl/TimestampSpec.java index c8bf251cf0b..0a125f3ccd5 100644 --- a/api/src/main/java/io/druid/data/input/impl/TimestampSpec.java +++ b/api/src/main/java/io/druid/data/input/impl/TimestampSpec.java @@ -23,7 +23,9 @@ import com.google.common.base.Function; import com.metamx.common.parsers.TimestampParser; import org.joda.time.DateTime; +import java.util.List; import java.util.Map; +import java.util.Objects; /** */ @@ -130,4 +132,24 @@ public class TimestampSpec result = 31 * result + (missingValue != null ? missingValue.hashCode() : 0); return result; } + + //simple merge strategy on timestampSpec that checks if all are equal or else + //returns null. this can be improved in future but is good enough for most use-cases. + public static TimestampSpec mergeTimestampSpec(List toMerge) { + if (toMerge == null || toMerge.size() == 0) { + return null; + } + + TimestampSpec result = toMerge.get(0); + for (int i = 1; i < toMerge.size(); i++) { + if (toMerge.get(i) == null) { + continue; + } + if (!Objects.equals(result, toMerge.get(i))) { + return null; + } + } + + return result; + } } diff --git a/docs/content/querying/segmentmetadataquery.md b/docs/content/querying/segmentmetadataquery.md index 5b8c6ed1ec9..1d7d7ec256d 100644 --- a/docs/content/querying/segmentmetadataquery.md +++ b/docs/content/querying/segmentmetadataquery.md @@ -126,6 +126,10 @@ dimension columns. * `intervals` in the result will contain the list of intervals associated with the queried segments. +#### timestampSpec + +* `timestampSpec` in the result will contain timestampSpec of data stored in segments. this can be null if timestampSpec of segments was unknown or unmergeable (if merging is enabled). + #### queryGranularity * `queryGranularity` in the result will contain query granularity of data stored in segments. this can be null if query granularity of segments was unknown or unmergeable (if merging is enabled). diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index 00619e90db7..ac9edc263b4 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -222,6 +222,7 @@ public class IndexGeneratorJob implements Jobby final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig(); final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder() .withMinTimestamp(theBucket.time.getMillis()) + .withTimestampSpec(config.getSchema().getDataSchema().getParser().getParseSpec().getTimestampSpec()) .withDimensionsSpec(config.getSchema().getDataSchema().getParser()) .withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity()) .withMetrics(aggs) diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index 80bf4d9aee3..9f732d12d54 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -37,6 +37,7 @@ import com.metamx.common.guava.nary.BinaryFn; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.common.guava.CombiningSequence; import io.druid.common.utils.JodaUtils; +import io.druid.data.input.impl.TimestampSpec; import io.druid.granularity.QueryGranularity; import io.druid.query.CacheStrategy; import io.druid.query.DruidMetrics; @@ -332,6 +333,13 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest private final long size; private final long numRows; private final Map aggregators; + private final TimestampSpec timestampSpec; private final QueryGranularity queryGranularity; @JsonCreator @@ -47,6 +49,7 @@ public class SegmentAnalysis implements Comparable @JsonProperty("size") long size, @JsonProperty("numRows") long numRows, @JsonProperty("aggregators") Map aggregators, + @JsonProperty("timestampSpec") TimestampSpec timestampSpec, @JsonProperty("queryGranularity") QueryGranularity queryGranularity ) { @@ -56,6 +59,7 @@ public class SegmentAnalysis implements Comparable this.size = size; this.numRows = numRows; this.aggregators = aggregators; + this.timestampSpec = timestampSpec; this.queryGranularity = queryGranularity; } @@ -89,6 +93,12 @@ public class SegmentAnalysis implements Comparable return numRows; } + @JsonProperty + public TimestampSpec getTimestampSpec() + { + return timestampSpec; + } + @JsonProperty public QueryGranularity getQueryGranularity() { @@ -111,6 +121,7 @@ public class SegmentAnalysis implements Comparable ", size=" + size + ", numRows=" + numRows + ", aggregators=" + aggregators + + ", timestampSpec=" + timestampSpec + ", queryGranularity=" + queryGranularity + '}'; } @@ -134,6 +145,7 @@ public class SegmentAnalysis implements Comparable Objects.equals(interval, that.interval) && Objects.equals(columns, that.columns) && Objects.equals(aggregators, that.aggregators) && + Objects.equals(timestampSpec, that.timestampSpec) && Objects.equals(queryGranularity, that.queryGranularity); } @@ -144,7 +156,7 @@ public class SegmentAnalysis implements Comparable @Override public int hashCode() { - return Objects.hash(id, interval, columns, size, numRows, aggregators, queryGranularity); + return Objects.hash(id, interval, columns, size, numRows, aggregators, timestampSpec, queryGranularity); } @Override diff --git a/processing/src/main/java/io/druid/query/metadata/metadata/SegmentMetadataQuery.java b/processing/src/main/java/io/druid/query/metadata/metadata/SegmentMetadataQuery.java index b6086eccf18..751f6ed9c9b 100644 --- a/processing/src/main/java/io/druid/query/metadata/metadata/SegmentMetadataQuery.java +++ b/processing/src/main/java/io/druid/query/metadata/metadata/SegmentMetadataQuery.java @@ -57,6 +57,7 @@ public class SegmentMetadataQuery extends BaseQuery INTERVAL, AGGREGATORS, MINMAX, + TIMESTAMPSPEC, QUERYGRANULARITY; @JsonValue @@ -188,6 +189,11 @@ public class SegmentMetadataQuery extends BaseQuery return analysisTypes.contains(AnalysisType.AGGREGATORS); } + public boolean hasTimestampSpec() + { + return analysisTypes.contains(AnalysisType.TIMESTAMPSPEC); + } + public boolean hasQueryGranularity() { return analysisTypes.contains(AnalysisType.QUERYGRANULARITY); diff --git a/processing/src/main/java/io/druid/segment/Metadata.java b/processing/src/main/java/io/druid/segment/Metadata.java index de83566f3a7..25455620a70 100644 --- a/processing/src/main/java/io/druid/segment/Metadata.java +++ b/processing/src/main/java/io/druid/segment/Metadata.java @@ -20,6 +20,7 @@ package io.druid.segment; import com.fasterxml.jackson.annotation.JsonProperty; +import io.druid.data.input.impl.TimestampSpec; import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.AggregatorFactory; @@ -42,6 +43,9 @@ public class Metadata @JsonProperty private AggregatorFactory[] aggregators; + @JsonProperty + private TimestampSpec timestampSpec; + @JsonProperty private QueryGranularity queryGranularity; @@ -61,6 +65,17 @@ public class Metadata return this; } + public TimestampSpec getTimestampSpec() + { + return timestampSpec; + } + + public Metadata setTimestampSpec(TimestampSpec timestampSpec) + { + this.timestampSpec = timestampSpec; + return this; + } + public QueryGranularity getQueryGranularity() { return queryGranularity; @@ -111,6 +126,7 @@ public class Metadata ? new ArrayList() : null; + List timestampSpecsToMerge = new ArrayList<>(); List gransToMerge = new ArrayList<>(); for (Metadata metadata : toBeMerged) { @@ -120,6 +136,10 @@ public class Metadata aggregatorsToMerge.add(metadata.getAggregators()); } + if (timestampSpecsToMerge != null && metadata.getTimestampSpec() != null) { + timestampSpecsToMerge.add(metadata.getTimestampSpec()); + } + if (gransToMerge != null) { gransToMerge.add(metadata.getQueryGranularity()); } @@ -128,6 +148,7 @@ public class Metadata //if metadata and hence aggregators and queryGranularity for some segment being merged are unknown then //final merged segment should not have same in metadata aggregatorsToMerge = null; + timestampSpecsToMerge = null; gransToMerge = null; } } @@ -143,6 +164,10 @@ public class Metadata result.setAggregators(overrideMergedAggregators); } + if (timestampSpecsToMerge != null) { + result.setTimestampSpec(TimestampSpec.mergeTimestampSpec(timestampSpecsToMerge)); + } + if (gransToMerge != null) { result.setQueryGranularity(QueryGranularity.mergeQueryGranularities(gransToMerge)); } @@ -171,9 +196,12 @@ public class Metadata if (!Arrays.equals(aggregators, metadata.aggregators)) { return false; } - return !(queryGranularity != null - ? !queryGranularity.equals(metadata.queryGranularity) - : metadata.queryGranularity != null); + if (timestampSpec != null ? !timestampSpec.equals(metadata.timestampSpec) : metadata.timestampSpec != null) { + return false; + } + return queryGranularity != null + ? queryGranularity.equals(metadata.queryGranularity) + : metadata.queryGranularity == null; } @@ -181,7 +209,8 @@ public class Metadata public int hashCode() { int result = container.hashCode(); - result = 31 * result + (aggregators != null ? Arrays.hashCode(aggregators) : 0); + result = 31 * result + Arrays.hashCode(aggregators); + result = 31 * result + (timestampSpec != null ? timestampSpec.hashCode() : 0); result = 31 * result + (queryGranularity != null ? queryGranularity.hashCode() : 0); return result; } @@ -190,9 +219,9 @@ public class Metadata public String toString() { return "Metadata{" + - "container=" + container + ", aggregators=" + Arrays.toString(aggregators) + + ", timestampSpec=" + timestampSpec + ", queryGranularity=" + queryGranularity + '}'; } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 45cef452b27..27110753485 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -410,6 +410,7 @@ public abstract class IncrementalIndex implements Iterable, this.metadata = new Metadata() .setAggregators(getCombiningAggregators(metrics)) + .setTimestampSpec(incrementalIndexSchema.getTimestampSpec()) .setQueryGranularity(this.gran); this.aggs = initAggs(metrics, rowSupplier, deserializeComplexMetrics); diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexSchema.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexSchema.java index 1e2e38140a7..e6cb94315d4 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexSchema.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexSchema.java @@ -21,8 +21,9 @@ package io.druid.segment.incremental; import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.InputRowParser; -import io.druid.granularity.QueryGranularity; +import io.druid.data.input.impl.TimestampSpec; import io.druid.granularity.QueryGranularities; +import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.AggregatorFactory; /** @@ -30,18 +31,21 @@ import io.druid.query.aggregation.AggregatorFactory; public class IncrementalIndexSchema { private final long minTimestamp; + private final TimestampSpec timestampSpec; private final QueryGranularity gran; private final DimensionsSpec dimensionsSpec; private final AggregatorFactory[] metrics; public IncrementalIndexSchema( long minTimestamp, + TimestampSpec timestampSpec, QueryGranularity gran, DimensionsSpec dimensionsSpec, AggregatorFactory[] metrics ) { this.minTimestamp = minTimestamp; + this.timestampSpec = timestampSpec; this.gran = gran; this.dimensionsSpec = dimensionsSpec; this.metrics = metrics; @@ -52,6 +56,11 @@ public class IncrementalIndexSchema return minTimestamp; } + public TimestampSpec getTimestampSpec() + { + return timestampSpec; + } + public QueryGranularity getGran() { return gran; @@ -70,6 +79,7 @@ public class IncrementalIndexSchema public static class Builder { private long minTimestamp; + private TimestampSpec timestampSpec; private QueryGranularity gran; private DimensionsSpec dimensionsSpec; private AggregatorFactory[] metrics; @@ -88,6 +98,24 @@ public class IncrementalIndexSchema return this; } + public Builder withTimestampSpec(TimestampSpec timestampSpec) + { + this.timestampSpec = timestampSpec; + return this; + } + + public Builder withTimestampSpec(InputRowParser parser) + { + if (parser != null + && parser.getParseSpec() != null + && parser.getParseSpec().getTimestampSpec() != null) { + this.timestampSpec = parser.getParseSpec().getTimestampSpec(); + } else { + this.timestampSpec = new TimestampSpec(null, null, null); + } + return this; + } + public Builder withQueryGranularity(QueryGranularity gran) { this.gran = gran; @@ -122,7 +150,7 @@ public class IncrementalIndexSchema public IncrementalIndexSchema build() { return new IncrementalIndexSchema( - minTimestamp, gran, dimensionsSpec, metrics + minTimestamp, timestampSpec, gran, dimensionsSpec, metrics ); } } diff --git a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChestTest.java b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChestTest.java index e19657b92ce..09f78a3e6c5 100644 --- a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChestTest.java +++ b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryQueryToolChestTest.java @@ -86,6 +86,7 @@ public class SegmentMetadataQueryQueryToolChestTest ), 71982, 100, null, + null, null ); @@ -115,6 +116,7 @@ public class SegmentMetadataQueryQueryToolChestTest "foo", new LongSumAggregatorFactory("foo", "foo"), "baz", new DoubleSumAggregatorFactory("baz", "baz") ), + null, null ); final SegmentAnalysis analysis2 = new SegmentAnalysis( @@ -127,6 +129,7 @@ public class SegmentMetadataQueryQueryToolChestTest "foo", new LongSumAggregatorFactory("foo", "foo"), "bar", new DoubleSumAggregatorFactory("bar", "bar") ), + null, null ); @@ -158,6 +161,7 @@ public class SegmentMetadataQueryQueryToolChestTest 0, 0, null, + null, null ); final SegmentAnalysis analysis2 = new SegmentAnalysis( @@ -170,6 +174,7 @@ public class SegmentMetadataQueryQueryToolChestTest "foo", new LongSumAggregatorFactory("foo", "foo"), "bar", new DoubleSumAggregatorFactory("bar", "bar") ), + null, null ); @@ -193,6 +198,7 @@ public class SegmentMetadataQueryQueryToolChestTest 0, 0, null, + null, null ); final SegmentAnalysis analysis2 = new SegmentAnalysis( @@ -202,6 +208,7 @@ public class SegmentMetadataQueryQueryToolChestTest 0, 0, null, + null, null ); @@ -222,6 +229,7 @@ public class SegmentMetadataQueryQueryToolChestTest "foo", new LongSumAggregatorFactory("foo", "foo"), "bar", new DoubleSumAggregatorFactory("bar", "bar") ), + null, null ); final SegmentAnalysis analysis2 = new SegmentAnalysis( @@ -235,6 +243,7 @@ public class SegmentMetadataQueryQueryToolChestTest "bar", new DoubleMaxAggregatorFactory("bar", "bar"), "baz", new LongMaxAggregatorFactory("baz", "baz") ), + null, null ); diff --git a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java index e4d8ec95520..637d81a8c42 100644 --- a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java +++ b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataQueryTest.java @@ -28,6 +28,7 @@ import com.google.common.collect.Maps; import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.guava.Sequences; import io.druid.common.utils.JodaUtils; +import io.druid.data.input.impl.TimestampSpec; import io.druid.granularity.QueryGranularities; import io.druid.jackson.DefaultObjectMapper; import io.druid.query.BySegmentResultValue; @@ -181,6 +182,7 @@ public class SegmentMetadataQueryTest ), mmap1 ? 71982 : 72755, 1209, null, + null, null ); expectedSegmentAnalysis2 = new SegmentAnalysis( @@ -223,6 +225,7 @@ public class SegmentMetadataQueryTest ), mmap2 ? 71982 : 72755, 1209, null, + null, null ); } @@ -270,6 +273,7 @@ public class SegmentMetadataQueryTest 0, expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(), null, + null, null ); @@ -337,6 +341,7 @@ public class SegmentMetadataQueryTest 0, expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(), null, + null, null ); @@ -453,6 +458,7 @@ public class SegmentMetadataQueryTest expectedSegmentAnalysis1.getSize() + expectedSegmentAnalysis2.getSize(), expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(), null, + null, null ); @@ -503,6 +509,7 @@ public class SegmentMetadataQueryTest 0, expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(), null, + null, null ); @@ -564,6 +571,7 @@ public class SegmentMetadataQueryTest 0, expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(), expectedAggregators, + null, null ); @@ -600,6 +608,63 @@ public class SegmentMetadataQueryTest exec.shutdownNow(); } + @Test + public void testSegmentMetadataQueryWithTimestampSpecMerge() + { + SegmentAnalysis mergedSegmentAnalysis = new SegmentAnalysis( + differentIds ? "merged" : "testSegment", + null, + ImmutableMap.of( + "placement", + new ColumnAnalysis( + ValueType.STRING.toString(), + false, + 0, + 0, + null, + null, + null + ) + ), + 0, + expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(), + null, + new TimestampSpec("ds", "auto", null), + null + ); + + QueryToolChest toolChest = FACTORY.getToolchest(); + + ExecutorService exec = Executors.newCachedThreadPool(); + QueryRunner myRunner = new FinalizeResultsQueryRunner<>( + toolChest.mergeResults( + FACTORY.mergeRunners( + MoreExecutors.sameThreadExecutor(), + Lists.>newArrayList( + toolChest.preMergeQueryDecoration(runner1), + toolChest.preMergeQueryDecoration(runner2) + ) + ) + ), + toolChest + ); + + TestHelper.assertExpectedObjects( + ImmutableList.of(mergedSegmentAnalysis), + myRunner.run( + Druids.newSegmentMetadataQueryBuilder() + .dataSource("testing") + .intervals("2013/2014") + .toInclude(new ListColumnIncluderator(Arrays.asList("placement"))) + .analysisTypes(SegmentMetadataQuery.AnalysisType.TIMESTAMPSPEC) + .merge(true) + .build(), + Maps.newHashMap() + ), + "failed SegmentMetadata merging query" + ); + exec.shutdownNow(); + } @Test public void testSegmentMetadataQueryWithQueryGranularityMerge() @@ -622,6 +687,7 @@ public class SegmentMetadataQueryTest 0, expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(), null, + null, QueryGranularities.NONE ); diff --git a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataUnionQueryTest.java b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataUnionQueryTest.java index 6e5cf2464d8..b99097428c0 100644 --- a/processing/src/test/java/io/druid/query/metadata/SegmentMetadataUnionQueryTest.java +++ b/processing/src/test/java/io/druid/query/metadata/SegmentMetadataUnionQueryTest.java @@ -126,6 +126,7 @@ public class SegmentMetadataUnionQueryTest mmap ? 287928 : 291020, 4836, null, + null, null ); SegmentMetadataQuery query = new Druids.SegmentMetadataQueryBuilder() diff --git a/processing/src/test/java/io/druid/segment/IndexMergerTest.java b/processing/src/test/java/io/druid/segment/IndexMergerTest.java index a06377a4c81..7e285f4b23c 100644 --- a/processing/src/test/java/io/druid/segment/IndexMergerTest.java +++ b/processing/src/test/java/io/druid/segment/IndexMergerTest.java @@ -1765,12 +1765,12 @@ public class IndexMergerTest private IncrementalIndex getIndexWithDims(List dims) { - IncrementalIndexSchema schema = new IncrementalIndexSchema( - 0L, - QueryGranularities.NONE, - new DimensionsSpec(DimensionsSpec.getDefaultSchemas(dims), null, null), - new AggregatorFactory[]{new CountAggregatorFactory("count")} - ); + IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder() + .withMinTimestamp(0L) + .withQueryGranularity(QueryGranularities.NONE) + .withDimensionsSpec(new DimensionsSpec(DimensionsSpec.getDefaultSchemas(dims), null, null)) + .withMetrics(new AggregatorFactory[]{new CountAggregatorFactory("count")}) + .build(); return new OnheapIncrementalIndex(schema, true, 1000); } diff --git a/processing/src/test/java/io/druid/segment/MetadataTest.java b/processing/src/test/java/io/druid/segment/MetadataTest.java index 9bace7ead28..b75269fa900 100644 --- a/processing/src/test/java/io/druid/segment/MetadataTest.java +++ b/processing/src/test/java/io/druid/segment/MetadataTest.java @@ -21,6 +21,7 @@ package io.druid.segment; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; +import io.druid.data.input.impl.TimestampSpec; import io.druid.granularity.QueryGranularities; import io.druid.jackson.DefaultObjectMapper; import io.druid.query.aggregation.AggregatorFactory; @@ -78,11 +79,13 @@ public class MetadataTest Metadata m1 = new Metadata(); m1.put("k", "v"); m1.setAggregators(aggs); + m1.setTimestampSpec(new TimestampSpec("ds", "auto", null)); m1.setQueryGranularity(QueryGranularities.ALL); Metadata m2 = new Metadata(); m2.put("k", "v"); m2.setAggregators(aggs); + m2.setTimestampSpec(new TimestampSpec("ds", "auto", null)); m2.setQueryGranularity(QueryGranularities.ALL); Metadata merged = new Metadata(); @@ -92,6 +95,7 @@ public class MetadataTest new LongMaxAggregatorFactory("n", "n") } ); + merged.setTimestampSpec(new TimestampSpec("ds", "auto", null)); merged.setQueryGranularity(QueryGranularities.ALL); Assert.assertEquals(merged, Metadata.merge(ImmutableList.of(m1, m2), null)); @@ -102,6 +106,7 @@ public class MetadataTest metadataToBeMerged.add(null); merged.setAggregators(null); + merged.setTimestampSpec(null); merged.setQueryGranularity(null); Assert.assertEquals(merged, Metadata.merge(metadataToBeMerged, null)); @@ -116,6 +121,7 @@ public class MetadataTest Metadata.merge(metadataToBeMerged, explicitAggs) ); + merged.setTimestampSpec(new TimestampSpec("ds", "auto", null)); merged.setQueryGranularity(QueryGranularities.ALL); Assert.assertEquals( merged, diff --git a/processing/src/test/java/io/druid/segment/TestIndex.java b/processing/src/test/java/io/druid/segment/TestIndex.java index c9687520f24..f21d88ae3ea 100644 --- a/processing/src/test/java/io/druid/segment/TestIndex.java +++ b/processing/src/test/java/io/druid/segment/TestIndex.java @@ -178,6 +178,7 @@ public class TestIndex { final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder() .withMinTimestamp(new DateTime("2011-01-12T00:00:00.000Z").getMillis()) + .withTimestampSpec(new TimestampSpec("ds", "auto", null)) .withQueryGranularity(QueryGranularities.NONE) .withMetrics(METRIC_AGGS) .build(); diff --git a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java index 9b3f1b6552f..a9b8252c33c 100644 --- a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java +++ b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java @@ -87,12 +87,12 @@ public class IncrementalIndexTest new SelectorDimFilter("billy", "A", null) ) }; - final IncrementalIndexSchema schema = new IncrementalIndexSchema( - 0, - QueryGranularities.MINUTE, - dimensions, - metrics - ); + final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder() + .withMinTimestamp(0) + .withQueryGranularity(QueryGranularities.MINUTE) + .withDimensionsSpec(dimensions) + .withMetrics(metrics) + .build(); final List constructors = Lists.newArrayList(); for (final Boolean sortFacts : ImmutableList.of(false, true)) { diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java index 91326192fdd..0011df0f442 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java @@ -232,6 +232,7 @@ public class Sink implements Iterable { final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder() .withMinTimestamp(minTimestamp) + .withTimestampSpec(schema.getParser()) .withQueryGranularity(schema.getGranularitySpec().getQueryGranularity()) .withDimensionsSpec(schema.getParser()) .withMetrics(schema.getAggregators())