diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/query/ScanBenchmark.java b/benchmarks/src/main/java/org/apache/druid/benchmark/query/ScanBenchmark.java index dc8ecc58ea3..8bea3f4a0a3 100644 --- a/benchmarks/src/main/java/org/apache/druid/benchmark/query/ScanBenchmark.java +++ b/benchmarks/src/main/java/org/apache/druid/benchmark/query/ScanBenchmark.java @@ -114,7 +114,7 @@ public class ScanBenchmark private int limit; @Param({"none", "descending", "ascending"}) - private static String timeOrdering; + private static ScanQuery.TimeOrder timeOrdering; private static final Logger log = new Logger(ScanBenchmark.class); private static final ObjectMapper JSON_MAPPER; diff --git a/processing/src/main/java/org/apache/druid/query/Druids.java b/processing/src/main/java/org/apache/druid/query/Druids.java index b0fe8edfecc..ba4f6c13f60 100644 --- a/processing/src/main/java/org/apache/druid/query/Druids.java +++ b/processing/src/main/java/org/apache/druid/query/Druids.java @@ -924,7 +924,7 @@ public class Druids private DimFilter dimFilter; private List columns; private Boolean legacy; - private String timeOrder; + private ScanQuery.TimeOrder timeOrder; public ScanQueryBuilder() { @@ -1051,7 +1051,7 @@ public class Druids return this; } - public ScanQueryBuilder timeOrder(String timeOrder) + public ScanQueryBuilder timeOrder(ScanQuery.TimeOrder timeOrder) { this.timeOrder = timeOrder; return this; diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java index 323c0a3246a..071a01b7062 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java @@ -40,9 +40,11 @@ public class ScanQuery extends BaseQuery public static final String RESULT_FORMAT_COMPACTED_LIST = "compactedList"; public static final String RESULT_FORMAT_VALUE_VECTOR = "valueVector"; - public static final String TIME_ORDER_ASCENDING = "ascending"; - public static final String TIME_ORDER_DESCENDING = "descending"; - public static final String TIME_ORDER_NONE = "none"; + public enum TimeOrder { + @JsonProperty("ascending") ASCENDING, + @JsonProperty("descending") DESCENDING, + @JsonProperty("none") NONE + } private final VirtualColumns virtualColumns; private final String resultFormat; @@ -51,7 +53,7 @@ public class ScanQuery extends BaseQuery private final DimFilter dimFilter; private final List columns; private final Boolean legacy; - private final String timeOrder; + private final TimeOrder timeOrder; @JsonCreator public ScanQuery( @@ -61,7 +63,7 @@ public class ScanQuery extends BaseQuery @JsonProperty("resultFormat") String resultFormat, @JsonProperty("batchSize") int batchSize, @JsonProperty("limit") long limit, - @JsonProperty("timeOrder") String timeOrder, + @JsonProperty("timeOrder") TimeOrder timeOrder, @JsonProperty("filter") DimFilter dimFilter, @JsonProperty("columns") List columns, @JsonProperty("legacy") Boolean legacy, @@ -78,7 +80,7 @@ public class ScanQuery extends BaseQuery this.dimFilter = dimFilter; this.columns = columns; this.legacy = legacy; - this.timeOrder = timeOrder == null ? TIME_ORDER_NONE : timeOrder; + this.timeOrder = timeOrder == null ? TimeOrder.NONE : timeOrder; } @JsonProperty @@ -106,7 +108,7 @@ public class ScanQuery extends BaseQuery } @JsonProperty - public String getTimeOrder() + public TimeOrder getTimeOrder() { return timeOrder; } diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryConfig.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryConfig.java index b11e67a6f63..d2ec4ba5b29 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryConfig.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryConfig.java @@ -40,7 +40,6 @@ public class ScanQueryConfig return this; } - // int should suffice here because no one should be sorting greater than 2B rows in memory @JsonProperty private int maxRowsTimeOrderedInMemory = 100000; diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java index 069b2dfe8d5..cb6b7871e5b 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java @@ -131,8 +131,8 @@ public class ScanQueryEngine intervals.get(0), query.getVirtualColumns(), Granularities.ALL, - query.getTimeOrder().equals(ScanQuery.TIME_ORDER_DESCENDING) || - (query.getTimeOrder().equals(ScanQuery.TIME_ORDER_NONE) && query.isDescending()), + query.getTimeOrder().equals(ScanQuery.TimeOrder.DESCENDING) || + (query.getTimeOrder().equals(ScanQuery.TimeOrder.NONE) && query.isDescending()), null ) .map(cursor -> new BaseSequence<>( diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java index 575b9cc3f58..8d9ae742a85 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Functions; +import com.google.common.collect.Iterables; import com.google.inject.Inject; import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.guava.BaseSequence; @@ -89,43 +90,39 @@ public class ScanQueryQueryToolChest extends QueryToolChest(scanQueryLimitRowIteratorMaker); - } else if ((scanQuery.getTimeOrder().equals(ScanQuery.TIME_ORDER_ASCENDING) || - scanQuery.getTimeOrder().equals(ScanQuery.TIME_ORDER_DESCENDING)) - && scanQuery.getLimit() <= scanQueryConfig.getMaxRowsTimeOrderedInMemory()) { + } else if (scanQuery.getLimit() <= scanQueryConfig.getMaxRowsTimeOrderedInMemory()) { Iterator scanResultIterator = scanQueryLimitRowIteratorMaker.make(); return new BaseSequence( - new BaseSequence.IteratorMaker() + new BaseSequence.IteratorMaker() { @Override - public ScanBatchedTimeOrderedIterator make() + public ScanBatchedIterator make() { - return new ScanBatchedTimeOrderedIterator( + return new ScanBatchedIterator( sortScanResultValues(scanResultIterator, scanQuery), scanQuery.getBatchSize() ); } @Override - public void cleanup(ScanBatchedTimeOrderedIterator iterFromMake) + public void cleanup(ScanBatchedIterator iterFromMake) { CloseQuietly.close(iterFromMake); } }); - } else if (scanQuery.getLimit() > scanQueryConfig.getMaxRowsTimeOrderedInMemory()) { + } else { throw new UOE( - "Time ordering for result set limit of %s is not supported. Try lowering the " - + "result set size to less than or equal to the time ordering limit of %s.", + "Time ordering for result set limit of %,d is not supported. Try lowering the " + + "result set size to less than or equal to the time ordering limit of %,d.", scanQuery.getLimit(), scanQueryConfig.getMaxRowsTimeOrderedInMemory() ); - } else { - throw new UOE("Time ordering [%s] is not supported", scanQuery.getTimeOrder()); } }; } @@ -198,16 +195,16 @@ public class ScanQueryQueryToolChest extends QueryToolChest + private static class ScanBatchedIterator implements CloseableIterator { private final Iterator itr; private final int batchSize; - public ScanBatchedTimeOrderedIterator(Iterator iterator, int batchSize) + public ScanBatchedIterator(Iterator iterator, int batchSize) { this.itr = iterator; this.batchSize = batchSize; @@ -234,7 +231,7 @@ public class ScanQueryQueryToolChest extends QueryToolChest { @@ -40,7 +44,7 @@ public class ScanResultValue implements Comparable @JsonCreator public ScanResultValue( - @JsonProperty("segmentId") String segmentId, + @Nullable @JsonProperty("segmentId") String segmentId, @JsonProperty("columns") List columns, @JsonProperty("events") Object events ) @@ -50,6 +54,7 @@ public class ScanResultValue implements Comparable this.events = events; } + @Nullable @JsonProperty public String getSegmentId() { @@ -68,6 +73,17 @@ public class ScanResultValue implements Comparable return events; } + public long getFirstEventTimestamp(ScanQuery query) { + if (query.getResultFormat().equals(ScanQuery.RESULT_FORMAT_LIST)) { + return (Long) ((Map) ((List) this.getEvents()).get(0)).get(ColumnHolder.TIME_COLUMN_NAME); + } else if (query.getResultFormat().equals(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)) { + int timeColumnIndex = this.getColumns().indexOf(ColumnHolder.TIME_COLUMN_NAME); + List firstEvent = (List) ((List) this.getEvents()).get(0); + return (Long)firstEvent.get(timeColumnIndex); + } + throw new UOE("Unable to get first event timestamp using result format of [%s]", query.getResultFormat()); + } + @Override public boolean equals(Object o) { diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueTimestampComparator.java b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueTimestampComparator.java index c2edea61312..66abbe612b9 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueTimestampComparator.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueTimestampComparator.java @@ -20,12 +20,8 @@ package org.apache.druid.query.scan; import com.google.common.primitives.Longs; -import org.apache.druid.java.util.common.UOE; -import org.apache.druid.segment.column.ColumnHolder; import java.util.Comparator; -import java.util.List; -import java.util.Map; /** * This comparator class supports comparisons of ScanResultValues based on the timestamp of their first event. Since @@ -47,39 +43,12 @@ public class ScanResultValueTimestampComparator implements Comparator) ((List) o1.getEvents()).get(0)).get(ColumnHolder.TIME_COLUMN_NAME), - (Long) ((Map) ((List) o2.getEvents()).get(0)).get(ColumnHolder.TIME_COLUMN_NAME) - ); - } else if (scanQuery.getResultFormat().equals(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)) { - int val1TimeColumnIndex = o1.getColumns().indexOf(ColumnHolder.TIME_COLUMN_NAME); - int val2TimeColumnIndex = o2.getColumns().indexOf(ColumnHolder.TIME_COLUMN_NAME); - List event1 = (List) ((List) o1.getEvents()).get(0); - List event2 = (List) ((List) o2.getEvents()).get(0); - comparison = Longs.compare( - (Long) event1.get(val1TimeColumnIndex), - (Long) event2.get(val2TimeColumnIndex) - ); - } else { - throw new UOE("Result format [%s] is not supported", scanQuery.getResultFormat()); - } - if (scanQuery.getTimeOrder().equals(ScanQuery.TIME_ORDER_DESCENDING)) { + comparison = Longs.compare( + o1.getFirstEventTimestamp(scanQuery), + o2.getFirstEventTimestamp(scanQuery)); + if (scanQuery.getTimeOrder().equals(ScanQuery.TimeOrder.DESCENDING)) { return comparison * -1; } return comparison; } - - @Override - public boolean equals(Object obj) - { - if (this == obj) { - return true; - } - if (obj == null || getClass() != obj.getClass()) { - return false; - } - ScanResultValueTimestampComparator comp = (ScanResultValueTimestampComparator) obj; - return this.scanQuery.equals(comp.scanQuery); - } } diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryQueryToolChestTest.java index 8a057924d89..137855de509 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryQueryToolChestTest.java @@ -92,7 +92,7 @@ public class ScanQueryQueryToolChestTest } ScanQuery scanQuery = new Druids.ScanQueryBuilder() .resultFormat("list") - .timeOrder(ScanQuery.TIME_ORDER_DESCENDING) + .timeOrder(ScanQuery.TimeOrder.DESCENDING) .dataSource("some data source") .intervals(emptySegmentSpec) .limit(99999) @@ -129,7 +129,7 @@ public class ScanQueryQueryToolChestTest } ScanQuery scanQuery = new Druids.ScanQueryBuilder() .resultFormat("list") - .timeOrder(ScanQuery.TIME_ORDER_ASCENDING) + .timeOrder(ScanQuery.TimeOrder.ASCENDING) .dataSource("some data source") .intervals(emptySegmentSpec) .limit(99999) @@ -164,7 +164,7 @@ public class ScanQueryQueryToolChestTest } ScanQuery scanQuery = new Druids.ScanQueryBuilder() .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) - .timeOrder(ScanQuery.TIME_ORDER_DESCENDING) + .timeOrder(ScanQuery.TimeOrder.DESCENDING) .dataSource("some data source") .intervals(emptySegmentSpec) .limit(99999) @@ -199,7 +199,7 @@ public class ScanQueryQueryToolChestTest } ScanQuery scanQuery = new Druids.ScanQueryBuilder() .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) - .timeOrder(ScanQuery.TIME_ORDER_ASCENDING) + .timeOrder(ScanQuery.TimeOrder.ASCENDING) .dataSource("some data source") .intervals(emptySegmentSpec) .limit(99999) diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueTimestampComparatorTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueTimestampComparatorTest.java index 005a820ec75..5e55e894858 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueTimestampComparatorTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueTimestampComparatorTest.java @@ -52,7 +52,7 @@ public class ScanResultValueTimestampComparatorTest public void comparisonDescendingListTest() { ScanQuery query = Druids.newScanQueryBuilder() - .timeOrder(ScanQuery.TIME_ORDER_DESCENDING) + .timeOrder(ScanQuery.TimeOrder.DESCENDING) .resultFormat(ScanQuery.RESULT_FORMAT_LIST) .dataSource("some src") .intervals(intervalSpec) @@ -89,7 +89,7 @@ public class ScanResultValueTimestampComparatorTest public void comparisonAscendingListTest() { ScanQuery query = Druids.newScanQueryBuilder() - .timeOrder(ScanQuery.TIME_ORDER_ASCENDING) + .timeOrder(ScanQuery.TimeOrder.ASCENDING) .resultFormat(ScanQuery.RESULT_FORMAT_LIST) .dataSource("some src") .intervals(intervalSpec) @@ -126,7 +126,7 @@ public class ScanResultValueTimestampComparatorTest public void comparisonDescendingCompactedListTest() { ScanQuery query = Druids.newScanQueryBuilder() - .timeOrder(ScanQuery.TIME_ORDER_DESCENDING) + .timeOrder(ScanQuery.TimeOrder.DESCENDING) .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) .dataSource("some src") .intervals(intervalSpec) @@ -161,7 +161,7 @@ public class ScanResultValueTimestampComparatorTest public void comparisonAscendingCompactedListTest() { ScanQuery query = Druids.newScanQueryBuilder() - .timeOrder(ScanQuery.TIME_ORDER_ASCENDING) + .timeOrder(ScanQuery.TimeOrder.ASCENDING) .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST) .dataSource("some src") .intervals(intervalSpec)