Changes based on Gian's comments

This commit is contained in:
Justin Borromeo 2019-02-19 17:52:06 -08:00
parent 35150fe1a6
commit 7baeade832
17 changed files with 307 additions and 131 deletions

View File

@ -113,7 +113,7 @@ public class ScanBenchmark
@Param({"1000", "99999"})
private int limit;
@Param({"none", "descending", "ascending"})
@Param({"NONE", "DESCENDING", "ASCENDING"})
private static ScanQuery.TimeOrder timeOrdering;
private static final Logger log = new Logger(ScanBenchmark.class);

View File

@ -27,8 +27,11 @@ title: "Scan query"
The Scan query returns raw Druid rows in streaming mode. The biggest difference between the Select query and the Scan
query is that the Scan query does not retain all the returned rows in memory before they are returned to the client
(except when time-ordering is used). The Select query _will_ retain the rows in memory, causing memory pressure if too
many rows are returned. The Scan query can return all the rows without issuing another pagination query, which is
extremely useful when directly querying against historical or realtime nodes.
many rows are returned. The Scan query can return all the rows without issuing another pagination query.
In addition to straightforward usage where a Scan query is issued to the Broker, the Scan query can also be issued
directly to Historical processes or streaming ingestion tasks. This can be useful if you want to retrieve large
amounts of data in parallel.
An example Scan query object is shown below:

View File

@ -918,7 +918,7 @@ public class Druids
private QuerySegmentSpec querySegmentSpec;
private VirtualColumns virtualColumns;
private Map<String, Object> context;
private String resultFormat;
private ScanQuery.ResultFormat resultFormat;
private int batchSize;
private long limit;
private DimFilter dimFilter;
@ -1009,7 +1009,7 @@ public class Druids
return this;
}
public ScanQueryBuilder resultFormat(String r)
public ScanQueryBuilder resultFormat(ScanQuery.ResultFormat r)
{
resultFormat = r;
return this;

View File

@ -21,7 +21,9 @@ package org.apache.druid.query.scan;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.Druids;
@ -36,18 +38,76 @@ import java.util.Objects;
public class ScanQuery extends BaseQuery<ScanResultValue>
{
public static final String RESULT_FORMAT_LIST = "list";
public static final String RESULT_FORMAT_COMPACTED_LIST = "compactedList";
public static final String RESULT_FORMAT_VALUE_VECTOR = "valueVector";
public enum ResultFormat
{
RESULT_FORMAT_LIST,
RESULT_FORMAT_COMPACTED_LIST,
RESULT_FORMAT_VALUE_VECTOR;
public enum TimeOrder {
@JsonProperty("ascending") ASCENDING,
@JsonProperty("descending") DESCENDING,
@JsonProperty("none") NONE
@JsonValue
@Override
public String toString()
{
switch (this) {
case RESULT_FORMAT_LIST:
return "list";
case RESULT_FORMAT_COMPACTED_LIST:
return "compactedList";
case RESULT_FORMAT_VALUE_VECTOR:
return "valueVector";
default:
return "";
}
}
@JsonCreator
public static ResultFormat fromString(String name)
{
switch (name) {
case "compactedList":
return RESULT_FORMAT_COMPACTED_LIST;
case "valueVector":
return RESULT_FORMAT_VALUE_VECTOR;
case "list":
return RESULT_FORMAT_LIST;
default:
return RESULT_FORMAT_LIST;
}
}
public byte[] getCacheKey()
{
return new byte[]{(byte) this.ordinal()};
}
}
public enum TimeOrder
{
ASCENDING,
DESCENDING,
NONE;
@JsonValue
@Override
public String toString()
{
return StringUtils.toLowerCase(this.name());
}
@JsonCreator
public static TimeOrder fromString(String name)
{
return valueOf(StringUtils.toUpperCase(name));
}
public byte[] getCacheKey()
{
return new byte[]{(byte) this.ordinal()};
}
}
private final VirtualColumns virtualColumns;
private final String resultFormat;
private final ResultFormat resultFormat;
private final int batchSize;
private final long limit;
private final DimFilter dimFilter;
@ -60,7 +120,7 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
@JsonProperty("dataSource") DataSource dataSource,
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
@JsonProperty("virtualColumns") VirtualColumns virtualColumns,
@JsonProperty("resultFormat") String resultFormat,
@JsonProperty("resultFormat") ResultFormat resultFormat,
@JsonProperty("batchSize") int batchSize,
@JsonProperty("limit") long limit,
@JsonProperty("timeOrder") TimeOrder timeOrder,
@ -72,7 +132,7 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
{
super(dataSource, querySegmentSpec, false, context);
this.virtualColumns = VirtualColumns.nullToEmpty(virtualColumns);
this.resultFormat = resultFormat == null ? RESULT_FORMAT_LIST : resultFormat;
this.resultFormat = resultFormat;
this.batchSize = (batchSize == 0) ? 4096 * 5 : batchSize;
this.limit = (limit == 0) ? Long.MAX_VALUE : limit;
Preconditions.checkArgument(this.batchSize > 0, "batchSize must be greater than 0");
@ -90,7 +150,7 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
}
@JsonProperty
public String getResultFormat()
public ResultFormat getResultFormat()
{
return resultFormat;
}

View File

@ -178,13 +178,13 @@ public class ScanQueryEngine
}
final long lastOffset = offset;
final Object events;
final String resultFormat = query.getResultFormat();
if (ScanQuery.RESULT_FORMAT_COMPACTED_LIST.equals(resultFormat)) {
final ScanQuery.ResultFormat resultFormat = query.getResultFormat();
if (ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST.equals(resultFormat)) {
events = rowsToCompactedList();
} else if (ScanQuery.RESULT_FORMAT_LIST.equals(resultFormat)) {
} else if (ScanQuery.ResultFormat.RESULT_FORMAT_LIST.equals(resultFormat)) {
events = rowsToList();
} else {
throw new UOE("resultFormat[%s] is not supported", resultFormat);
throw new UOE("resultFormat[%s] is not supported", resultFormat.toString());
}
responseContext.put(
ScanQueryRunnerFactory.CTX_COUNT,

View File

@ -33,7 +33,7 @@ import java.util.Map;
public class ScanQueryLimitRowIterator implements CloseableIterator<ScanResultValue>
{
private Yielder<ScanResultValue> yielder;
private String resultFormat;
private ScanQuery.ResultFormat resultFormat;
private long limit;
private long count = 0;
@ -71,8 +71,8 @@ public class ScanQueryLimitRowIterator implements CloseableIterator<ScanResultVa
public ScanResultValue next()
{
ScanResultValue batch = yielder.get();
if (ScanQuery.RESULT_FORMAT_COMPACTED_LIST.equals(resultFormat) ||
ScanQuery.RESULT_FORMAT_LIST.equals(resultFormat)) {
if (ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST.equals(resultFormat) ||
ScanQuery.ResultFormat.RESULT_FORMAT_LIST.equals(resultFormat)) {
List events = (List) batch.getEvents();
if (events.size() <= limit - count) {
count += events.size();
@ -86,7 +86,7 @@ public class ScanQueryLimitRowIterator implements CloseableIterator<ScanResultVa
return new ScanResultValue(batch.getSegmentId(), batch.getColumns(), events.subList(0, left));
}
}
throw new UnsupportedOperationException(ScanQuery.RESULT_FORMAT_VALUE_VECTOR + " is not supported yet");
throw new UnsupportedOperationException(ScanQuery.ResultFormat.RESULT_FORMAT_VALUE_VECTOR + " is not supported yet");
}
@Override

View File

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.scan;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.YieldingAccumulator;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import java.io.IOException;
import java.util.Map;
public class ScanQueryNoLimitRowIterator implements CloseableIterator<ScanResultValue>
{
private Yielder<ScanResultValue> yielder;
private ScanQuery.ResultFormat resultFormat;
public ScanQueryNoLimitRowIterator(
QueryRunner<ScanResultValue> baseRunner,
QueryPlus<ScanResultValue> queryPlus,
Map<String, Object> responseContext
)
{
ScanQuery query = Druids.ScanQueryBuilder.copy((ScanQuery) queryPlus.getQuery()).limit(Long.MAX_VALUE).timeOrder(
ScanQuery.TimeOrder.NONE).build();
resultFormat = query.getResultFormat();
queryPlus = queryPlus.withQuery(query);
Sequence<ScanResultValue> baseSequence = baseRunner.run(queryPlus, responseContext);
yielder = baseSequence.toYielder(
null,
new YieldingAccumulator<ScanResultValue, ScanResultValue>()
{
@Override
public ScanResultValue accumulate(ScanResultValue accumulated, ScanResultValue in)
{
yield();
return in;
}
}
);
}
@Override
public boolean hasNext()
{
return !yielder.isDone();
}
@Override
public ScanResultValue next()
{
ScanResultValue batch = yielder.get();
if (ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST.equals(resultFormat) ||
ScanQuery.ResultFormat.RESULT_FORMAT_LIST.equals(resultFormat)) {
yielder = yielder.next(null);
return batch;
}
throw new UnsupportedOperationException(ScanQuery.ResultFormat.RESULT_FORMAT_VALUE_VECTOR + " is not supported yet");
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
@Override
public void close() throws IOException
{
yielder.close();
}
}

View File

@ -39,9 +39,11 @@ import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.aggregation.MetricManipulationFn;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -74,29 +76,42 @@ public class ScanQueryQueryToolChest extends QueryToolChest<ScanResultValue, Sca
// the same way, even if they have different default legacy values.
final ScanQuery scanQuery = ((ScanQuery) queryPlus.getQuery()).withNonNullLegacy(scanQueryConfig);
final QueryPlus<ScanResultValue> queryPlusWithNonNullLegacy = queryPlus.withQuery(scanQuery);
final BaseSequence.IteratorMaker scanQueryLimitRowIteratorMaker =
new BaseSequence.IteratorMaker<ScanResultValue, ScanQueryLimitRowIterator>()
{
@Override
public ScanQueryLimitRowIterator make()
{
return new ScanQueryLimitRowIterator(runner, queryPlusWithNonNullLegacy, responseContext);
}
@Override
public void cleanup(ScanQueryLimitRowIterator iterFromMake)
{
CloseQuietly.close(iterFromMake);
}
};
if (scanQuery.getTimeOrder().equals(ScanQuery.TimeOrder.NONE)) {
if (scanQuery.getLimit() == Long.MAX_VALUE) {
return runner.run(queryPlusWithNonNullLegacy, responseContext);
} else {
return new BaseSequence<>(
new BaseSequence.IteratorMaker<ScanResultValue, ScanQueryLimitRowIterator>()
{
@Override
public ScanQueryLimitRowIterator make()
{
return new ScanQueryLimitRowIterator(runner, queryPlusWithNonNullLegacy, responseContext);
}
@Override
public void cleanup(ScanQueryLimitRowIterator iterFromMake)
{
CloseQuietly.close(iterFromMake);
}
});
}
return new BaseSequence<ScanResultValue, ScanQueryLimitRowIterator>(scanQueryLimitRowIteratorMaker);
} else if (scanQuery.getLimit() <= scanQueryConfig.getMaxRowsTimeOrderedInMemory()) {
Iterator<ScanResultValue> scanResultIterator = scanQueryLimitRowIteratorMaker.make();
ScanQueryNoLimitRowIterator scanResultIterator =
new BaseSequence.IteratorMaker<ScanResultValue, ScanQueryNoLimitRowIterator>()
{
@Override
public ScanQueryNoLimitRowIterator make()
{
return new ScanQueryNoLimitRowIterator(runner, queryPlusWithNonNullLegacy, responseContext);
}
@Override
public void cleanup(ScanQueryNoLimitRowIterator iterFromMake)
{
CloseQuietly.close(iterFromMake);
}
}.make();
return new BaseSequence(
new BaseSequence.IteratorMaker<ScanResultValue, ScanBatchedIterator>()
@ -105,7 +120,7 @@ public class ScanQueryQueryToolChest extends QueryToolChest<ScanResultValue, Sca
public ScanBatchedIterator make()
{
return new ScanBatchedIterator(
sortScanResultValues(scanResultIterator, scanQuery),
sortAndLimitScanResultValues(scanResultIterator, scanQuery),
scanQuery.getBatchSize()
);
}
@ -167,14 +182,14 @@ public class ScanQueryQueryToolChest extends QueryToolChest<ScanResultValue, Sca
}
@VisibleForTesting
Iterator<ScanResultValue> sortScanResultValues(Iterator<ScanResultValue> inputIterator, ScanQuery scanQuery)
Iterator<ScanResultValue> sortAndLimitScanResultValues(Iterator<ScanResultValue> inputIterator, ScanQuery scanQuery)
{
Comparator<ScanResultValue> priorityQComparator = new ScanResultValueTimestampComparator(scanQuery);
// Converting the limit from long to int could theoretically throw an ArithmeticException but this branch
// only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which should be < Integer.MAX_VALUE)
PriorityQueue<ScanResultValue> q = new PriorityQueue<>(Math.toIntExact(scanQuery.getLimit()), priorityQComparator);
int limit = Math.toIntExact(scanQuery.getLimit());
PriorityQueue<ScanResultValue> q = new PriorityQueue<>(limit, priorityQComparator);
while (inputIterator.hasNext()) {
ScanResultValue next = inputIterator.next();
@ -183,14 +198,19 @@ public class ScanQueryQueryToolChest extends QueryToolChest<ScanResultValue, Sca
// Using an intermediate unbatched ScanResultValue is not that great memory-wise, but the column list
// needs to be preserved for queries using the compactedList result format
q.offer(new ScanResultValue(null, next.getColumns(), Collections.singletonList(event)));
if (q.size() > limit) {
q.poll();
}
}
}
// Need to convert to a List because Priority Queue's iterator doesn't guarantee that the sorted order
// will be maintained
List<ScanResultValue> sortedElements = new ArrayList<>(q.size());
final Deque<ScanResultValue> sortedElements = new ArrayDeque<>(q.size());
while (q.size() != 0) {
sortedElements.add(q.poll());
// We add at the front of the list because poll removes the tail of the queue.
sortedElements.addFirst(q.poll());
}
return sortedElements.iterator();
}

View File

@ -73,13 +73,14 @@ public class ScanResultValue implements Comparable<ScanResultValue>
return events;
}
public long getFirstEventTimestamp(ScanQuery query) {
if (query.getResultFormat().equals(ScanQuery.RESULT_FORMAT_LIST)) {
public long getFirstEventTimestamp(ScanQuery query)
{
if (query.getResultFormat().equals(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)) {
return (Long) ((Map<String, Object>) ((List<Object>) this.getEvents()).get(0)).get(ColumnHolder.TIME_COLUMN_NAME);
} else if (query.getResultFormat().equals(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)) {
} else if (query.getResultFormat().equals(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)) {
int timeColumnIndex = this.getColumns().indexOf(ColumnHolder.TIME_COLUMN_NAME);
List<Object> firstEvent = (List<Object>) ((List<Object>) this.getEvents()).get(0);
return (Long)firstEvent.get(timeColumnIndex);
return (Long) firstEvent.get(timeColumnIndex);
}
throw new UOE("Unable to get first event timestamp using result format of [%s]", query.getResultFormat());
}

View File

@ -47,8 +47,8 @@ public class ScanResultValueTimestampComparator implements Comparator<ScanResult
o1.getFirstEventTimestamp(scanQuery),
o2.getFirstEventTimestamp(scanQuery));
if (scanQuery.getTimeOrder().equals(ScanQuery.TimeOrder.DESCENDING)) {
return comparison * -1;
return comparison;
}
return comparison;
return comparison * -1;
}
}

View File

@ -91,13 +91,13 @@ public class ScanQueryQueryToolChestTest
);
}
ScanQuery scanQuery = new Druids.ScanQueryBuilder()
.resultFormat("list")
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)
.timeOrder(ScanQuery.TimeOrder.DESCENDING)
.dataSource("some data source")
.intervals(emptySegmentSpec)
.limit(99999)
.build();
Iterator<ScanResultValue> sorted = chest.sortScanResultValues(inputs.iterator(), scanQuery);
Iterator<ScanResultValue> sorted = chest.sortAndLimitScanResultValues(inputs.iterator(), scanQuery);
int count = 0;
Long previousTime = Long.MAX_VALUE;
@ -128,13 +128,13 @@ public class ScanQueryQueryToolChestTest
);
}
ScanQuery scanQuery = new Druids.ScanQueryBuilder()
.resultFormat("list")
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)
.timeOrder(ScanQuery.TimeOrder.ASCENDING)
.dataSource("some data source")
.intervals(emptySegmentSpec)
.limit(99999)
.build();
Iterator<ScanResultValue> sorted = chest.sortScanResultValues(inputs.iterator(), scanQuery);
Iterator<ScanResultValue> sorted = chest.sortAndLimitScanResultValues(inputs.iterator(), scanQuery);
int count = 0;
Long previousTime = -1L;
@ -163,13 +163,13 @@ public class ScanQueryQueryToolChestTest
);
}
ScanQuery scanQuery = new Druids.ScanQueryBuilder()
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.timeOrder(ScanQuery.TimeOrder.DESCENDING)
.dataSource("some data source")
.intervals(emptySegmentSpec)
.limit(99999)
.build();
Iterator<ScanResultValue> sorted = chest.sortScanResultValues(inputs.iterator(), scanQuery);
Iterator<ScanResultValue> sorted = chest.sortAndLimitScanResultValues(inputs.iterator(), scanQuery);
Long previousTime = Long.MAX_VALUE;
int count = 0;
@ -198,13 +198,13 @@ public class ScanQueryQueryToolChestTest
);
}
ScanQuery scanQuery = new Druids.ScanQueryBuilder()
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.timeOrder(ScanQuery.TimeOrder.ASCENDING)
.dataSource("some data source")
.intervals(emptySegmentSpec)
.limit(99999)
.build();
Iterator<ScanResultValue> sorted = chest.sortScanResultValues(inputs.iterator(), scanQuery);
Iterator<ScanResultValue> sorted = chest.sortAndLimitScanResultValues(inputs.iterator(), scanQuery);
Long previousTime = -1L;
int count = 0;

View File

@ -216,7 +216,7 @@ public class ScanQueryRunnerTest
ScanQuery query = newTestQuery()
.intervals(I_0112_0114)
.virtualColumns(EXPR_COLUMN)
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.build();
HashMap<String, Object> context = new HashMap<String, Object>();
@ -322,7 +322,7 @@ public class ScanQueryRunnerTest
ScanQuery query = newTestQuery()
.intervals(I_0112_0114)
.columns(QueryRunnerTestHelper.marketDimension, QueryRunnerTestHelper.indexMetric)
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.build();
HashMap<String, Object> context = new HashMap<String, Object>();
@ -524,7 +524,7 @@ public class ScanQueryRunnerTest
.filters(new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot", null))
.columns(QueryRunnerTestHelper.qualityDimension, QueryRunnerTestHelper.indexMetric)
.limit(limit)
.timeOrder("ascending")
.timeOrder(ScanQuery.TimeOrder.ASCENDING)
.build();
HashMap<String, Object> context = new HashMap<>();
@ -582,7 +582,7 @@ public class ScanQueryRunnerTest
.filters(new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot", null))
.columns(QueryRunnerTestHelper.qualityDimension, QueryRunnerTestHelper.indexMetric)
.limit(limit)
.timeOrder("descending")
.timeOrder(ScanQuery.TimeOrder.DESCENDING)
.build();
HashMap<String, Object> context = new HashMap<>();
@ -664,8 +664,8 @@ public class ScanQueryRunnerTest
.intervals(I_0112_0114)
.filters(new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot", null))
.columns(QueryRunnerTestHelper.qualityDimension, QueryRunnerTestHelper.indexMetric)
.resultFormat("compactedList")
.timeOrder("ascending")
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.timeOrder(ScanQuery.TimeOrder.ASCENDING)
.limit(limit)
.build();
@ -725,8 +725,8 @@ public class ScanQueryRunnerTest
.intervals(I_0112_0114)
.filters(new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot", null))
.columns(QueryRunnerTestHelper.qualityDimension, QueryRunnerTestHelper.indexMetric)
.resultFormat("compactedList")
.timeOrder("descending")
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.timeOrder(ScanQuery.TimeOrder.DESCENDING)
.limit(limit)
.build();

View File

@ -65,10 +65,10 @@ public class ScanQuerySpecTest
new TableDataSource(QueryRunnerTestHelper.dataSource),
new LegacySegmentSpec(Intervals.of("2011-01-12/2011-01-14")),
VirtualColumns.EMPTY,
null,
ScanQuery.ResultFormat.RESULT_FORMAT_LIST,
0,
3,
"none",
ScanQuery.TimeOrder.NONE,
null,
Arrays.asList("market", "quality", "index"),
null,

View File

@ -53,7 +53,7 @@ public class ScanResultValueTimestampComparatorTest
{
ScanQuery query = Druids.newScanQueryBuilder()
.timeOrder(ScanQuery.TimeOrder.DESCENDING)
.resultFormat(ScanQuery.RESULT_FORMAT_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)
.dataSource("some src")
.intervals(intervalSpec)
.build();
@ -82,7 +82,7 @@ public class ScanResultValueTimestampComparatorTest
events2
);
Assert.assertEquals(1, comparator.compare(s1, s2));
Assert.assertEquals(-1, comparator.compare(s1, s2));
}
@Test
@ -90,7 +90,7 @@ public class ScanResultValueTimestampComparatorTest
{
ScanQuery query = Druids.newScanQueryBuilder()
.timeOrder(ScanQuery.TimeOrder.ASCENDING)
.resultFormat(ScanQuery.RESULT_FORMAT_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)
.dataSource("some src")
.intervals(intervalSpec)
.build();
@ -119,7 +119,7 @@ public class ScanResultValueTimestampComparatorTest
events2
);
Assert.assertEquals(-1, comparator.compare(s1, s2));
Assert.assertEquals(1, comparator.compare(s1, s2));
}
@Test
@ -127,7 +127,42 @@ public class ScanResultValueTimestampComparatorTest
{
ScanQuery query = Druids.newScanQueryBuilder()
.timeOrder(ScanQuery.TimeOrder.DESCENDING)
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.dataSource("some src")
.intervals(intervalSpec)
.build();
ScanResultValueTimestampComparator comparator = new ScanResultValueTimestampComparator(query);
List<List<Object>> events1 = new ArrayList<>();
List<Object> event1 = Collections.singletonList(new Long(42));
events1.add(event1);
ScanResultValue s1 = new ScanResultValue(
"segmentId",
Collections.singletonList(ColumnHolder.TIME_COLUMN_NAME),
events1
);
List<List<Object>> events2 = new ArrayList<>();
List<Object> event2 = Collections.singletonList(new Long(43));
events2.add(event2);
ScanResultValue s2 = new ScanResultValue(
"segmentId",
Collections.singletonList(ColumnHolder.TIME_COLUMN_NAME),
events2
);
Assert.assertEquals(-1, comparator.compare(s1, s2));
}
@Test
public void comparisonAscendingCompactedListTest()
{
ScanQuery query = Druids.newScanQueryBuilder()
.timeOrder(ScanQuery.TimeOrder.ASCENDING)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.dataSource("some src")
.intervals(intervalSpec)
.build();
@ -156,39 +191,4 @@ public class ScanResultValueTimestampComparatorTest
Assert.assertEquals(1, comparator.compare(s1, s2));
}
@Test
public void comparisonAscendingCompactedListTest()
{
ScanQuery query = Druids.newScanQueryBuilder()
.timeOrder(ScanQuery.TimeOrder.ASCENDING)
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.dataSource("some src")
.intervals(intervalSpec)
.build();
ScanResultValueTimestampComparator comparator = new ScanResultValueTimestampComparator(query);
List<List<Object>> events1 = new ArrayList<>();
List<Object> event1 = Collections.singletonList(new Long(42));
events1.add(event1);
ScanResultValue s1 = new ScanResultValue(
"segmentId",
Collections.singletonList(ColumnHolder.TIME_COLUMN_NAME),
events1
);
List<List<Object>> events2 = new ArrayList<>();
List<Object> event2 = Collections.singletonList(new Long(43));
events2.add(event2);
ScanResultValue s2 = new ScanResultValue(
"segmentId",
Collections.singletonList(ColumnHolder.TIME_COLUMN_NAME),
events2
);
Assert.assertEquals(-1, comparator.compare(s1, s2));
}
}

View File

@ -961,7 +961,7 @@ public class DruidQuery
dataSource,
filtration.getQuerySegmentSpec(),
selectProjection != null ? VirtualColumns.create(selectProjection.getVirtualColumns()) : VirtualColumns.EMPTY,
ScanQuery.RESULT_FORMAT_COMPACTED_LIST,
ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST,
0,
scanLimit,
null, // Will default to "none"

View File

@ -370,7 +370,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase
public static Druids.ScanQueryBuilder newScanQueryBuilder()
{
return new Druids.ScanQueryBuilder().resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
return new Druids.ScanQueryBuilder().resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.legacy(false);
}

View File

@ -111,7 +111,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.intervals(QSS(Filtration.eternity()))
.virtualColumns(EXPRESSION_VIRTUAL_COLUMN("v0", "2", ValueType.LONG))
.columns("dim1", "v0")
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.limit(1)
.context(QUERY_CONTEXT_DEFAULT)
.build()
@ -431,7 +431,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.dataSource(CalciteTests.DATASOURCE1)
.intervals(QSS(Filtration.eternity()))
.columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
@ -465,7 +465,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.dataSource(CalciteTests.FORBIDDEN_DATASOURCE)
.intervals(QSS(Filtration.eternity()))
.columns("__time", "cnt", "dim1", "dim2", "m1", "m2", "unique_dim1")
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
@ -531,7 +531,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.intervals(QSS(Filtration.eternity()))
.columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
.limit(2)
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
@ -556,7 +556,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
)
.columns("v0")
.limit(2)
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
@ -652,7 +652,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.intervals(QSS(Filtration.eternity()))
.columns("dim2")
.limit(2)
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
@ -744,14 +744,14 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.intervals(QSS(Filtration.eternity()))
.columns("dim1")
.filters(NOT(SELECTOR("dim1", "", null)))
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(QUERY_CONTEXT_DEFAULT)
.build(),
newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(QSS(Filtration.eternity()))
.columns("dim1", "dim2")
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
@ -1879,7 +1879,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
)
)
.columns("__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1")
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
@ -6715,7 +6715,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
)
))
.columns("__time", "cnt", "dim1", "dim2")
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
@ -7270,7 +7270,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
ValueType.STRING
))
.columns("v0")
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
@ -7296,7 +7296,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
ValueType.STRING
))
.columns("v0")
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
@ -7322,7 +7322,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.intervals(QSS(Filtration.eternity()))
.virtualColumns(EXPRESSION_VIRTUAL_COLUMN("v0", "concat(\"dim1\",\"dim1\")", ValueType.STRING))
.columns("v0")
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
@ -7348,7 +7348,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
ValueType.STRING
))
.columns("v0")
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(QUERY_CONTEXT_DEFAULT)
.build()
),
@ -7544,7 +7544,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.intervals(QSS(Filtration.eternity()))
.columns("dim1")
.filters(SELECTOR("f1", "0.1", null))
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.limit(1)
.context(QUERY_CONTEXT_DEFAULT)
.build()
@ -7566,7 +7566,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.intervals(QSS(Filtration.eternity()))
.columns("dim1")
.filters(SELECTOR("d1", "1.7", null))
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.limit(1)
.context(QUERY_CONTEXT_DEFAULT)
.build()
@ -7588,7 +7588,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.intervals(QSS(Filtration.eternity()))
.columns("dim1")
.filters(SELECTOR("l1", "7", null))
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.limit(1)
.context(QUERY_CONTEXT_DEFAULT)
.build()