diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/query/ScanBenchmark.java b/benchmarks/src/main/java/org/apache/druid/benchmark/query/ScanBenchmark.java index 9d302a80179..6b250af2120 100644 --- a/benchmarks/src/main/java/org/apache/druid/benchmark/query/ScanBenchmark.java +++ b/benchmarks/src/main/java/org/apache/druid/benchmark/query/ScanBenchmark.java @@ -299,7 +299,8 @@ public class ScanBenchmark config, DefaultGenericQueryMetricsFactory.instance() ), - new ScanQueryEngine() + new ScanQueryEngine(), + new ScanQueryConfig() ); } diff --git a/docs/content/querying/scan-query.md b/docs/content/querying/scan-query.md index a025ea5c4c6..7998f890eb6 100644 --- a/docs/content/querying/scan-query.md +++ b/docs/content/querying/scan-query.md @@ -25,9 +25,9 @@ title: "Scan query" # Scan query The Scan query returns raw Druid rows in streaming mode. The biggest difference between the Select query and the Scan -query is that the Scan query does not retain all the returned rows in memory before they are returned to the client -(except when time-ordering is used). The Select query _will_ retain the rows in memory, causing memory pressure if too -many rows are returned. The Scan query can return all the rows without issuing another pagination query. +query is that the Scan query does not retain all the returned rows in memory before they are returned to the client. +The Select query _will_ retain the rows in memory, causing memory pressure if too many rows are returned. +The Scan query can return all the rows without issuing another pagination query. In addition to straightforward usage where a Scan query is issued to the Broker, the Scan query can also be issued directly to Historical processes or streaming ingestion tasks. This can be useful if you want to retrieve large @@ -155,12 +155,31 @@ The format of the result when resultFormat equals `compactedList`: ## Time Ordering -The Scan query currently supports ordering based on timestamp for non-legacy queries where the limit is less than -`druid.query.scan.maxRowsTimeOrderedInMemory` rows. The default value of `druid.query.scan.maxRowsTimeOrderedInMemory` -is 100000 rows. The reasoning behind this limit is that the current implementation of time ordering sorts all returned -records in memory. Attempting to load too many rows into memory runs the risk of Broker nodes running out of memory. -The limit can be configured based on server memory and number of dimensions being queried. +The Scan query currently supports ordering based on timestamp for non-legacy queries. Note that using time ordering +will yield results that do not indicate which segment rows are from. Furthermore, time ordering is only supported +where the result set limit is less than `druid.query.scan.maxRowsQueuedForTimeOrdering` rows and less than +`druid.query.scan.maxSegmentsTimeOrderedInMemory` segments are scanned per Historical. The reasoning behind these +limitations is that the implementation of time ordering uses two strategies that can consume too much heap memory +if left unbounded. These strategies (listed below) are chosen on a per-Historical basis depending on query +result set limit and the number of segments being scanned. +1. Priority Queue: Each segment on a Historical is opened sequentially. Every row is added to a bounded priority +queue which is ordered by timestamp. For every row above the result set limit, the row with the earliest (if descending) +or latest (if ascending) timestamp will be dequeued. After every row has been processed, the sorted contents of the +priority queue are streamed back to the Broker(s) in batches. Attempting to load too many rows into memory runs the +risk of Historical nodes running out of memory. The `druid.query.scan.maxRowsQueuedForTimeOrdering` property protects +from this by limiting the number of rows in the query result set when time ordering is used. + +2. K-Way/N-Way Merge: Each segment on a Historical is opened in parallel. Since each segment's rows are already +time-ordered, a k-way merge can be performed on the results from each segment. This approach doesn't persist the entire +result set in memory (like the Priority Queue) as it streams back batches as they are returned from the merge function. +However, attempting to query too many segments could also result in high memory usage due to the need to open +decompression and decoding buffers for each. The `druid.query.scan.maxSegmentsTimeOrderedInMemory` limit protects +from this by capping the number of segments opened per historical when time ordering is used. + +Both `druid.query.scan.maxRowsQueuedForTimeOrdering` and `druid.query.scan.maxSegmentsTimeOrderedInMemory` are +configurable and can be tuned based on hardware specs and number of dimensions being queried. + ## Legacy mode The Scan query supports a legacy mode designed for protocol compatibility with the former scan-query contrib extension. @@ -180,5 +199,6 @@ is complete. |property|description|values|default| |--------|-----------|------|-------| -|druid.query.scan.maxRowsTimeOrderedInMemory|An integer in the range [0, 2147483647]|100000| -|druid.query.scan.legacy|Whether legacy mode should be turned on for Scan queries|true or false|false| +|druid.query.scan.maxRowsQueuedForTimeOrdering|The maximum number of rows returned when time ordering is used|An integer in [0, 2147483647]|100000| +|druid.query.scan.maxSegmentsTimeOrderedInMemory|The maximum number of segments scanned per historical when time ordering is used|An integer in [0, 2147483647]|50| +|druid.query.scan.legacy|Whether legacy mode should be turned on for Scan queries|true or false|false| \ No newline at end of file diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 3d6308cb0a9..e8f8faaaccc 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -2423,7 +2423,8 @@ public class KafkaIndexTaskTest new ScanQueryConfig(), new DefaultGenericQueryMetricsFactory(TestHelper.makeJsonMapper()) ), - new ScanQueryEngine() + new ScanQueryEngine(), + new ScanQueryConfig() ) ) .build() diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java index 9ff77bfeadc..89b3ffe68b1 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQuery.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonValue; import com.google.common.base.Preconditions; +import com.google.common.collect.Ordering; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.BaseQuery; import org.apache.druid.query.DataSource; @@ -207,6 +208,12 @@ public class ScanQuery extends BaseQuery return legacy; } + @Override + public Ordering getResultOrdering() + { + return Ordering.from(new ScanResultValueTimestampComparator(this)); + } + public ScanQuery withNonNullLegacy(final ScanQueryConfig scanQueryConfig) { return Druids.ScanQueryBuilder.copy(this).legacy(legacy != null ? legacy : scanQueryConfig.isLegacy()).build(); diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java index e59239e7ed0..39323cddfce 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java @@ -137,7 +137,6 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory scanQueryConfig.getMaxRowsQueuedForTimeOrdering()) { throw new UOE( @@ -307,7 +306,8 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory inputYielder; int batchSize; - public ScanResultValueBatchingSequence(Sequence inputSequence, int batchSize) { + public ScanResultValueBatchingSequence(Sequence inputSequence, int batchSize) + { this.inputYielder = inputSequence.toYielder( null, new YieldingAccumulator() @@ -324,9 +324,9 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory Yielder toYielder( - OutType initValue, YieldingAccumulator accumulator + OutType initValue, + YieldingAccumulator accumulator ) { return makeYielder(initValue, accumulator); @@ -354,7 +354,8 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory return events; } - public long getFirstEventTimestamp(ScanQuery query) + public long getFirstEventTimestamp(ScanQuery.ResultFormat resultFormat) { - if (query.getResultFormat().equals(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)) { + if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)) { return (Long) ((Map) ((List) this.getEvents()).get(0)).get(ColumnHolder.TIME_COLUMN_NAME); - } else if (query.getResultFormat().equals(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)) { + } else if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)) { int timeColumnIndex = this.getColumns().indexOf(ColumnHolder.TIME_COLUMN_NAME); List firstEvent = (List) ((List) this.getEvents()).get(0); return (Long) firstEvent.get(timeColumnIndex); } - throw new UOE("Unable to get first event timestamp using result format of [%s]", query.getResultFormat()); + throw new UOE("Unable to get first event timestamp using result format of [%s]", resultFormat.toString()); } public List toSingleEventScanResultValues() diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueTimestampComparator.java b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueTimestampComparator.java index 5ee1672b570..f94020bf81b 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueTimestampComparator.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueTimestampComparator.java @@ -44,8 +44,8 @@ public class ScanResultValueTimestampComparator implements Comparator CONFIG_MAP = ImmutableMap + .builder() + .put("maxSegmentsTimeOrderedInMemory", "1") + .put("maxRowsQueuedForTimeOrdering", "1") + .put("legacy", "true") + .build(); + + private final ImmutableMap CONFIG_MAP2 = ImmutableMap + .builder() + .put("legacy", "false") + .put("maxSegmentsTimeOrderedInMemory", "42") + .build(); + + private final ImmutableMap CONFIG_MAP_EMPTY = ImmutableMap + .builder() + .build(); + + @Test + public void testSerde() + { + final ScanQueryConfig config = MAPPER.convertValue(CONFIG_MAP, ScanQueryConfig.class); + Assert.assertEquals(1, config.getMaxRowsQueuedForTimeOrdering()); + Assert.assertEquals(1, config.getMaxSegmentsTimeOrderedInMemory()); + Assert.assertTrue(config.isLegacy()); + + final ScanQueryConfig config2 = MAPPER.convertValue(CONFIG_MAP2, ScanQueryConfig.class); + Assert.assertEquals(100000, config2.getMaxRowsQueuedForTimeOrdering()); + Assert.assertEquals(42, config2.getMaxSegmentsTimeOrderedInMemory()); + Assert.assertFalse(config2.isLegacy()); + + final ScanQueryConfig config3 = MAPPER.convertValue(CONFIG_MAP_EMPTY, ScanQueryConfig.class); + Assert.assertEquals(100000, config3.getMaxRowsQueuedForTimeOrdering()); + Assert.assertEquals(50, config3.getMaxSegmentsTimeOrderedInMemory()); + Assert.assertFalse(config3.isLegacy()); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryQueryToolChestTest.java deleted file mode 100644 index 83b792c92c1..00000000000 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryQueryToolChestTest.java +++ /dev/null @@ -1,224 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.scan; - -import org.apache.druid.query.Druids; -import org.apache.druid.query.Query; -import org.apache.druid.query.QueryRunner; -import org.apache.druid.query.QuerySegmentWalker; -import org.apache.druid.query.spec.QuerySegmentSpec; -import org.apache.druid.segment.column.ColumnHolder; -import org.joda.time.Interval; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -import static org.easymock.EasyMock.createNiceMock; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.replay; - -public class ScanQueryQueryToolChestTest -{ - /* - private static ScanQueryQueryToolChest chest; - private static ScanQueryConfig config; - private static int numElements; - private static QuerySegmentSpec emptySegmentSpec; - - @BeforeClass - public static void setup() - { - - config = createNiceMock(ScanQueryConfig.class); - expect(config.getMaxRowsQueuedForTimeOrdering()).andReturn(100000); - replay(config); - chest = new ScanQueryQueryToolChest(config, null); - numElements = 1000; - emptySegmentSpec = new QuerySegmentSpec() - { - @Override - public List getIntervals() - { - return null; - } - - @Override - public QueryRunner lookup( - Query query, - QuerySegmentWalker walker - ) - { - return null; - } - }; - } - - @Test - public void testDescendingHeapsortListScanResultValues() - { - List inputs = new ArrayList<>(); - for (long i = 0; i < numElements; i++) { - HashMap event = new HashMap<>(); - event.put("__time", i * 1000); - inputs.add( - new ScanResultValue( - "some segment id", - Collections.singletonList("__time"), - Collections.singletonList(event) - ) - ); - } - ScanQuery scanQuery = new Druids.ScanQueryBuilder() - .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST) - .timeOrder(ScanQuery.TimeOrder.DESCENDING) - .dataSource("some data source") - .intervals(emptySegmentSpec) - .limit(99999) - .build(); - Iterator sorted = chest.sortAndLimitScanResultValues(inputs.iterator(), scanQuery); - - int count = 0; - Long previousTime = Long.MAX_VALUE; - while (sorted.hasNext()) { - count++; - ScanResultValue curr = sorted.next(); - Long currentTime = (Long) - ((Map) (((List) curr.getEvents()).get(0))).get(ColumnHolder.TIME_COLUMN_NAME); - Assert.assertTrue(currentTime < previousTime); - previousTime = currentTime; - } - Assert.assertEquals(numElements, count); - } - - @Test - public void testAscendingHeapsortListScanResultValues() - { - List inputs = new ArrayList<>(); - for (long i = numElements; i > 0; i--) { - HashMap event = new HashMap<>(); - event.put("__time", i * 1000); - inputs.add( - new ScanResultValue( - "some segment id", - Collections.singletonList("__time"), - Collections.singletonList(event) - ) - ); - } - ScanQuery scanQuery = new Druids.ScanQueryBuilder() - .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST) - .timeOrder(ScanQuery.TimeOrder.ASCENDING) - .dataSource("some data source") - .intervals(emptySegmentSpec) - .limit(99999) - .build(); - Iterator sorted = chest.sortAndLimitScanResultValues(inputs.iterator(), scanQuery); - - int count = 0; - Long previousTime = -1L; - while (sorted.hasNext()) { - count++; - ScanResultValue curr = sorted.next(); - Long currentTime = (Long) - ((Map) (((List) curr.getEvents()).get(0))).get(ColumnHolder.TIME_COLUMN_NAME); - Assert.assertTrue(currentTime > previousTime); - previousTime = currentTime; - } - Assert.assertEquals(numElements, count); - } - - @Test - public void testDescendingHeapsortCompactedListScanResultValues() - { - List inputs = new ArrayList<>(); - for (long i = 0; i < numElements; i++) { - inputs.add( - new ScanResultValue( - "some segment id", - Collections.singletonList("__time"), - Collections.singletonList(Collections.singletonList(new Long(i * 1000))) - ) - ); - } - ScanQuery scanQuery = new Druids.ScanQueryBuilder() - .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) - .timeOrder(ScanQuery.TimeOrder.DESCENDING) - .dataSource("some data source") - .intervals(emptySegmentSpec) - .limit(99999) - .build(); - Iterator sorted = chest.sortAndLimitScanResultValues(inputs.iterator(), scanQuery); - - Long previousTime = Long.MAX_VALUE; - int count = 0; - while (sorted.hasNext()) { - count++; - ScanResultValue curr = sorted.next(); - Long currentTime = (Long) - ((List) (((List) curr.getEvents()).get(0))).get(0); - Assert.assertTrue(currentTime < previousTime); - previousTime = currentTime; - } - Assert.assertEquals(numElements, count); - } - - @Test - public void testAscendingHeapsortCompactedListScanResultValues() - { - List inputs = new ArrayList<>(); - for (long i = numElements; i > 0; i--) { - inputs.add( - new ScanResultValue( - "some segment id", - Collections.singletonList("__time"), - Collections.singletonList(Collections.singletonList(new Long(i * 1000))) - ) - ); - } - ScanQuery scanQuery = new Druids.ScanQueryBuilder() - .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) - .timeOrder(ScanQuery.TimeOrder.ASCENDING) - .dataSource("some data source") - .intervals(emptySegmentSpec) - .limit(99999) - .build(); - Iterator sorted = chest.sortAndLimitScanResultValues(inputs.iterator(), scanQuery); - - Long previousTime = -1L; - int count = 0; - while (sorted.hasNext()) { - count++; - ScanResultValue curr = sorted.next(); - Long currentTime = (Long) - ((List) (((List) curr.getEvents()).get(0))).get(0); - Assert.assertTrue(currentTime > previousTime); - previousTime = currentTime; - } - Assert.assertEquals(numElements, count); - } - */ -} diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueSerdeTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueSerdeTest.java deleted file mode 100644 index 572f1d8615c..00000000000 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueSerdeTest.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.scan; - -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.druid.jackson.DefaultObjectMapper; -import org.junit.Assert; -import org.junit.Test; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class ScanResultValueSerdeTest -{ - private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); - - @Test - public void testSerdeScanResultValueCompactedList() throws IOException - { - String segmentId = "some_segment_id"; - List columns = new ArrayList<>(Arrays.asList("col1", "col2", "col3")); - List event = new ArrayList<>(Arrays.asList( - "prop1", - "prop2", - "prop3" - )); - List> events = new ArrayList<>(Collections.singletonList(event)); - ScanResultValue srv = new ScanResultValue(segmentId, columns, events); - String serialized = jsonMapper.writeValueAsString(srv); - ScanResultValue deserialized = jsonMapper.readValue(serialized, ScanResultValue.class); - Assert.assertEquals(srv, deserialized); - } - - @Test - public void testSerdeScanResultValueNonCompactedList() throws IOException - { - String segmentId = "some_segment_id"; - List columns = new ArrayList<>(Arrays.asList("col1", "col2", "col3")); - Map event = new HashMap<>(); - event.put("key1", new Integer(4)); - event.put("key2", "some_string"); - event.put("key3", new Double(4.1)); - List> events = new ArrayList<>(Collections.singletonList(event)); - ScanResultValue srv = new ScanResultValue(segmentId, columns, events); - String serialized = jsonMapper.writeValueAsString(srv); - ScanResultValue deserialized = jsonMapper.readValue(serialized, ScanResultValue.class); - Assert.assertEquals(srv, deserialized); - } -} diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueTest.java new file mode 100644 index 00000000000..47f82ad8ce5 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.scan; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.segment.column.ColumnHolder; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ScanResultValueTest +{ + private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper(); + private static final long TIME_1 = 1234567890000L; + private static final long TIME_2 = 9876543210000L; + + private static ScanResultValue compactedListSRV; + private static ScanResultValue listSRV; + + @BeforeClass + public static void setup() + { + String segmentId = "some_segment_id"; + List columns = new ArrayList<>(Arrays.asList(ColumnHolder.TIME_COLUMN_NAME, "name", "count")); + List event = new ArrayList<>(Arrays.asList( + TIME_1, + "Feridun", + 4 + )); + List event2 = new ArrayList<>(Arrays.asList( + TIME_2, + "Justin", + 6 + )); + + List> events = Arrays.asList(event, event2); + compactedListSRV = new ScanResultValue(segmentId, columns, events); + + Map eventMap1 = new HashMap<>(); + eventMap1.put(ColumnHolder.TIME_COLUMN_NAME, TIME_1); + eventMap1.put("name", "Feridun"); + eventMap1.put("count", 4); + Map eventMap2 = new HashMap<>(); + eventMap2.put(ColumnHolder.TIME_COLUMN_NAME, TIME_2); + eventMap2.put("name", "Justin"); + eventMap2.put("count", 6); + List> eventMaps = Arrays.asList(eventMap1, eventMap2); + listSRV = new ScanResultValue(segmentId, columns, eventMaps); + } + + @Test + public void testSerdeScanResultValueCompactedList() throws IOException + { + + String serialized = JSON_MAPPER.writeValueAsString(compactedListSRV); + ScanResultValue deserialized = JSON_MAPPER.readValue(serialized, ScanResultValue.class); + Assert.assertEquals(compactedListSRV, deserialized); + } + + @Test + public void testSerdeScanResultValueNonCompactedList() throws IOException + { + + String serialized = JSON_MAPPER.writeValueAsString(listSRV); + ScanResultValue deserialized = JSON_MAPPER.readValue(serialized, ScanResultValue.class); + Assert.assertEquals(listSRV, deserialized); + } + + @Test + public void testGetFirstEventTimestampCompactedList() + { + long timestamp = compactedListSRV.getFirstEventTimestamp(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST); + Assert.assertEquals(TIME_1, timestamp); + } + + @Test + public void testGetFirstEventTimestampNonCompactedList() + { + long timestamp = listSRV.getFirstEventTimestamp(ScanQuery.ResultFormat.RESULT_FORMAT_LIST); + Assert.assertEquals(TIME_1, timestamp); + } + + @Test + public void testToSingleEventScanResultValues() + { + List compactedListScanResultValues = compactedListSRV.toSingleEventScanResultValues(); + for (ScanResultValue srv : compactedListScanResultValues) { + List events = (List) srv.getEvents(); + Assert.assertEquals(1, events.size()); + } + List listScanResultValues = listSRV.toSingleEventScanResultValues(); + for (ScanResultValue srv : compactedListScanResultValues) { + List events = (List) srv.getEvents(); + Assert.assertEquals(1, events.size()); + } + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java index 778c44527a5..a5fa0d4a2a4 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java @@ -969,7 +969,6 @@ public class DruidQuery Ordering.natural().sortedCopy(ImmutableSet.copyOf(outputRowSignature.getRowOrder())), false, ImmutableSortedMap.copyOf(plannerContext.getQueryContext()) - ); }