From 491f8cca81349662a89444f00a61d393b6f8cd9f Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Thu, 11 Jan 2018 12:39:33 -0800 Subject: [PATCH] fix timewarp query results when using timezones and crossing DST transitions (#5157) * timewarp and timezones changes: * `TimewarpOperator` will now compensate for daylight savings time shifts between date translation ranges for queries using a `PeriodGranularity` with a timezone defined * introduces a new abstract query type `TimeBucketedQuery` for all queries which have a `Granularity` (100% not attached to this name). `GroupByQuery`, `SearchQuery`, `SelectQuery`, `TimeseriesQuery`, and `TopNQuery` all extend `TimeBucke tedQuery`, cutting down on some duplicate code and providing a mechanism for `TimewarpOperator` (and anything else) that needs to be aware of granularity * move precondition check to TimeBucketedQuery, add Granularities.nullToAll, add getTimezone to TimeBucketQuery * formatting * more formatting * unused import * changes: * add 'getGranularity' and 'getTimezone' to 'Query' interface * merge 'TimeBucketedQuery' into 'BaseQuery' * fixup tests from resulting serialization changes * dedupe * fix after merge * suppress warning --- .../common/granularity/Granularities.java | 4 + .../main/java/io/druid/query/BaseQuery.java | 71 +++++---- .../src/main/java/io/druid/query/Query.java | 8 ++ .../java/io/druid/query/TimewarpOperator.java | 8 +- .../io/druid/query/groupby/GroupByQuery.java | 22 +-- .../io/druid/query/search/SearchQuery.java | 30 ++-- .../io/druid/query/select/SelectQuery.java | 34 ++--- .../query/timeseries/TimeseriesQuery.java | 15 +- .../java/io/druid/query/topn/TopNQuery.java | 14 +- .../io/druid/query/TimewarpOperatorTest.java | 136 +++++++++++++++++- .../druid/query/scan/ScanQuerySpecTest.java | 3 +- .../druid/sql/calcite/CalciteQueryTest.java | 8 +- 12 files changed, 231 insertions(+), 122 deletions(-) diff --git a/java-util/src/main/java/io/druid/java/util/common/granularity/Granularities.java b/java-util/src/main/java/io/druid/java/util/common/granularity/Granularities.java index 599110550f9..de8b6f688d2 100644 --- a/java-util/src/main/java/io/druid/java/util/common/granularity/Granularities.java +++ b/java-util/src/main/java/io/druid/java/util/common/granularity/Granularities.java @@ -41,4 +41,8 @@ public class Granularities public static final Granularity ALL = GranularityType.ALL.getDefaultGranularity(); public static final Granularity NONE = GranularityType.NONE.getDefaultGranularity(); + public static Granularity nullToAll(Granularity granularity) + { + return granularity == null ? Granularities.ALL : granularity; + } } diff --git a/processing/src/main/java/io/druid/query/BaseQuery.java b/processing/src/main/java/io/druid/query/BaseQuery.java index 7b511097a83..86587d47ae8 100644 --- a/processing/src/main/java/io/druid/query/BaseQuery.java +++ b/processing/src/main/java/io/druid/query/BaseQuery.java @@ -25,12 +25,17 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import io.druid.guice.annotations.ExtensionPoint; +import io.druid.java.util.common.granularity.Granularities; +import io.druid.java.util.common.granularity.Granularity; +import io.druid.java.util.common.granularity.PeriodGranularity; import io.druid.query.spec.QuerySegmentSpec; +import org.joda.time.DateTimeZone; import org.joda.time.Duration; import org.joda.time.Interval; import java.util.List; import java.util.Map; +import java.util.Objects; /** */ @@ -50,6 +55,7 @@ public abstract class BaseQuery> implements Query private final Map context; private final QuerySegmentSpec querySegmentSpec; private volatile Duration duration; + private final Granularity granularity; public BaseQuery( DataSource dataSource, @@ -57,14 +63,27 @@ public abstract class BaseQuery> implements Query boolean descending, Map context ) + { + this(dataSource, querySegmentSpec, descending, context, Granularities.ALL); + } + + public BaseQuery( + DataSource dataSource, + QuerySegmentSpec querySegmentSpec, + boolean descending, + Map context, + Granularity granularity + ) { Preconditions.checkNotNull(dataSource, "dataSource can't be null"); Preconditions.checkNotNull(querySegmentSpec, "querySegmentSpec can't be null"); + Preconditions.checkNotNull(granularity, "Must specify a granularity"); this.dataSource = dataSource; this.context = context; this.querySegmentSpec = querySegmentSpec; this.descending = descending; + this.granularity = granularity; } @JsonProperty @@ -115,6 +134,21 @@ public abstract class BaseQuery> implements Query return duration; } + @Override + @JsonProperty + public Granularity getGranularity() + { + return granularity; + } + + @Override + public DateTimeZone getTimezone() + { + return granularity instanceof PeriodGranularity + ? ((PeriodGranularity) granularity).getTimeZone() + : DateTimeZone.UTC; + } + @Override @JsonProperty public Map getContext() @@ -193,38 +227,19 @@ public abstract class BaseQuery> implements Query if (o == null || getClass() != o.getClass()) { return false; } - - BaseQuery baseQuery = (BaseQuery) o; - - if (descending != baseQuery.descending) { - return false; - } - if (context != null ? !context.equals(baseQuery.context) : baseQuery.context != null) { - return false; - } - if (dataSource != null ? !dataSource.equals(baseQuery.dataSource) : baseQuery.dataSource != null) { - return false; - } - if (duration != null ? !duration.equals(baseQuery.duration) : baseQuery.duration != null) { - return false; - } - if (querySegmentSpec != null - ? !querySegmentSpec.equals(baseQuery.querySegmentSpec) - : baseQuery.querySegmentSpec != null) { - return false; - } - - return true; + BaseQuery baseQuery = (BaseQuery) o; + return descending == baseQuery.descending && + Objects.equals(dataSource, baseQuery.dataSource) && + Objects.equals(context, baseQuery.context) && + Objects.equals(querySegmentSpec, baseQuery.querySegmentSpec) && + Objects.equals(duration, baseQuery.duration) && + Objects.equals(granularity, baseQuery.granularity); } @Override public int hashCode() { - int result = dataSource != null ? dataSource.hashCode() : 0; - result = 31 * result + (descending ? 1 : 0); - result = 31 * result + (context != null ? context.hashCode() : 0); - result = 31 * result + (querySegmentSpec != null ? querySegmentSpec.hashCode() : 0); - result = 31 * result + (duration != null ? duration.hashCode() : 0); - return result; + + return Objects.hash(dataSource, descending, context, querySegmentSpec, duration, granularity); } } diff --git a/processing/src/main/java/io/druid/query/Query.java b/processing/src/main/java/io/druid/query/Query.java index efb9fd500e0..06ff069b205 100644 --- a/processing/src/main/java/io/druid/query/Query.java +++ b/processing/src/main/java/io/druid/query/Query.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.google.common.collect.Ordering; import io.druid.guice.annotations.ExtensionPoint; +import io.druid.java.util.common.granularity.Granularity; import io.druid.query.datasourcemetadata.DataSourceMetadataQuery; import io.druid.query.filter.DimFilter; import io.druid.query.groupby.GroupByQuery; @@ -34,6 +35,7 @@ import io.druid.query.spec.QuerySegmentSpec; import io.druid.query.timeboundary.TimeBoundaryQuery; import io.druid.query.timeseries.TimeseriesQuery; import io.druid.query.topn.TopNQuery; +import org.joda.time.DateTimeZone; import org.joda.time.Duration; import org.joda.time.Interval; @@ -80,6 +82,12 @@ public interface Query Duration getDuration(); + // currently unused, but helping enforce the idea that all queries have a Granularity + @SuppressWarnings("unused") + Granularity getGranularity(); + + DateTimeZone getTimezone(); + Map getContext(); ContextType getContextValue(String key); diff --git a/processing/src/main/java/io/druid/query/TimewarpOperator.java b/processing/src/main/java/io/druid/query/TimewarpOperator.java index 64ed0cdd9af..cee9047e5bf 100644 --- a/processing/src/main/java/io/druid/query/TimewarpOperator.java +++ b/processing/src/main/java/io/druid/query/TimewarpOperator.java @@ -30,6 +30,7 @@ import io.druid.query.spec.MultipleIntervalSegmentSpec; import io.druid.query.timeboundary.TimeBoundaryQuery; import io.druid.query.timeboundary.TimeBoundaryResultValue; import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; import org.joda.time.Interval; import org.joda.time.Period; @@ -80,7 +81,8 @@ public class TimewarpOperator implements PostProcessingOperator @Override public Sequence run(final QueryPlus queryPlus, final Map responseContext) { - final long offset = computeOffset(now); + final DateTimeZone tz = queryPlus.getQuery().getTimezone(); + final long offset = computeOffset(now, tz); final Interval interval = queryPlus.getQuery().getIntervals().get(0); final Interval modifiedInterval = new Interval( @@ -142,7 +144,7 @@ public class TimewarpOperator implements PostProcessingOperator * * @return the offset between the mapped time and time t */ - protected long computeOffset(final long t) + protected long computeOffset(final long t, final DateTimeZone tz) { // start is the beginning of the last period ending within dataInterval long start = dataInterval.getEndMillis() - periodMillis; @@ -159,6 +161,6 @@ public class TimewarpOperator implements PostProcessingOperator tOffset += periodMillis; } tOffset += start; - return tOffset - t; + return tOffset - t - (tz.getOffset(tOffset) - tz.getOffset(t)); } } diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java index 2b8bd7f5de8..9ca527ea17a 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java @@ -96,7 +96,6 @@ public class GroupByQuery extends BaseQuery private final LimitSpec limitSpec; private final HavingSpec havingSpec; private final DimFilter dimFilter; - private final Granularity granularity; private final List dimensions; private final List aggregatorSpecs; private final List postAggregatorSpecs; @@ -171,15 +170,15 @@ public class GroupByQuery extends BaseQuery final Map context ) { - super(dataSource, querySegmentSpec, false, context); + super(dataSource, querySegmentSpec, false, context, granularity); this.virtualColumns = VirtualColumns.nullToEmpty(virtualColumns); this.dimFilter = dimFilter; - this.granularity = granularity; this.dimensions = dimensions == null ? ImmutableList.of() : dimensions; for (DimensionSpec spec : this.dimensions) { Preconditions.checkArgument(spec != null, "dimensions has null DimensionSpec"); } + this.aggregatorSpecs = aggregatorSpecs == null ? ImmutableList.of() : aggregatorSpecs; this.postAggregatorSpecs = Queries.prepareAggregations( this.dimensions.stream().map(DimensionSpec::getOutputName).collect(Collectors.toList()), @@ -189,7 +188,6 @@ public class GroupByQuery extends BaseQuery this.havingSpec = havingSpec; this.limitSpec = LimitSpec.nullToNoopLimitSpec(limitSpec); - Preconditions.checkNotNull(this.granularity, "Must specify a granularity"); // Verify no duplicate names between dimensions, aggregators, and postAggregators. // They will all end up in the same namespace in the returned Rows and we can't have them clobbering each other. @@ -214,12 +212,6 @@ public class GroupByQuery extends BaseQuery return dimFilter; } - @JsonProperty - public Granularity getGranularity() - { - return granularity; - } - @JsonProperty public List getDimensions() { @@ -518,12 +510,12 @@ public class GroupByQuery extends BaseQuery private Comparator getTimeComparator(boolean granular) { - if (Granularities.ALL.equals(granularity)) { + if (Granularities.ALL.equals(getGranularity())) { return null; } else if (granular) { return (lhs, rhs) -> Longs.compare( - granularity.bucketStart(lhs.getTimestamp()).getMillis(), - granularity.bucketStart(rhs.getTimestamp()).getMillis() + getGranularity().bucketStart(lhs.getTimestamp()).getMillis(), + getGranularity().bucketStart(rhs.getTimestamp()).getMillis() ); } else { return NON_GRANULAR_TIME_COMP; @@ -990,7 +982,7 @@ public class GroupByQuery extends BaseQuery ", virtualColumns=" + virtualColumns + ", limitSpec=" + limitSpec + ", dimFilter=" + dimFilter + - ", granularity=" + granularity + + ", granularity=" + getGranularity() + ", dimensions=" + dimensions + ", aggregatorSpecs=" + aggregatorSpecs + ", postAggregatorSpecs=" + postAggregatorSpecs + @@ -1015,7 +1007,6 @@ public class GroupByQuery extends BaseQuery Objects.equals(limitSpec, that.limitSpec) && Objects.equals(havingSpec, that.havingSpec) && Objects.equals(dimFilter, that.dimFilter) && - Objects.equals(granularity, that.granularity) && Objects.equals(dimensions, that.dimensions) && Objects.equals(aggregatorSpecs, that.aggregatorSpecs) && Objects.equals(postAggregatorSpecs, that.postAggregatorSpecs); @@ -1030,7 +1021,6 @@ public class GroupByQuery extends BaseQuery limitSpec, havingSpec, dimFilter, - granularity, dimensions, aggregatorSpecs, postAggregatorSpecs diff --git a/processing/src/main/java/io/druid/query/search/SearchQuery.java b/processing/src/main/java/io/druid/query/search/SearchQuery.java index f9cd866e1f7..df298d55c5b 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQuery.java +++ b/processing/src/main/java/io/druid/query/search/SearchQuery.java @@ -45,7 +45,6 @@ public class SearchQuery extends BaseQuery> private final DimFilter dimFilter; private final SearchSortSpec sortSpec; - private final Granularity granularity; private final List dimensions; private final SearchQuerySpec querySpec; private final int limit; @@ -63,12 +62,11 @@ public class SearchQuery extends BaseQuery> @JsonProperty("context") Map context ) { - super(dataSource, querySegmentSpec, false, context); + super(dataSource, querySegmentSpec, false, context, Granularities.nullToAll(granularity)); Preconditions.checkNotNull(querySegmentSpec, "Must specify an interval"); this.dimFilter = dimFilter; this.sortSpec = sortSpec == null ? DEFAULT_SORT_SPEC : sortSpec; - this.granularity = granularity == null ? Granularities.ALL : granularity; this.limit = (limit == 0) ? 1000 : limit; this.dimensions = dimensions; this.querySpec = querySpec == null ? new AllSearchQuerySpec() : querySpec; @@ -122,12 +120,6 @@ public class SearchQuery extends BaseQuery> return dimFilter; } - @JsonProperty - public Granularity getGranularity() - { - return granularity; - } - @JsonProperty public int getLimit() { @@ -161,14 +153,14 @@ public class SearchQuery extends BaseQuery> public String toString() { return "SearchQuery{" + - "dataSource='" + getDataSource() + '\'' + - ", dimFilter=" + dimFilter + - ", granularity='" + granularity + '\'' + - ", dimensions=" + dimensions + - ", querySpec=" + querySpec + - ", querySegmentSpec=" + getQuerySegmentSpec() + - ", limit=" + limit + - '}'; + "dataSource='" + getDataSource() + '\'' + + ", dimFilter=" + dimFilter + + ", granularity='" + getGranularity() + '\'' + + ", dimensions=" + dimensions + + ", querySpec=" + querySpec + + ", querySegmentSpec=" + getQuerySegmentSpec() + + ", limit=" + limit + + '}'; } @Override @@ -195,9 +187,6 @@ public class SearchQuery extends BaseQuery> if (dimensions != null ? !dimensions.equals(that.dimensions) : that.dimensions != null) { return false; } - if (granularity != null ? !granularity.equals(that.granularity) : that.granularity != null) { - return false; - } if (querySpec != null ? !querySpec.equals(that.querySpec) : that.querySpec != null) { return false; } @@ -214,7 +203,6 @@ public class SearchQuery extends BaseQuery> int result = super.hashCode(); result = 31 * result + (dimFilter != null ? dimFilter.hashCode() : 0); result = 31 * result + (sortSpec != null ? sortSpec.hashCode() : 0); - result = 31 * result + (granularity != null ? granularity.hashCode() : 0); result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0); result = 31 * result + (querySpec != null ? querySpec.hashCode() : 0); result = 31 * result + limit; diff --git a/processing/src/main/java/io/druid/query/select/SelectQuery.java b/processing/src/main/java/io/druid/query/select/SelectQuery.java index 6676777ba3d..2bc972b9fac 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQuery.java +++ b/processing/src/main/java/io/druid/query/select/SelectQuery.java @@ -45,7 +45,6 @@ import java.util.Objects; public class SelectQuery extends BaseQuery> { private final DimFilter dimFilter; - private final Granularity granularity; private final List dimensions; private final List metrics; private final VirtualColumns virtualColumns; @@ -65,9 +64,8 @@ public class SelectQuery extends BaseQuery> @JsonProperty("context") Map context ) { - super(dataSource, querySegmentSpec, descending, context); + super(dataSource, querySegmentSpec, descending, context, Granularities.nullToAll(granularity)); this.dimFilter = dimFilter; - this.granularity = granularity == null ? Granularities.ALL : granularity; this.dimensions = dimensions; this.virtualColumns = VirtualColumns.nullToEmpty(virtualColumns); this.metrics = metrics; @@ -111,12 +109,6 @@ public class SelectQuery extends BaseQuery> return dimFilter; } - @JsonProperty - public Granularity getGranularity() - { - return granularity; - } - @JsonProperty public List getDimensions() { @@ -179,16 +171,16 @@ public class SelectQuery extends BaseQuery> public String toString() { return "SelectQuery{" + - "dataSource='" + getDataSource() + '\'' + - ", querySegmentSpec=" + getQuerySegmentSpec() + - ", descending=" + isDescending() + - ", dimFilter=" + dimFilter + - ", granularity=" + granularity + - ", dimensions=" + dimensions + - ", metrics=" + metrics + - ", virtualColumns=" + virtualColumns + - ", pagingSpec=" + pagingSpec + - '}'; + "dataSource='" + getDataSource() + '\'' + + ", querySegmentSpec=" + getQuerySegmentSpec() + + ", descending=" + isDescending() + + ", dimFilter=" + dimFilter + + ", granularity=" + getGranularity() + + ", dimensions=" + dimensions + + ", metrics=" + metrics + + ", virtualColumns=" + virtualColumns + + ", pagingSpec=" + pagingSpec + + '}'; } @Override @@ -209,9 +201,6 @@ public class SelectQuery extends BaseQuery> if (!Objects.equals(dimFilter, that.dimFilter)) { return false; } - if (!Objects.equals(granularity, that.granularity)) { - return false; - } if (!Objects.equals(dimensions, that.dimensions)) { return false; } @@ -233,7 +222,6 @@ public class SelectQuery extends BaseQuery> { int result = super.hashCode(); result = 31 * result + (dimFilter != null ? dimFilter.hashCode() : 0); - result = 31 * result + (granularity != null ? granularity.hashCode() : 0); result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0); result = 31 * result + (metrics != null ? metrics.hashCode() : 0); result = 31 * result + (virtualColumns != null ? virtualColumns.hashCode() : 0); diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java index 2cfa00b6235..d1ac95cdadc 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java @@ -47,7 +47,6 @@ public class TimeseriesQuery extends BaseQuery> { private final VirtualColumns virtualColumns; private final DimFilter dimFilter; - private final Granularity granularity; private final List aggregatorSpecs; private final List postAggregatorSpecs; @@ -64,11 +63,10 @@ public class TimeseriesQuery extends BaseQuery> @JsonProperty("context") Map context ) { - super(dataSource, querySegmentSpec, descending, context); + super(dataSource, querySegmentSpec, descending, context, granularity); this.virtualColumns = VirtualColumns.nullToEmpty(virtualColumns); this.dimFilter = dimFilter; - this.granularity = granularity; this.aggregatorSpecs = aggregatorSpecs == null ? ImmutableList.of() : aggregatorSpecs; this.postAggregatorSpecs = Queries.prepareAggregations( ImmutableList.of(), @@ -107,12 +105,6 @@ public class TimeseriesQuery extends BaseQuery> return dimFilter; } - @JsonProperty - public Granularity getGranularity() - { - return granularity; - } - @JsonProperty("aggregations") public List getAggregatorSpecs() { @@ -168,7 +160,7 @@ public class TimeseriesQuery extends BaseQuery> ", descending=" + isDescending() + ", virtualColumns=" + virtualColumns + ", dimFilter=" + dimFilter + - ", granularity='" + granularity + '\'' + + ", granularity='" + getGranularity() + '\'' + ", aggregatorSpecs=" + aggregatorSpecs + ", postAggregatorSpecs=" + postAggregatorSpecs + ", context=" + getContext() + @@ -190,7 +182,6 @@ public class TimeseriesQuery extends BaseQuery> final TimeseriesQuery that = (TimeseriesQuery) o; return Objects.equals(virtualColumns, that.virtualColumns) && Objects.equals(dimFilter, that.dimFilter) && - Objects.equals(granularity, that.granularity) && Objects.equals(aggregatorSpecs, that.aggregatorSpecs) && Objects.equals(postAggregatorSpecs, that.postAggregatorSpecs); } @@ -198,6 +189,6 @@ public class TimeseriesQuery extends BaseQuery> @Override public int hashCode() { - return Objects.hash(super.hashCode(), virtualColumns, dimFilter, granularity, aggregatorSpecs, postAggregatorSpecs); + return Objects.hash(super.hashCode(), virtualColumns, dimFilter, aggregatorSpecs, postAggregatorSpecs); } } diff --git a/processing/src/main/java/io/druid/query/topn/TopNQuery.java b/processing/src/main/java/io/druid/query/topn/TopNQuery.java index 764990b063e..844142d01c5 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQuery.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQuery.java @@ -51,7 +51,6 @@ public class TopNQuery extends BaseQuery> private final TopNMetricSpec topNMetricSpec; private final int threshold; private final DimFilter dimFilter; - private final Granularity granularity; private final List aggregatorSpecs; private final List postAggregatorSpecs; @@ -70,7 +69,7 @@ public class TopNQuery extends BaseQuery> @JsonProperty("context") Map context ) { - super(dataSource, querySegmentSpec, false, context); + super(dataSource, querySegmentSpec, false, context, granularity); this.virtualColumns = VirtualColumns.nullToEmpty(virtualColumns); this.dimensionSpec = dimensionSpec; @@ -78,7 +77,6 @@ public class TopNQuery extends BaseQuery> this.threshold = threshold; this.dimFilter = dimFilter; - this.granularity = granularity; this.aggregatorSpecs = aggregatorSpecs == null ? ImmutableList.of() : aggregatorSpecs; this.postAggregatorSpecs = Queries.prepareAggregations( ImmutableList.of(dimensionSpec.getOutputName()), @@ -143,12 +141,6 @@ public class TopNQuery extends BaseQuery> return dimFilter; } - @JsonProperty - public Granularity getGranularity() - { - return granularity; - } - @JsonProperty("aggregations") public List getAggregatorSpecs() { @@ -218,7 +210,7 @@ public class TopNQuery extends BaseQuery> ", querySegmentSpec=" + getQuerySegmentSpec() + ", virtualColumns=" + virtualColumns + ", dimFilter=" + dimFilter + - ", granularity='" + granularity + '\'' + + ", granularity='" + getGranularity() + '\'' + ", aggregatorSpecs=" + aggregatorSpecs + ", postAggregatorSpecs=" + postAggregatorSpecs + '}'; @@ -242,7 +234,6 @@ public class TopNQuery extends BaseQuery> Objects.equals(dimensionSpec, topNQuery.dimensionSpec) && Objects.equals(topNMetricSpec, topNQuery.topNMetricSpec) && Objects.equals(dimFilter, topNQuery.dimFilter) && - Objects.equals(granularity, topNQuery.granularity) && Objects.equals(aggregatorSpecs, topNQuery.aggregatorSpecs) && Objects.equals(postAggregatorSpecs, topNQuery.postAggregatorSpecs); } @@ -257,7 +248,6 @@ public class TopNQuery extends BaseQuery> topNMetricSpec, threshold, dimFilter, - granularity, aggregatorSpecs, postAggregatorSpecs ); diff --git a/processing/src/test/java/io/druid/query/TimewarpOperatorTest.java b/processing/src/test/java/io/druid/query/TimewarpOperatorTest.java index d0288684c0b..d4f9bc2f8b4 100644 --- a/processing/src/test/java/io/druid/query/TimewarpOperatorTest.java +++ b/processing/src/test/java/io/druid/query/TimewarpOperatorTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import io.druid.java.util.common.DateTimes; +import io.druid.java.util.common.granularity.PeriodGranularity; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.query.aggregation.AggregatorFactory; @@ -31,6 +32,7 @@ import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.timeboundary.TimeBoundaryResultValue; import io.druid.query.timeseries.TimeseriesResultValue; import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; import org.joda.time.Interval; import org.joda.time.Period; import org.junit.Assert; @@ -57,14 +59,24 @@ public class TimewarpOperatorTest final DateTime t = DateTimes.of("2014-01-23"); final DateTime tOffset = DateTimes.of("2014-01-09"); - Assert.assertEquals(tOffset, t.plus(testOperator.computeOffset(t.getMillis()))); + Assert.assertEquals(tOffset, t.plus(testOperator.computeOffset(t.getMillis(), DateTimeZone.UTC))); } { final DateTime t = DateTimes.of("2014-08-02"); final DateTime tOffset = DateTimes.of("2014-01-11"); - Assert.assertEquals(tOffset, t.plus(testOperator.computeOffset(t.getMillis()))); + Assert.assertEquals(tOffset, t.plus(testOperator.computeOffset(t.getMillis(), DateTimeZone.UTC))); + } + + { + final DateTime t = DateTimes.of("2014-08-02T-07"); + final DateTime tOffset = DateTimes.of("2014-01-11T-08"); + + Assert.assertEquals( + tOffset, + t.plus(testOperator.computeOffset(t.getMillis(), DateTimeZone.forID("America/Los_Angeles"))) + ); } } @@ -177,6 +189,126 @@ public class TimewarpOperatorTest } + @Test + public void testPostProcessWithTimezonesAndDstShift() throws Exception + { + QueryRunner> queryRunner = testOperator.postProcess( + new QueryRunner>() + { + @Override + public Sequence> run( + QueryPlus> queryPlus, + Map responseContext + ) + { + return Sequences.simple( + ImmutableList.of( + new Result<>( + DateTimes.of("2014-01-09T-08"), + new TimeseriesResultValue(ImmutableMap.of("metric", 2)) + ), + new Result<>( + DateTimes.of("2014-01-11T-08"), + new TimeseriesResultValue(ImmutableMap.of("metric", 3)) + ), + new Result<>( + queryPlus.getQuery().getIntervals().get(0).getEnd(), + new TimeseriesResultValue(ImmutableMap.of("metric", 5)) + ) + ) + ); + } + }, + DateTimes.of("2014-08-02T-07").getMillis() + ); + + final Query> query = + Druids.newTimeseriesQueryBuilder() + .dataSource("dummy") + .intervals("2014-07-31T-07/2014-08-05T-07") + .granularity(new PeriodGranularity(new Period("P1D"), null, DateTimeZone.forID("America/Los_Angeles"))) + .aggregators(Arrays.asList(new CountAggregatorFactory("count"))) + .build(); + + Assert.assertEquals( + Lists.newArrayList( + new Result<>( + DateTimes.of("2014-07-31T-07"), + new TimeseriesResultValue(ImmutableMap.of("metric", 2)) + ), + new Result<>( + DateTimes.of("2014-08-02T-07"), + new TimeseriesResultValue(ImmutableMap.of("metric", 3)) + ), + new Result<>( + DateTimes.of("2014-08-02T-07"), + new TimeseriesResultValue(ImmutableMap.of("metric", 5)) + ) + ), + queryRunner.run(QueryPlus.wrap(query), CONTEXT).toList() + ); + } + + @Test + public void testPostProcessWithTimezonesAndNoDstShift() throws Exception + { + QueryRunner> queryRunner = testOperator.postProcess( + new QueryRunner>() + { + @Override + public Sequence> run( + QueryPlus> queryPlus, + Map responseContext + ) + { + return Sequences.simple( + ImmutableList.of( + new Result<>( + DateTimes.of("2014-01-09T-07"), + new TimeseriesResultValue(ImmutableMap.of("metric", 2)) + ), + new Result<>( + DateTimes.of("2014-01-11T-07"), + new TimeseriesResultValue(ImmutableMap.of("metric", 3)) + ), + new Result<>( + queryPlus.getQuery().getIntervals().get(0).getEnd(), + new TimeseriesResultValue(ImmutableMap.of("metric", 5)) + ) + ) + ); + } + }, + DateTimes.of("2014-08-02T-07").getMillis() + ); + + final Query> query = + Druids.newTimeseriesQueryBuilder() + .dataSource("dummy") + .intervals("2014-07-31T-07/2014-08-05T-07") + .granularity(new PeriodGranularity(new Period("P1D"), null, DateTimeZone.forID("America/Phoenix"))) + .aggregators(Arrays.asList(new CountAggregatorFactory("count"))) + .build(); + + Assert.assertEquals( + Lists.newArrayList( + new Result<>( + DateTimes.of("2014-07-31T-07"), + new TimeseriesResultValue(ImmutableMap.of("metric", 2)) + ), + new Result<>( + DateTimes.of("2014-08-02T-07"), + new TimeseriesResultValue(ImmutableMap.of("metric", 3)) + ), + new Result<>( + DateTimes.of("2014-08-02T-07"), + new TimeseriesResultValue(ImmutableMap.of("metric", 5)) + ) + ), + queryRunner.run(QueryPlus.wrap(query), CONTEXT).toList() + ); + } + @Test public void testEmptyFutureInterval() throws Exception { diff --git a/processing/src/test/java/io/druid/query/scan/ScanQuerySpecTest.java b/processing/src/test/java/io/druid/query/scan/ScanQuerySpecTest.java index 774a3b798a6..8b903f7b72f 100644 --- a/processing/src/test/java/io/druid/query/scan/ScanQuerySpecTest.java +++ b/processing/src/test/java/io/druid/query/scan/ScanQuerySpecTest.java @@ -57,7 +57,8 @@ public class ScanQuerySpecTest + "\"columns\":[\"market\",\"quality\",\"index\"]," + "\"legacy\":null," + "\"context\":null," - + "\"descending\":false}"; + + "\"descending\":false," + + "\"granularity\":{\"type\":\"all\"}}"; ScanQuery query = new ScanQuery( new TableDataSource(QueryRunnerTestHelper.dataSource), diff --git a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java index 10a45668605..163760cc8d8 100644 --- a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java @@ -553,7 +553,7 @@ public class CalciteQueryTest ImmutableList.of(), ImmutableList.of( new Object[]{ - "DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"limit\":9223372036854775807,\"filter\":null,\"columns\":[\"__time\",\"cnt\",\"dim1\",\"dim2\",\"m1\",\"m2\",\"unique_dim1\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\"},\"descending\":false}], signature=[{__time:LONG, cnt:LONG, dim1:STRING, dim2:STRING, m1:FLOAT, m2:DOUBLE, unique_dim1:COMPLEX}])\n" + "DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"limit\":9223372036854775807,\"filter\":null,\"columns\":[\"__time\",\"cnt\",\"dim1\",\"dim2\",\"m1\",\"m2\",\"unique_dim1\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\"},\"descending\":false,\"granularity\":{\"type\":\"all\"}}], signature=[{__time:LONG, cnt:LONG, dim1:STRING, dim2:STRING, m1:FLOAT, m2:DOUBLE, unique_dim1:COMPLEX}])\n" } ) ); @@ -799,8 +799,8 @@ public class CalciteQueryTest { final String explanation = "BindableJoin(condition=[=($0, $2)], joinType=[inner])\n" - + " DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"limit\":9223372036854775807,\"filter\":{\"type\":\"not\",\"field\":{\"type\":\"selector\",\"dimension\":\"dim1\",\"value\":\"\",\"extractionFn\":null}},\"columns\":[\"dim1\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\"},\"descending\":false}], signature=[{dim1:STRING}])\n" - + " DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"limit\":9223372036854775807,\"filter\":null,\"columns\":[\"dim1\",\"dim2\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\"},\"descending\":false}], signature=[{dim1:STRING, dim2:STRING}])\n"; + + " DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"limit\":9223372036854775807,\"filter\":{\"type\":\"not\",\"field\":{\"type\":\"selector\",\"dimension\":\"dim1\",\"value\":\"\",\"extractionFn\":null}},\"columns\":[\"dim1\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\"},\"descending\":false,\"granularity\":{\"type\":\"all\"}}], signature=[{dim1:STRING}])\n" + + " DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"limit\":9223372036854775807,\"filter\":null,\"columns\":[\"dim1\",\"dim2\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\"},\"descending\":false,\"granularity\":{\"type\":\"all\"}}], signature=[{dim1:STRING, dim2:STRING}])\n"; testQuery( PLANNER_CONFIG_FALLBACK, @@ -6173,7 +6173,7 @@ public class CalciteQueryTest + " BindableFilter(condition=[OR(=($0, 'xxx'), CAST(AND(IS NOT NULL($4), <>($2, 0))):BOOLEAN)])\n" + " BindableJoin(condition=[=($1, $3)], joinType=[left])\n" + " BindableJoin(condition=[true], joinType=[inner])\n" - + " DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"limit\":9223372036854775807,\"filter\":null,\"columns\":[\"dim1\",\"dim2\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\"},\"descending\":false}], signature=[{dim1:STRING, dim2:STRING}])\n" + + " DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"limit\":9223372036854775807,\"filter\":null,\"columns\":[\"dim1\",\"dim2\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\"},\"descending\":false,\"granularity\":{\"type\":\"all\"}}], signature=[{dim1:STRING, dim2:STRING}])\n" + " DruidQueryRel(query=[{\"queryType\":\"timeseries\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"descending\":false,\"virtualColumns\":[],\"filter\":{\"type\":\"like\",\"dimension\":\"dim1\",\"pattern\":\"%bc\",\"escape\":null,\"extractionFn\":null},\"granularity\":{\"type\":\"all\"},\"aggregations\":[{\"type\":\"count\",\"name\":\"a0\"}],\"postAggregations\":[],\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"skipEmptyBuckets\":true,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\"}}], signature=[{a0:LONG}])\n" + " DruidQueryRel(query=[{\"queryType\":\"groupBy\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[{\"type\":\"expression\",\"name\":\"d1:v\",\"expression\":\"1\",\"outputType\":\"LONG\"}],\"filter\":{\"type\":\"like\",\"dimension\":\"dim1\",\"pattern\":\"%bc\",\"escape\":null,\"extractionFn\":null},\"granularity\":{\"type\":\"all\"},\"dimensions\":[{\"type\":\"default\",\"dimension\":\"dim1\",\"outputName\":\"d0\",\"outputType\":\"STRING\"},{\"type\":\"default\",\"dimension\":\"d1:v\",\"outputName\":\"d1\",\"outputType\":\"LONG\"}],\"aggregations\":[],\"postAggregations\":[],\"having\":null,\"limitSpec\":{\"type\":\"NoopLimitSpec\"},\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\"},\"descending\":false}], signature=[{d0:STRING, d1:LONG}])\n";