From 85f10ed0d0df0e040a5448ce8b99a2c972537381 Mon Sep 17 00:00:00 2001 From: Justin Borromeo Date: Fri, 12 Apr 2019 19:08:34 -0700 Subject: [PATCH] Support querying realtime segments using time-ordered scan queries and fix broken scan queries without time column (#7454) * Update scan query runner factory to accept SpecificSegmentSpec * nit * Sorry travis * Improve logging and fix doc * Bug fix * Friendlier error msgs and tests to cover bug * Address Gian's comments * Fix doc * Added tests for empty and null column list * Style * Fix checking wrong order (looking at query param when it should be looking at the null-handled order) * Add test case for null order * Fix ScanQueryRunnerTest * Forbidden APIs fixed --- docs/content/querying/scan-query.md | 2 +- .../apache/druid/query/scan/ScanQuery.java | 10 + .../query/scan/ScanQueryRunnerFactory.java | 37 +- .../druid/query/scan/ScanResultValue.java | 10 +- .../ScanResultValueTimestampComparator.java | 3 +- .../spec/MultipleSpecificSegmentSpec.java | 10 +- .../scan/ScanQueryRunnerFactoryTest.java | 424 ++++++++++-------- .../druid/query/scan/ScanQueryRunnerTest.java | 275 ++++++++---- .../druid/query/scan/ScanQueryTest.java | 272 +++++++++++ ...canResultValueTimestampComparatorTest.java | 8 +- 10 files changed, 761 insertions(+), 290 deletions(-) create mode 100644 processing/src/test/java/org/apache/druid/query/scan/ScanQueryTest.java diff --git a/docs/content/querying/scan-query.md b/docs/content/querying/scan-query.md index 7ba5c6097d9..64210475a92 100644 --- a/docs/content/querying/scan-query.md +++ b/docs/content/querying/scan-query.md @@ -61,7 +61,7 @@ The following are the main parameters for Scan queries: |columns|A String array of dimensions and metrics to scan. If left empty, all dimensions and metrics are returned.|no| |batchSize|How many rows buffered before return to client. Default is `20480`|no| |limit|How many rows to return. If not specified, all rows will be returned.|no| -|order|The ordering of returned rows based on timestamp. "ascending", "descending", and "none" (default) are supported. Currently, "ascending" and "descending" are only supported for queries where the limit is less than `druid.query.scan.maxRowsQueuedForOrdering`. Scan queries that are either legacy mode or have a limit greater than `druid.query.scan.maxRowsQueuedForOrdering` will not be time-ordered and default to a order of "none".|none| +|order|The ordering of returned rows based on timestamp. "ascending", "descending", and "none" (default) are supported. Currently, "ascending" and "descending" are only supported for queries where the `__time` column is included in the `columns` field and the requirements outlined in the [time ordering](#time-ordering) section are met.|none| |legacy|Return results consistent with the legacy "scan-query" contrib extension. Defaults to the value set by `druid.query.scan.legacy`, which in turn defaults to false. See [Legacy mode](#legacy-mode) for details.|no| |context|An additional JSON Object which can be used to specify certain flags (see the Query Context Properties section below).|no| diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java index 2b6fc82b71d..3f6d4074ea7 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java @@ -34,6 +34,7 @@ import org.apache.druid.query.Query; import org.apache.druid.query.filter.DimFilter; import org.apache.druid.query.spec.QuerySegmentSpec; import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.ColumnHolder; import javax.annotation.Nullable; import java.util.List; @@ -149,6 +150,12 @@ public class ScanQuery extends BaseQuery this.columns = columns; this.legacy = legacy; this.order = (order == null) ? Order.NONE : order; + if (this.order != Order.NONE) { + Preconditions.checkArgument( + columns == null || columns.size() == 0 || columns.contains(ColumnHolder.TIME_COLUMN_NAME), + "The __time column must be selected if the results are time-ordered." + ); + } this.maxRowsQueuedForOrdering = validateAndGetMaxRowsQueuedForOrdering(); this.maxSegmentPartitionsOrderedInMemory = validateAndGetMaxSegmentPartitionsOrderedInMemory(); } @@ -256,6 +263,9 @@ public class ScanQuery extends BaseQuery @Override public Ordering getResultOrdering() { + if (order == Order.NONE) { + return Ordering.natural(); + } return Ordering.from(new ScanResultValueTimestampComparator(this)).reverse(); } diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java index 5f49a666944..0a9f3b9b52d 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java @@ -40,11 +40,14 @@ import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; +import org.apache.druid.query.spec.QuerySegmentSpec; +import org.apache.druid.query.spec.SpecificSegmentSpec; import org.apache.druid.segment.Segment; import org.joda.time.Interval; import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.Deque; import java.util.LinkedHashMap; @@ -111,17 +114,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory descriptorsOrdered = - ((MultipleSpecificSegmentSpec) query.getQuerySegmentSpec()).getDescriptors(); + List descriptorsOrdered = getSegmentDescriptorsFromSpecificQuerySpec(query.getQuerySegmentSpec()); List> queryRunnersOrdered = Lists.newArrayList(queryRunners); if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) { @@ -286,6 +279,28 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory getSegmentDescriptorsFromSpecificQuerySpec(QuerySegmentSpec spec) + { + // Query segment spec must be an instance of MultipleSpecificSegmentSpec or SpecificSegmentSpec because + // segment descriptors need to be present for a 1:1 matching of intervals with query runners. + // The other types of segment spec condense the intervals (i.e. merge neighbouring intervals), eliminating + // the 1:1 relationship between intervals and query runners. + List descriptorsOrdered; + + if (spec instanceof MultipleSpecificSegmentSpec) { + // Ascending time order for both descriptors and query runners by default + descriptorsOrdered = ((MultipleSpecificSegmentSpec) spec).getDescriptors(); + } else if (spec instanceof SpecificSegmentSpec) { + descriptorsOrdered = Collections.singletonList(((SpecificSegmentSpec) spec).getDescriptor()); + } else { + throw new UOE("Time-ordering on scan queries is only supported for queries with segment specs" + + "of type MultipleSpecificSegmentSpec or SpecificSegmentSpec...a [%s] was received instead.", + spec.getClass().getSimpleName()); + } + return descriptorsOrdered; + } + @VisibleForTesting Sequence nWayMergeAndLimit( List>> groupedRunners, diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValue.java b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValue.java index 7bfcf025e1b..a05e7150766 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValue.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValue.java @@ -21,6 +21,7 @@ package org.apache.druid.query.scan; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.UOE; import org.apache.druid.segment.column.ColumnHolder; @@ -78,9 +79,16 @@ public class ScanResultValue implements Comparable public long getFirstEventTimestamp(ScanQuery.ResultFormat resultFormat) { if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)) { - return (Long) ((Map) ((List) this.getEvents()).get(0)).get(ColumnHolder.TIME_COLUMN_NAME); + Long timestamp = (Long) ((Map) ((List) this.getEvents()).get(0)).get(ColumnHolder.TIME_COLUMN_NAME); + if (timestamp == null) { + throw new ISE("Unable to compare timestamp for rows without a time column"); + } + return timestamp; } else if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)) { int timeColumnIndex = this.getColumns().indexOf(ColumnHolder.TIME_COLUMN_NAME); + if (timeColumnIndex == -1) { + throw new ISE("Unable to compare timestamp for rows without a time column"); + } List firstEvent = (List) ((List) this.getEvents()).get(0); return (Long) firstEvent.get(timeColumnIndex); } diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueTimestampComparator.java b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueTimestampComparator.java index dcf3bade136..69f780fca70 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueTimestampComparator.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueTimestampComparator.java @@ -42,8 +42,7 @@ public class ScanResultValueTimestampComparator implements Comparator() - { - @Override - public Interval apply(SegmentDescriptor input) - { - return input.getInterval(); - } - } + input -> input.getInterval() ) ); diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java index a7cf3c60e3f..6559c65b70f 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java @@ -31,9 +31,15 @@ import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerTestHelper; import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.query.spec.LegacySegmentSpec; +import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; +import org.apache.druid.query.spec.QuerySegmentSpec; +import org.apache.druid.query.spec.SpecificSegmentSpec; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Test; +import org.junit.experimental.runners.Enclosed; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -43,12 +49,9 @@ import java.util.Collections; import java.util.List; -@RunWith(Parameterized.class) +@RunWith(Enclosed.class) public class ScanQueryRunnerFactoryTest { - private int numElements; - private ScanQuery query; - private ScanQuery.ResultFormat resultFormat; private static final ScanQueryRunnerFactory factory = new ScanQueryRunnerFactory( new ScanQueryQueryToolChest( @@ -59,200 +62,261 @@ public class ScanQueryRunnerFactoryTest new ScanQueryConfig() ); - public ScanQueryRunnerFactoryTest( - final int numElements, - final int batchSize, - final long limit, - final ScanQuery.ResultFormat resultFormat, - final ScanQuery.Order order - ) + @RunWith(Parameterized.class) + public static class ScanQueryRunnerFactoryParameterizedTest { - this.numElements = numElements; - this.query = Druids.newScanQueryBuilder() - .batchSize(batchSize) - .limit(limit) - .order(order) - .intervals(QueryRunnerTestHelper.fullOnIntervalSpec) - .dataSource("some datasource") - .resultFormat(resultFormat) - .build(); - this.resultFormat = resultFormat; - } + private int numElements; + private ScanQuery query; + private ScanQuery.ResultFormat resultFormat; - @Parameterized.Parameters(name = "{0} {1} {2} {3} {4}") - public static Iterable constructorFeeder() - { - List numsElements = ImmutableList.of(0, 10, 100); - List batchSizes = ImmutableList.of(1, 100); - List limits = ImmutableList.of(3L, 1000L, Long.MAX_VALUE); - List resultFormats = ImmutableList.of( - ScanQuery.ResultFormat.RESULT_FORMAT_LIST, - ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST - ); - List order = ImmutableList.of( - ScanQuery.Order.ASCENDING, - ScanQuery.Order.DESCENDING - ); - - return QueryRunnerTestHelper.cartesian( - numsElements, - batchSizes, - limits, - resultFormats, - order - ); - } - - @Test - public void testSortAndLimitScanResultValues() - { - List srvs = new ArrayList<>(numElements); - List expectedEventTimestamps = new ArrayList<>(); - for (int i = 0; i < numElements; i++) { - long timestamp = DateTimes.of("2015-01-01").plusHours(i).getMillis(); - expectedEventTimestamps.add(timestamp); - srvs.add(ScanQueryTestHelper.generateScanResultValue(timestamp, resultFormat, 1)); + public ScanQueryRunnerFactoryParameterizedTest( + final int numElements, + final int batchSize, + final long limit, + final ScanQuery.ResultFormat resultFormat, + final ScanQuery.Order order + ) + { + this.numElements = numElements; + this.query = Druids.newScanQueryBuilder() + .batchSize(batchSize) + .limit(limit) + .order(order) + .intervals(QueryRunnerTestHelper.fullOnIntervalSpec) + .dataSource("some datasource") + .resultFormat(resultFormat) + .build(); + this.resultFormat = resultFormat; } - expectedEventTimestamps.sort((o1, o2) -> { - int retVal = 0; - if (o1 > o2) { - retVal = 1; - } else if (o1 < o2) { - retVal = -1; + + @Parameterized.Parameters(name = "{0} {1} {2} {3} {4}") + public static Iterable constructorFeeder() + { + List numsElements = ImmutableList.of(0, 10, 100); + List batchSizes = ImmutableList.of(1, 100); + List limits = ImmutableList.of(3L, 1000L, Long.MAX_VALUE); + List resultFormats = ImmutableList.of( + ScanQuery.ResultFormat.RESULT_FORMAT_LIST, + ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST + ); + List order = ImmutableList.of( + ScanQuery.Order.ASCENDING, + ScanQuery.Order.DESCENDING + ); + + return QueryRunnerTestHelper.cartesian( + numsElements, + batchSizes, + limits, + resultFormats, + order + ); + } + + @Test + public void testSortAndLimitScanResultValues() + { + List srvs = new ArrayList<>(numElements); + List expectedEventTimestamps = new ArrayList<>(); + for (int i = 0; i < numElements; i++) { + long timestamp = DateTimes.of("2015-01-01").plusHours(i).getMillis(); + expectedEventTimestamps.add(timestamp); + srvs.add(ScanQueryTestHelper.generateScanResultValue(timestamp, resultFormat, 1)); } - if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) { - return retVal * -1; + expectedEventTimestamps.sort((o1, o2) -> { + int retVal = 0; + if (o1 > o2) { + retVal = 1; + } else if (o1 < o2) { + retVal = -1; + } + if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) { + return retVal * -1; + } + return retVal; + }); + Sequence inputSequence = Sequences.simple(srvs); + try { + List output = factory.priorityQueueSortAndLimit( + inputSequence, + query, + ImmutableList.of(new SegmentDescriptor(new Interval( + DateTimes.of("2010-01-01"), + DateTimes.of("2019-01-01").plusHours(1) + ), "1", 0)) + ).toList(); + if (query.getLimit() > Integer.MAX_VALUE) { + Assert.fail("Unsupported exception should have been thrown due to high limit"); + } + validateSortedOutput(output, expectedEventTimestamps); } - return retVal; - }); - Sequence inputSequence = Sequences.simple(srvs); - try { - List output = factory.priorityQueueSortAndLimit( - inputSequence, - query, - ImmutableList.of(new SegmentDescriptor(new Interval( - DateTimes.of("2010-01-01"), - DateTimes.of("2019-01-01").plusHours(1) - ), "1", 0)) - ).toList(); - if (query.getLimit() > Integer.MAX_VALUE) { - Assert.fail("Unsupported exception should have been thrown due to high limit"); + catch (UOE e) { + if (query.getLimit() <= Integer.MAX_VALUE) { + Assert.fail("Unsupported operation exception should not have been thrown here"); + } } + } + + @Test + public void testNWayMerge() + { + List expectedEventTimestamps = new ArrayList<>(numElements * 3); + + List scanResultValues1 = new ArrayList<>(numElements); + for (int i = 0; i < numElements; i++) { + long timestamp = DateTimes.of("2015-01-01").plusMinutes(i * 2).getMillis(); + expectedEventTimestamps.add(timestamp); + scanResultValues1.add(ScanQueryTestHelper.generateScanResultValue(timestamp, resultFormat, 1)); + } + + List scanResultValues2 = new ArrayList<>(numElements); + for (int i = 0; i < numElements; i++) { + long timestamp = DateTimes.of("2015-01-01").plusMinutes(i * 2 + 1).getMillis(); + expectedEventTimestamps.add(timestamp); + scanResultValues2.add(ScanQueryTestHelper.generateScanResultValue(timestamp, resultFormat, 1)); + } + + List scanResultValues3 = new ArrayList<>(numElements); + for (int i = 0; i < numElements; i++) { + long timestamp = DateTimes.of("2015-01-02").plusMinutes(i).getMillis(); + expectedEventTimestamps.add(timestamp); + scanResultValues3.add(ScanQueryTestHelper.generateScanResultValue(timestamp, resultFormat, 1)); + } + + if (query.getOrder() == ScanQuery.Order.DESCENDING) { + Collections.reverse(scanResultValues1); + Collections.reverse(scanResultValues2); + Collections.reverse(scanResultValues3); + } + + QueryRunner runnerSegment1Partition1 = + (queryPlus, responseContext) -> Sequences.simple(scanResultValues1); + + QueryRunner runnerSegment1Partition2 = + (queryPlus, responseContext) -> Sequences.simple(scanResultValues2); + + + QueryRunner runnerSegment2Partition1 = + (queryPlus, responseContext) -> Sequences.simple(scanResultValues3); + + QueryRunner runnerSegment2Partition2 = + (queryPlus, responseContext) -> Sequences.empty(); + + List>> groupedRunners = new ArrayList<>(2); + + if (query.getOrder() == ScanQuery.Order.DESCENDING) { + groupedRunners.add(Arrays.asList(runnerSegment2Partition1, runnerSegment2Partition2)); + groupedRunners.add(Arrays.asList(runnerSegment1Partition1, runnerSegment1Partition2)); + } else { + groupedRunners.add(Arrays.asList(runnerSegment1Partition1, runnerSegment1Partition2)); + groupedRunners.add(Arrays.asList(runnerSegment2Partition1, runnerSegment2Partition2)); + } + + expectedEventTimestamps.sort((o1, o2) -> { + int retVal = 0; + if (o1 > o2) { + retVal = 1; + } else if (o1 < o2) { + retVal = -1; + } + if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) { + return retVal * -1; + } + return retVal; + }); + + List output = + factory.nWayMergeAndLimit( + groupedRunners, + QueryPlus.wrap(query), + ImmutableMap.of() + ).toList(); + validateSortedOutput(output, expectedEventTimestamps); } - catch (UOE e) { - if (query.getLimit() <= Integer.MAX_VALUE) { - Assert.fail("Unsupported operation exception should not have been thrown here"); + + private void validateSortedOutput(List output, List expectedEventTimestamps) + { + // check each scan result value has one event + for (ScanResultValue srv : output) { + if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)) { + Assert.assertTrue(ScanQueryTestHelper.getEventsCompactedListResultFormat(srv).size() == 1); + } else if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)) { + Assert.assertTrue(ScanQueryTestHelper.getEventsListResultFormat(srv).size() == 1); + } + } + + // check total # of rows <= limit + Assert.assertTrue(output.size() <= query.getLimit()); + + // check ordering is correct + for (int i = 1; i < output.size(); i++) { + if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) { + Assert.assertTrue(output.get(i).getFirstEventTimestamp(resultFormat) < + output.get(i - 1).getFirstEventTimestamp(resultFormat)); + } else { + Assert.assertTrue(output.get(i).getFirstEventTimestamp(resultFormat) > + output.get(i - 1).getFirstEventTimestamp(resultFormat)); + } + } + + // check the values are correct + for (int i = 0; i < query.getLimit() && i < output.size(); i++) { + Assert.assertEquals((long) expectedEventTimestamps.get(i), output.get(i).getFirstEventTimestamp(resultFormat)); } } } - @Test - public void testNWayMerge() + public static class ScanQueryRunnerFactoryNonParameterizedTest { - List expectedEventTimestamps = new ArrayList<>(numElements * 3); + private SegmentDescriptor descriptor = new SegmentDescriptor(new Interval( + DateTimes.of("2010-01-01"), + DateTimes.of("2019-01-01").plusHours(1) + ), "1", 0); - List scanResultValues1 = new ArrayList<>(numElements); - for (int i = 0; i < numElements; i++) { - long timestamp = DateTimes.of("2015-01-01").plusMinutes(i * 2).getMillis(); - expectedEventTimestamps.add(timestamp); - scanResultValues1.add(ScanQueryTestHelper.generateScanResultValue(timestamp, resultFormat, 1)); + @Test + public void testGetValidSegmentDescriptorsFromSpec() + { + QuerySegmentSpec multiSpecificSpec = new MultipleSpecificSegmentSpec( + Collections.singletonList( + descriptor + ) + ); + QuerySegmentSpec singleSpecificSpec = new SpecificSegmentSpec(descriptor); + + List descriptors = factory.getSegmentDescriptorsFromSpecificQuerySpec(multiSpecificSpec); + Assert.assertEquals(1, descriptors.size()); + Assert.assertEquals(descriptor, descriptors.get(0)); + + descriptors = factory.getSegmentDescriptorsFromSpecificQuerySpec(singleSpecificSpec); + Assert.assertEquals(1, descriptors.size()); + Assert.assertEquals(descriptor, descriptors.get(0)); } - List scanResultValues2 = new ArrayList<>(numElements); - for (int i = 0; i < numElements; i++) { - long timestamp = DateTimes.of("2015-01-01").plusMinutes(i * 2 + 1).getMillis(); - expectedEventTimestamps.add(timestamp); - scanResultValues2.add(ScanQueryTestHelper.generateScanResultValue(timestamp, resultFormat, 1)); + @Test(expected = UOE.class) + public void testGetSegmentDescriptorsFromInvalidIntervalSpec() + { + QuerySegmentSpec multiIntervalSpec = new MultipleIntervalSegmentSpec( + Collections.singletonList( + new Interval( + DateTimes.of("2010-01-01"), + DateTimes.of("2019-01-01").plusHours(1) + ) + ) + ); + factory.getSegmentDescriptorsFromSpecificQuerySpec(multiIntervalSpec); } - List scanResultValues3 = new ArrayList<>(numElements); - for (int i = 0; i < numElements; i++) { - long timestamp = DateTimes.of("2015-01-02").plusMinutes(i).getMillis(); - expectedEventTimestamps.add(timestamp); - scanResultValues3.add(ScanQueryTestHelper.generateScanResultValue(timestamp, resultFormat, 1)); - } - - if (query.getOrder() == ScanQuery.Order.DESCENDING) { - Collections.reverse(scanResultValues1); - Collections.reverse(scanResultValues2); - Collections.reverse(scanResultValues3); - } - - QueryRunner runnerSegment1Partition1 = - (queryPlus, responseContext) -> Sequences.simple(scanResultValues1); - - QueryRunner runnerSegment1Partition2 = - (queryPlus, responseContext) -> Sequences.simple(scanResultValues2); - - - QueryRunner runnerSegment2Partition1 = - (queryPlus, responseContext) -> Sequences.simple(scanResultValues3); - - QueryRunner runnerSegment2Partition2 = - (queryPlus, responseContext) -> Sequences.empty(); - - List>> groupedRunners = new ArrayList<>(2); - - if (query.getOrder() == ScanQuery.Order.DESCENDING) { - groupedRunners.add(Arrays.asList(runnerSegment2Partition1, runnerSegment2Partition2)); - groupedRunners.add(Arrays.asList(runnerSegment1Partition1, runnerSegment1Partition2)); - } else { - groupedRunners.add(Arrays.asList(runnerSegment1Partition1, runnerSegment1Partition2)); - groupedRunners.add(Arrays.asList(runnerSegment2Partition1, runnerSegment2Partition2)); - } - - expectedEventTimestamps.sort((o1, o2) -> { - int retVal = 0; - if (o1 > o2) { - retVal = 1; - } else if (o1 < o2) { - retVal = -1; - } - if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) { - return retVal * -1; - } - return retVal; - }); - - List output = - factory.nWayMergeAndLimit( - groupedRunners, - QueryPlus.wrap(query), - ImmutableMap.of() - ).toList(); - - validateSortedOutput(output, expectedEventTimestamps); - } - - private void validateSortedOutput(List output, List expectedEventTimestamps) - { - // check each scan result value has one event - for (ScanResultValue srv : output) { - if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)) { - Assert.assertTrue(ScanQueryTestHelper.getEventsCompactedListResultFormat(srv).size() == 1); - } else if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)) { - Assert.assertTrue(ScanQueryTestHelper.getEventsListResultFormat(srv).size() == 1); - } - } - - // check total # of rows <= limit - Assert.assertTrue(output.size() <= query.getLimit()); - - // check ordering is correct - for (int i = 1; i < output.size(); i++) { - if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) { - Assert.assertTrue(output.get(i).getFirstEventTimestamp(resultFormat) < - output.get(i - 1).getFirstEventTimestamp(resultFormat)); - } else { - Assert.assertTrue(output.get(i).getFirstEventTimestamp(resultFormat) > - output.get(i - 1).getFirstEventTimestamp(resultFormat)); - } - } - - // check the values are correct - for (int i = 0; i < query.getLimit() && i < output.size(); i++) { - Assert.assertEquals((long) expectedEventTimestamps.get(i), output.get(i).getFirstEventTimestamp(resultFormat)); + @Test(expected = UOE.class) + public void testGetSegmentDescriptorsFromInvalidLegacySpec() + { + QuerySegmentSpec legacySpec = new LegacySegmentSpec( + new Interval( + DateTimes.of("2010-01-01"), + DateTimes.of("2019-01-01").plusHours(1) + ) + ); + factory.getSegmentDescriptorsFromSpecificQuerySpec(legacySpec); } } } diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java index b3a0d0069ac..2928ca9460e 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java @@ -67,6 +67,7 @@ import java.util.Map; import java.util.Set; /** + * */ @RunWith(Parameterized.class) public class ScanQueryRunnerTest @@ -143,11 +144,11 @@ public class ScanQueryRunnerTest private Druids.ScanQueryBuilder newTestQuery() { return Druids.newScanQueryBuilder() - .dataSource(new TableDataSource(QueryRunnerTestHelper.dataSource)) - .columns(Collections.emptyList()) - .intervals(QueryRunnerTestHelper.fullOnIntervalSpec) - .limit(3) - .legacy(legacy); + .dataSource(new TableDataSource(QueryRunnerTestHelper.dataSource)) + .columns(Collections.emptyList()) + .intervals(QueryRunnerTestHelper.fullOnIntervalSpec) + .limit(3) + .legacy(legacy); } @Test @@ -524,7 +525,11 @@ public class ScanQueryRunnerTest ScanQuery query = newTestQuery() .intervals(I_0112_0114) .filters(new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot", null)) - .columns(QueryRunnerTestHelper.qualityDimension, QueryRunnerTestHelper.indexMetric) + .columns( + QueryRunnerTestHelper.timeDimension, + QueryRunnerTestHelper.qualityDimension, + QueryRunnerTestHelper.indexMetric + ) .limit(limit) .order(ScanQuery.Order.ASCENDING) .context(ImmutableMap.of(ScanQuery.CTX_KEY_OUTERMOST, false)) @@ -556,7 +561,7 @@ public class ScanQueryRunnerTest }; final List>> ascendingEvents = toEvents( new String[]{ - legacy ? getTimestampName() + ":TIME" : null, + legacy ? getTimestampName() + ":TIME" : ColumnHolder.TIME_COLUMN_NAME, null, QueryRunnerTestHelper.qualityDimension + ":STRING", null, @@ -565,9 +570,35 @@ public class ScanQueryRunnerTest }, (String[]) ArrayUtils.addAll(seg1Results, seg2Results) ); + + if (legacy) { + for (List> batch : ascendingEvents) { + for (Map event : batch) { + event.put("__time", ((DateTime) event.get("timestamp")).getMillis()); + } + } + } else { + for (List> batch : ascendingEvents) { + for (Map event : batch) { + event.put("__time", (DateTimes.of((String) event.get("__time"))).getMillis()); + } + } + } + List ascendingExpectedResults = toExpected( ascendingEvents, - legacy ? Lists.newArrayList(getTimestampName(), "quality", "index") : Lists.newArrayList("quality", "index"), + legacy ? + Lists.newArrayList( + QueryRunnerTestHelper.timeDimension, + getTimestampName(), + "quality", + "index" + ) : + Lists.newArrayList( + QueryRunnerTestHelper.timeDimension, + "quality", + "index" + ), 0, limit ); @@ -583,7 +614,11 @@ public class ScanQueryRunnerTest ScanQuery query = newTestQuery() .intervals(I_0112_0114) .filters(new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot", null)) - .columns(QueryRunnerTestHelper.qualityDimension, QueryRunnerTestHelper.indexMetric) + .columns( + QueryRunnerTestHelper.timeDimension, + QueryRunnerTestHelper.qualityDimension, + QueryRunnerTestHelper.indexMetric + ) .limit(limit) .order(ScanQuery.Order.DESCENDING) .build(); @@ -616,7 +651,7 @@ public class ScanQueryRunnerTest ArrayUtils.reverse(expectedRet); final List>> descendingEvents = toEvents( new String[]{ - legacy ? getTimestampName() + ":TIME" : null, + legacy ? getTimestampName() + ":TIME" : ColumnHolder.TIME_COLUMN_NAME, null, QueryRunnerTestHelper.qualityDimension + ":STRING", null, @@ -625,9 +660,34 @@ public class ScanQueryRunnerTest }, expectedRet ); + if (legacy) { + for (List> batch : descendingEvents) { + for (Map event : batch) { + event.put("__time", ((DateTime) event.get("timestamp")).getMillis()); + } + } + } else { + for (List> batch : descendingEvents) { + for (Map event : batch) { + event.put("__time", (DateTimes.of((String) event.get("__time"))).getMillis()); + } + } + } List descendingExpectedResults = toExpected( descendingEvents, - legacy ? Lists.newArrayList(getTimestampName(), "quality", "index") : Lists.newArrayList("quality", "index"), + legacy ? + Lists.newArrayList( + QueryRunnerTestHelper.timeDimension, + getTimestampName(), + // getTimestampName() always returns the legacy timestamp when legacy is true + "quality", + "index" + ) : + Lists.newArrayList( + QueryRunnerTestHelper.timeDimension, + "quality", + "index" + ), 0, limit ); @@ -666,7 +726,11 @@ public class ScanQueryRunnerTest ScanQuery query = newTestQuery() .intervals(I_0112_0114) .filters(new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot", null)) - .columns(QueryRunnerTestHelper.qualityDimension, QueryRunnerTestHelper.indexMetric) + .columns( + QueryRunnerTestHelper.timeDimension, + QueryRunnerTestHelper.qualityDimension, + QueryRunnerTestHelper.indexMetric + ) .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) .order(ScanQuery.Order.ASCENDING) .limit(limit) @@ -676,7 +740,7 @@ public class ScanQueryRunnerTest Iterable results = runner.run(QueryPlus.wrap(query), context).toList(); final List>> ascendingEvents = toEvents( new String[]{ - legacy ? getTimestampName() + ":TIME" : null, + legacy ? getTimestampName() + ":TIME" : ColumnHolder.TIME_COLUMN_NAME, null, QueryRunnerTestHelper.qualityDimension + ":STRING", null, @@ -685,9 +749,34 @@ public class ScanQueryRunnerTest }, (String[]) ArrayUtils.addAll(seg1Results, seg2Results) ); + if (legacy) { + for (List> batch : ascendingEvents) { + for (Map event : batch) { + event.put("__time", ((DateTime) event.get("timestamp")).getMillis()); + } + } + } else { + for (List> batch : ascendingEvents) { + for (Map event : batch) { + event.put("__time", ((DateTimes.of((String) event.get("__time"))).getMillis())); + } + } + } List ascendingExpectedResults = toExpected( ascendingEvents, - legacy ? Lists.newArrayList(getTimestampName(), "quality", "index") : Lists.newArrayList("quality", "index"), + legacy ? + Lists.newArrayList( + QueryRunnerTestHelper.timeDimension, + getTimestampName(), + // getTimestampName() always returns the legacy timestamp when legacy is true + "quality", + "index" + ) : + Lists.newArrayList( + QueryRunnerTestHelper.timeDimension, + "quality", + "index" + ), 0, limit ); @@ -727,7 +816,11 @@ public class ScanQueryRunnerTest ScanQuery query = newTestQuery() .intervals(I_0112_0114) .filters(new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot", null)) - .columns(QueryRunnerTestHelper.qualityDimension, QueryRunnerTestHelper.indexMetric) + .columns( + QueryRunnerTestHelper.timeDimension, + QueryRunnerTestHelper.qualityDimension, + QueryRunnerTestHelper.indexMetric + ) .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) .order(ScanQuery.Order.DESCENDING) .context(ImmutableMap.of(ScanQuery.CTX_KEY_OUTERMOST, false)) @@ -740,7 +833,7 @@ public class ScanQueryRunnerTest ArrayUtils.reverse(expectedRet); final List>> descendingEvents = toEvents( new String[]{ - legacy ? getTimestampName() + ":TIME" : null, + legacy ? getTimestampName() + ":TIME" : ColumnHolder.TIME_COLUMN_NAME, null, QueryRunnerTestHelper.qualityDimension + ":STRING", null, @@ -749,9 +842,34 @@ public class ScanQueryRunnerTest }, expectedRet //segments in reverse order from above ); + if (legacy) { + for (List> batch : descendingEvents) { + for (Map event : batch) { + event.put("__time", ((DateTime) event.get("timestamp")).getMillis()); + } + } + } else { + for (List> batch : descendingEvents) { + for (Map event : batch) { + event.put("__time", ((DateTimes.of((String) event.get("__time"))).getMillis())); + } + } + } List descendingExpectedResults = toExpected( descendingEvents, - legacy ? Lists.newArrayList(getTimestampName(), "quality", "index") : Lists.newArrayList("quality", "index"), + legacy ? + Lists.newArrayList( + QueryRunnerTestHelper.timeDimension, + getTimestampName(), + // getTimestampName() always returns the legacy timestamp when legacy is true + "quality", + "index" + ) : + Lists.newArrayList( + QueryRunnerTestHelper.timeDimension, + "quality", + "index" + ), 0, limit ); @@ -760,7 +878,6 @@ public class ScanQueryRunnerTest } } - private List>> toFullEvents(final String[]... valueSet) { return toEvents( @@ -799,71 +916,66 @@ public class ScanQueryRunnerTest Lists.newArrayList( Iterables.transform( values, - new Function>() - { - @Override - public Map apply(String input) - { - Map event = new HashMap<>(); - String[] values = input.split("\\t"); - for (int i = 0; i < dimSpecs.length; i++) { - if (dimSpecs[i] == null || i >= dimSpecs.length) { - continue; - } - - // For testing metrics and virtual columns we have some special handling here, since - // they don't appear in the source data. - if (dimSpecs[i].equals(EXPR_COLUMN.getOutputName())) { - event.put( - EXPR_COLUMN.getOutputName(), - (double) event.get(QueryRunnerTestHelper.indexMetric) * 2 - ); - continue; - } else if (dimSpecs[i].equals("indexMin")) { - event.put("indexMin", (double) event.get(QueryRunnerTestHelper.indexMetric)); - continue; - } else if (dimSpecs[i].equals("indexFloat")) { - event.put("indexFloat", (float) (double) event.get(QueryRunnerTestHelper.indexMetric)); - continue; - } else if (dimSpecs[i].equals("indexMaxPlusTen")) { - event.put("indexMaxPlusTen", (double) event.get(QueryRunnerTestHelper.indexMetric) + 10); - continue; - } else if (dimSpecs[i].equals("indexMinFloat")) { - event.put("indexMinFloat", (float) (double) event.get(QueryRunnerTestHelper.indexMetric)); - continue; - } else if (dimSpecs[i].equals("indexMaxFloat")) { - event.put("indexMaxFloat", (float) (double) event.get(QueryRunnerTestHelper.indexMetric)); - continue; - } else if (dimSpecs[i].equals("quality_uniques")) { - final HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector(); - collector.add( - Hashing.murmur3_128() - .hashBytes(StringUtils.toUtf8((String) event.get("quality"))) - .asBytes() - ); - event.put("quality_uniques", collector); - } - - if (i >= values.length) { - continue; - } - - String[] specs = dimSpecs[i].split(":"); - - event.put( - specs[0], - specs.length == 1 || specs[1].equals("STRING") ? values[i] : - specs[1].equals("TIME") ? toTimestamp(values[i]) : - specs[1].equals("FLOAT") ? Float.valueOf(values[i]) : - specs[1].equals("DOUBLE") ? Double.valueOf(values[i]) : - specs[1].equals("LONG") ? Long.valueOf(values[i]) : - specs[1].equals("NULL") ? null : - specs[1].equals("STRINGS") ? Arrays.asList(values[i].split("\u0001")) : - values[i] - ); + input -> { + Map event = new HashMap<>(); + String[] values1 = input.split("\\t"); + for (int i = 0; i < dimSpecs.length; i++) { + if (dimSpecs[i] == null || i >= dimSpecs.length) { + continue; } - return event; + + // For testing metrics and virtual columns we have some special handling here, since + // they don't appear in the source data. + if (dimSpecs[i].equals(EXPR_COLUMN.getOutputName())) { + event.put( + EXPR_COLUMN.getOutputName(), + (double) event.get(QueryRunnerTestHelper.indexMetric) * 2 + ); + continue; + } else if (dimSpecs[i].equals("indexMin")) { + event.put("indexMin", (double) event.get(QueryRunnerTestHelper.indexMetric)); + continue; + } else if (dimSpecs[i].equals("indexFloat")) { + event.put("indexFloat", (float) (double) event.get(QueryRunnerTestHelper.indexMetric)); + continue; + } else if (dimSpecs[i].equals("indexMaxPlusTen")) { + event.put("indexMaxPlusTen", (double) event.get(QueryRunnerTestHelper.indexMetric) + 10); + continue; + } else if (dimSpecs[i].equals("indexMinFloat")) { + event.put("indexMinFloat", (float) (double) event.get(QueryRunnerTestHelper.indexMetric)); + continue; + } else if (dimSpecs[i].equals("indexMaxFloat")) { + event.put("indexMaxFloat", (float) (double) event.get(QueryRunnerTestHelper.indexMetric)); + continue; + } else if (dimSpecs[i].equals("quality_uniques")) { + final HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector(); + collector.add( + Hashing.murmur3_128() + .hashBytes(StringUtils.toUtf8((String) event.get("quality"))) + .asBytes() + ); + event.put("quality_uniques", collector); + } + + if (i >= values1.length) { + continue; + } + + String[] specs = dimSpecs[i].split(":"); + + event.put( + specs[0], + specs.length == 1 || specs[1].equals("STRING") ? values1[i] : + specs[1].equals("TIME") ? toTimestamp(values1[i]) : + specs[1].equals("FLOAT") ? Float.valueOf(values1[i]) : + specs[1].equals("DOUBLE") ? Double.valueOf(values1[i]) : + specs[1].equals("LONG") ? Long.valueOf(values1[i]) : + specs[1].equals("NULL") ? null : + specs[1].equals("STRINGS") ? Arrays.asList(values1[i].split("\u0001")) : + values1[i] + ); } + return event; } ) ) @@ -969,7 +1081,6 @@ public class ScanQueryRunnerTest } else { Assert.assertEquals("invalid value for " + ac.getKey(), exVal, actVal); } - } } diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryTest.java new file mode 100644 index 00000000000..1854883ca5b --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryTest.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.scan; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.query.Druids; +import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.query.spec.QuerySegmentSpec; +import org.apache.druid.segment.column.ColumnHolder; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Set; + +public class ScanQueryTest +{ + private static QuerySegmentSpec intervalSpec; + private static ScanResultValue s1; + private static ScanResultValue s2; + private static ScanResultValue s3; + + @BeforeClass + public static void setup() + { + intervalSpec = new MultipleIntervalSegmentSpec( + Collections.singletonList( + new Interval(DateTimes.of("2012-01-01"), DateTimes.of("2012-01-01").plusHours(1)) + ) + ); + + ArrayList> events1 = new ArrayList<>(); + HashMap event1 = new HashMap<>(); + event1.put(ColumnHolder.TIME_COLUMN_NAME, new Long(42)); + events1.add(event1); + + s1 = new ScanResultValue( + "segmentId", + Collections.singletonList(ColumnHolder.TIME_COLUMN_NAME), + events1 + ); + + ArrayList> events2 = new ArrayList<>(); + HashMap event2 = new HashMap<>(); + event2.put(ColumnHolder.TIME_COLUMN_NAME, new Long(43)); + events2.add(event2); + + s2 = new ScanResultValue( + "segmentId", + Collections.singletonList(ColumnHolder.TIME_COLUMN_NAME), + events2 + ); + + // ScanResultValue s3 has no time column + ArrayList> events3 = new ArrayList<>(); + HashMap event3 = new HashMap<>(); + event3.put("yah", "yeet"); + events3.add(event3); + + s3 = new ScanResultValue( + "segmentId", + Collections.singletonList("yah"), + events3 + ); + } + + @Test(expected = IllegalArgumentException.class) + public void testAscendingScanQueryWithInvalidColumns() + { + Druids.newScanQueryBuilder() + .order(ScanQuery.Order.ASCENDING) + .columns(ImmutableList.of("not time", "also not time")) + .dataSource("source") + .intervals(intervalSpec) + .build(); + } + + @Test(expected = IllegalArgumentException.class) + public void testDescendingScanQueryWithInvalidColumns() + { + Druids.newScanQueryBuilder() + .order(ScanQuery.Order.DESCENDING) + .columns(ImmutableList.of("not time", "also not time")) + .dataSource("source") + .intervals(intervalSpec) + .build(); + } + + // No assertions because we're checking that no IllegalArgumentExceptions are thrown + @Test + public void testValidScanQueryInitialization() + { + List nonOrderedOrders = Arrays.asList(null, ScanQuery.Order.NONE); + + for (ScanQuery.Order order : nonOrderedOrders) { + Druids.newScanQueryBuilder() + .order(order) + .columns(ImmutableList.of("not time")) + .dataSource("source") + .intervals(intervalSpec) + .build(); + + Druids.newScanQueryBuilder() + .order(order) + .dataSource("source") + .intervals(intervalSpec) + .build(); + + + Druids.newScanQueryBuilder() + .order(order) + .columns(ImmutableList.of()) + .dataSource("source") + .intervals(intervalSpec) + .build(); + + Druids.newScanQueryBuilder() + .order(order) + .columns(ImmutableList.of("__time")) + .dataSource("source") + .intervals(intervalSpec) + .build(); + } + + Set orderedOrders = ImmutableSet.of(ScanQuery.Order.ASCENDING, ScanQuery.Order.DESCENDING); + + for (ScanQuery.Order order : orderedOrders) { + Druids.newScanQueryBuilder() + .order(order) + .columns((List) null) + .dataSource("source") + .intervals(intervalSpec) + .build(); + + Druids.newScanQueryBuilder() + .order(order) + .columns(ImmutableList.of()) + .dataSource("source") + .intervals(intervalSpec) + .build(); + + Druids.newScanQueryBuilder() + .order(order) + .dataSource("source") + .intervals(intervalSpec) + .build(); + + Druids.newScanQueryBuilder() + .order(order) + .columns(ImmutableList.of("__time", "col2")) + .dataSource("source") + .intervals(intervalSpec) + .build(); + } + } + + // Validates that getResultOrdering will work for the broker n-way merge + @Test + public void testMergeSequenceForResults() + { + // Should be able to handle merging s1, s2, s3 + ScanQuery noOrderScan = Druids.newScanQueryBuilder() + .order(ScanQuery.Order.NONE) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST) + .dataSource("some src") + .intervals(intervalSpec) + .build(); + + // Should only handle s1 and s2 + ScanQuery descendingOrderScan = Druids.newScanQueryBuilder() + .order(ScanQuery.Order.DESCENDING) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST) + .dataSource("some src") + .intervals(intervalSpec) + .build(); + + // Should only handle s1 and s2 + ScanQuery ascendingOrderScan = Druids.newScanQueryBuilder() + .order(ScanQuery.Order.ASCENDING) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST) + .dataSource("some src") + .intervals(intervalSpec) + .build(); + // No Order + Sequence noOrderSeq = + Sequences.simple( + ImmutableList.of( + Sequences.simple(ImmutableList.of(s1, s3)), + Sequences.simple(ImmutableList.of(s2)) + ) + ).flatMerge(seq -> seq, noOrderScan.getResultOrdering()); + + List noOrderList = noOrderSeq.toList(); + Assert.assertEquals(3, noOrderList.size()); + + + // Ascending + Sequence ascendingOrderSeq = Sequences.simple( + ImmutableList.of( + Sequences.simple(ImmutableList.of(s1)), + Sequences.simple(ImmutableList.of(s2)) + ) + ).flatMerge(seq -> seq, ascendingOrderScan.getResultOrdering()); + + List ascendingList = ascendingOrderSeq.toList(); + Assert.assertEquals(2, ascendingList.size()); + Assert.assertEquals(s1, ascendingList.get(0)); + Assert.assertEquals(s2, ascendingList.get(1)); + + // Descending + Sequence descendingOrderSeq = Sequences.simple( + ImmutableList.of( + Sequences.simple(ImmutableList.of(s1)), + Sequences.simple(ImmutableList.of(s2)) + ) + ).flatMerge(seq -> seq, descendingOrderScan.getResultOrdering()); + + List descendingList = descendingOrderSeq.toList(); + Assert.assertEquals(2, descendingList.size()); + Assert.assertEquals(s2, descendingList.get(0)); + Assert.assertEquals(s1, descendingList.get(1)); + } + + @Test(expected = ISE.class) + public void testTimeOrderingWithoutTimeColumn() + { + ScanQuery descendingOrderScan = Druids.newScanQueryBuilder() + .order(ScanQuery.Order.DESCENDING) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST) + .dataSource("some src") + .intervals(intervalSpec) + .build(); + // This should fail because s3 doesn't have a timestamp + Sequence borkedSequence = Sequences.simple( + ImmutableList.of( + Sequences.simple(ImmutableList.of(s1)), + Sequences.simple(ImmutableList.of(s2, s3)) + ) + ).flatMerge(seq -> seq, descendingOrderScan.getResultOrdering()); + + // This should throw an ISE + List res = borkedSequence.toList(); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueTimestampComparatorTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueTimestampComparatorTest.java index 70f2e080b9f..465794a2831 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueTimestampComparatorTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueTimestampComparatorTest.java @@ -49,7 +49,7 @@ public class ScanResultValueTimestampComparatorTest } @Test - public void comparisonDescendingListTest() + public void testComparisonDescendingList() { ScanQuery query = Druids.newScanQueryBuilder() .order(ScanQuery.Order.DESCENDING) @@ -86,7 +86,7 @@ public class ScanResultValueTimestampComparatorTest } @Test - public void comparisonAscendingListTest() + public void testComparisonAscendingList() { ScanQuery query = Druids.newScanQueryBuilder() .order(ScanQuery.Order.ASCENDING) @@ -123,7 +123,7 @@ public class ScanResultValueTimestampComparatorTest } @Test - public void comparisonDescendingCompactedListTest() + public void testComparisonDescendingCompactedList() { ScanQuery query = Druids.newScanQueryBuilder() .order(ScanQuery.Order.DESCENDING) @@ -158,7 +158,7 @@ public class ScanResultValueTimestampComparatorTest } @Test - public void comparisonAscendingCompactedListTest() + public void testAscendingCompactedList() { ScanQuery query = Druids.newScanQueryBuilder() .order(ScanQuery.Order.ASCENDING)