Stuff for time-ordered scan query

This commit is contained in:
Justin Borromeo 2019-02-01 18:00:58 -08:00
parent 6430ef8e1b
commit 7a6080f636
10 changed files with 520 additions and 58 deletions

View File

@ -24,7 +24,13 @@ title: "Scan query"
# Scan query
Scan query returns raw Druid rows in streaming mode.
The Scan query returns raw Druid rows in streaming mode. The biggest difference between the Select query and the Scan
query is that the Scan query does not retain all the returned rows in memory before they are returned to the client
(except when time-ordering is used). The Select query _will_ retain the rows in memory, causing memory pressure if too
many rows are returned. The Scan query can return all the rows without issuing another pagination query, which is
extremely useful when directly querying against historical or realtime nodes.
An example Scan query object is shown below:
```json
{
@ -36,28 +42,29 @@ Scan query returns raw Druid rows in streaming mode.
"2013-01-01/2013-01-02"
],
"batchSize":20480,
"limit":5
"limit":3
}
```
There are several main parts to a scan query:
The following are the main parameters for Scan queries:
|property|description|required?|
|--------|-----------|---------|
|queryType|This String should always be "scan"; this is the first thing Druid looks at to figure out how to interpret the query|yes|
|dataSource|A String or Object defining the data source to query, very similar to a table in a relational database. See [DataSource](../querying/datasource.html) for more information.|yes|
|intervals|A JSON Object representing ISO-8601 Intervals. This defines the time ranges to run the query over.|yes|
|resultFormat|How result represented, list or compactedList or valueVector. Currently only `list` and `compactedList` are supported. Default is `list`|no|
|resultFormat|How the results are represented: list, compactedList or valueVector. Currently only `list` and `compactedList` are supported. Default is `list`|no|
|filter|See [Filters](../querying/filters.html)|no|
|columns|A String array of dimensions and metrics to scan. If left empty, all dimensions and metrics are returned.|no|
|batchSize|How many rows buffered before return to client. Default is `20480`|no|
|limit|How many rows to return. If not specified, all rows will be returned.|no|
|timeOrder|The ordering of returned rows based on timestamp. "ascending", "descending", and "none" (default) are supported. Currently, "ascending" and "descending" are only supported for queries where the limit is less than `druid.query.scan.maxRowsTimeOrderedInMemory`. Scan queries that are either legacy mode or have a limit greater than `druid.query.scan.maxRowsTimeOrderedInMemory` will not be time-ordered and default to a timeOrder of "none".|none|
|legacy|Return results consistent with the legacy "scan-query" contrib extension. Defaults to the value set by `druid.query.scan.legacy`, which in turn defaults to false. See [Legacy mode](#legacy-mode) for details.|no|
|context|An additional JSON Object which can be used to specify certain flags.|no|
## Example results
The format of the result when resultFormat equals to `list`:
The format of the result when resultFormat equals `list`:
```json
[{
@ -123,41 +130,11 @@ The format of the result when resultFormat equals to `list`:
"delta" : 77.0,
"variation" : 77.0,
"deleted" : 0.0
}, {
"timestamp" : "2013-01-01T00:00:00.000Z",
"robot" : "0",
"namespace" : "article",
"anonymous" : "0",
"unpatrolled" : "0",
"page" : "113_U.S._73",
"language" : "en",
"newpage" : "1",
"user" : "MZMcBride",
"count" : 1.0,
"added" : 70.0,
"delta" : 70.0,
"variation" : 70.0,
"deleted" : 0.0
}, {
"timestamp" : "2013-01-01T00:00:00.000Z",
"robot" : "0",
"namespace" : "article",
"anonymous" : "0",
"unpatrolled" : "0",
"page" : "113_U.S._756",
"language" : "en",
"newpage" : "1",
"user" : "MZMcBride",
"count" : 1.0,
"added" : 68.0,
"delta" : 68.0,
"variation" : 68.0,
"deleted" : 0.0
} ]
} ]
```
The format of the result when resultFormat equals to `compactedList`:
The format of the result when resultFormat equals `compactedList`:
```json
[{
@ -168,17 +145,18 @@ The format of the result when resultFormat equals to `compactedList`:
"events" : [
["2013-01-01T00:00:00.000Z", "1", "article", "0", "0", "11._korpus_(NOVJ)", "sl", "0", "EmausBot", 1.0, 39.0, 39.0, 39.0, 0.0],
["2013-01-01T00:00:00.000Z", "0", "article", "0", "0", "112_U.S._580", "en", "1", "MZMcBride", 1.0, 70.0, 70.0, 70.0, 0.0],
["2013-01-01T00:00:00.000Z", "0", "article", "0", "0", "113_U.S._243", "en", "1", "MZMcBride", 1.0, 77.0, 77.0, 77.0, 0.0],
["2013-01-01T00:00:00.000Z", "0", "article", "0", "0", "113_U.S._73", "en", "1", "MZMcBride", 1.0, 70.0, 70.0, 70.0, 0.0],
["2013-01-01T00:00:00.000Z", "0", "article", "0", "0", "113_U.S._756", "en", "1", "MZMcBride", 1.0, 68.0, 68.0, 68.0, 0.0]
["2013-01-01T00:00:00.000Z", "0", "article", "0", "0", "113_U.S._243", "en", "1", "MZMcBride", 1.0, 77.0, 77.0, 77.0, 0.0]
]
} ]
```
The biggest difference between select query and scan query is that, scan query doesn't retain all rows in memory before rows can be returned to client.
It will cause memory pressure if too many rows required by select query.
Scan query doesn't have this issue.
Scan query can return all rows without issuing another pagination query, which is extremely useful when query against Historical or realtime node directly.
## Time Ordering
The Scan query currently supports ordering based on timestamp for non-legacy queries where the limit is less than
`druid.query.scan.maxRowsTimeOrderedInMemory` rows. The default value of `druid.query.scan.maxRowsTimeOrderedInMemory`
is 100000 rows. The reasoning behind this limit is that the current implementation of time ordering sorts all returned
records in memory. Attempting to load too many rows into memory runs the risk of Broker nodes running out of memory.
The limit can be configured based on server memory and number of dimensions being queried.
## Legacy mode
@ -194,3 +172,10 @@ Legacy mode can be triggered either by passing `"legacy" : true` in your query J
`druid.query.scan.legacy = true` on your Druid nodes. If you were previously using the scan-query contrib extension,
the best way to migrate is to activate legacy mode during a rolling upgrade, then switch it off after the upgrade
is complete.
## Configuration Properties
|property|description|values|default|
|--------|-----------|------|-------|
|druid.query.scan.maxRowsTimeOrderedInMemory|An integer in the range [0, 2147483647]|100000|
|druid.query.scan.legacy|Whether legacy mode should be turned on for Scan queries|true or false|false|

View File

@ -43,6 +43,10 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
public static final String RESULT_FORMAT_COMPACTED_LIST = "compactedList";
public static final String RESULT_FORMAT_VALUE_VECTOR = "valueVector";
public static final String TIME_ORDER_ASCENDING = "ascending";
public static final String TIME_ORDER_DESCENDING = "descending";
public static final String TIME_ORDER_NONE = "none";
private final VirtualColumns virtualColumns;
private final String resultFormat;
private final int batchSize;
@ -50,6 +54,7 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
private final DimFilter dimFilter;
private final List<String> columns;
private final Boolean legacy;
private final String timeOrder;
@JsonCreator
public ScanQuery(
@ -62,7 +67,8 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
@JsonProperty("filter") DimFilter dimFilter,
@JsonProperty("columns") List<String> columns,
@JsonProperty("legacy") Boolean legacy,
@JsonProperty("context") Map<String, Object> context
@JsonProperty("context") Map<String, Object> context,
@JsonProperty("timeOrder") String timeOrder
)
{
super(dataSource, querySegmentSpec, false, context);
@ -75,6 +81,7 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
this.dimFilter = dimFilter;
this.columns = columns;
this.legacy = legacy;
this.timeOrder = timeOrder == null ? TIME_ORDER_NONE : timeOrder;
}
@JsonProperty
@ -101,6 +108,12 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
return limit;
}
@JsonProperty
public String getTimeOrder()
{
return timeOrder;
}
@Override
public boolean hasFilters()
{
@ -234,6 +247,7 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
private DimFilter dimFilter;
private List<String> columns;
private Boolean legacy;
private String timeOrder;
public ScanQueryBuilder()
{
@ -247,6 +261,7 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
dimFilter = null;
columns = new ArrayList<>();
legacy = null;
timeOrder = null;
}
public ScanQuery build()
@ -261,7 +276,8 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
dimFilter,
columns,
legacy,
context
context,
timeOrder
);
}
@ -277,7 +293,8 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
.filters(query.getFilter())
.columns(query.getColumns())
.legacy(query.isLegacy())
.context(query.getContext());
.context(query.getContext())
.timeOrder(query.getTimeOrder());
}
public ScanQueryBuilder dataSource(String ds)
@ -339,6 +356,12 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
return this;
}
public ScanQueryBuilder timeOrder(String t)
{
timeOrder = t;
return this;
}
public ScanQueryBuilder columns(List<String> c)
{
columns = c;

View File

@ -40,6 +40,15 @@ public class ScanQueryConfig
return this;
}
// int should suffice here because no one should be sorting greater than 2B rows in memory
@JsonProperty
private int maxRowsTimeOrderedInMemory = 100000;
public int getMaxRowsTimeOrderedInMemory()
{
return maxRowsTimeOrderedInMemory;
}
@Override
public boolean equals(final Object o)
{

View File

@ -131,7 +131,8 @@ public class ScanQueryEngine
intervals.get(0),
query.getVirtualColumns(),
Granularities.ALL,
query.isDescending(),
query.getTimeOrder().equals(ScanQuery.TIME_ORDER_DESCENDING) ||
(query.getTimeOrder().equals(ScanQuery.TIME_ORDER_NONE) && query.isDescending()),
null
)
.map(cursor -> new BaseSequence<>(

View File

@ -22,10 +22,13 @@ package org.apache.druid.query.scan;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.primitives.Longs;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.GenericQueryMetricsFactory;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryMetrics;
@ -33,8 +36,16 @@ import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.aggregation.MetricManipulationFn;
import org.apache.druid.segment.column.ColumnHolder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
public class ScanQueryQueryToolChest extends QueryToolChest<ScanResultValue, ScanQuery>
{
@ -44,6 +55,7 @@ public class ScanQueryQueryToolChest extends QueryToolChest<ScanResultValue, Sca
private final ScanQueryConfig scanQueryConfig;
private final GenericQueryMetricsFactory queryMetricsFactory;
private final long maxRowsForInMemoryTimeOrdering;
@Inject
public ScanQueryQueryToolChest(
@ -53,6 +65,7 @@ public class ScanQueryQueryToolChest extends QueryToolChest<ScanResultValue, Sca
{
this.scanQueryConfig = scanQueryConfig;
this.queryMetricsFactory = queryMetricsFactory;
this.maxRowsForInMemoryTimeOrdering = scanQueryConfig.getMaxRowsTimeOrderedInMemory();
}
@Override
@ -69,11 +82,7 @@ public class ScanQueryQueryToolChest extends QueryToolChest<ScanResultValue, Sca
// the same way, even if they have different default legacy values.
final ScanQuery scanQuery = ((ScanQuery) queryPlus.getQuery()).withNonNullLegacy(scanQueryConfig);
final QueryPlus<ScanResultValue> queryPlusWithNonNullLegacy = queryPlus.withQuery(scanQuery);
if (scanQuery.getLimit() == Long.MAX_VALUE) {
return runner.run(queryPlusWithNonNullLegacy, responseContext);
}
return new BaseSequence<>(
BaseSequence.IteratorMaker scanQueryLimitRowIteratorMaker =
new BaseSequence.IteratorMaker<ScanResultValue, ScanQueryLimitRowIterator>()
{
@Override
@ -87,8 +96,77 @@ public class ScanQueryQueryToolChest extends QueryToolChest<ScanResultValue, Sca
{
CloseQuietly.close(iterFromMake);
}
};
if (scanQuery.getTimeOrder().equals(ScanQuery.TIME_ORDER_NONE) ||
scanQuery.getLimit() > maxRowsForInMemoryTimeOrdering) {
if (scanQuery.getLimit() == Long.MAX_VALUE) {
return runner.run(queryPlusWithNonNullLegacy, responseContext);
}
return new BaseSequence<>(scanQueryLimitRowIteratorMaker);
} else if (scanQuery.getTimeOrder().equals(ScanQuery.TIME_ORDER_ASCENDING) ||
scanQuery.getTimeOrder().equals(ScanQuery.TIME_ORDER_DESCENDING)) {
Comparator priorityQComparator = (val1, val2) -> {
int comparison;
ScanResultValue val1SRV = (ScanResultValue) val1,
val2SRV = (ScanResultValue) val2;
if (scanQuery.getResultFormat().equals(ScanQuery.RESULT_FORMAT_LIST)) {
comparison = Longs.compare(
(Long) ((Map<String, Object>) ((List) val1SRV.getEvents()).get(0)).get(ColumnHolder.TIME_COLUMN_NAME),
(Long) ((Map<String, Object>) ((List) val2SRV.getEvents()).get(0)).get(ColumnHolder.TIME_COLUMN_NAME)
);
} else if (scanQuery.getResultFormat().equals(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)) {
int val1TimeColumnIndex = val1SRV.getColumns().indexOf(ColumnHolder.TIME_COLUMN_NAME);
int val2TimeColumnIndex = val2SRV.getColumns().indexOf(ColumnHolder.TIME_COLUMN_NAME);
List<Object> event1 = (List<Object>) ((List<Object>) val1SRV.getEvents()).get(0);
List<Object> event2 = (List<Object>) ((List<Object>) val2SRV.getEvents()).get(0);
comparison = Longs.compare(
(Long) event1.get(val1TimeColumnIndex),
(Long) event2.get(val2TimeColumnIndex)
);
} else {
throw new UOE("Result format [%s] is not supported", scanQuery.getResultFormat());
}
);
if (scanQuery.getTimeOrder().equals(ScanQuery.TIME_ORDER_DESCENDING)) {
return comparison * -1;
}
return comparison;
};
// Converting the limit from long to int could theoretically throw an ArithmeticException but this branch
// only runs if limit < MAX_LIMIT_FOR_IN_MEMORY_TIME_ORDERING (which should be < Integer.MAX_VALUE)
PriorityQueue<Object> q = new PriorityQueue<>(Math.toIntExact(scanQuery.getLimit()), priorityQComparator);
Iterator<ScanResultValue> scanResultIterator = scanQueryLimitRowIteratorMaker.make();
while (scanResultIterator.hasNext()) {
ScanResultValue next = scanResultIterator.next();
List<Object> events = (List<Object>) next.getEvents();
for (Object event : events) {
// Using an intermediate unbatched ScanResultValue is not that great memory-wise, but the column list
// needs to be preserved for queries using the compactedList result format
q.offer(new ScanResultValue(null, next.getColumns(), Collections.singletonList(event)));
}
}
Iterator queueIterator = q.iterator();
return new BaseSequence(
new BaseSequence.IteratorMaker<ScanResultValue, ScanBatchedTimeOrderedQueueIterator>()
{
@Override
public ScanBatchedTimeOrderedQueueIterator make()
{
return new ScanBatchedTimeOrderedQueueIterator(queueIterator, scanQuery.getBatchSize());
}
@Override
public void cleanup(ScanBatchedTimeOrderedQueueIterator iterFromMake)
{
CloseQuietly.close(iterFromMake);
}
});
} else {
throw new UOE("Time ordering [%s] is not supported", scanQuery.getTimeOrder());
}
}
};
}
@ -131,4 +209,42 @@ public class ScanQueryQueryToolChest extends QueryToolChest<ScanResultValue, Sca
}
};
}
private class ScanBatchedTimeOrderedQueueIterator implements CloseableIterator<ScanResultValue>
{
private final Iterator<ScanResultValue> itr;
private final int batchSize;
public ScanBatchedTimeOrderedQueueIterator(Iterator<ScanResultValue> iterator, int batchSize)
{
itr = iterator;
this.batchSize = batchSize;
}
@Override
public void close() throws IOException
{
}
@Override
public boolean hasNext()
{
return itr.hasNext();
}
@Override
public ScanResultValue next()
{
// Create new scanresultvalue from event map
List<Object> eventsToAdd = new ArrayList<>(batchSize);
List<String> columns = new ArrayList<>();
while (eventsToAdd.size() < batchSize && itr.hasNext()) {
ScanResultValue srv = itr.next();
// Only replace once using the columns from the first event
columns = columns.isEmpty() ? srv.getColumns() : columns;
eventsToAdd.add(((List) srv.getEvents()).get(0));
}
return new ScanResultValue(null, columns, eventsToAdd);
}
}
}

View File

@ -26,6 +26,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.ObjectArrays;
import com.google.common.collect.Sets;
import com.google.common.hash.Hashing;
import org.apache.commons.lang.ArrayUtils;
import org.apache.druid.hll.HyperLogLogCollector;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
@ -64,6 +65,7 @@ import java.util.Map;
import java.util.Set;
/**
*
*/
@RunWith(Parameterized.class)
public class ScanQueryRunnerTest
@ -234,7 +236,11 @@ public class ScanQueryRunnerTest
{
ScanQuery query = newTestQuery()
.intervals(I_0112_0114)
.columns(ColumnHolder.TIME_COLUMN_NAME, QueryRunnerTestHelper.marketDimension, QueryRunnerTestHelper.indexMetric)
.columns(
ColumnHolder.TIME_COLUMN_NAME,
QueryRunnerTestHelper.marketDimension,
QueryRunnerTestHelper.indexMetric
)
.build();
HashMap<String, Object> context = new HashMap<String, Object>();
@ -508,6 +514,249 @@ public class ScanQueryRunnerTest
verify(expectedResults, results);
}
@Test
public void testFullOnSelectWithFilterLimitAndAscendingTimeOrderingListFormat()
{
// limits
for (int limit : new int[]{3, 1, 5, 7, 0}) {
ScanQuery query = newTestQuery()
.intervals(I_0112_0114)
.filters(new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot", null))
.columns(QueryRunnerTestHelper.qualityDimension, QueryRunnerTestHelper.indexMetric)
.limit(limit)
.timeOrder("ascending")
.build();
HashMap<String, Object> context = new HashMap<>();
Iterable<ScanResultValue> results = runner.run(QueryPlus.wrap(query), context).toList();
String[] seg1Results = new String[]{
"2011-01-12T00:00:00.000Z\tspot\tautomotive\tpreferred\tapreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\tbusiness\tpreferred\tbpreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\tentertainment\tpreferred\tepreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\thealth\tpreferred\thpreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\tmezzanine\tpreferred\tmpreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\tnews\tpreferred\tnpreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\tpremium\tpreferred\tppreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\ttechnology\tpreferred\ttpreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\ttravel\tpreferred\ttpreferred\t100.000000"
};
String[] seg2Results = new String[]{
"2011-01-13T00:00:00.000Z\tspot\tautomotive\tpreferred\tapreferred\t94.874713",
"2011-01-13T00:00:00.000Z\tspot\tbusiness\tpreferred\tbpreferred\t103.629399",
"2011-01-13T00:00:00.000Z\tspot\tentertainment\tpreferred\tepreferred\t110.087299",
"2011-01-13T00:00:00.000Z\tspot\thealth\tpreferred\thpreferred\t114.947403",
"2011-01-13T00:00:00.000Z\tspot\tmezzanine\tpreferred\tmpreferred\t104.465767",
"2011-01-13T00:00:00.000Z\tspot\tnews\tpreferred\tnpreferred\t102.851683",
"2011-01-13T00:00:00.000Z\tspot\tpremium\tpreferred\tppreferred\t108.863011",
"2011-01-13T00:00:00.000Z\tspot\ttechnology\tpreferred\ttpreferred\t111.356672",
"2011-01-13T00:00:00.000Z\tspot\ttravel\tpreferred\ttpreferred\t106.236928"
};
final List<List<Map<String, Object>>> ascendingEvents = toEvents(
new String[]{
legacy ? getTimestampName() + ":TIME" : null,
null,
QueryRunnerTestHelper.qualityDimension + ":STRING",
null,
null,
QueryRunnerTestHelper.indexMetric + ":DOUBLE"
},
(String[]) ArrayUtils.addAll(seg1Results, seg2Results)
);
List<ScanResultValue> ascendingExpectedResults = toExpected(
ascendingEvents,
legacy ? Lists.newArrayList(getTimestampName(), "quality", "index") : Lists.newArrayList("quality", "index"),
0,
limit
);
verify(ascendingExpectedResults, results);
}
}
@Test
public void testFullOnSelectWithFilterLimitAndDescendingTimeOrderingListFormat()
{
// limits
for (int limit : new int[]{3, 1, 5, 7, 0}) {
ScanQuery query = newTestQuery()
.intervals(I_0112_0114)
.filters(new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot", null))
.columns(QueryRunnerTestHelper.qualityDimension, QueryRunnerTestHelper.indexMetric)
.limit(limit)
.timeOrder("descending")
.build();
HashMap<String, Object> context = new HashMap<>();
Iterable<ScanResultValue> results = runner.run(QueryPlus.wrap(query), context).toList();
String[] seg1Results = new String[]{
"2011-01-12T00:00:00.000Z\tspot\tautomotive\tpreferred\tapreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\tbusiness\tpreferred\tbpreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\tentertainment\tpreferred\tepreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\thealth\tpreferred\thpreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\tmezzanine\tpreferred\tmpreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\tnews\tpreferred\tnpreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\tpremium\tpreferred\tppreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\ttechnology\tpreferred\ttpreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\ttravel\tpreferred\ttpreferred\t100.000000"
};
String[] seg2Results = new String[]{
"2011-01-13T00:00:00.000Z\tspot\tautomotive\tpreferred\tapreferred\t94.874713",
"2011-01-13T00:00:00.000Z\tspot\tbusiness\tpreferred\tbpreferred\t103.629399",
"2011-01-13T00:00:00.000Z\tspot\tentertainment\tpreferred\tepreferred\t110.087299",
"2011-01-13T00:00:00.000Z\tspot\thealth\tpreferred\thpreferred\t114.947403",
"2011-01-13T00:00:00.000Z\tspot\tmezzanine\tpreferred\tmpreferred\t104.465767",
"2011-01-13T00:00:00.000Z\tspot\tnews\tpreferred\tnpreferred\t102.851683",
"2011-01-13T00:00:00.000Z\tspot\tpremium\tpreferred\tppreferred\t108.863011",
"2011-01-13T00:00:00.000Z\tspot\ttechnology\tpreferred\ttpreferred\t111.356672",
"2011-01-13T00:00:00.000Z\tspot\ttravel\tpreferred\ttpreferred\t106.236928"
};
String[] expectedRet = (String[]) ArrayUtils.addAll(seg1Results, seg2Results);
ArrayUtils.reverse(expectedRet);
final List<List<Map<String, Object>>> descendingEvents = toEvents(
new String[]{
legacy ? getTimestampName() + ":TIME" : null,
null,
QueryRunnerTestHelper.qualityDimension + ":STRING",
null,
null,
QueryRunnerTestHelper.indexMetric + ":DOUBLE"
},
expectedRet
);
List<ScanResultValue> descendingExpectedResults = toExpected(
descendingEvents,
legacy ? Lists.newArrayList(getTimestampName(), "quality", "index") : Lists.newArrayList("quality", "index"),
0,
limit
);
verify(descendingExpectedResults, results);
}
}
@Test
public void testFullOnSelectWithFilterLimitAndAscendingTimeOrderingCompactedListFormat()
{
String[] seg1Results = new String[]{
"2011-01-12T00:00:00.000Z\tspot\tautomotive\tpreferred\tapreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\tbusiness\tpreferred\tbpreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\tentertainment\tpreferred\tepreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\thealth\tpreferred\thpreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\tmezzanine\tpreferred\tmpreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\tnews\tpreferred\tnpreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\tpremium\tpreferred\tppreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\ttechnology\tpreferred\ttpreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\ttravel\tpreferred\ttpreferred\t100.000000"
};
String[] seg2Results = new String[]{
"2011-01-13T00:00:00.000Z\tspot\tautomotive\tpreferred\tapreferred\t94.874713",
"2011-01-13T00:00:00.000Z\tspot\tbusiness\tpreferred\tbpreferred\t103.629399",
"2011-01-13T00:00:00.000Z\tspot\tentertainment\tpreferred\tepreferred\t110.087299",
"2011-01-13T00:00:00.000Z\tspot\thealth\tpreferred\thpreferred\t114.947403",
"2011-01-13T00:00:00.000Z\tspot\tmezzanine\tpreferred\tmpreferred\t104.465767",
"2011-01-13T00:00:00.000Z\tspot\tnews\tpreferred\tnpreferred\t102.851683",
"2011-01-13T00:00:00.000Z\tspot\tpremium\tpreferred\tppreferred\t108.863011",
"2011-01-13T00:00:00.000Z\tspot\ttechnology\tpreferred\ttpreferred\t111.356672",
"2011-01-13T00:00:00.000Z\tspot\ttravel\tpreferred\ttpreferred\t106.236928"
};
// limits
for (int limit : new int[]{3, 1, 5, 7, 0}) {
/* Ascending */
ScanQuery query = newTestQuery()
.intervals(I_0112_0114)
.filters(new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot", null))
.columns(QueryRunnerTestHelper.qualityDimension, QueryRunnerTestHelper.indexMetric)
.resultFormat("compactedList")
.timeOrder("ascending")
.limit(limit)
.build();
HashMap<String, Object> context = new HashMap<>();
Iterable<ScanResultValue> results = runner.run(QueryPlus.wrap(query), context).toList();
final List<List<Map<String, Object>>> ascendingEvents = toEvents(
new String[]{
legacy ? getTimestampName() + ":TIME" : null,
null,
QueryRunnerTestHelper.qualityDimension + ":STRING",
null,
null,
QueryRunnerTestHelper.indexMetric + ":DOUBLE"
},
(String[]) ArrayUtils.addAll(seg1Results, seg2Results)
);
List<ScanResultValue> ascendingExpectedResults = toExpected(
ascendingEvents,
legacy ? Lists.newArrayList(getTimestampName(), "quality", "index") : Lists.newArrayList("quality", "index"),
0,
limit
);
results = compactedListToRow(results);
verify(ascendingExpectedResults, results);
}
}
@Test
public void testFullOnSelectWithFilterLimitAndDescendingTimeOrderingCompactedListFormat()
{
String[] seg1Results = new String[]{
"2011-01-12T00:00:00.000Z\tspot\tautomotive\tpreferred\tapreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\tbusiness\tpreferred\tbpreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\tentertainment\tpreferred\tepreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\thealth\tpreferred\thpreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\tmezzanine\tpreferred\tmpreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\tnews\tpreferred\tnpreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\tpremium\tpreferred\tppreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\ttechnology\tpreferred\ttpreferred\t100.000000",
"2011-01-12T00:00:00.000Z\tspot\ttravel\tpreferred\ttpreferred\t100.000000"
};
String[] seg2Results = new String[]{
"2011-01-13T00:00:00.000Z\tspot\tautomotive\tpreferred\tapreferred\t94.874713",
"2011-01-13T00:00:00.000Z\tspot\tbusiness\tpreferred\tbpreferred\t103.629399",
"2011-01-13T00:00:00.000Z\tspot\tentertainment\tpreferred\tepreferred\t110.087299",
"2011-01-13T00:00:00.000Z\tspot\thealth\tpreferred\thpreferred\t114.947403",
"2011-01-13T00:00:00.000Z\tspot\tmezzanine\tpreferred\tmpreferred\t104.465767",
"2011-01-13T00:00:00.000Z\tspot\tnews\tpreferred\tnpreferred\t102.851683",
"2011-01-13T00:00:00.000Z\tspot\tpremium\tpreferred\tppreferred\t108.863011",
"2011-01-13T00:00:00.000Z\tspot\ttechnology\tpreferred\ttpreferred\t111.356672",
"2011-01-13T00:00:00.000Z\tspot\ttravel\tpreferred\ttpreferred\t106.236928"
};
// limits
for (int limit : new int[]{3, 1, 5, 7, 0}) {
/* Descending */
ScanQuery query = newTestQuery()
.intervals(I_0112_0114)
.filters(new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot", null))
.columns(QueryRunnerTestHelper.qualityDimension, QueryRunnerTestHelper.indexMetric)
.resultFormat("compactedList")
.timeOrder("descending")
.limit(limit)
.build();
HashMap<String, Object> context = new HashMap<>();
Iterable<ScanResultValue> results = runner.run(QueryPlus.wrap(query), context).toList();
String[] expectedRet = (String[]) ArrayUtils.addAll(seg1Results, seg2Results);
ArrayUtils.reverse(expectedRet);
final List<List<Map<String, Object>>> descendingEvents = toEvents(
new String[]{
legacy ? getTimestampName() + ":TIME" : null,
null,
QueryRunnerTestHelper.qualityDimension + ":STRING",
null,
null,
QueryRunnerTestHelper.indexMetric + ":DOUBLE"
},
expectedRet //segments in reverse order from above
);
List<ScanResultValue> descendingExpectedResults = toExpected(
descendingEvents,
legacy ? Lists.newArrayList(getTimestampName(), "quality", "index") : Lists.newArrayList("quality", "index"),
0,
limit
);
results = compactedListToRow(results);
verify(descendingExpectedResults, results);
}
}
private List<List<Map<String, Object>>> toFullEvents(final String[]... valueSet)
{
return toEvents(
@ -686,7 +935,12 @@ public class ScanQueryRunnerTest
Object exValue = ex.getValue();
if (exValue instanceof Double || exValue instanceof Float) {
final double expectedDoubleValue = ((Number) exValue).doubleValue();
Assert.assertEquals("invalid value for " + ex.getKey(), expectedDoubleValue, ((Number) actVal).doubleValue(), expectedDoubleValue * 1e-6);
Assert.assertEquals(
"invalid value for " + ex.getKey(),
expectedDoubleValue,
((Number) actVal).doubleValue(),
expectedDoubleValue * 1e-6
);
} else {
Assert.assertEquals("invalid value for " + ex.getKey(), ex.getValue(), actVal);
}

View File

@ -53,6 +53,7 @@ public class ScanQuerySpecTest
+ "\"resultFormat\":\"list\","
+ "\"batchSize\":20480,"
+ "\"limit\":3,"
+ "\"timeOrder\":\"none\","
+ "\"filter\":null,"
+ "\"columns\":[\"market\",\"quality\",\"index\"],"
+ "\"legacy\":null,"
@ -70,7 +71,8 @@ public class ScanQuerySpecTest
null,
Arrays.asList("market", "quality", "index"),
null,
null
null,
"none"
);
String actual = jsonMapper.writeValueAsString(query);

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.scan;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ScanResultValueSerdeTest
{
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
@Test
public void testSerdeScanResultValueCompactedList() throws IOException
{
String segmentId = "some_segment_id";
List<String> columns = new ArrayList<>(Arrays.asList("col1", "col2", "col3"));
List<String> event = new ArrayList<>(Arrays.asList(
"prop1",
"prop2",
"prop3"
));
List<List<String>> events = new ArrayList<>(Collections.singletonList(event));
ScanResultValue srv = new ScanResultValue(segmentId, columns, events);
String serialized = jsonMapper.writeValueAsString(srv);
ScanResultValue deserialized = jsonMapper.readValue(serialized, ScanResultValue.class);
Assert.assertEquals(srv, deserialized);
}
@Test
public void testSerdeScanResultValueNonCompactedList() throws IOException
{
String segmentId = "some_segment_id";
List<String> columns = new ArrayList<>(Arrays.asList("col1", "col2", "col3"));
Map<String, Object> event = new HashMap<>();
event.put("key1", new Integer(4));
event.put("key2", "some_string");
event.put("key3", new Double(4.1));
List<Map<String, Object>> events = new ArrayList<>(Collections.singletonList(event));
ScanResultValue srv = new ScanResultValue(segmentId, columns, events);
String serialized = jsonMapper.writeValueAsString(srv);
ScanResultValue deserialized = jsonMapper.readValue(serialized, ScanResultValue.class);
Assert.assertEquals(srv, deserialized);
}
}

View File

@ -199,7 +199,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
}
/**
* This class essentially incapsulates the major part of the logic of {@link CachingClusteredClient}. It's state and
* This class essentially encapsulates the major part of the logic of {@link CachingClusteredClient}. It's state and
* methods couldn't belong to {@link CachingClusteredClient} itself, because they depend on the specific query object
* being run, but {@link QuerySegmentWalker} API is designed so that implementations should be able to accept
* arbitrary queries.

View File

@ -967,7 +967,8 @@ public class DruidQuery
filtration.getDimFilter(),
Ordering.natural().sortedCopy(ImmutableSet.copyOf(outputRowSignature.getRowOrder())),
false,
ImmutableSortedMap.copyOf(plannerContext.getQueryContext())
ImmutableSortedMap.copyOf(plannerContext.getQueryContext()),
null // Will default to "none"
);
}