Add timestampSpec to metadata.drd and SegmentMetadataQuery (#3227)

* save TimestampSpec in metadata.drd

* add timestampSpec info in SegmentMetadataQuery
This commit is contained in:
kaijianding 2016-07-26 06:45:30 +08:00 committed by Fangjin Yang
parent d5ed3f1347
commit 3dc2974894
18 changed files with 229 additions and 20 deletions

View File

@ -23,7 +23,9 @@ import com.google.common.base.Function;
import com.metamx.common.parsers.TimestampParser; import com.metamx.common.parsers.TimestampParser;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
/** /**
*/ */
@ -130,4 +132,24 @@ public class TimestampSpec
result = 31 * result + (missingValue != null ? missingValue.hashCode() : 0); result = 31 * result + (missingValue != null ? missingValue.hashCode() : 0);
return result; return result;
} }
//simple merge strategy on timestampSpec that checks if all are equal or else
//returns null. this can be improved in future but is good enough for most use-cases.
public static TimestampSpec mergeTimestampSpec(List<TimestampSpec> toMerge) {
if (toMerge == null || toMerge.size() == 0) {
return null;
}
TimestampSpec result = toMerge.get(0);
for (int i = 1; i < toMerge.size(); i++) {
if (toMerge.get(i) == null) {
continue;
}
if (!Objects.equals(result, toMerge.get(i))) {
return null;
}
}
return result;
}
} }

View File

@ -126,6 +126,10 @@ dimension columns.
* `intervals` in the result will contain the list of intervals associated with the queried segments. * `intervals` in the result will contain the list of intervals associated with the queried segments.
#### timestampSpec
* `timestampSpec` in the result will contain timestampSpec of data stored in segments. this can be null if timestampSpec of segments was unknown or unmergeable (if merging is enabled).
#### queryGranularity #### queryGranularity
* `queryGranularity` in the result will contain query granularity of data stored in segments. this can be null if query granularity of segments was unknown or unmergeable (if merging is enabled). * `queryGranularity` in the result will contain query granularity of data stored in segments. this can be null if query granularity of segments was unknown or unmergeable (if merging is enabled).

View File

@ -222,6 +222,7 @@ public class IndexGeneratorJob implements Jobby
final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig(); final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig();
final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder() final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
.withMinTimestamp(theBucket.time.getMillis()) .withMinTimestamp(theBucket.time.getMillis())
.withTimestampSpec(config.getSchema().getDataSchema().getParser().getParseSpec().getTimestampSpec())
.withDimensionsSpec(config.getSchema().getDataSchema().getParser()) .withDimensionsSpec(config.getSchema().getDataSchema().getParser())
.withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity()) .withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity())
.withMetrics(aggs) .withMetrics(aggs)

View File

@ -37,6 +37,7 @@ import com.metamx.common.guava.nary.BinaryFn;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.common.guava.CombiningSequence; import io.druid.common.guava.CombiningSequence;
import io.druid.common.utils.JodaUtils; import io.druid.common.utils.JodaUtils;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.CacheStrategy; import io.druid.query.CacheStrategy;
import io.druid.query.DruidMetrics; import io.druid.query.DruidMetrics;
@ -332,6 +333,13 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
} }
} }
final TimestampSpec timestampSpec = TimestampSpec.mergeTimestampSpec(
Lists.newArrayList(
arg1.getTimestampSpec(),
arg2.getTimestampSpec()
)
);
final QueryGranularity queryGranularity = QueryGranularity.mergeQueryGranularities( final QueryGranularity queryGranularity = QueryGranularity.mergeQueryGranularities(
Lists.newArrayList( Lists.newArrayList(
arg1.getQueryGranularity(), arg1.getQueryGranularity(),
@ -354,6 +362,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
arg1.getSize() + arg2.getSize(), arg1.getSize() + arg2.getSize(),
arg1.getNumRows() + arg2.getNumRows(), arg1.getNumRows() + arg2.getNumRows(),
aggregators.isEmpty() ? null : aggregators, aggregators.isEmpty() ? null : aggregators,
timestampSpec,
queryGranularity queryGranularity
); );
} }
@ -368,6 +377,7 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
analysis.getSize(), analysis.getSize(),
analysis.getNumRows(), analysis.getNumRows(),
analysis.getAggregators(), analysis.getAggregators(),
analysis.getTimestampSpec(),
analysis.getQueryGranularity() analysis.getQueryGranularity()
); );
} }

View File

@ -29,6 +29,7 @@ import com.google.inject.Inject;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.AbstractPrioritizedCallable; import io.druid.query.AbstractPrioritizedCallable;
import io.druid.query.BaseQuery; import io.druid.query.BaseQuery;
@ -127,6 +128,16 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
aggregators = null; aggregators = null;
} }
final TimestampSpec timestampSpec;
if (query.hasTimestampSpec()) {
if (metadata == null) {
metadata = segment.asStorageAdapter().getMetadata();
}
timestampSpec = metadata != null ? metadata.getTimestampSpec() : null;
} else {
timestampSpec = null;
}
final QueryGranularity queryGranularity; final QueryGranularity queryGranularity;
if (query.hasQueryGranularity()) { if (query.hasQueryGranularity()) {
if (metadata == null) { if (metadata == null) {
@ -146,6 +157,7 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
totalSize, totalSize,
numRows, numRows,
aggregators, aggregators,
timestampSpec,
queryGranularity queryGranularity
) )
) )

View File

@ -21,6 +21,7 @@ package io.druid.query.metadata.metadata;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import org.joda.time.Interval; import org.joda.time.Interval;
@ -37,6 +38,7 @@ public class SegmentAnalysis implements Comparable<SegmentAnalysis>
private final long size; private final long size;
private final long numRows; private final long numRows;
private final Map<String, AggregatorFactory> aggregators; private final Map<String, AggregatorFactory> aggregators;
private final TimestampSpec timestampSpec;
private final QueryGranularity queryGranularity; private final QueryGranularity queryGranularity;
@JsonCreator @JsonCreator
@ -47,6 +49,7 @@ public class SegmentAnalysis implements Comparable<SegmentAnalysis>
@JsonProperty("size") long size, @JsonProperty("size") long size,
@JsonProperty("numRows") long numRows, @JsonProperty("numRows") long numRows,
@JsonProperty("aggregators") Map<String, AggregatorFactory> aggregators, @JsonProperty("aggregators") Map<String, AggregatorFactory> aggregators,
@JsonProperty("timestampSpec") TimestampSpec timestampSpec,
@JsonProperty("queryGranularity") QueryGranularity queryGranularity @JsonProperty("queryGranularity") QueryGranularity queryGranularity
) )
{ {
@ -56,6 +59,7 @@ public class SegmentAnalysis implements Comparable<SegmentAnalysis>
this.size = size; this.size = size;
this.numRows = numRows; this.numRows = numRows;
this.aggregators = aggregators; this.aggregators = aggregators;
this.timestampSpec = timestampSpec;
this.queryGranularity = queryGranularity; this.queryGranularity = queryGranularity;
} }
@ -89,6 +93,12 @@ public class SegmentAnalysis implements Comparable<SegmentAnalysis>
return numRows; return numRows;
} }
@JsonProperty
public TimestampSpec getTimestampSpec()
{
return timestampSpec;
}
@JsonProperty @JsonProperty
public QueryGranularity getQueryGranularity() public QueryGranularity getQueryGranularity()
{ {
@ -111,6 +121,7 @@ public class SegmentAnalysis implements Comparable<SegmentAnalysis>
", size=" + size + ", size=" + size +
", numRows=" + numRows + ", numRows=" + numRows +
", aggregators=" + aggregators + ", aggregators=" + aggregators +
", timestampSpec=" + timestampSpec +
", queryGranularity=" + queryGranularity + ", queryGranularity=" + queryGranularity +
'}'; '}';
} }
@ -134,6 +145,7 @@ public class SegmentAnalysis implements Comparable<SegmentAnalysis>
Objects.equals(interval, that.interval) && Objects.equals(interval, that.interval) &&
Objects.equals(columns, that.columns) && Objects.equals(columns, that.columns) &&
Objects.equals(aggregators, that.aggregators) && Objects.equals(aggregators, that.aggregators) &&
Objects.equals(timestampSpec, that.timestampSpec) &&
Objects.equals(queryGranularity, that.queryGranularity); Objects.equals(queryGranularity, that.queryGranularity);
} }
@ -144,7 +156,7 @@ public class SegmentAnalysis implements Comparable<SegmentAnalysis>
@Override @Override
public int hashCode() public int hashCode()
{ {
return Objects.hash(id, interval, columns, size, numRows, aggregators, queryGranularity); return Objects.hash(id, interval, columns, size, numRows, aggregators, timestampSpec, queryGranularity);
} }
@Override @Override

View File

@ -57,6 +57,7 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
INTERVAL, INTERVAL,
AGGREGATORS, AGGREGATORS,
MINMAX, MINMAX,
TIMESTAMPSPEC,
QUERYGRANULARITY; QUERYGRANULARITY;
@JsonValue @JsonValue
@ -188,6 +189,11 @@ public class SegmentMetadataQuery extends BaseQuery<SegmentAnalysis>
return analysisTypes.contains(AnalysisType.AGGREGATORS); return analysisTypes.contains(AnalysisType.AGGREGATORS);
} }
public boolean hasTimestampSpec()
{
return analysisTypes.contains(AnalysisType.TIMESTAMPSPEC);
}
public boolean hasQueryGranularity() public boolean hasQueryGranularity()
{ {
return analysisTypes.contains(AnalysisType.QUERYGRANULARITY); return analysisTypes.contains(AnalysisType.QUERYGRANULARITY);

View File

@ -20,6 +20,7 @@
package io.druid.segment; package io.druid.segment;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
@ -42,6 +43,9 @@ public class Metadata
@JsonProperty @JsonProperty
private AggregatorFactory[] aggregators; private AggregatorFactory[] aggregators;
@JsonProperty
private TimestampSpec timestampSpec;
@JsonProperty @JsonProperty
private QueryGranularity queryGranularity; private QueryGranularity queryGranularity;
@ -61,6 +65,17 @@ public class Metadata
return this; return this;
} }
public TimestampSpec getTimestampSpec()
{
return timestampSpec;
}
public Metadata setTimestampSpec(TimestampSpec timestampSpec)
{
this.timestampSpec = timestampSpec;
return this;
}
public QueryGranularity getQueryGranularity() public QueryGranularity getQueryGranularity()
{ {
return queryGranularity; return queryGranularity;
@ -111,6 +126,7 @@ public class Metadata
? new ArrayList<AggregatorFactory[]>() ? new ArrayList<AggregatorFactory[]>()
: null; : null;
List<TimestampSpec> timestampSpecsToMerge = new ArrayList<>();
List<QueryGranularity> gransToMerge = new ArrayList<>(); List<QueryGranularity> gransToMerge = new ArrayList<>();
for (Metadata metadata : toBeMerged) { for (Metadata metadata : toBeMerged) {
@ -120,6 +136,10 @@ public class Metadata
aggregatorsToMerge.add(metadata.getAggregators()); aggregatorsToMerge.add(metadata.getAggregators());
} }
if (timestampSpecsToMerge != null && metadata.getTimestampSpec() != null) {
timestampSpecsToMerge.add(metadata.getTimestampSpec());
}
if (gransToMerge != null) { if (gransToMerge != null) {
gransToMerge.add(metadata.getQueryGranularity()); gransToMerge.add(metadata.getQueryGranularity());
} }
@ -128,6 +148,7 @@ public class Metadata
//if metadata and hence aggregators and queryGranularity for some segment being merged are unknown then //if metadata and hence aggregators and queryGranularity for some segment being merged are unknown then
//final merged segment should not have same in metadata //final merged segment should not have same in metadata
aggregatorsToMerge = null; aggregatorsToMerge = null;
timestampSpecsToMerge = null;
gransToMerge = null; gransToMerge = null;
} }
} }
@ -143,6 +164,10 @@ public class Metadata
result.setAggregators(overrideMergedAggregators); result.setAggregators(overrideMergedAggregators);
} }
if (timestampSpecsToMerge != null) {
result.setTimestampSpec(TimestampSpec.mergeTimestampSpec(timestampSpecsToMerge));
}
if (gransToMerge != null) { if (gransToMerge != null) {
result.setQueryGranularity(QueryGranularity.mergeQueryGranularities(gransToMerge)); result.setQueryGranularity(QueryGranularity.mergeQueryGranularities(gransToMerge));
} }
@ -171,9 +196,12 @@ public class Metadata
if (!Arrays.equals(aggregators, metadata.aggregators)) { if (!Arrays.equals(aggregators, metadata.aggregators)) {
return false; return false;
} }
return !(queryGranularity != null if (timestampSpec != null ? !timestampSpec.equals(metadata.timestampSpec) : metadata.timestampSpec != null) {
? !queryGranularity.equals(metadata.queryGranularity) return false;
: metadata.queryGranularity != null); }
return queryGranularity != null
? queryGranularity.equals(metadata.queryGranularity)
: metadata.queryGranularity == null;
} }
@ -181,7 +209,8 @@ public class Metadata
public int hashCode() public int hashCode()
{ {
int result = container.hashCode(); int result = container.hashCode();
result = 31 * result + (aggregators != null ? Arrays.hashCode(aggregators) : 0); result = 31 * result + Arrays.hashCode(aggregators);
result = 31 * result + (timestampSpec != null ? timestampSpec.hashCode() : 0);
result = 31 * result + (queryGranularity != null ? queryGranularity.hashCode() : 0); result = 31 * result + (queryGranularity != null ? queryGranularity.hashCode() : 0);
return result; return result;
} }
@ -190,9 +219,9 @@ public class Metadata
public String toString() public String toString()
{ {
return "Metadata{" + return "Metadata{" +
"container=" + container + "container=" + container +
", aggregators=" + Arrays.toString(aggregators) + ", aggregators=" + Arrays.toString(aggregators) +
", timestampSpec=" + timestampSpec +
", queryGranularity=" + queryGranularity + ", queryGranularity=" + queryGranularity +
'}'; '}';
} }

View File

@ -410,6 +410,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
this.metadata = new Metadata() this.metadata = new Metadata()
.setAggregators(getCombiningAggregators(metrics)) .setAggregators(getCombiningAggregators(metrics))
.setTimestampSpec(incrementalIndexSchema.getTimestampSpec())
.setQueryGranularity(this.gran); .setQueryGranularity(this.gran);
this.aggs = initAggs(metrics, rowSupplier, deserializeComplexMetrics); this.aggs = initAggs(metrics, rowSupplier, deserializeComplexMetrics);

View File

@ -21,8 +21,9 @@ package io.druid.segment.incremental;
import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.InputRowParser; import io.druid.data.input.impl.InputRowParser;
import io.druid.granularity.QueryGranularity; import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularities; import io.druid.granularity.QueryGranularities;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
/** /**
@ -30,18 +31,21 @@ import io.druid.query.aggregation.AggregatorFactory;
public class IncrementalIndexSchema public class IncrementalIndexSchema
{ {
private final long minTimestamp; private final long minTimestamp;
private final TimestampSpec timestampSpec;
private final QueryGranularity gran; private final QueryGranularity gran;
private final DimensionsSpec dimensionsSpec; private final DimensionsSpec dimensionsSpec;
private final AggregatorFactory[] metrics; private final AggregatorFactory[] metrics;
public IncrementalIndexSchema( public IncrementalIndexSchema(
long minTimestamp, long minTimestamp,
TimestampSpec timestampSpec,
QueryGranularity gran, QueryGranularity gran,
DimensionsSpec dimensionsSpec, DimensionsSpec dimensionsSpec,
AggregatorFactory[] metrics AggregatorFactory[] metrics
) )
{ {
this.minTimestamp = minTimestamp; this.minTimestamp = minTimestamp;
this.timestampSpec = timestampSpec;
this.gran = gran; this.gran = gran;
this.dimensionsSpec = dimensionsSpec; this.dimensionsSpec = dimensionsSpec;
this.metrics = metrics; this.metrics = metrics;
@ -52,6 +56,11 @@ public class IncrementalIndexSchema
return minTimestamp; return minTimestamp;
} }
public TimestampSpec getTimestampSpec()
{
return timestampSpec;
}
public QueryGranularity getGran() public QueryGranularity getGran()
{ {
return gran; return gran;
@ -70,6 +79,7 @@ public class IncrementalIndexSchema
public static class Builder public static class Builder
{ {
private long minTimestamp; private long minTimestamp;
private TimestampSpec timestampSpec;
private QueryGranularity gran; private QueryGranularity gran;
private DimensionsSpec dimensionsSpec; private DimensionsSpec dimensionsSpec;
private AggregatorFactory[] metrics; private AggregatorFactory[] metrics;
@ -88,6 +98,24 @@ public class IncrementalIndexSchema
return this; return this;
} }
public Builder withTimestampSpec(TimestampSpec timestampSpec)
{
this.timestampSpec = timestampSpec;
return this;
}
public Builder withTimestampSpec(InputRowParser parser)
{
if (parser != null
&& parser.getParseSpec() != null
&& parser.getParseSpec().getTimestampSpec() != null) {
this.timestampSpec = parser.getParseSpec().getTimestampSpec();
} else {
this.timestampSpec = new TimestampSpec(null, null, null);
}
return this;
}
public Builder withQueryGranularity(QueryGranularity gran) public Builder withQueryGranularity(QueryGranularity gran)
{ {
this.gran = gran; this.gran = gran;
@ -122,7 +150,7 @@ public class IncrementalIndexSchema
public IncrementalIndexSchema build() public IncrementalIndexSchema build()
{ {
return new IncrementalIndexSchema( return new IncrementalIndexSchema(
minTimestamp, gran, dimensionsSpec, metrics minTimestamp, timestampSpec, gran, dimensionsSpec, metrics
); );
} }
} }

View File

@ -86,6 +86,7 @@ public class SegmentMetadataQueryQueryToolChestTest
), 71982, ), 71982,
100, 100,
null, null,
null,
null null
); );
@ -115,6 +116,7 @@ public class SegmentMetadataQueryQueryToolChestTest
"foo", new LongSumAggregatorFactory("foo", "foo"), "foo", new LongSumAggregatorFactory("foo", "foo"),
"baz", new DoubleSumAggregatorFactory("baz", "baz") "baz", new DoubleSumAggregatorFactory("baz", "baz")
), ),
null,
null null
); );
final SegmentAnalysis analysis2 = new SegmentAnalysis( final SegmentAnalysis analysis2 = new SegmentAnalysis(
@ -127,6 +129,7 @@ public class SegmentMetadataQueryQueryToolChestTest
"foo", new LongSumAggregatorFactory("foo", "foo"), "foo", new LongSumAggregatorFactory("foo", "foo"),
"bar", new DoubleSumAggregatorFactory("bar", "bar") "bar", new DoubleSumAggregatorFactory("bar", "bar")
), ),
null,
null null
); );
@ -158,6 +161,7 @@ public class SegmentMetadataQueryQueryToolChestTest
0, 0,
0, 0,
null, null,
null,
null null
); );
final SegmentAnalysis analysis2 = new SegmentAnalysis( final SegmentAnalysis analysis2 = new SegmentAnalysis(
@ -170,6 +174,7 @@ public class SegmentMetadataQueryQueryToolChestTest
"foo", new LongSumAggregatorFactory("foo", "foo"), "foo", new LongSumAggregatorFactory("foo", "foo"),
"bar", new DoubleSumAggregatorFactory("bar", "bar") "bar", new DoubleSumAggregatorFactory("bar", "bar")
), ),
null,
null null
); );
@ -193,6 +198,7 @@ public class SegmentMetadataQueryQueryToolChestTest
0, 0,
0, 0,
null, null,
null,
null null
); );
final SegmentAnalysis analysis2 = new SegmentAnalysis( final SegmentAnalysis analysis2 = new SegmentAnalysis(
@ -202,6 +208,7 @@ public class SegmentMetadataQueryQueryToolChestTest
0, 0,
0, 0,
null, null,
null,
null null
); );
@ -222,6 +229,7 @@ public class SegmentMetadataQueryQueryToolChestTest
"foo", new LongSumAggregatorFactory("foo", "foo"), "foo", new LongSumAggregatorFactory("foo", "foo"),
"bar", new DoubleSumAggregatorFactory("bar", "bar") "bar", new DoubleSumAggregatorFactory("bar", "bar")
), ),
null,
null null
); );
final SegmentAnalysis analysis2 = new SegmentAnalysis( final SegmentAnalysis analysis2 = new SegmentAnalysis(
@ -235,6 +243,7 @@ public class SegmentMetadataQueryQueryToolChestTest
"bar", new DoubleMaxAggregatorFactory("bar", "bar"), "bar", new DoubleMaxAggregatorFactory("bar", "bar"),
"baz", new LongMaxAggregatorFactory("baz", "baz") "baz", new LongMaxAggregatorFactory("baz", "baz")
), ),
null,
null null
); );

View File

@ -28,6 +28,7 @@ import com.google.common.collect.Maps;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
import io.druid.common.utils.JodaUtils; import io.druid.common.utils.JodaUtils;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularities; import io.druid.granularity.QueryGranularities;
import io.druid.jackson.DefaultObjectMapper; import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.BySegmentResultValue; import io.druid.query.BySegmentResultValue;
@ -181,6 +182,7 @@ public class SegmentMetadataQueryTest
), mmap1 ? 71982 : 72755, ), mmap1 ? 71982 : 72755,
1209, 1209,
null, null,
null,
null null
); );
expectedSegmentAnalysis2 = new SegmentAnalysis( expectedSegmentAnalysis2 = new SegmentAnalysis(
@ -223,6 +225,7 @@ public class SegmentMetadataQueryTest
), mmap2 ? 71982 : 72755, ), mmap2 ? 71982 : 72755,
1209, 1209,
null, null,
null,
null null
); );
} }
@ -270,6 +273,7 @@ public class SegmentMetadataQueryTest
0, 0,
expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(), expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(),
null, null,
null,
null null
); );
@ -337,6 +341,7 @@ public class SegmentMetadataQueryTest
0, 0,
expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(), expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(),
null, null,
null,
null null
); );
@ -453,6 +458,7 @@ public class SegmentMetadataQueryTest
expectedSegmentAnalysis1.getSize() + expectedSegmentAnalysis2.getSize(), expectedSegmentAnalysis1.getSize() + expectedSegmentAnalysis2.getSize(),
expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(), expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(),
null, null,
null,
null null
); );
@ -503,6 +509,7 @@ public class SegmentMetadataQueryTest
0, 0,
expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(), expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(),
null, null,
null,
null null
); );
@ -564,6 +571,7 @@ public class SegmentMetadataQueryTest
0, 0,
expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(), expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(),
expectedAggregators, expectedAggregators,
null,
null null
); );
@ -600,6 +608,63 @@ public class SegmentMetadataQueryTest
exec.shutdownNow(); exec.shutdownNow();
} }
@Test
public void testSegmentMetadataQueryWithTimestampSpecMerge()
{
SegmentAnalysis mergedSegmentAnalysis = new SegmentAnalysis(
differentIds ? "merged" : "testSegment",
null,
ImmutableMap.of(
"placement",
new ColumnAnalysis(
ValueType.STRING.toString(),
false,
0,
0,
null,
null,
null
)
),
0,
expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(),
null,
new TimestampSpec("ds", "auto", null),
null
);
QueryToolChest toolChest = FACTORY.getToolchest();
ExecutorService exec = Executors.newCachedThreadPool();
QueryRunner myRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
FACTORY.mergeRunners(
MoreExecutors.sameThreadExecutor(),
Lists.<QueryRunner<SegmentAnalysis>>newArrayList(
toolChest.preMergeQueryDecoration(runner1),
toolChest.preMergeQueryDecoration(runner2)
)
)
),
toolChest
);
TestHelper.assertExpectedObjects(
ImmutableList.of(mergedSegmentAnalysis),
myRunner.run(
Druids.newSegmentMetadataQueryBuilder()
.dataSource("testing")
.intervals("2013/2014")
.toInclude(new ListColumnIncluderator(Arrays.asList("placement")))
.analysisTypes(SegmentMetadataQuery.AnalysisType.TIMESTAMPSPEC)
.merge(true)
.build(),
Maps.newHashMap()
),
"failed SegmentMetadata merging query"
);
exec.shutdownNow();
}
@Test @Test
public void testSegmentMetadataQueryWithQueryGranularityMerge() public void testSegmentMetadataQueryWithQueryGranularityMerge()
@ -622,6 +687,7 @@ public class SegmentMetadataQueryTest
0, 0,
expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(), expectedSegmentAnalysis1.getNumRows() + expectedSegmentAnalysis2.getNumRows(),
null, null,
null,
QueryGranularities.NONE QueryGranularities.NONE
); );

View File

@ -126,6 +126,7 @@ public class SegmentMetadataUnionQueryTest
mmap ? 287928 : 291020, mmap ? 287928 : 291020,
4836, 4836,
null, null,
null,
null null
); );
SegmentMetadataQuery query = new Druids.SegmentMetadataQueryBuilder() SegmentMetadataQuery query = new Druids.SegmentMetadataQueryBuilder()

View File

@ -1765,12 +1765,12 @@ public class IndexMergerTest
private IncrementalIndex getIndexWithDims(List<String> dims) private IncrementalIndex getIndexWithDims(List<String> dims)
{ {
IncrementalIndexSchema schema = new IncrementalIndexSchema( IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder()
0L, .withMinTimestamp(0L)
QueryGranularities.NONE, .withQueryGranularity(QueryGranularities.NONE)
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(dims), null, null), .withDimensionsSpec(new DimensionsSpec(DimensionsSpec.getDefaultSchemas(dims), null, null))
new AggregatorFactory[]{new CountAggregatorFactory("count")} .withMetrics(new AggregatorFactory[]{new CountAggregatorFactory("count")})
); .build();
return new OnheapIncrementalIndex(schema, true, 1000); return new OnheapIncrementalIndex(schema, true, 1000);
} }

View File

@ -21,6 +21,7 @@ package io.druid.segment;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularities; import io.druid.granularity.QueryGranularities;
import io.druid.jackson.DefaultObjectMapper; import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
@ -78,11 +79,13 @@ public class MetadataTest
Metadata m1 = new Metadata(); Metadata m1 = new Metadata();
m1.put("k", "v"); m1.put("k", "v");
m1.setAggregators(aggs); m1.setAggregators(aggs);
m1.setTimestampSpec(new TimestampSpec("ds", "auto", null));
m1.setQueryGranularity(QueryGranularities.ALL); m1.setQueryGranularity(QueryGranularities.ALL);
Metadata m2 = new Metadata(); Metadata m2 = new Metadata();
m2.put("k", "v"); m2.put("k", "v");
m2.setAggregators(aggs); m2.setAggregators(aggs);
m2.setTimestampSpec(new TimestampSpec("ds", "auto", null));
m2.setQueryGranularity(QueryGranularities.ALL); m2.setQueryGranularity(QueryGranularities.ALL);
Metadata merged = new Metadata(); Metadata merged = new Metadata();
@ -92,6 +95,7 @@ public class MetadataTest
new LongMaxAggregatorFactory("n", "n") new LongMaxAggregatorFactory("n", "n")
} }
); );
merged.setTimestampSpec(new TimestampSpec("ds", "auto", null));
merged.setQueryGranularity(QueryGranularities.ALL); merged.setQueryGranularity(QueryGranularities.ALL);
Assert.assertEquals(merged, Metadata.merge(ImmutableList.of(m1, m2), null)); Assert.assertEquals(merged, Metadata.merge(ImmutableList.of(m1, m2), null));
@ -102,6 +106,7 @@ public class MetadataTest
metadataToBeMerged.add(null); metadataToBeMerged.add(null);
merged.setAggregators(null); merged.setAggregators(null);
merged.setTimestampSpec(null);
merged.setQueryGranularity(null); merged.setQueryGranularity(null);
Assert.assertEquals(merged, Metadata.merge(metadataToBeMerged, null)); Assert.assertEquals(merged, Metadata.merge(metadataToBeMerged, null));
@ -116,6 +121,7 @@ public class MetadataTest
Metadata.merge(metadataToBeMerged, explicitAggs) Metadata.merge(metadataToBeMerged, explicitAggs)
); );
merged.setTimestampSpec(new TimestampSpec("ds", "auto", null));
merged.setQueryGranularity(QueryGranularities.ALL); merged.setQueryGranularity(QueryGranularities.ALL);
Assert.assertEquals( Assert.assertEquals(
merged, merged,

View File

@ -178,6 +178,7 @@ public class TestIndex
{ {
final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder() final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder()
.withMinTimestamp(new DateTime("2011-01-12T00:00:00.000Z").getMillis()) .withMinTimestamp(new DateTime("2011-01-12T00:00:00.000Z").getMillis())
.withTimestampSpec(new TimestampSpec("ds", "auto", null))
.withQueryGranularity(QueryGranularities.NONE) .withQueryGranularity(QueryGranularities.NONE)
.withMetrics(METRIC_AGGS) .withMetrics(METRIC_AGGS)
.build(); .build();

View File

@ -87,12 +87,12 @@ public class IncrementalIndexTest
new SelectorDimFilter("billy", "A", null) new SelectorDimFilter("billy", "A", null)
) )
}; };
final IncrementalIndexSchema schema = new IncrementalIndexSchema( final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder()
0, .withMinTimestamp(0)
QueryGranularities.MINUTE, .withQueryGranularity(QueryGranularities.MINUTE)
dimensions, .withDimensionsSpec(dimensions)
metrics .withMetrics(metrics)
); .build();
final List<Object[]> constructors = Lists.newArrayList(); final List<Object[]> constructors = Lists.newArrayList();
for (final Boolean sortFacts : ImmutableList.of(false, true)) { for (final Boolean sortFacts : ImmutableList.of(false, true)) {

View File

@ -232,6 +232,7 @@ public class Sink implements Iterable<FireHydrant>
{ {
final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder() final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
.withMinTimestamp(minTimestamp) .withMinTimestamp(minTimestamp)
.withTimestampSpec(schema.getParser())
.withQueryGranularity(schema.getGranularitySpec().getQueryGranularity()) .withQueryGranularity(schema.getGranularitySpec().getQueryGranularity())
.withDimensionsSpec(schema.getParser()) .withDimensionsSpec(schema.getParser())
.withMetrics(schema.getAggregators()) .withMetrics(schema.getAggregators())