diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/query/ScanBenchmark.java b/benchmarks/src/main/java/org/apache/druid/benchmark/query/ScanBenchmark.java index 8bea3f4a0a3..9d302a80179 100644 --- a/benchmarks/src/main/java/org/apache/druid/benchmark/query/ScanBenchmark.java +++ b/benchmarks/src/main/java/org/apache/druid/benchmark/query/ScanBenchmark.java @@ -113,7 +113,7 @@ public class ScanBenchmark @Param({"1000", "99999"}) private int limit; - @Param({"none", "descending", "ascending"}) + @Param({"NONE", "DESCENDING", "ASCENDING"}) private static ScanQuery.TimeOrder timeOrdering; private static final Logger log = new Logger(ScanBenchmark.class); diff --git a/docs/content/querying/scan-query.md b/docs/content/querying/scan-query.md index d7866abbae7..a025ea5c4c6 100644 --- a/docs/content/querying/scan-query.md +++ b/docs/content/querying/scan-query.md @@ -27,8 +27,11 @@ title: "Scan query" The Scan query returns raw Druid rows in streaming mode. The biggest difference between the Select query and the Scan query is that the Scan query does not retain all the returned rows in memory before they are returned to the client (except when time-ordering is used). The Select query _will_ retain the rows in memory, causing memory pressure if too -many rows are returned. The Scan query can return all the rows without issuing another pagination query, which is -extremely useful when directly querying against historical or realtime nodes. +many rows are returned. The Scan query can return all the rows without issuing another pagination query. + +In addition to straightforward usage where a Scan query is issued to the Broker, the Scan query can also be issued +directly to Historical processes or streaming ingestion tasks. This can be useful if you want to retrieve large +amounts of data in parallel. An example Scan query object is shown below: diff --git a/processing/src/main/java/org/apache/druid/query/Druids.java b/processing/src/main/java/org/apache/druid/query/Druids.java index ba4f6c13f60..a0d85cbb5f9 100644 --- a/processing/src/main/java/org/apache/druid/query/Druids.java +++ b/processing/src/main/java/org/apache/druid/query/Druids.java @@ -918,7 +918,7 @@ public class Druids private QuerySegmentSpec querySegmentSpec; private VirtualColumns virtualColumns; private Map context; - private String resultFormat; + private ScanQuery.ResultFormat resultFormat; private int batchSize; private long limit; private DimFilter dimFilter; @@ -1009,7 +1009,7 @@ public class Druids return this; } - public ScanQueryBuilder resultFormat(String r) + public ScanQueryBuilder resultFormat(ScanQuery.ResultFormat r) { resultFormat = r; return this; diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java index 071a01b7062..57ad220afcb 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java @@ -21,7 +21,9 @@ package org.apache.druid.query.scan; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonValue; import com.google.common.base.Preconditions; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.BaseQuery; import org.apache.druid.query.DataSource; import org.apache.druid.query.Druids; @@ -36,18 +38,76 @@ import java.util.Objects; public class ScanQuery extends BaseQuery { - public static final String RESULT_FORMAT_LIST = "list"; - public static final String RESULT_FORMAT_COMPACTED_LIST = "compactedList"; - public static final String RESULT_FORMAT_VALUE_VECTOR = "valueVector"; + public enum ResultFormat + { + RESULT_FORMAT_LIST, + RESULT_FORMAT_COMPACTED_LIST, + RESULT_FORMAT_VALUE_VECTOR; - public enum TimeOrder { - @JsonProperty("ascending") ASCENDING, - @JsonProperty("descending") DESCENDING, - @JsonProperty("none") NONE + @JsonValue + @Override + public String toString() + { + switch (this) { + case RESULT_FORMAT_LIST: + return "list"; + case RESULT_FORMAT_COMPACTED_LIST: + return "compactedList"; + case RESULT_FORMAT_VALUE_VECTOR: + return "valueVector"; + default: + return ""; + } + } + + @JsonCreator + public static ResultFormat fromString(String name) + { + switch (name) { + case "compactedList": + return RESULT_FORMAT_COMPACTED_LIST; + case "valueVector": + return RESULT_FORMAT_VALUE_VECTOR; + case "list": + return RESULT_FORMAT_LIST; + default: + return RESULT_FORMAT_LIST; + } + } + + public byte[] getCacheKey() + { + return new byte[]{(byte) this.ordinal()}; + } + } + + public enum TimeOrder + { + ASCENDING, + DESCENDING, + NONE; + + @JsonValue + @Override + public String toString() + { + return StringUtils.toLowerCase(this.name()); + } + + @JsonCreator + public static TimeOrder fromString(String name) + { + return valueOf(StringUtils.toUpperCase(name)); + } + + public byte[] getCacheKey() + { + return new byte[]{(byte) this.ordinal()}; + } } private final VirtualColumns virtualColumns; - private final String resultFormat; + private final ResultFormat resultFormat; private final int batchSize; private final long limit; private final DimFilter dimFilter; @@ -60,7 +120,7 @@ public class ScanQuery extends BaseQuery @JsonProperty("dataSource") DataSource dataSource, @JsonProperty("intervals") QuerySegmentSpec querySegmentSpec, @JsonProperty("virtualColumns") VirtualColumns virtualColumns, - @JsonProperty("resultFormat") String resultFormat, + @JsonProperty("resultFormat") ResultFormat resultFormat, @JsonProperty("batchSize") int batchSize, @JsonProperty("limit") long limit, @JsonProperty("timeOrder") TimeOrder timeOrder, @@ -72,7 +132,7 @@ public class ScanQuery extends BaseQuery { super(dataSource, querySegmentSpec, false, context); this.virtualColumns = VirtualColumns.nullToEmpty(virtualColumns); - this.resultFormat = resultFormat == null ? RESULT_FORMAT_LIST : resultFormat; + this.resultFormat = resultFormat; this.batchSize = (batchSize == 0) ? 4096 * 5 : batchSize; this.limit = (limit == 0) ? Long.MAX_VALUE : limit; Preconditions.checkArgument(this.batchSize > 0, "batchSize must be greater than 0"); @@ -90,7 +150,7 @@ public class ScanQuery extends BaseQuery } @JsonProperty - public String getResultFormat() + public ResultFormat getResultFormat() { return resultFormat; } diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java index cb6b7871e5b..12c38f0d48e 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java @@ -178,13 +178,13 @@ public class ScanQueryEngine } final long lastOffset = offset; final Object events; - final String resultFormat = query.getResultFormat(); - if (ScanQuery.RESULT_FORMAT_COMPACTED_LIST.equals(resultFormat)) { + final ScanQuery.ResultFormat resultFormat = query.getResultFormat(); + if (ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST.equals(resultFormat)) { events = rowsToCompactedList(); - } else if (ScanQuery.RESULT_FORMAT_LIST.equals(resultFormat)) { + } else if (ScanQuery.ResultFormat.RESULT_FORMAT_LIST.equals(resultFormat)) { events = rowsToList(); } else { - throw new UOE("resultFormat[%s] is not supported", resultFormat); + throw new UOE("resultFormat[%s] is not supported", resultFormat.toString()); } responseContext.put( ScanQueryRunnerFactory.CTX_COUNT, diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryLimitRowIterator.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryLimitRowIterator.java index 3f56054b5f4..6fc0685e04f 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryLimitRowIterator.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryLimitRowIterator.java @@ -33,7 +33,7 @@ import java.util.Map; public class ScanQueryLimitRowIterator implements CloseableIterator { private Yielder yielder; - private String resultFormat; + private ScanQuery.ResultFormat resultFormat; private long limit; private long count = 0; @@ -71,8 +71,8 @@ public class ScanQueryLimitRowIterator implements CloseableIterator +{ + private Yielder yielder; + private ScanQuery.ResultFormat resultFormat; + + public ScanQueryNoLimitRowIterator( + QueryRunner baseRunner, + QueryPlus queryPlus, + Map responseContext + ) + { + ScanQuery query = Druids.ScanQueryBuilder.copy((ScanQuery) queryPlus.getQuery()).limit(Long.MAX_VALUE).timeOrder( + ScanQuery.TimeOrder.NONE).build(); + resultFormat = query.getResultFormat(); + queryPlus = queryPlus.withQuery(query); + Sequence baseSequence = baseRunner.run(queryPlus, responseContext); + yielder = baseSequence.toYielder( + null, + new YieldingAccumulator() + { + @Override + public ScanResultValue accumulate(ScanResultValue accumulated, ScanResultValue in) + { + yield(); + return in; + } + } + ); + } + + @Override + public boolean hasNext() + { + return !yielder.isDone(); + } + + @Override + public ScanResultValue next() + { + ScanResultValue batch = yielder.get(); + if (ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST.equals(resultFormat) || + ScanQuery.ResultFormat.RESULT_FORMAT_LIST.equals(resultFormat)) { + yielder = yielder.next(null); + return batch; + } + throw new UnsupportedOperationException(ScanQuery.ResultFormat.RESULT_FORMAT_VALUE_VECTOR + " is not supported yet"); + } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws IOException + { + yielder.close(); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java index 8d9ae742a85..5e72f547ae8 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java @@ -39,9 +39,11 @@ import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.aggregation.MetricManipulationFn; import java.io.IOException; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.Deque; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -74,29 +76,42 @@ public class ScanQueryQueryToolChest extends QueryToolChest queryPlusWithNonNullLegacy = queryPlus.withQuery(scanQuery); - final BaseSequence.IteratorMaker scanQueryLimitRowIteratorMaker = - new BaseSequence.IteratorMaker() - { - @Override - public ScanQueryLimitRowIterator make() - { - return new ScanQueryLimitRowIterator(runner, queryPlusWithNonNullLegacy, responseContext); - } - - @Override - public void cleanup(ScanQueryLimitRowIterator iterFromMake) - { - CloseQuietly.close(iterFromMake); - } - }; - if (scanQuery.getTimeOrder().equals(ScanQuery.TimeOrder.NONE)) { if (scanQuery.getLimit() == Long.MAX_VALUE) { return runner.run(queryPlusWithNonNullLegacy, responseContext); + } else { + return new BaseSequence<>( + new BaseSequence.IteratorMaker() + { + @Override + public ScanQueryLimitRowIterator make() + { + return new ScanQueryLimitRowIterator(runner, queryPlusWithNonNullLegacy, responseContext); + } + + @Override + public void cleanup(ScanQueryLimitRowIterator iterFromMake) + { + CloseQuietly.close(iterFromMake); + } + }); } - return new BaseSequence(scanQueryLimitRowIteratorMaker); } else if (scanQuery.getLimit() <= scanQueryConfig.getMaxRowsTimeOrderedInMemory()) { - Iterator scanResultIterator = scanQueryLimitRowIteratorMaker.make(); + ScanQueryNoLimitRowIterator scanResultIterator = + new BaseSequence.IteratorMaker() + { + @Override + public ScanQueryNoLimitRowIterator make() + { + return new ScanQueryNoLimitRowIterator(runner, queryPlusWithNonNullLegacy, responseContext); + } + + @Override + public void cleanup(ScanQueryNoLimitRowIterator iterFromMake) + { + CloseQuietly.close(iterFromMake); + } + }.make(); return new BaseSequence( new BaseSequence.IteratorMaker() @@ -105,7 +120,7 @@ public class ScanQueryQueryToolChest extends QueryToolChest sortScanResultValues(Iterator inputIterator, ScanQuery scanQuery) + Iterator sortAndLimitScanResultValues(Iterator inputIterator, ScanQuery scanQuery) { Comparator priorityQComparator = new ScanResultValueTimestampComparator(scanQuery); // Converting the limit from long to int could theoretically throw an ArithmeticException but this branch // only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which should be < Integer.MAX_VALUE) - - PriorityQueue q = new PriorityQueue<>(Math.toIntExact(scanQuery.getLimit()), priorityQComparator); + int limit = Math.toIntExact(scanQuery.getLimit()); + PriorityQueue q = new PriorityQueue<>(limit, priorityQComparator); while (inputIterator.hasNext()) { ScanResultValue next = inputIterator.next(); @@ -183,14 +198,19 @@ public class ScanQueryQueryToolChest extends QueryToolChest limit) { + q.poll(); + } } } // Need to convert to a List because Priority Queue's iterator doesn't guarantee that the sorted order // will be maintained - List sortedElements = new ArrayList<>(q.size()); + final Deque sortedElements = new ArrayDeque<>(q.size()); while (q.size() != 0) { - sortedElements.add(q.poll()); + // We add at the front of the list because poll removes the tail of the queue. + sortedElements.addFirst(q.poll()); } + return sortedElements.iterator(); } diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValue.java b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValue.java index a9a10fa366e..b8a790cc41c 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValue.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValue.java @@ -73,13 +73,14 @@ public class ScanResultValue implements Comparable return events; } - public long getFirstEventTimestamp(ScanQuery query) { - if (query.getResultFormat().equals(ScanQuery.RESULT_FORMAT_LIST)) { + public long getFirstEventTimestamp(ScanQuery query) + { + if (query.getResultFormat().equals(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)) { return (Long) ((Map) ((List) this.getEvents()).get(0)).get(ColumnHolder.TIME_COLUMN_NAME); - } else if (query.getResultFormat().equals(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)) { + } else if (query.getResultFormat().equals(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)) { int timeColumnIndex = this.getColumns().indexOf(ColumnHolder.TIME_COLUMN_NAME); List firstEvent = (List) ((List) this.getEvents()).get(0); - return (Long)firstEvent.get(timeColumnIndex); + return (Long) firstEvent.get(timeColumnIndex); } throw new UOE("Unable to get first event timestamp using result format of [%s]", query.getResultFormat()); } diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueTimestampComparator.java b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueTimestampComparator.java index 66abbe612b9..5ee1672b570 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueTimestampComparator.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueTimestampComparator.java @@ -47,8 +47,8 @@ public class ScanResultValueTimestampComparator implements Comparator sorted = chest.sortScanResultValues(inputs.iterator(), scanQuery); + Iterator sorted = chest.sortAndLimitScanResultValues(inputs.iterator(), scanQuery); int count = 0; Long previousTime = Long.MAX_VALUE; @@ -128,13 +128,13 @@ public class ScanQueryQueryToolChestTest ); } ScanQuery scanQuery = new Druids.ScanQueryBuilder() - .resultFormat("list") + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST) .timeOrder(ScanQuery.TimeOrder.ASCENDING) .dataSource("some data source") .intervals(emptySegmentSpec) .limit(99999) .build(); - Iterator sorted = chest.sortScanResultValues(inputs.iterator(), scanQuery); + Iterator sorted = chest.sortAndLimitScanResultValues(inputs.iterator(), scanQuery); int count = 0; Long previousTime = -1L; @@ -163,13 +163,13 @@ public class ScanQueryQueryToolChestTest ); } ScanQuery scanQuery = new Druids.ScanQueryBuilder() - .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) .timeOrder(ScanQuery.TimeOrder.DESCENDING) .dataSource("some data source") .intervals(emptySegmentSpec) .limit(99999) .build(); - Iterator sorted = chest.sortScanResultValues(inputs.iterator(), scanQuery); + Iterator sorted = chest.sortAndLimitScanResultValues(inputs.iterator(), scanQuery); Long previousTime = Long.MAX_VALUE; int count = 0; @@ -198,13 +198,13 @@ public class ScanQueryQueryToolChestTest ); } ScanQuery scanQuery = new Druids.ScanQueryBuilder() - .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) .timeOrder(ScanQuery.TimeOrder.ASCENDING) .dataSource("some data source") .intervals(emptySegmentSpec) .limit(99999) .build(); - Iterator sorted = chest.sortScanResultValues(inputs.iterator(), scanQuery); + Iterator sorted = chest.sortAndLimitScanResultValues(inputs.iterator(), scanQuery); Long previousTime = -1L; int count = 0; diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java index 750b76ea06c..ae3c2e84227 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java @@ -216,7 +216,7 @@ public class ScanQueryRunnerTest ScanQuery query = newTestQuery() .intervals(I_0112_0114) .virtualColumns(EXPR_COLUMN) - .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) .build(); HashMap context = new HashMap(); @@ -322,7 +322,7 @@ public class ScanQueryRunnerTest ScanQuery query = newTestQuery() .intervals(I_0112_0114) .columns(QueryRunnerTestHelper.marketDimension, QueryRunnerTestHelper.indexMetric) - .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) .build(); HashMap context = new HashMap(); @@ -524,7 +524,7 @@ public class ScanQueryRunnerTest .filters(new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot", null)) .columns(QueryRunnerTestHelper.qualityDimension, QueryRunnerTestHelper.indexMetric) .limit(limit) - .timeOrder("ascending") + .timeOrder(ScanQuery.TimeOrder.ASCENDING) .build(); HashMap context = new HashMap<>(); @@ -582,7 +582,7 @@ public class ScanQueryRunnerTest .filters(new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot", null)) .columns(QueryRunnerTestHelper.qualityDimension, QueryRunnerTestHelper.indexMetric) .limit(limit) - .timeOrder("descending") + .timeOrder(ScanQuery.TimeOrder.DESCENDING) .build(); HashMap context = new HashMap<>(); @@ -664,8 +664,8 @@ public class ScanQueryRunnerTest .intervals(I_0112_0114) .filters(new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot", null)) .columns(QueryRunnerTestHelper.qualityDimension, QueryRunnerTestHelper.indexMetric) - .resultFormat("compactedList") - .timeOrder("ascending") + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .timeOrder(ScanQuery.TimeOrder.ASCENDING) .limit(limit) .build(); @@ -725,8 +725,8 @@ public class ScanQueryRunnerTest .intervals(I_0112_0114) .filters(new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot", null)) .columns(QueryRunnerTestHelper.qualityDimension, QueryRunnerTestHelper.indexMetric) - .resultFormat("compactedList") - .timeOrder("descending") + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .timeOrder(ScanQuery.TimeOrder.DESCENDING) .limit(limit) .build(); diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQuerySpecTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQuerySpecTest.java index 9cfbade4222..b4cf5631c48 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanQuerySpecTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQuerySpecTest.java @@ -65,10 +65,10 @@ public class ScanQuerySpecTest new TableDataSource(QueryRunnerTestHelper.dataSource), new LegacySegmentSpec(Intervals.of("2011-01-12/2011-01-14")), VirtualColumns.EMPTY, - null, + ScanQuery.ResultFormat.RESULT_FORMAT_LIST, 0, 3, - "none", + ScanQuery.TimeOrder.NONE, null, Arrays.asList("market", "quality", "index"), null, diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueTimestampComparatorTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueTimestampComparatorTest.java index 5e55e894858..293462217f2 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueTimestampComparatorTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueTimestampComparatorTest.java @@ -53,7 +53,7 @@ public class ScanResultValueTimestampComparatorTest { ScanQuery query = Druids.newScanQueryBuilder() .timeOrder(ScanQuery.TimeOrder.DESCENDING) - .resultFormat(ScanQuery.RESULT_FORMAT_LIST) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST) .dataSource("some src") .intervals(intervalSpec) .build(); @@ -82,7 +82,7 @@ public class ScanResultValueTimestampComparatorTest events2 ); - Assert.assertEquals(1, comparator.compare(s1, s2)); + Assert.assertEquals(-1, comparator.compare(s1, s2)); } @Test @@ -90,7 +90,7 @@ public class ScanResultValueTimestampComparatorTest { ScanQuery query = Druids.newScanQueryBuilder() .timeOrder(ScanQuery.TimeOrder.ASCENDING) - .resultFormat(ScanQuery.RESULT_FORMAT_LIST) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST) .dataSource("some src") .intervals(intervalSpec) .build(); @@ -119,7 +119,7 @@ public class ScanResultValueTimestampComparatorTest events2 ); - Assert.assertEquals(-1, comparator.compare(s1, s2)); + Assert.assertEquals(1, comparator.compare(s1, s2)); } @Test @@ -127,7 +127,42 @@ public class ScanResultValueTimestampComparatorTest { ScanQuery query = Druids.newScanQueryBuilder() .timeOrder(ScanQuery.TimeOrder.DESCENDING) - .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .dataSource("some src") + .intervals(intervalSpec) + .build(); + + ScanResultValueTimestampComparator comparator = new ScanResultValueTimestampComparator(query); + + List> events1 = new ArrayList<>(); + List event1 = Collections.singletonList(new Long(42)); + events1.add(event1); + + ScanResultValue s1 = new ScanResultValue( + "segmentId", + Collections.singletonList(ColumnHolder.TIME_COLUMN_NAME), + events1 + ); + + List> events2 = new ArrayList<>(); + List event2 = Collections.singletonList(new Long(43)); + events2.add(event2); + + ScanResultValue s2 = new ScanResultValue( + "segmentId", + Collections.singletonList(ColumnHolder.TIME_COLUMN_NAME), + events2 + ); + + Assert.assertEquals(-1, comparator.compare(s1, s2)); + } + + @Test + public void comparisonAscendingCompactedListTest() + { + ScanQuery query = Druids.newScanQueryBuilder() + .timeOrder(ScanQuery.TimeOrder.ASCENDING) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) .dataSource("some src") .intervals(intervalSpec) .build(); @@ -156,39 +191,4 @@ public class ScanResultValueTimestampComparatorTest Assert.assertEquals(1, comparator.compare(s1, s2)); } - - @Test - public void comparisonAscendingCompactedListTest() - { - ScanQuery query = Druids.newScanQueryBuilder() - .timeOrder(ScanQuery.TimeOrder.ASCENDING) - .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) - .dataSource("some src") - .intervals(intervalSpec) - .build(); - - ScanResultValueTimestampComparator comparator = new ScanResultValueTimestampComparator(query); - - List> events1 = new ArrayList<>(); - List event1 = Collections.singletonList(new Long(42)); - events1.add(event1); - - ScanResultValue s1 = new ScanResultValue( - "segmentId", - Collections.singletonList(ColumnHolder.TIME_COLUMN_NAME), - events1 - ); - - List> events2 = new ArrayList<>(); - List event2 = Collections.singletonList(new Long(43)); - events2.add(event2); - - ScanResultValue s2 = new ScanResultValue( - "segmentId", - Collections.singletonList(ColumnHolder.TIME_COLUMN_NAME), - events2 - ); - - Assert.assertEquals(-1, comparator.compare(s1, s2)); - } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java index 6cb4eafa412..778c44527a5 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java @@ -961,7 +961,7 @@ public class DruidQuery dataSource, filtration.getQuerySegmentSpec(), selectProjection != null ? VirtualColumns.create(selectProjection.getVirtualColumns()) : VirtualColumns.EMPTY, - ScanQuery.RESULT_FORMAT_COMPACTED_LIST, + ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST, 0, scanLimit, null, // Will default to "none" diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java index e963517ec1a..a40b424e699 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java @@ -370,7 +370,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase public static Druids.ScanQueryBuilder newScanQueryBuilder() { - return new Druids.ScanQueryBuilder().resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) + return new Druids.ScanQueryBuilder().resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) .legacy(false); } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 7ffb44965e3..d131cd361f3 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -111,7 +111,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest .intervals(QSS(Filtration.eternity())) .virtualColumns(EXPRESSION_VIRTUAL_COLUMN("v0", "2", ValueType.LONG)) .columns("dim1", "v0") - .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) .limit(1) .context(QUERY_CONTEXT_DEFAULT) .build() @@ -431,7 +431,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest .dataSource(CalciteTests.DATASOURCE1) .intervals(QSS(Filtration.eternity())) .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1") - .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) .context(QUERY_CONTEXT_DEFAULT) .build() ), @@ -465,7 +465,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest .dataSource(CalciteTests.FORBIDDEN_DATASOURCE) .intervals(QSS(Filtration.eternity())) .columns("__time", "cnt", "dim1", "dim2", "m1", "m2", "unique_dim1") - .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) .context(QUERY_CONTEXT_DEFAULT) .build() ), @@ -531,7 +531,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest .intervals(QSS(Filtration.eternity())) .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1") .limit(2) - .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) .context(QUERY_CONTEXT_DEFAULT) .build() ), @@ -556,7 +556,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest ) .columns("v0") .limit(2) - .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) .context(QUERY_CONTEXT_DEFAULT) .build() ), @@ -652,7 +652,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest .intervals(QSS(Filtration.eternity())) .columns("dim2") .limit(2) - .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) .context(QUERY_CONTEXT_DEFAULT) .build() ), @@ -744,14 +744,14 @@ public class CalciteQueryTest extends BaseCalciteQueryTest .intervals(QSS(Filtration.eternity())) .columns("dim1") .filters(NOT(SELECTOR("dim1", "", null))) - .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) .context(QUERY_CONTEXT_DEFAULT) .build(), newScanQueryBuilder() .dataSource(CalciteTests.DATASOURCE1) .intervals(QSS(Filtration.eternity())) .columns("dim1", "dim2") - .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) .context(QUERY_CONTEXT_DEFAULT) .build() ), @@ -1879,7 +1879,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest ) ) .columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1") - .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) .context(QUERY_CONTEXT_DEFAULT) .build() ), @@ -6715,7 +6715,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest ) )) .columns("__time", "cnt", "dim1", "dim2") - .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) .context(QUERY_CONTEXT_DEFAULT) .build() ), @@ -7270,7 +7270,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest ValueType.STRING )) .columns("v0") - .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) .context(QUERY_CONTEXT_DEFAULT) .build() ), @@ -7296,7 +7296,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest ValueType.STRING )) .columns("v0") - .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) .context(QUERY_CONTEXT_DEFAULT) .build() ), @@ -7322,7 +7322,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest .intervals(QSS(Filtration.eternity())) .virtualColumns(EXPRESSION_VIRTUAL_COLUMN("v0", "concat(\"dim1\",\"dim1\")", ValueType.STRING)) .columns("v0") - .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) .context(QUERY_CONTEXT_DEFAULT) .build() ), @@ -7348,7 +7348,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest ValueType.STRING )) .columns("v0") - .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) .context(QUERY_CONTEXT_DEFAULT) .build() ), @@ -7544,7 +7544,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest .intervals(QSS(Filtration.eternity())) .columns("dim1") .filters(SELECTOR("f1", "0.1", null)) - .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) .limit(1) .context(QUERY_CONTEXT_DEFAULT) .build() @@ -7566,7 +7566,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest .intervals(QSS(Filtration.eternity())) .columns("dim1") .filters(SELECTOR("d1", "1.7", null)) - .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) .limit(1) .context(QUERY_CONTEXT_DEFAULT) .build() @@ -7588,7 +7588,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest .intervals(QSS(Filtration.eternity())) .columns("dim1") .filters(SELECTOR("l1", "7", null)) - .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) .limit(1) .context(QUERY_CONTEXT_DEFAULT) .build()