diff --git a/docs/content/development/select-query.md b/docs/content/development/select-query.md index 222cdbb45a8..9bc5b8b2e06 100644 --- a/docs/content/development/select-query.md +++ b/docs/content/development/select-query.md @@ -8,6 +8,7 @@ Select queries return raw Druid rows and support pagination. { "queryType": "select", "dataSource": "wikipedia", + "descending": "false", "dimensions":[], "metrics":[], "granularity": "all", @@ -25,6 +26,7 @@ There are several main parts to a select query: |queryType|This String should always be "select"; this is the first thing Druid looks at to figure out how to interpret the query|yes| |dataSource|A String or Object defining the data source to query, very similar to a table in a relational database. See [DataSource](../querying/datasource.html) for more information.|yes| |intervals|A JSON Object representing ISO-8601 Intervals. This defines the time ranges to run the query over.|yes| +|descending|Whether to make descending ordered result. Default is `false`(ascending). When this is `true`, page identifier and offsets will be negative value.|no| |filter|See [Filters](../querying/filters.html)|no| |dimensions|A String array of dimensions to select. If left empty, all dimensions are returned.|no| |metrics|A String array of metrics to select. If left empty, all metrics are returned.|no| @@ -140,7 +142,7 @@ The format of the result is: } ] ``` -The `threshold` determines how many hits are returned, with each hit indexed by an offset. +The `threshold` determines how many hits are returned, with each hit indexed by an offset. When `descending` is true, the offset will be negative value. The results above include: @@ -166,4 +168,4 @@ This can be used with the next query's pagingSpec: } -Note that in the second query, an offset is specified and that it is 1 greater than the largest offset found in the initial results. To return the next "page", this offset must be incremented by 1 with each new query. When an empty results set is received, the very last page has been returned. +Note that in the second query, an offset is specified and that it is 1 greater than the largest offset found in the initial results. To return the next "page", this offset must be incremented by 1 (should be decremented by 1 for descending query), with each new query. When an empty results set is received, the very last page has been returned. diff --git a/docs/content/querying/timeseriesquery.md b/docs/content/querying/timeseriesquery.md index 66d38bcb02f..05eb0425ca9 100644 --- a/docs/content/querying/timeseriesquery.md +++ b/docs/content/querying/timeseriesquery.md @@ -50,7 +50,7 @@ There are 7 main parts to a timeseries query: |--------|-----------|---------| |queryType|This String should always be "timeseries"; this is the first thing Druid looks at to figure out how to interpret the query|yes| |dataSource|A String or Object defining the data source to query, very similar to a table in a relational database. See [DataSource](../querying/datasource.html) for more information.|yes| -|descending|Whether to make descending ordered result.|no| +|descending|Whether to make descending ordered result. Default is `false`(ascending).|no| |intervals|A JSON Object representing ISO-8601 Intervals. This defines the time ranges to run the query over.|yes| |granularity|Defines the granularity to bucket query results. See [Granularities](../querying/granularities.html)|yes| |filter|See [Filters](../querying/filters.html)|no| diff --git a/processing/src/main/java/io/druid/query/Druids.java b/processing/src/main/java/io/druid/query/Druids.java index d20a6530f3e..a971a7cda52 100644 --- a/processing/src/main/java/io/druid/query/Druids.java +++ b/processing/src/main/java/io/druid/query/Druids.java @@ -1073,6 +1073,7 @@ public class Druids { private DataSource dataSource; private QuerySegmentSpec querySegmentSpec; + private boolean descending; private Map context; private DimFilter dimFilter; private QueryGranularity granularity; @@ -1097,6 +1098,7 @@ public class Druids return new SelectQuery( dataSource, querySegmentSpec, + descending, dimFilter, granularity, dimensions, @@ -1144,6 +1146,12 @@ public class Druids return this; } + public SelectQueryBuilder descending(boolean descending) + { + this.descending = descending; + return this; + } + public SelectQueryBuilder context(Map c) { context = c; diff --git a/processing/src/main/java/io/druid/query/select/PagingOffset.java b/processing/src/main/java/io/druid/query/select/PagingOffset.java new file mode 100644 index 00000000000..c1d98f2bf5c --- /dev/null +++ b/processing/src/main/java/io/druid/query/select/PagingOffset.java @@ -0,0 +1,125 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.select; + +import com.google.common.annotations.VisibleForTesting; + +/** + * offset iterator for select query + */ +public abstract class PagingOffset +{ + protected final int startOffset; + protected final int threshold; + + protected int counter; + + public PagingOffset(int startOffset, int threshold) + { + this.startOffset = startOffset; + this.threshold = threshold; + } + + public abstract boolean isDescending(); + + public final int startOffset() + { + return startOffset; + } + + public abstract int startDelta(); + + public final int threshold() + { + return threshold; + } + + public final boolean hasNext() + { + return counter < threshold; + } + + public final void next() + { + counter++; + } + + public abstract int current(); + + private static class Ascending extends PagingOffset + { + public Ascending(int offset, int threshold) + { + super(offset, threshold); + } + + public final boolean isDescending() + { + return false; + } + + public final int startDelta() + { + return startOffset; + } + + public final int current() + { + return startOffset + counter; + } + } + + private static class Descending extends PagingOffset + { + public Descending(int offset, int threshold) + { + super(offset, threshold); + } + + public final boolean isDescending() + { + return true; + } + + public final int startDelta() + { + return -startOffset - 1; + } + + public final int current() + { + return startOffset - counter; + } + } + + public static PagingOffset of(int startOffset, int threshold) + { + return startOffset < 0 ? new Descending(startOffset, threshold) : new Ascending(startOffset, threshold); + } + + @VisibleForTesting + static int toOffset(int delta, boolean descending) + { + if (delta < 0) { + throw new IllegalArgumentException("Delta should not be negative"); + } + return descending ? -delta - 1 : delta; + } +} diff --git a/processing/src/main/java/io/druid/query/select/PagingSpec.java b/processing/src/main/java/io/druid/query/select/PagingSpec.java index be89c0f4ec8..86a61e8f5d8 100644 --- a/processing/src/main/java/io/druid/query/select/PagingSpec.java +++ b/processing/src/main/java/io/druid/query/select/PagingSpec.java @@ -98,4 +98,14 @@ public class PagingSpec ", threshold=" + threshold + '}'; } + + public PagingOffset getOffset(String identifier, boolean descending) + { + Integer offset = pagingIdentifiers.get(identifier); + if (offset == null) { + offset = PagingOffset.toOffset(0, descending); + } + return PagingOffset.of(offset, threshold); + } + } diff --git a/processing/src/main/java/io/druid/query/select/SelectBinaryFn.java b/processing/src/main/java/io/druid/query/select/SelectBinaryFn.java index f5702e694c8..662d06db7e9 100644 --- a/processing/src/main/java/io/druid/query/select/SelectBinaryFn.java +++ b/processing/src/main/java/io/druid/query/select/SelectBinaryFn.java @@ -62,7 +62,7 @@ public class SelectBinaryFn ? arg1.getTimestamp() : gran.toDateTime(gran.truncate(arg1.getTimestamp().getMillis())); - SelectResultValueBuilder builder = new SelectResultValueBuilder(timestamp, pagingSpec.getThreshold(), descending); + SelectResultValueBuilder builder = new SelectResultValueBuilder(timestamp, pagingSpec, descending); SelectResultValue arg1Val = arg1.getValue(); SelectResultValue arg2Val = arg2.getValue(); diff --git a/processing/src/main/java/io/druid/query/select/SelectQuery.java b/processing/src/main/java/io/druid/query/select/SelectQuery.java index 4db2d9d4839..05603c6afd7 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQuery.java +++ b/processing/src/main/java/io/druid/query/select/SelectQuery.java @@ -49,6 +49,7 @@ public class SelectQuery extends BaseQuery> public SelectQuery( @JsonProperty("dataSource") DataSource dataSource, @JsonProperty("intervals") QuerySegmentSpec querySegmentSpec, + @JsonProperty("descending") boolean descending, @JsonProperty("filter") DimFilter dimFilter, @JsonProperty("granularity") QueryGranularity granularity, @JsonProperty("dimensions") List dimensions, @@ -57,7 +58,7 @@ public class SelectQuery extends BaseQuery> @JsonProperty("context") Map context ) { - super(dataSource, querySegmentSpec, false, context); + super(dataSource, querySegmentSpec, descending, context); this.dimFilter = dimFilter; this.granularity = granularity; this.dimensions = dimensions; @@ -65,6 +66,17 @@ public class SelectQuery extends BaseQuery> this.pagingSpec = pagingSpec; Preconditions.checkNotNull(pagingSpec, "must specify a pagingSpec"); + Preconditions.checkArgument(checkPagingSpec(pagingSpec, descending), "invalid pagingSpec"); + } + + private boolean checkPagingSpec(PagingSpec pagingSpec, boolean descending) + { + for (Integer value : pagingSpec.getPagingIdentifiers().values()) { + if (descending ^ (value < 0)) { + return false; + } + } + return pagingSpec.getThreshold() >= 0; } @Override @@ -109,11 +121,17 @@ public class SelectQuery extends BaseQuery> return metrics; } + public PagingOffset getPagingOffset(String identifier) + { + return pagingSpec.getOffset(identifier, isDescending()); + } + public SelectQuery withQuerySegmentSpec(QuerySegmentSpec querySegmentSpec) { return new SelectQuery( getDataSource(), querySegmentSpec, + isDescending(), dimFilter, granularity, dimensions, @@ -129,6 +147,7 @@ public class SelectQuery extends BaseQuery> return new SelectQuery( dataSource, getQuerySegmentSpec(), + isDescending(), dimFilter, granularity, dimensions, @@ -143,6 +162,7 @@ public class SelectQuery extends BaseQuery> return new SelectQuery( getDataSource(), getQuerySegmentSpec(), + isDescending(), dimFilter, granularity, dimensions, @@ -158,6 +178,7 @@ public class SelectQuery extends BaseQuery> return "SelectQuery{" + "dataSource='" + getDataSource() + '\'' + ", querySegmentSpec=" + getQuerySegmentSpec() + + ", descending=" + isDescending() + ", dimFilter=" + dimFilter + ", granularity=" + granularity + ", dimensions=" + dimensions + diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java b/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java index 71da82de6e7..75e5ac944ad 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java @@ -82,8 +82,7 @@ public class SelectQueryEngine { final SelectResultValueBuilder builder = new SelectResultValueBuilder( cursor.getTime(), - query.getPagingSpec() - .getThreshold(), + query.getPagingSpec(), query.isDescending() ); @@ -102,18 +101,11 @@ public class SelectQueryEngine metSelectors.put(metric, metricSelector); } - int startOffset; - if (query.getPagingSpec().getPagingIdentifiers() == null) { - startOffset = 0; - } else { - Integer offset = query.getPagingSpec().getPagingIdentifiers().get(segment.getIdentifier()); - startOffset = (offset == null) ? 0 : offset; - } + final PagingOffset offset = query.getPagingOffset(segment.getIdentifier()); - cursor.advanceTo(startOffset); + cursor.advanceTo(offset.startDelta()); - int offset = 0; - while (!cursor.isDone() && offset < query.getPagingSpec().getThreshold()) { + for (; !cursor.isDone() && offset.hasNext(); cursor.advance(), offset.next()) { final Map theEvent = Maps.newLinkedHashMap(); theEvent.put(EventHolder.timestampKey, new DateTime(timestampColumnSelector.get())); @@ -153,12 +145,10 @@ public class SelectQueryEngine builder.addEntry( new EventHolder( segment.getIdentifier(), - startOffset + offset, + offset.current(), theEvent ) ); - cursor.advance(); - offset++; } return builder.build(); diff --git a/processing/src/main/java/io/druid/query/select/SelectResultValueBuilder.java b/processing/src/main/java/io/druid/query/select/SelectResultValueBuilder.java index 8f4f511a004..9891c87df4d 100644 --- a/processing/src/main/java/io/druid/query/select/SelectResultValueBuilder.java +++ b/processing/src/main/java/io/druid/query/select/SelectResultValueBuilder.java @@ -22,6 +22,7 @@ package io.druid.query.select; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.MinMaxPriorityQueue; +import com.google.common.collect.Queues; import com.google.common.primitives.Longs; import com.metamx.common.guava.Comparators; import io.druid.query.Result; @@ -30,6 +31,7 @@ import org.joda.time.DateTime; import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Queue; /** */ @@ -55,18 +57,20 @@ public class SelectResultValueBuilder }; private final DateTime timestamp; + private final PagingSpec pagingSpec; - private MinMaxPriorityQueue pQueue = null; + private Queue pQueue = null; public SelectResultValueBuilder( DateTime timestamp, - int threshold, + PagingSpec pagingSpec, boolean descending ) { this.timestamp = timestamp; + this.pagingSpec = pagingSpec; - instantiatePQueue(threshold, descending ? Comparators.inverse(comparator) : comparator); + instantiatePQueue(pagingSpec.getThreshold(), descending ? Comparators.inverse(comparator) : comparator); } public void addEntry( @@ -87,6 +91,10 @@ public class SelectResultValueBuilder values.add(event); } + if (pagingIdentifiers.isEmpty()) { + pagingIdentifiers.putAll(pagingSpec.getPagingIdentifiers()); + } + return new Result( timestamp, new SelectResultValue(pagingIdentifiers, values) @@ -95,6 +103,8 @@ public class SelectResultValueBuilder private void instantiatePQueue(int threshold, final Comparator comparator) { - this.pQueue = MinMaxPriorityQueue.orderedBy(comparator).maximumSize(threshold).create(); + this.pQueue = threshold > 0 + ? MinMaxPriorityQueue.orderedBy(comparator).maximumSize(threshold).create() + : Queues.newArrayDeque(); } } diff --git a/processing/src/test/java/io/druid/query/select/PagingOffsetTest.java b/processing/src/test/java/io/druid/query/select/PagingOffsetTest.java new file mode 100644 index 00000000000..fe70abb7b8c --- /dev/null +++ b/processing/src/test/java/io/druid/query/select/PagingOffsetTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.select; + +import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +public class PagingOffsetTest +{ + @Test + public void testZeroThreshold() throws Exception + { + PagingOffset offset = PagingOffset.of(PagingOffset.toOffset(3, false), 0); + Assert.assertEquals(3, offset.startOffset()); + Assert.assertEquals(3, offset.startDelta()); + Assert.assertArrayEquals(new int[]{}, toArray(offset)); + + offset = PagingOffset.of(PagingOffset.toOffset(3, true), 0); + Assert.assertEquals(-4, offset.startOffset()); + Assert.assertEquals(3, offset.startDelta()); + Assert.assertArrayEquals(new int[]{}, toArray(offset)); + } + + @Test + public void testAscending() throws Exception + { + PagingOffset offset = PagingOffset.of(PagingOffset.toOffset(3, false), 3); + Assert.assertEquals(3, offset.startOffset()); + Assert.assertEquals(3, offset.startDelta()); + Assert.assertArrayEquals(new int[]{3, 4, 5}, toArray(offset)); + } + + @Test + public void testDescending() throws Exception + { + PagingOffset offset = PagingOffset.of(PagingOffset.toOffset(3, true), 3); + Assert.assertEquals(-4, offset.startOffset()); + Assert.assertEquals(3, offset.startDelta()); + Assert.assertArrayEquals(new int[]{-4, -5, -6}, toArray(offset)); + } + + private int[] toArray(PagingOffset offset) + { + List ints = Lists.newArrayList(); + for (; offset.hasNext(); offset.next()) { + ints.add(offset.current()); + } + return Ints.toArray(ints); + } +} diff --git a/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java b/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java index b6fd3f4965f..bbf3fa2b2c5 100644 --- a/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/select/SelectQueryRunnerTest.java @@ -19,9 +19,12 @@ package io.druid.query.select; +import com.google.common.base.Function; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.ObjectArrays; import com.metamx.common.ISE; import com.metamx.common.guava.Sequences; import io.druid.jackson.DefaultObjectMapper; @@ -33,6 +36,7 @@ import io.druid.query.filter.AndDimFilter; import io.druid.query.filter.DimFilter; import io.druid.query.filter.SelectorDimFilter; import io.druid.query.spec.LegacySegmentSpec; +import io.druid.query.spec.QuerySegmentSpec; import org.joda.time.DateTime; import org.joda.time.Interval; import org.junit.Assert; @@ -44,6 +48,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -52,10 +57,47 @@ import java.util.Map; @RunWith(Parameterized.class) public class SelectQueryRunnerTest { - @Parameterized.Parameters + // copied from druid.sample.tsv + public static final String[] V_0112 = { + "2011-01-12T00:00:00.000Z spot automotive preferred apreferred 100.000000", + "2011-01-12T00:00:00.000Z spot business preferred bpreferred 100.000000", + "2011-01-12T00:00:00.000Z spot entertainment preferred epreferred 100.000000", + "2011-01-12T00:00:00.000Z spot health preferred hpreferred 100.000000", + "2011-01-12T00:00:00.000Z spot mezzanine preferred mpreferred 100.000000", + "2011-01-12T00:00:00.000Z spot news preferred npreferred 100.000000", + "2011-01-12T00:00:00.000Z spot premium preferred ppreferred 100.000000", + "2011-01-12T00:00:00.000Z spot technology preferred tpreferred 100.000000", + "2011-01-12T00:00:00.000Z spot travel preferred tpreferred 100.000000", + "2011-01-12T00:00:00.000Z total_market mezzanine preferred mpreferred 1000.000000", + "2011-01-12T00:00:00.000Z total_market premium preferred ppreferred 1000.000000", + "2011-01-12T00:00:00.000Z upfront mezzanine preferred mpreferred 800.000000 value", + "2011-01-12T00:00:00.000Z upfront premium preferred ppreferred 800.000000 value" + }; + public static final String[] V_0113 = { + "2011-01-13T00:00:00.000Z spot automotive preferred apreferred 94.874713", + "2011-01-13T00:00:00.000Z spot business preferred bpreferred 103.629399", + "2011-01-13T00:00:00.000Z spot entertainment preferred epreferred 110.087299", + "2011-01-13T00:00:00.000Z spot health preferred hpreferred 114.947403", + "2011-01-13T00:00:00.000Z spot mezzanine preferred mpreferred 104.465767", + "2011-01-13T00:00:00.000Z spot news preferred npreferred 102.851683", + "2011-01-13T00:00:00.000Z spot premium preferred ppreferred 108.863011", + "2011-01-13T00:00:00.000Z spot technology preferred tpreferred 111.356672", + "2011-01-13T00:00:00.000Z spot travel preferred tpreferred 106.236928", + "2011-01-13T00:00:00.000Z total_market mezzanine preferred mpreferred 1040.945505", + "2011-01-13T00:00:00.000Z total_market premium preferred ppreferred 1689.012875", + "2011-01-13T00:00:00.000Z upfront mezzanine preferred mpreferred 826.060182 value", + "2011-01-13T00:00:00.000Z upfront premium preferred ppreferred 1564.617729 value" + }; + + public static final QuerySegmentSpec I_0112_0114 = new LegacySegmentSpec( + new Interval("2011-01-12/2011-01-14") + ); + public static final String[] V_0112_0114 = ObjectArrays.concat(V_0112, V_0113, String.class); + + @Parameterized.Parameters(name = "{0}:descending={1}") public static Iterable constructorFeeder() throws IOException { - return QueryRunnerTestHelper.transformToConstructionFeeder( + return QueryRunnerTestHelper.cartesian( QueryRunnerTestHelper.makeQueryRunners( new SelectQueryRunnerFactory( new SelectQueryQueryToolChest( @@ -65,19 +107,18 @@ public class SelectQueryRunnerTest new SelectQueryEngine(), QueryRunnerTestHelper.NOOP_QUERYWATCHER ) - ) + ), + Arrays.asList(false, true) ); } - private static final String providerLowercase = "market"; - private final QueryRunner runner; + private final boolean descending; - public SelectQueryRunnerTest( - QueryRunner runner - ) + public SelectQueryRunnerTest(QueryRunner runner, boolean descending) { this.runner = runner; + this.descending = descending; } @Test @@ -85,67 +126,27 @@ public class SelectQueryRunnerTest { SelectQuery query = new SelectQuery( new TableDataSource(QueryRunnerTestHelper.dataSource), - QueryRunnerTestHelper.fullOnInterval, + I_0112_0114, + descending, null, QueryRunnerTestHelper.allGran, - Lists.newArrayList(), - Lists.newArrayList(), + Arrays.asList(), + Arrays.asList(), new PagingSpec(null, 3), null ); - HashMap context = new HashMap(); + HashMap context = new HashMap(); Iterable> results = Sequences.toList( runner.run(query, context), Lists.>newArrayList() ); - List> expectedResults = Arrays.asList( - new Result( - new DateTime("2011-01-12T00:00:00.000Z"), - new SelectResultValue( - ImmutableMap.of(QueryRunnerTestHelper.segmentId, 2), - Arrays.asList( - new EventHolder( - QueryRunnerTestHelper.segmentId, - 0, - new ImmutableMap.Builder() - .put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z")) - .put(providerLowercase, "spot") - .put(QueryRunnerTestHelper.qualityDimension, "automotive") - .put(QueryRunnerTestHelper.placementDimension, "preferred") - .put(QueryRunnerTestHelper.placementishDimension, Lists.newArrayList("a", "preferred")) - .put(QueryRunnerTestHelper.indexMetric, 100.000000F) - .build() - ), - new EventHolder( - QueryRunnerTestHelper.segmentId, - 1, - new ImmutableMap.Builder() - .put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z")) - .put(providerLowercase, "spot") - .put(QueryRunnerTestHelper.qualityDimension, "business") - .put(QueryRunnerTestHelper.placementDimension, "preferred") - .put(QueryRunnerTestHelper.placementishDimension, Lists.newArrayList("b", "preferred")) - .put(QueryRunnerTestHelper.indexMetric, 100.000000F) - .build() - ), - new EventHolder( - QueryRunnerTestHelper.segmentId, - 2, - new ImmutableMap.Builder() - .put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z")) - .put(providerLowercase, "spot") - .put(QueryRunnerTestHelper.qualityDimension, "entertainment") - .put(QueryRunnerTestHelper.placementDimension, "preferred") - .put(QueryRunnerTestHelper.placementishDimension, Lists.newArrayList("e", "preferred")) - .put(QueryRunnerTestHelper.indexMetric, 100.000000F) - .build() - ) - ) - ) - ) + PagingOffset offset = query.getPagingOffset(QueryRunnerTestHelper.segmentId); + List> expectedResults = toExpected( + toEvents(new String[]{EventHolder.timestampKey + ":TIME"}, V_0112_0114), + offset.startOffset(), + offset.threshold() ); - verify(expectedResults, results); } @@ -154,58 +155,37 @@ public class SelectQueryRunnerTest { SelectQuery query = new SelectQuery( new TableDataSource(QueryRunnerTestHelper.dataSource), - QueryRunnerTestHelper.fullOnInterval, + I_0112_0114, + descending, null, QueryRunnerTestHelper.allGran, - Lists.newArrayList(providerLowercase), - Lists.newArrayList(QueryRunnerTestHelper.indexMetric), + Arrays.asList(QueryRunnerTestHelper.marketDimension), + Arrays.asList(QueryRunnerTestHelper.indexMetric), new PagingSpec(null, 3), null ); - HashMap context = new HashMap(); + HashMap context = new HashMap(); Iterable> results = Sequences.toList( runner.run(query, context), Lists.>newArrayList() ); - List> expectedResults = Arrays.asList( - new Result( - new DateTime("2011-01-12T00:00:00.000Z"), - new SelectResultValue( - ImmutableMap.of(QueryRunnerTestHelper.segmentId, 2), - Arrays.asList( - new EventHolder( - QueryRunnerTestHelper.segmentId, - 0, - new ImmutableMap.Builder() - .put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z")) - .put(providerLowercase, "spot") - .put(QueryRunnerTestHelper.indexMetric, 100.000000F) - .build() - ), - new EventHolder( - QueryRunnerTestHelper.segmentId, - 1, - new ImmutableMap.Builder() - .put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z")) - .put(providerLowercase, "spot") - .put(QueryRunnerTestHelper.indexMetric, 100.000000F) - .build() - ), - new EventHolder( - QueryRunnerTestHelper.segmentId, - 2, - new ImmutableMap.Builder() - .put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z")) - .put(providerLowercase, "spot") - .put(QueryRunnerTestHelper.indexMetric, 100.000000F) - .build() - ) - ) - ) - ) + PagingOffset offset = query.getPagingOffset(QueryRunnerTestHelper.segmentId); + List> expectedResults = toExpected( + toEvents( + new String[]{ + EventHolder.timestampKey + ":TIME", + QueryRunnerTestHelper.marketDimension + ":STRING", + null, + null, + null, + QueryRunnerTestHelper.indexMetric + ":FLOAT" + }, + V_0112_0114 + ), + offset.startOffset(), + offset.threshold() ); - verify(expectedResults, results); } @@ -214,154 +194,101 @@ public class SelectQueryRunnerTest { SelectQuery query = new SelectQuery( new TableDataSource(QueryRunnerTestHelper.dataSource), - QueryRunnerTestHelper.fullOnInterval, + I_0112_0114, + descending, null, QueryRunnerTestHelper.allGran, - Lists.newArrayList(QueryRunnerTestHelper.qualityDimension), - Lists.newArrayList(QueryRunnerTestHelper.indexMetric), - new PagingSpec(Maps.newLinkedHashMap(ImmutableMap.of(QueryRunnerTestHelper.segmentId, 3)), 3), + Arrays.asList(QueryRunnerTestHelper.qualityDimension), + Arrays.asList(QueryRunnerTestHelper.indexMetric), + new PagingSpec(toPagingIdentifier(3, descending), 3), null ); - HashMap context = new HashMap(); + Iterable> results = Sequences.toList( - runner.run(query, context), + runner.run(query, Maps.newHashMap()), Lists.>newArrayList() ); - List> expectedResults = Arrays.asList( - new Result( - new DateTime("2011-01-12T00:00:00.000Z"), - new SelectResultValue( - ImmutableMap.of(QueryRunnerTestHelper.segmentId, 5), - Arrays.asList( - new EventHolder( - QueryRunnerTestHelper.segmentId, - 3, - new ImmutableMap.Builder() - .put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z")) - .put(QueryRunnerTestHelper.qualityDimension, "health") - .put(QueryRunnerTestHelper.indexMetric, 100.000000F) - .build() - ), - new EventHolder( - QueryRunnerTestHelper.segmentId, - 4, - new ImmutableMap.Builder() - .put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z")) - .put(QueryRunnerTestHelper.qualityDimension, "mezzanine") - .put(QueryRunnerTestHelper.indexMetric, 100.000000F) - .build() - ), - new EventHolder( - QueryRunnerTestHelper.segmentId, - 5, - new ImmutableMap.Builder() - .put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z")) - .put(QueryRunnerTestHelper.qualityDimension, "news") - .put(QueryRunnerTestHelper.indexMetric, 100.000000F) - .build() - ) - ) - ) - ) + PagingOffset offset = query.getPagingOffset(QueryRunnerTestHelper.segmentId); + List> expectedResults = toExpected( + toEvents( + new String[]{ + EventHolder.timestampKey + ":TIME", + "foo:NULL", + "foo2:NULL" + }, + V_0112_0114 + ), + offset.startOffset(), + offset.threshold() ); - verify(expectedResults, results); } @Test public void testFullOnSelectWithFilter() { - SelectQuery query = new SelectQuery( - new TableDataSource(QueryRunnerTestHelper.dataSource), - new LegacySegmentSpec(new Interval("2011-01-12/2011-01-14")), - new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot"), - QueryRunnerTestHelper.dayGran, - Lists.newArrayList(QueryRunnerTestHelper.qualityDimension), - Lists.newArrayList(QueryRunnerTestHelper.indexMetric), - new PagingSpec(Maps.newLinkedHashMap(ImmutableMap.of(QueryRunnerTestHelper.segmentId, 3)), 3), - null - ); - HashMap context = new HashMap(); - Iterable> results = Sequences.toList( - runner.run(query, context), - Lists.>newArrayList() - ); + // startDelta + threshold pairs + for (int[] param : new int[][]{{3, 3}, {0, 1}, {5, 5}, {2, 7}, {3, 0}}) { + SelectQuery query = new SelectQuery( + new TableDataSource(QueryRunnerTestHelper.dataSource), + I_0112_0114, + descending, + new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot"), + QueryRunnerTestHelper.dayGran, + Lists.newArrayList(QueryRunnerTestHelper.qualityDimension), + Lists.newArrayList(QueryRunnerTestHelper.indexMetric), + new PagingSpec(toPagingIdentifier(param[0], descending), param[1]), + null + ); + HashMap context = new HashMap(); + Iterable> results = Sequences.toList( + runner.run(query, context), + Lists.>newArrayList() + ); - List> expectedResults = Arrays.asList( - new Result( - new DateTime("2011-01-12T00:00:00.000Z"), - new SelectResultValue( - ImmutableMap.of(QueryRunnerTestHelper.segmentId, 5), - Arrays.asList( - new EventHolder( - QueryRunnerTestHelper.segmentId, - 3, - new ImmutableMap.Builder() - .put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z")) - .put(QueryRunnerTestHelper.qualityDimension, "health") - .put(QueryRunnerTestHelper.indexMetric, 100.000000F) - .build() - ), - new EventHolder( - QueryRunnerTestHelper.segmentId, - 4, - new ImmutableMap.Builder() - .put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z")) - .put(QueryRunnerTestHelper.qualityDimension, "mezzanine") - .put(QueryRunnerTestHelper.indexMetric, 100.000000F) - .build() - ), - new EventHolder( - QueryRunnerTestHelper.segmentId, - 5, - new ImmutableMap.Builder() - .put(EventHolder.timestampKey, new DateTime("2011-01-12T00:00:00.000Z")) - .put(QueryRunnerTestHelper.qualityDimension, "news") - .put(QueryRunnerTestHelper.indexMetric, 100.000000F) - .build() - ) - ) - ) - ), - new Result( - new DateTime("2011-01-13T00:00:00.000Z"), - new SelectResultValue( - ImmutableMap.of(QueryRunnerTestHelper.segmentId, 5), - Arrays.asList( - new EventHolder( - QueryRunnerTestHelper.segmentId, - 3, - new ImmutableMap.Builder() - .put(EventHolder.timestampKey, new DateTime("2011-01-13T00:00:00.000Z")) - .put(QueryRunnerTestHelper.qualityDimension, "health") - .put(QueryRunnerTestHelper.indexMetric, 114.947403F) - .build() - ), - new EventHolder( - QueryRunnerTestHelper.segmentId, - 4, - new ImmutableMap.Builder() - .put(EventHolder.timestampKey, new DateTime("2011-01-13T00:00:00.000Z")) - .put(QueryRunnerTestHelper.qualityDimension, "mezzanine") - .put(QueryRunnerTestHelper.indexMetric, 104.465767F) - .build() - ), - new EventHolder( - QueryRunnerTestHelper.segmentId, - 5, - new ImmutableMap.Builder() - .put(EventHolder.timestampKey, new DateTime("2011-01-13T00:00:00.000Z")) - .put(QueryRunnerTestHelper.qualityDimension, "news") - .put(QueryRunnerTestHelper.indexMetric, 102.851683F) - .build() - ) - ) - ) - ) - ); + final List>> events = toEvents( + new String[]{ + EventHolder.timestampKey + ":TIME", + null, + QueryRunnerTestHelper.qualityDimension + ":STRING", + null, + null, + QueryRunnerTestHelper.indexMetric + ":FLOAT" + }, + // filtered values with day granularity + new String[]{ + "2011-01-12T00:00:00.000Z spot automotive preferred apreferred 100.000000", + "2011-01-12T00:00:00.000Z spot business preferred bpreferred 100.000000", + "2011-01-12T00:00:00.000Z spot entertainment preferred epreferred 100.000000", + "2011-01-12T00:00:00.000Z spot health preferred hpreferred 100.000000", + "2011-01-12T00:00:00.000Z spot mezzanine preferred mpreferred 100.000000", + "2011-01-12T00:00:00.000Z spot news preferred npreferred 100.000000", + "2011-01-12T00:00:00.000Z spot premium preferred ppreferred 100.000000", + "2011-01-12T00:00:00.000Z spot technology preferred tpreferred 100.000000", + "2011-01-12T00:00:00.000Z spot travel preferred tpreferred 100.000000" + }, + new String[]{ + "2011-01-13T00:00:00.000Z spot automotive preferred apreferred 94.874713", + "2011-01-13T00:00:00.000Z spot business preferred bpreferred 103.629399", + "2011-01-13T00:00:00.000Z spot entertainment preferred epreferred 110.087299", + "2011-01-13T00:00:00.000Z spot health preferred hpreferred 114.947403", + "2011-01-13T00:00:00.000Z spot mezzanine preferred mpreferred 104.465767", + "2011-01-13T00:00:00.000Z spot news preferred npreferred 102.851683", + "2011-01-13T00:00:00.000Z spot premium preferred ppreferred 108.863011", + "2011-01-13T00:00:00.000Z spot technology preferred tpreferred 111.356672", + "2011-01-13T00:00:00.000Z spot travel preferred tpreferred 106.236928" + } + ); - verify(expectedResults, results); + PagingOffset offset = query.getPagingOffset(QueryRunnerTestHelper.segmentId); + List> expectedResults = toExpected( + events, + offset.startOffset(), + offset.threshold() + ); + verify(expectedResults, results); + } } @Test @@ -369,7 +296,8 @@ public class SelectQueryRunnerTest { SelectQuery query = new SelectQuery( new TableDataSource(QueryRunnerTestHelper.dataSource), - new LegacySegmentSpec(new Interval("2011-01-12/2011-01-14")), + I_0112_0114, + descending, new AndDimFilter( Arrays.asList( new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot"), @@ -401,13 +329,13 @@ public class SelectQueryRunnerTest verify(expectedResults, results); } - @Test public void testFullSelectNoDimensionAndMetric() { SelectQuery query = new SelectQuery( new TableDataSource(QueryRunnerTestHelper.dataSource), - new LegacySegmentSpec(new Interval("2011-01-12/2011-01-14")), + I_0112_0114, + descending, null, QueryRunnerTestHelper.allGran, Lists.newArrayList("foo"), @@ -421,40 +349,109 @@ public class SelectQueryRunnerTest Lists.>newArrayList() ); - Map res = Maps.newHashMap(); - res.put("timestamp", new DateTime("2011-01-12T00:00:00.000Z")); - res.put("foo", null); - res.put("foo2", null); - - List> expectedResults = Arrays.asList( - new Result( - new DateTime("2011-01-12T00:00:00.000Z"), - new SelectResultValue( - ImmutableMap.of(QueryRunnerTestHelper.segmentId, 2), - Arrays.asList( - new EventHolder( - QueryRunnerTestHelper.segmentId, - 0, - res - ), - new EventHolder( - QueryRunnerTestHelper.segmentId, - 1, - res - ), - new EventHolder( - QueryRunnerTestHelper.segmentId, - 2, - res - ) - ) - ) - ) + final List>> events = toEvents( + new String[]{ + EventHolder.timestampKey + ":TIME", + "foo:NULL", + "foo2:NULL" + }, + V_0112_0114 ); + PagingOffset offset = query.getPagingOffset(QueryRunnerTestHelper.segmentId); + List> expectedResults = toExpected( + events, + offset.startOffset(), + offset.threshold() + ); verify(expectedResults, results); } + private LinkedHashMap toPagingIdentifier(int startDelta, boolean descending) + { + return Maps.newLinkedHashMap( + ImmutableMap.of( + QueryRunnerTestHelper.segmentId, + PagingOffset.toOffset(startDelta, descending) + ) + ); + } + + private List>> toEvents(final String[] dimSpecs, final String[]... valueSet) + { + List>> events = Lists.newArrayList(); + for (String[] values : valueSet) { + events.add( + Lists.newArrayList( + Iterables.transform( + Arrays.asList(values), new Function>() + { + @Override + public Map apply(String input) + { + Map event = Maps.newHashMap(); + String[] values = input.split("\\t"); + for (int i = 0; i < dimSpecs.length; i++) { + if (dimSpecs[i] == null || i >= dimSpecs.length) { + continue; + } + String[] specs = dimSpecs[i].split(":"); + event.put( + specs[0], + specs[1].equals("TIME") ? new DateTime(values[i]) : + specs[1].equals("FLOAT") ? Float.valueOf(values[i]) : + specs[1].equals("DOUBLE") ? Double.valueOf(values[i]) : + specs[1].equals("LONG") ? Long.valueOf(values[i]) : + specs[1].equals("NULL") ? null : + values[i] + ); + } + return event; + } + } + ) + ) + ); + } + return events; + } + + private List> toExpected( + List>> targets, + final int offset, + final int threshold + ) + { + if (offset < 0) { + targets = Lists.reverse(targets); + } + List> expected = Lists.newArrayListWithExpectedSize(targets.size()); + for (List> group : targets) { + List holders = Lists.newArrayListWithExpectedSize(threshold); + int newOffset = offset; + if (offset < 0) { + int start = group.size() + offset; + int end = Math.max(-1, start - threshold); + for (int i = start; i > end; i--) { + holders.add(new EventHolder(QueryRunnerTestHelper.segmentId, newOffset--, group.get(i))); + } + } else { + int end = Math.min(group.size(), offset + threshold); + for (int i = offset; i < end; i++) { + holders.add(new EventHolder(QueryRunnerTestHelper.segmentId, newOffset++, group.get(i))); + } + } + int lastOffset = holders.isEmpty() ? offset : holders.get(holders.size() - 1).getOffset(); + expected.add( + new Result( + new DateTime(group.get(0).get(EventHolder.timestampKey)), + new SelectResultValue(ImmutableMap.of(QueryRunnerTestHelper.segmentId, lastOffset), holders) + ) + ); + } + return expected; + } + private static void verify( Iterable> expectedResults, Iterable> actualResults