diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java index 04d66f900cb..337b016e14f 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java @@ -20,6 +20,7 @@ package org.apache.druid.query.scan; import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Functions; import com.google.inject.Inject; @@ -98,32 +99,18 @@ public class ScanQueryQueryToolChest extends QueryToolChest(scanQueryLimitRowIteratorMaker); } else if (scanQuery.getTimeOrder().equals(ScanQuery.TIME_ORDER_ASCENDING) || scanQuery.getTimeOrder().equals(ScanQuery.TIME_ORDER_DESCENDING)) { - Comparator priorityQComparator = new ScanResultValueTimestampComparator(scanQuery); - - // Converting the limit from long to int could theoretically throw an ArithmeticException but this branch - // only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which should be < Integer.MAX_VALUE) - PriorityQueue q = new PriorityQueue(Math.toIntExact(scanQuery.getLimit()), priorityQComparator); Iterator scanResultIterator = scanQueryLimitRowIteratorMaker.make(); - while (scanResultIterator.hasNext()) { - ScanResultValue next = scanResultIterator.next(); - List events = (List) next.getEvents(); - for (Object event : events) { - // Using an intermediate unbatched ScanResultValue is not that great memory-wise, but the column list - // needs to be preserved for queries using the compactedList result format - q.offer(new ScanResultValue(null, next.getColumns(), Collections.singletonList(event))); - } - } - - Iterator queueIterator = q.iterator(); - return new BaseSequence( new BaseSequence.IteratorMaker() { @Override public ScanBatchedTimeOrderedQueueIterator make() { - return new ScanBatchedTimeOrderedQueueIterator(queueIterator, scanQuery.getBatchSize()); + return new ScanBatchedTimeOrderedQueueIterator( + heapsortScanResultValues(scanResultIterator, scanQuery), + scanQuery.getBatchSize() + ); } @Override @@ -177,6 +164,35 @@ public class ScanQueryQueryToolChest extends QueryToolChest heapsortScanResultValues(Iterator inputIterator, ScanQuery scanQuery) { + Comparator priorityQComparator = new ScanResultValueTimestampComparator(scanQuery); + + // Converting the limit from long to int could theoretically throw an ArithmeticException but this branch + // only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which should be < Integer.MAX_VALUE) + + PriorityQueue q = new PriorityQueue<> + (Math.toIntExact(scanQuery.getLimit()), priorityQComparator); + + while (inputIterator.hasNext()) { + + ScanResultValue next = inputIterator.next(); + List events = (List) next.getEvents(); + for (Object event : events) { + // Using an intermediate unbatched ScanResultValue is not that great memory-wise, but the column list + // needs to be preserved for queries using the compactedList result format + q.offer(new ScanResultValue(null, next.getColumns(), Collections.singletonList(event))); + } + } + // Need to convert to a List because Priority Queue's iterator doesn't guarantee that the sorted order + // will be maintained + List sortedElements = new ArrayList<>(q.size()); + while (q.size() != 0 ) { + sortedElements.add(q.poll()); + } + return sortedElements.iterator(); + } + private class ScanBatchedTimeOrderedQueueIterator implements CloseableIterator { private final Iterator itr; diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryQueryToolChestTest.java new file mode 100644 index 00000000000..89d846c54aa --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryQueryToolChestTest.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.scan; + +import org.apache.druid.query.Druids; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryRunner; +import org.apache.druid.query.QuerySegmentWalker; +import org.apache.druid.query.spec.QuerySegmentSpec; +import org.apache.druid.segment.column.ColumnHolder; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import static org.easymock.EasyMock.createNiceMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; + +public class ScanQueryQueryToolChestTest +{ + private static ScanQueryQueryToolChest chest; + private static ScanQueryConfig config; + + @Before + public void setup() + { + config = createNiceMock(ScanQueryConfig.class); + expect(config.getMaxRowsTimeOrderedInMemory()).andReturn(100000); + replay(config); + chest = new ScanQueryQueryToolChest(config, null); + } + + @Test + public void testDescendingHeapsortListScanResultValues() + { + List inputs = new ArrayList<>(); + for (long i = 0; i < 1000; i++) { + HashMap event = new HashMap<>(); + event.put("__time", i * 1000); + inputs.add( + new ScanResultValue( + "some segment id", + Collections.singletonList("__time"), + Collections.singletonList(event) + ) + ); + } + ScanQuery scanQuery = new Druids.ScanQueryBuilder() + .resultFormat("list") + .timeOrder(ScanQuery.TIME_ORDER_DESCENDING) + .dataSource("some data source") + .intervals(new QuerySegmentSpec() + { + @Override + public List getIntervals() + { + return null; + } + + @Override + public QueryRunner lookup( + Query query, QuerySegmentWalker walker + ) + { + return null; + } + }) + .limit(99999) + .build(); + Iterator sorted = chest.heapsortScanResultValues(inputs.iterator(), scanQuery); + + Long previousTime = Long.MAX_VALUE; + while (sorted.hasNext()) { + ScanResultValue curr = sorted.next(); + Long currentTime = (Long) + ((Map) (((List) curr.getEvents()).get(0))).get(ColumnHolder.TIME_COLUMN_NAME); + Assert.assertTrue("Event timestamp is less than that of the previous event", + currentTime < previousTime); + previousTime = currentTime; + } + } + + @Test + public void testAscendingHeapsortListScanResultValues() + { + List inputs = new ArrayList<>(); + for (long i = 1000; i > 0; i--) { + HashMap event = new HashMap<>(); + event.put("__time", i * 1000); + inputs.add( + new ScanResultValue( + "some segment id", + Collections.singletonList("__time"), + Collections.singletonList(event) + ) + ); + } + ScanQuery scanQuery = new Druids.ScanQueryBuilder() + .resultFormat("list") + .timeOrder(ScanQuery.TIME_ORDER_ASCENDING) + .dataSource("some data source") + .intervals(new QuerySegmentSpec() + { + @Override + public List getIntervals() + { + return null; + } + + @Override + public QueryRunner lookup( + Query query, QuerySegmentWalker walker + ) + { + return null; + } + }) + .limit(99999) + .build(); + Iterator sorted = chest.heapsortScanResultValues(inputs.iterator(), scanQuery); + + Long previousTime = -1L; + while (sorted.hasNext()) { + ScanResultValue curr = sorted.next(); + Long currentTime = (Long) + ((Map) (((List) curr.getEvents()).get(0))).get(ColumnHolder.TIME_COLUMN_NAME); + Assert.assertTrue( + "Event timestamp is greater than that of the previous event", + currentTime > previousTime + ); + previousTime = currentTime; + } + } +}