mirror of https://github.com/apache/druid.git
fix timewarp query results when using timezones and crossing DST transitions (#5157)
* timewarp and timezones changes: * `TimewarpOperator` will now compensate for daylight savings time shifts between date translation ranges for queries using a `PeriodGranularity` with a timezone defined * introduces a new abstract query type `TimeBucketedQuery` for all queries which have a `Granularity` (100% not attached to this name). `GroupByQuery`, `SearchQuery`, `SelectQuery`, `TimeseriesQuery`, and `TopNQuery` all extend `TimeBucke tedQuery`, cutting down on some duplicate code and providing a mechanism for `TimewarpOperator` (and anything else) that needs to be aware of granularity * move precondition check to TimeBucketedQuery, add Granularities.nullToAll, add getTimezone to TimeBucketQuery * formatting * more formatting * unused import * changes: * add 'getGranularity' and 'getTimezone' to 'Query' interface * merge 'TimeBucketedQuery' into 'BaseQuery' * fixup tests from resulting serialization changes * dedupe * fix after merge * suppress warning
This commit is contained in:
parent
3c69717202
commit
491f8cca81
|
@ -41,4 +41,8 @@ public class Granularities
|
|||
public static final Granularity ALL = GranularityType.ALL.getDefaultGranularity();
|
||||
public static final Granularity NONE = GranularityType.NONE.getDefaultGranularity();
|
||||
|
||||
public static Granularity nullToAll(Granularity granularity)
|
||||
{
|
||||
return granularity == null ? Granularities.ALL : granularity;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,12 +25,17 @@ import com.google.common.collect.ImmutableMap;
|
|||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Ordering;
|
||||
import io.druid.guice.annotations.ExtensionPoint;
|
||||
import io.druid.java.util.common.granularity.Granularities;
|
||||
import io.druid.java.util.common.granularity.Granularity;
|
||||
import io.druid.java.util.common.granularity.PeriodGranularity;
|
||||
import io.druid.query.spec.QuerySegmentSpec;
|
||||
import org.joda.time.DateTimeZone;
|
||||
import org.joda.time.Duration;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -50,6 +55,7 @@ public abstract class BaseQuery<T extends Comparable<T>> implements Query<T>
|
|||
private final Map<String, Object> context;
|
||||
private final QuerySegmentSpec querySegmentSpec;
|
||||
private volatile Duration duration;
|
||||
private final Granularity granularity;
|
||||
|
||||
public BaseQuery(
|
||||
DataSource dataSource,
|
||||
|
@ -57,14 +63,27 @@ public abstract class BaseQuery<T extends Comparable<T>> implements Query<T>
|
|||
boolean descending,
|
||||
Map<String, Object> context
|
||||
)
|
||||
{
|
||||
this(dataSource, querySegmentSpec, descending, context, Granularities.ALL);
|
||||
}
|
||||
|
||||
public BaseQuery(
|
||||
DataSource dataSource,
|
||||
QuerySegmentSpec querySegmentSpec,
|
||||
boolean descending,
|
||||
Map<String, Object> context,
|
||||
Granularity granularity
|
||||
)
|
||||
{
|
||||
Preconditions.checkNotNull(dataSource, "dataSource can't be null");
|
||||
Preconditions.checkNotNull(querySegmentSpec, "querySegmentSpec can't be null");
|
||||
Preconditions.checkNotNull(granularity, "Must specify a granularity");
|
||||
|
||||
this.dataSource = dataSource;
|
||||
this.context = context;
|
||||
this.querySegmentSpec = querySegmentSpec;
|
||||
this.descending = descending;
|
||||
this.granularity = granularity;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
|
@ -115,6 +134,21 @@ public abstract class BaseQuery<T extends Comparable<T>> implements Query<T>
|
|||
return duration;
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonProperty
|
||||
public Granularity getGranularity()
|
||||
{
|
||||
return granularity;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DateTimeZone getTimezone()
|
||||
{
|
||||
return granularity instanceof PeriodGranularity
|
||||
? ((PeriodGranularity) granularity).getTimeZone()
|
||||
: DateTimeZone.UTC;
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonProperty
|
||||
public Map<String, Object> getContext()
|
||||
|
@ -193,38 +227,19 @@ public abstract class BaseQuery<T extends Comparable<T>> implements Query<T>
|
|||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
BaseQuery baseQuery = (BaseQuery) o;
|
||||
|
||||
if (descending != baseQuery.descending) {
|
||||
return false;
|
||||
}
|
||||
if (context != null ? !context.equals(baseQuery.context) : baseQuery.context != null) {
|
||||
return false;
|
||||
}
|
||||
if (dataSource != null ? !dataSource.equals(baseQuery.dataSource) : baseQuery.dataSource != null) {
|
||||
return false;
|
||||
}
|
||||
if (duration != null ? !duration.equals(baseQuery.duration) : baseQuery.duration != null) {
|
||||
return false;
|
||||
}
|
||||
if (querySegmentSpec != null
|
||||
? !querySegmentSpec.equals(baseQuery.querySegmentSpec)
|
||||
: baseQuery.querySegmentSpec != null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
BaseQuery<?> baseQuery = (BaseQuery<?>) o;
|
||||
return descending == baseQuery.descending &&
|
||||
Objects.equals(dataSource, baseQuery.dataSource) &&
|
||||
Objects.equals(context, baseQuery.context) &&
|
||||
Objects.equals(querySegmentSpec, baseQuery.querySegmentSpec) &&
|
||||
Objects.equals(duration, baseQuery.duration) &&
|
||||
Objects.equals(granularity, baseQuery.granularity);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
int result = dataSource != null ? dataSource.hashCode() : 0;
|
||||
result = 31 * result + (descending ? 1 : 0);
|
||||
result = 31 * result + (context != null ? context.hashCode() : 0);
|
||||
result = 31 * result + (querySegmentSpec != null ? querySegmentSpec.hashCode() : 0);
|
||||
result = 31 * result + (duration != null ? duration.hashCode() : 0);
|
||||
return result;
|
||||
|
||||
return Objects.hash(dataSource, descending, context, querySegmentSpec, duration, granularity);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes;
|
|||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import com.google.common.collect.Ordering;
|
||||
import io.druid.guice.annotations.ExtensionPoint;
|
||||
import io.druid.java.util.common.granularity.Granularity;
|
||||
import io.druid.query.datasourcemetadata.DataSourceMetadataQuery;
|
||||
import io.druid.query.filter.DimFilter;
|
||||
import io.druid.query.groupby.GroupByQuery;
|
||||
|
@ -34,6 +35,7 @@ import io.druid.query.spec.QuerySegmentSpec;
|
|||
import io.druid.query.timeboundary.TimeBoundaryQuery;
|
||||
import io.druid.query.timeseries.TimeseriesQuery;
|
||||
import io.druid.query.topn.TopNQuery;
|
||||
import org.joda.time.DateTimeZone;
|
||||
import org.joda.time.Duration;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
|
@ -80,6 +82,12 @@ public interface Query<T>
|
|||
|
||||
Duration getDuration();
|
||||
|
||||
// currently unused, but helping enforce the idea that all queries have a Granularity
|
||||
@SuppressWarnings("unused")
|
||||
Granularity getGranularity();
|
||||
|
||||
DateTimeZone getTimezone();
|
||||
|
||||
Map<String, Object> getContext();
|
||||
|
||||
<ContextType> ContextType getContextValue(String key);
|
||||
|
|
|
@ -30,6 +30,7 @@ import io.druid.query.spec.MultipleIntervalSegmentSpec;
|
|||
import io.druid.query.timeboundary.TimeBoundaryQuery;
|
||||
import io.druid.query.timeboundary.TimeBoundaryResultValue;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.DateTimeZone;
|
||||
import org.joda.time.Interval;
|
||||
import org.joda.time.Period;
|
||||
|
||||
|
@ -80,7 +81,8 @@ public class TimewarpOperator<T> implements PostProcessingOperator<T>
|
|||
@Override
|
||||
public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext)
|
||||
{
|
||||
final long offset = computeOffset(now);
|
||||
final DateTimeZone tz = queryPlus.getQuery().getTimezone();
|
||||
final long offset = computeOffset(now, tz);
|
||||
|
||||
final Interval interval = queryPlus.getQuery().getIntervals().get(0);
|
||||
final Interval modifiedInterval = new Interval(
|
||||
|
@ -142,7 +144,7 @@ public class TimewarpOperator<T> implements PostProcessingOperator<T>
|
|||
*
|
||||
* @return the offset between the mapped time and time t
|
||||
*/
|
||||
protected long computeOffset(final long t)
|
||||
protected long computeOffset(final long t, final DateTimeZone tz)
|
||||
{
|
||||
// start is the beginning of the last period ending within dataInterval
|
||||
long start = dataInterval.getEndMillis() - periodMillis;
|
||||
|
@ -159,6 +161,6 @@ public class TimewarpOperator<T> implements PostProcessingOperator<T>
|
|||
tOffset += periodMillis;
|
||||
}
|
||||
tOffset += start;
|
||||
return tOffset - t;
|
||||
return tOffset - t - (tz.getOffset(tOffset) - tz.getOffset(t));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -96,7 +96,6 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
private final LimitSpec limitSpec;
|
||||
private final HavingSpec havingSpec;
|
||||
private final DimFilter dimFilter;
|
||||
private final Granularity granularity;
|
||||
private final List<DimensionSpec> dimensions;
|
||||
private final List<AggregatorFactory> aggregatorSpecs;
|
||||
private final List<PostAggregator> postAggregatorSpecs;
|
||||
|
@ -171,15 +170,15 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
final Map<String, Object> context
|
||||
)
|
||||
{
|
||||
super(dataSource, querySegmentSpec, false, context);
|
||||
super(dataSource, querySegmentSpec, false, context, granularity);
|
||||
|
||||
this.virtualColumns = VirtualColumns.nullToEmpty(virtualColumns);
|
||||
this.dimFilter = dimFilter;
|
||||
this.granularity = granularity;
|
||||
this.dimensions = dimensions == null ? ImmutableList.of() : dimensions;
|
||||
for (DimensionSpec spec : this.dimensions) {
|
||||
Preconditions.checkArgument(spec != null, "dimensions has null DimensionSpec");
|
||||
}
|
||||
|
||||
this.aggregatorSpecs = aggregatorSpecs == null ? ImmutableList.<AggregatorFactory>of() : aggregatorSpecs;
|
||||
this.postAggregatorSpecs = Queries.prepareAggregations(
|
||||
this.dimensions.stream().map(DimensionSpec::getOutputName).collect(Collectors.toList()),
|
||||
|
@ -189,7 +188,6 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
this.havingSpec = havingSpec;
|
||||
this.limitSpec = LimitSpec.nullToNoopLimitSpec(limitSpec);
|
||||
|
||||
Preconditions.checkNotNull(this.granularity, "Must specify a granularity");
|
||||
|
||||
// Verify no duplicate names between dimensions, aggregators, and postAggregators.
|
||||
// They will all end up in the same namespace in the returned Rows and we can't have them clobbering each other.
|
||||
|
@ -214,12 +212,6 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
return dimFilter;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public Granularity getGranularity()
|
||||
{
|
||||
return granularity;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public List<DimensionSpec> getDimensions()
|
||||
{
|
||||
|
@ -518,12 +510,12 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
|
||||
private Comparator<Row> getTimeComparator(boolean granular)
|
||||
{
|
||||
if (Granularities.ALL.equals(granularity)) {
|
||||
if (Granularities.ALL.equals(getGranularity())) {
|
||||
return null;
|
||||
} else if (granular) {
|
||||
return (lhs, rhs) -> Longs.compare(
|
||||
granularity.bucketStart(lhs.getTimestamp()).getMillis(),
|
||||
granularity.bucketStart(rhs.getTimestamp()).getMillis()
|
||||
getGranularity().bucketStart(lhs.getTimestamp()).getMillis(),
|
||||
getGranularity().bucketStart(rhs.getTimestamp()).getMillis()
|
||||
);
|
||||
} else {
|
||||
return NON_GRANULAR_TIME_COMP;
|
||||
|
@ -990,7 +982,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
", virtualColumns=" + virtualColumns +
|
||||
", limitSpec=" + limitSpec +
|
||||
", dimFilter=" + dimFilter +
|
||||
", granularity=" + granularity +
|
||||
", granularity=" + getGranularity() +
|
||||
", dimensions=" + dimensions +
|
||||
", aggregatorSpecs=" + aggregatorSpecs +
|
||||
", postAggregatorSpecs=" + postAggregatorSpecs +
|
||||
|
@ -1015,7 +1007,6 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
Objects.equals(limitSpec, that.limitSpec) &&
|
||||
Objects.equals(havingSpec, that.havingSpec) &&
|
||||
Objects.equals(dimFilter, that.dimFilter) &&
|
||||
Objects.equals(granularity, that.granularity) &&
|
||||
Objects.equals(dimensions, that.dimensions) &&
|
||||
Objects.equals(aggregatorSpecs, that.aggregatorSpecs) &&
|
||||
Objects.equals(postAggregatorSpecs, that.postAggregatorSpecs);
|
||||
|
@ -1030,7 +1021,6 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
limitSpec,
|
||||
havingSpec,
|
||||
dimFilter,
|
||||
granularity,
|
||||
dimensions,
|
||||
aggregatorSpecs,
|
||||
postAggregatorSpecs
|
||||
|
|
|
@ -45,7 +45,6 @@ public class SearchQuery extends BaseQuery<Result<SearchResultValue>>
|
|||
|
||||
private final DimFilter dimFilter;
|
||||
private final SearchSortSpec sortSpec;
|
||||
private final Granularity granularity;
|
||||
private final List<DimensionSpec> dimensions;
|
||||
private final SearchQuerySpec querySpec;
|
||||
private final int limit;
|
||||
|
@ -63,12 +62,11 @@ public class SearchQuery extends BaseQuery<Result<SearchResultValue>>
|
|||
@JsonProperty("context") Map<String, Object> context
|
||||
)
|
||||
{
|
||||
super(dataSource, querySegmentSpec, false, context);
|
||||
super(dataSource, querySegmentSpec, false, context, Granularities.nullToAll(granularity));
|
||||
Preconditions.checkNotNull(querySegmentSpec, "Must specify an interval");
|
||||
|
||||
this.dimFilter = dimFilter;
|
||||
this.sortSpec = sortSpec == null ? DEFAULT_SORT_SPEC : sortSpec;
|
||||
this.granularity = granularity == null ? Granularities.ALL : granularity;
|
||||
this.limit = (limit == 0) ? 1000 : limit;
|
||||
this.dimensions = dimensions;
|
||||
this.querySpec = querySpec == null ? new AllSearchQuerySpec() : querySpec;
|
||||
|
@ -122,12 +120,6 @@ public class SearchQuery extends BaseQuery<Result<SearchResultValue>>
|
|||
return dimFilter;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public Granularity getGranularity()
|
||||
{
|
||||
return granularity;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public int getLimit()
|
||||
{
|
||||
|
@ -163,7 +155,7 @@ public class SearchQuery extends BaseQuery<Result<SearchResultValue>>
|
|||
return "SearchQuery{" +
|
||||
"dataSource='" + getDataSource() + '\'' +
|
||||
", dimFilter=" + dimFilter +
|
||||
", granularity='" + granularity + '\'' +
|
||||
", granularity='" + getGranularity() + '\'' +
|
||||
", dimensions=" + dimensions +
|
||||
", querySpec=" + querySpec +
|
||||
", querySegmentSpec=" + getQuerySegmentSpec() +
|
||||
|
@ -195,9 +187,6 @@ public class SearchQuery extends BaseQuery<Result<SearchResultValue>>
|
|||
if (dimensions != null ? !dimensions.equals(that.dimensions) : that.dimensions != null) {
|
||||
return false;
|
||||
}
|
||||
if (granularity != null ? !granularity.equals(that.granularity) : that.granularity != null) {
|
||||
return false;
|
||||
}
|
||||
if (querySpec != null ? !querySpec.equals(that.querySpec) : that.querySpec != null) {
|
||||
return false;
|
||||
}
|
||||
|
@ -214,7 +203,6 @@ public class SearchQuery extends BaseQuery<Result<SearchResultValue>>
|
|||
int result = super.hashCode();
|
||||
result = 31 * result + (dimFilter != null ? dimFilter.hashCode() : 0);
|
||||
result = 31 * result + (sortSpec != null ? sortSpec.hashCode() : 0);
|
||||
result = 31 * result + (granularity != null ? granularity.hashCode() : 0);
|
||||
result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0);
|
||||
result = 31 * result + (querySpec != null ? querySpec.hashCode() : 0);
|
||||
result = 31 * result + limit;
|
||||
|
|
|
@ -45,7 +45,6 @@ import java.util.Objects;
|
|||
public class SelectQuery extends BaseQuery<Result<SelectResultValue>>
|
||||
{
|
||||
private final DimFilter dimFilter;
|
||||
private final Granularity granularity;
|
||||
private final List<DimensionSpec> dimensions;
|
||||
private final List<String> metrics;
|
||||
private final VirtualColumns virtualColumns;
|
||||
|
@ -65,9 +64,8 @@ public class SelectQuery extends BaseQuery<Result<SelectResultValue>>
|
|||
@JsonProperty("context") Map<String, Object> context
|
||||
)
|
||||
{
|
||||
super(dataSource, querySegmentSpec, descending, context);
|
||||
super(dataSource, querySegmentSpec, descending, context, Granularities.nullToAll(granularity));
|
||||
this.dimFilter = dimFilter;
|
||||
this.granularity = granularity == null ? Granularities.ALL : granularity;
|
||||
this.dimensions = dimensions;
|
||||
this.virtualColumns = VirtualColumns.nullToEmpty(virtualColumns);
|
||||
this.metrics = metrics;
|
||||
|
@ -111,12 +109,6 @@ public class SelectQuery extends BaseQuery<Result<SelectResultValue>>
|
|||
return dimFilter;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public Granularity getGranularity()
|
||||
{
|
||||
return granularity;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public List<DimensionSpec> getDimensions()
|
||||
{
|
||||
|
@ -183,7 +175,7 @@ public class SelectQuery extends BaseQuery<Result<SelectResultValue>>
|
|||
", querySegmentSpec=" + getQuerySegmentSpec() +
|
||||
", descending=" + isDescending() +
|
||||
", dimFilter=" + dimFilter +
|
||||
", granularity=" + granularity +
|
||||
", granularity=" + getGranularity() +
|
||||
", dimensions=" + dimensions +
|
||||
", metrics=" + metrics +
|
||||
", virtualColumns=" + virtualColumns +
|
||||
|
@ -209,9 +201,6 @@ public class SelectQuery extends BaseQuery<Result<SelectResultValue>>
|
|||
if (!Objects.equals(dimFilter, that.dimFilter)) {
|
||||
return false;
|
||||
}
|
||||
if (!Objects.equals(granularity, that.granularity)) {
|
||||
return false;
|
||||
}
|
||||
if (!Objects.equals(dimensions, that.dimensions)) {
|
||||
return false;
|
||||
}
|
||||
|
@ -233,7 +222,6 @@ public class SelectQuery extends BaseQuery<Result<SelectResultValue>>
|
|||
{
|
||||
int result = super.hashCode();
|
||||
result = 31 * result + (dimFilter != null ? dimFilter.hashCode() : 0);
|
||||
result = 31 * result + (granularity != null ? granularity.hashCode() : 0);
|
||||
result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0);
|
||||
result = 31 * result + (metrics != null ? metrics.hashCode() : 0);
|
||||
result = 31 * result + (virtualColumns != null ? virtualColumns.hashCode() : 0);
|
||||
|
|
|
@ -47,7 +47,6 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
|
|||
{
|
||||
private final VirtualColumns virtualColumns;
|
||||
private final DimFilter dimFilter;
|
||||
private final Granularity granularity;
|
||||
private final List<AggregatorFactory> aggregatorSpecs;
|
||||
private final List<PostAggregator> postAggregatorSpecs;
|
||||
|
||||
|
@ -64,11 +63,10 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
|
|||
@JsonProperty("context") Map<String, Object> context
|
||||
)
|
||||
{
|
||||
super(dataSource, querySegmentSpec, descending, context);
|
||||
super(dataSource, querySegmentSpec, descending, context, granularity);
|
||||
|
||||
this.virtualColumns = VirtualColumns.nullToEmpty(virtualColumns);
|
||||
this.dimFilter = dimFilter;
|
||||
this.granularity = granularity;
|
||||
this.aggregatorSpecs = aggregatorSpecs == null ? ImmutableList.of() : aggregatorSpecs;
|
||||
this.postAggregatorSpecs = Queries.prepareAggregations(
|
||||
ImmutableList.of(),
|
||||
|
@ -107,12 +105,6 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
|
|||
return dimFilter;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public Granularity getGranularity()
|
||||
{
|
||||
return granularity;
|
||||
}
|
||||
|
||||
@JsonProperty("aggregations")
|
||||
public List<AggregatorFactory> getAggregatorSpecs()
|
||||
{
|
||||
|
@ -168,7 +160,7 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
|
|||
", descending=" + isDescending() +
|
||||
", virtualColumns=" + virtualColumns +
|
||||
", dimFilter=" + dimFilter +
|
||||
", granularity='" + granularity + '\'' +
|
||||
", granularity='" + getGranularity() + '\'' +
|
||||
", aggregatorSpecs=" + aggregatorSpecs +
|
||||
", postAggregatorSpecs=" + postAggregatorSpecs +
|
||||
", context=" + getContext() +
|
||||
|
@ -190,7 +182,6 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
|
|||
final TimeseriesQuery that = (TimeseriesQuery) o;
|
||||
return Objects.equals(virtualColumns, that.virtualColumns) &&
|
||||
Objects.equals(dimFilter, that.dimFilter) &&
|
||||
Objects.equals(granularity, that.granularity) &&
|
||||
Objects.equals(aggregatorSpecs, that.aggregatorSpecs) &&
|
||||
Objects.equals(postAggregatorSpecs, that.postAggregatorSpecs);
|
||||
}
|
||||
|
@ -198,6 +189,6 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
|
|||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(super.hashCode(), virtualColumns, dimFilter, granularity, aggregatorSpecs, postAggregatorSpecs);
|
||||
return Objects.hash(super.hashCode(), virtualColumns, dimFilter, aggregatorSpecs, postAggregatorSpecs);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -51,7 +51,6 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
|
|||
private final TopNMetricSpec topNMetricSpec;
|
||||
private final int threshold;
|
||||
private final DimFilter dimFilter;
|
||||
private final Granularity granularity;
|
||||
private final List<AggregatorFactory> aggregatorSpecs;
|
||||
private final List<PostAggregator> postAggregatorSpecs;
|
||||
|
||||
|
@ -70,7 +69,7 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
|
|||
@JsonProperty("context") Map<String, Object> context
|
||||
)
|
||||
{
|
||||
super(dataSource, querySegmentSpec, false, context);
|
||||
super(dataSource, querySegmentSpec, false, context, granularity);
|
||||
|
||||
this.virtualColumns = VirtualColumns.nullToEmpty(virtualColumns);
|
||||
this.dimensionSpec = dimensionSpec;
|
||||
|
@ -78,7 +77,6 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
|
|||
this.threshold = threshold;
|
||||
|
||||
this.dimFilter = dimFilter;
|
||||
this.granularity = granularity;
|
||||
this.aggregatorSpecs = aggregatorSpecs == null ? ImmutableList.<AggregatorFactory>of() : aggregatorSpecs;
|
||||
this.postAggregatorSpecs = Queries.prepareAggregations(
|
||||
ImmutableList.of(dimensionSpec.getOutputName()),
|
||||
|
@ -143,12 +141,6 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
|
|||
return dimFilter;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public Granularity getGranularity()
|
||||
{
|
||||
return granularity;
|
||||
}
|
||||
|
||||
@JsonProperty("aggregations")
|
||||
public List<AggregatorFactory> getAggregatorSpecs()
|
||||
{
|
||||
|
@ -218,7 +210,7 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
|
|||
", querySegmentSpec=" + getQuerySegmentSpec() +
|
||||
", virtualColumns=" + virtualColumns +
|
||||
", dimFilter=" + dimFilter +
|
||||
", granularity='" + granularity + '\'' +
|
||||
", granularity='" + getGranularity() + '\'' +
|
||||
", aggregatorSpecs=" + aggregatorSpecs +
|
||||
", postAggregatorSpecs=" + postAggregatorSpecs +
|
||||
'}';
|
||||
|
@ -242,7 +234,6 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
|
|||
Objects.equals(dimensionSpec, topNQuery.dimensionSpec) &&
|
||||
Objects.equals(topNMetricSpec, topNQuery.topNMetricSpec) &&
|
||||
Objects.equals(dimFilter, topNQuery.dimFilter) &&
|
||||
Objects.equals(granularity, topNQuery.granularity) &&
|
||||
Objects.equals(aggregatorSpecs, topNQuery.aggregatorSpecs) &&
|
||||
Objects.equals(postAggregatorSpecs, topNQuery.postAggregatorSpecs);
|
||||
}
|
||||
|
@ -257,7 +248,6 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
|
|||
topNMetricSpec,
|
||||
threshold,
|
||||
dimFilter,
|
||||
granularity,
|
||||
aggregatorSpecs,
|
||||
postAggregatorSpecs
|
||||
);
|
||||
|
|
|
@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap;
|
|||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import io.druid.java.util.common.DateTimes;
|
||||
import io.druid.java.util.common.granularity.PeriodGranularity;
|
||||
import io.druid.java.util.common.guava.Sequence;
|
||||
import io.druid.java.util.common.guava.Sequences;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
|
@ -31,6 +32,7 @@ import io.druid.query.aggregation.CountAggregatorFactory;
|
|||
import io.druid.query.timeboundary.TimeBoundaryResultValue;
|
||||
import io.druid.query.timeseries.TimeseriesResultValue;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.DateTimeZone;
|
||||
import org.joda.time.Interval;
|
||||
import org.joda.time.Period;
|
||||
import org.junit.Assert;
|
||||
|
@ -57,14 +59,24 @@ public class TimewarpOperatorTest
|
|||
final DateTime t = DateTimes.of("2014-01-23");
|
||||
final DateTime tOffset = DateTimes.of("2014-01-09");
|
||||
|
||||
Assert.assertEquals(tOffset, t.plus(testOperator.computeOffset(t.getMillis())));
|
||||
Assert.assertEquals(tOffset, t.plus(testOperator.computeOffset(t.getMillis(), DateTimeZone.UTC)));
|
||||
}
|
||||
|
||||
{
|
||||
final DateTime t = DateTimes.of("2014-08-02");
|
||||
final DateTime tOffset = DateTimes.of("2014-01-11");
|
||||
|
||||
Assert.assertEquals(tOffset, t.plus(testOperator.computeOffset(t.getMillis())));
|
||||
Assert.assertEquals(tOffset, t.plus(testOperator.computeOffset(t.getMillis(), DateTimeZone.UTC)));
|
||||
}
|
||||
|
||||
{
|
||||
final DateTime t = DateTimes.of("2014-08-02T-07");
|
||||
final DateTime tOffset = DateTimes.of("2014-01-11T-08");
|
||||
|
||||
Assert.assertEquals(
|
||||
tOffset,
|
||||
t.plus(testOperator.computeOffset(t.getMillis(), DateTimeZone.forID("America/Los_Angeles")))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -177,6 +189,126 @@ public class TimewarpOperatorTest
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPostProcessWithTimezonesAndDstShift() throws Exception
|
||||
{
|
||||
QueryRunner<Result<TimeseriesResultValue>> queryRunner = testOperator.postProcess(
|
||||
new QueryRunner<Result<TimeseriesResultValue>>()
|
||||
{
|
||||
@Override
|
||||
public Sequence<Result<TimeseriesResultValue>> run(
|
||||
QueryPlus<Result<TimeseriesResultValue>> queryPlus,
|
||||
Map<String, Object> responseContext
|
||||
)
|
||||
{
|
||||
return Sequences.simple(
|
||||
ImmutableList.of(
|
||||
new Result<>(
|
||||
DateTimes.of("2014-01-09T-08"),
|
||||
new TimeseriesResultValue(ImmutableMap.<String, Object>of("metric", 2))
|
||||
),
|
||||
new Result<>(
|
||||
DateTimes.of("2014-01-11T-08"),
|
||||
new TimeseriesResultValue(ImmutableMap.<String, Object>of("metric", 3))
|
||||
),
|
||||
new Result<>(
|
||||
queryPlus.getQuery().getIntervals().get(0).getEnd(),
|
||||
new TimeseriesResultValue(ImmutableMap.<String, Object>of("metric", 5))
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
},
|
||||
DateTimes.of("2014-08-02T-07").getMillis()
|
||||
);
|
||||
|
||||
final Query<Result<TimeseriesResultValue>> query =
|
||||
Druids.newTimeseriesQueryBuilder()
|
||||
.dataSource("dummy")
|
||||
.intervals("2014-07-31T-07/2014-08-05T-07")
|
||||
.granularity(new PeriodGranularity(new Period("P1D"), null, DateTimeZone.forID("America/Los_Angeles")))
|
||||
.aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("count")))
|
||||
.build();
|
||||
|
||||
Assert.assertEquals(
|
||||
Lists.newArrayList(
|
||||
new Result<>(
|
||||
DateTimes.of("2014-07-31T-07"),
|
||||
new TimeseriesResultValue(ImmutableMap.<String, Object>of("metric", 2))
|
||||
),
|
||||
new Result<>(
|
||||
DateTimes.of("2014-08-02T-07"),
|
||||
new TimeseriesResultValue(ImmutableMap.<String, Object>of("metric", 3))
|
||||
),
|
||||
new Result<>(
|
||||
DateTimes.of("2014-08-02T-07"),
|
||||
new TimeseriesResultValue(ImmutableMap.<String, Object>of("metric", 5))
|
||||
)
|
||||
),
|
||||
queryRunner.run(QueryPlus.wrap(query), CONTEXT).toList()
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPostProcessWithTimezonesAndNoDstShift() throws Exception
|
||||
{
|
||||
QueryRunner<Result<TimeseriesResultValue>> queryRunner = testOperator.postProcess(
|
||||
new QueryRunner<Result<TimeseriesResultValue>>()
|
||||
{
|
||||
@Override
|
||||
public Sequence<Result<TimeseriesResultValue>> run(
|
||||
QueryPlus<Result<TimeseriesResultValue>> queryPlus,
|
||||
Map<String, Object> responseContext
|
||||
)
|
||||
{
|
||||
return Sequences.simple(
|
||||
ImmutableList.of(
|
||||
new Result<>(
|
||||
DateTimes.of("2014-01-09T-07"),
|
||||
new TimeseriesResultValue(ImmutableMap.<String, Object>of("metric", 2))
|
||||
),
|
||||
new Result<>(
|
||||
DateTimes.of("2014-01-11T-07"),
|
||||
new TimeseriesResultValue(ImmutableMap.<String, Object>of("metric", 3))
|
||||
),
|
||||
new Result<>(
|
||||
queryPlus.getQuery().getIntervals().get(0).getEnd(),
|
||||
new TimeseriesResultValue(ImmutableMap.<String, Object>of("metric", 5))
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
},
|
||||
DateTimes.of("2014-08-02T-07").getMillis()
|
||||
);
|
||||
|
||||
final Query<Result<TimeseriesResultValue>> query =
|
||||
Druids.newTimeseriesQueryBuilder()
|
||||
.dataSource("dummy")
|
||||
.intervals("2014-07-31T-07/2014-08-05T-07")
|
||||
.granularity(new PeriodGranularity(new Period("P1D"), null, DateTimeZone.forID("America/Phoenix")))
|
||||
.aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("count")))
|
||||
.build();
|
||||
|
||||
Assert.assertEquals(
|
||||
Lists.newArrayList(
|
||||
new Result<>(
|
||||
DateTimes.of("2014-07-31T-07"),
|
||||
new TimeseriesResultValue(ImmutableMap.<String, Object>of("metric", 2))
|
||||
),
|
||||
new Result<>(
|
||||
DateTimes.of("2014-08-02T-07"),
|
||||
new TimeseriesResultValue(ImmutableMap.<String, Object>of("metric", 3))
|
||||
),
|
||||
new Result<>(
|
||||
DateTimes.of("2014-08-02T-07"),
|
||||
new TimeseriesResultValue(ImmutableMap.<String, Object>of("metric", 5))
|
||||
)
|
||||
),
|
||||
queryRunner.run(QueryPlus.wrap(query), CONTEXT).toList()
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEmptyFutureInterval() throws Exception
|
||||
{
|
||||
|
|
|
@ -57,7 +57,8 @@ public class ScanQuerySpecTest
|
|||
+ "\"columns\":[\"market\",\"quality\",\"index\"],"
|
||||
+ "\"legacy\":null,"
|
||||
+ "\"context\":null,"
|
||||
+ "\"descending\":false}";
|
||||
+ "\"descending\":false,"
|
||||
+ "\"granularity\":{\"type\":\"all\"}}";
|
||||
|
||||
ScanQuery query = new ScanQuery(
|
||||
new TableDataSource(QueryRunnerTestHelper.dataSource),
|
||||
|
|
|
@ -553,7 +553,7 @@ public class CalciteQueryTest
|
|||
ImmutableList.of(),
|
||||
ImmutableList.of(
|
||||
new Object[]{
|
||||
"DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"limit\":9223372036854775807,\"filter\":null,\"columns\":[\"__time\",\"cnt\",\"dim1\",\"dim2\",\"m1\",\"m2\",\"unique_dim1\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\"},\"descending\":false}], signature=[{__time:LONG, cnt:LONG, dim1:STRING, dim2:STRING, m1:FLOAT, m2:DOUBLE, unique_dim1:COMPLEX}])\n"
|
||||
"DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"limit\":9223372036854775807,\"filter\":null,\"columns\":[\"__time\",\"cnt\",\"dim1\",\"dim2\",\"m1\",\"m2\",\"unique_dim1\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\"},\"descending\":false,\"granularity\":{\"type\":\"all\"}}], signature=[{__time:LONG, cnt:LONG, dim1:STRING, dim2:STRING, m1:FLOAT, m2:DOUBLE, unique_dim1:COMPLEX}])\n"
|
||||
}
|
||||
)
|
||||
);
|
||||
|
@ -799,8 +799,8 @@ public class CalciteQueryTest
|
|||
{
|
||||
final String explanation =
|
||||
"BindableJoin(condition=[=($0, $2)], joinType=[inner])\n"
|
||||
+ " DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"limit\":9223372036854775807,\"filter\":{\"type\":\"not\",\"field\":{\"type\":\"selector\",\"dimension\":\"dim1\",\"value\":\"\",\"extractionFn\":null}},\"columns\":[\"dim1\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\"},\"descending\":false}], signature=[{dim1:STRING}])\n"
|
||||
+ " DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"limit\":9223372036854775807,\"filter\":null,\"columns\":[\"dim1\",\"dim2\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\"},\"descending\":false}], signature=[{dim1:STRING, dim2:STRING}])\n";
|
||||
+ " DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"limit\":9223372036854775807,\"filter\":{\"type\":\"not\",\"field\":{\"type\":\"selector\",\"dimension\":\"dim1\",\"value\":\"\",\"extractionFn\":null}},\"columns\":[\"dim1\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\"},\"descending\":false,\"granularity\":{\"type\":\"all\"}}], signature=[{dim1:STRING}])\n"
|
||||
+ " DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"limit\":9223372036854775807,\"filter\":null,\"columns\":[\"dim1\",\"dim2\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\"},\"descending\":false,\"granularity\":{\"type\":\"all\"}}], signature=[{dim1:STRING, dim2:STRING}])\n";
|
||||
|
||||
testQuery(
|
||||
PLANNER_CONFIG_FALLBACK,
|
||||
|
@ -6173,7 +6173,7 @@ public class CalciteQueryTest
|
|||
+ " BindableFilter(condition=[OR(=($0, 'xxx'), CAST(AND(IS NOT NULL($4), <>($2, 0))):BOOLEAN)])\n"
|
||||
+ " BindableJoin(condition=[=($1, $3)], joinType=[left])\n"
|
||||
+ " BindableJoin(condition=[true], joinType=[inner])\n"
|
||||
+ " DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"limit\":9223372036854775807,\"filter\":null,\"columns\":[\"dim1\",\"dim2\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\"},\"descending\":false}], signature=[{dim1:STRING, dim2:STRING}])\n"
|
||||
+ " DruidQueryRel(query=[{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[],\"resultFormat\":\"compactedList\",\"batchSize\":20480,\"limit\":9223372036854775807,\"filter\":null,\"columns\":[\"dim1\",\"dim2\"],\"legacy\":false,\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\"},\"descending\":false,\"granularity\":{\"type\":\"all\"}}], signature=[{dim1:STRING, dim2:STRING}])\n"
|
||||
+ " DruidQueryRel(query=[{\"queryType\":\"timeseries\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"descending\":false,\"virtualColumns\":[],\"filter\":{\"type\":\"like\",\"dimension\":\"dim1\",\"pattern\":\"%bc\",\"escape\":null,\"extractionFn\":null},\"granularity\":{\"type\":\"all\"},\"aggregations\":[{\"type\":\"count\",\"name\":\"a0\"}],\"postAggregations\":[],\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"skipEmptyBuckets\":true,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\"}}], signature=[{a0:LONG}])\n"
|
||||
+ " DruidQueryRel(query=[{\"queryType\":\"groupBy\",\"dataSource\":{\"type\":\"table\",\"name\":\"foo\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[{\"type\":\"expression\",\"name\":\"d1:v\",\"expression\":\"1\",\"outputType\":\"LONG\"}],\"filter\":{\"type\":\"like\",\"dimension\":\"dim1\",\"pattern\":\"%bc\",\"escape\":null,\"extractionFn\":null},\"granularity\":{\"type\":\"all\"},\"dimensions\":[{\"type\":\"default\",\"dimension\":\"dim1\",\"outputName\":\"d0\",\"outputType\":\"STRING\"},{\"type\":\"default\",\"dimension\":\"d1:v\",\"outputName\":\"d1\",\"outputType\":\"LONG\"}],\"aggregations\":[],\"postAggregations\":[],\"having\":null,\"limitSpec\":{\"type\":\"NoopLimitSpec\"},\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\"},\"descending\":false}], signature=[{d0:STRING, d1:LONG}])\n";
|
||||
|
||||
|
|
Loading…
Reference in New Issue