mirror of https://github.com/apache/druid.git
Wrote docs
This commit is contained in:
parent
3b923dac9c
commit
06a5218917
|
@ -299,7 +299,8 @@ public class ScanBenchmark
|
||||||
config,
|
config,
|
||||||
DefaultGenericQueryMetricsFactory.instance()
|
DefaultGenericQueryMetricsFactory.instance()
|
||||||
),
|
),
|
||||||
new ScanQueryEngine()
|
new ScanQueryEngine(),
|
||||||
|
new ScanQueryConfig()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -25,9 +25,9 @@ title: "Scan query"
|
||||||
# Scan query
|
# Scan query
|
||||||
|
|
||||||
The Scan query returns raw Druid rows in streaming mode. The biggest difference between the Select query and the Scan
|
The Scan query returns raw Druid rows in streaming mode. The biggest difference between the Select query and the Scan
|
||||||
query is that the Scan query does not retain all the returned rows in memory before they are returned to the client
|
query is that the Scan query does not retain all the returned rows in memory before they are returned to the client.
|
||||||
(except when time-ordering is used). The Select query _will_ retain the rows in memory, causing memory pressure if too
|
The Select query _will_ retain the rows in memory, causing memory pressure if too many rows are returned.
|
||||||
many rows are returned. The Scan query can return all the rows without issuing another pagination query.
|
The Scan query can return all the rows without issuing another pagination query.
|
||||||
|
|
||||||
In addition to straightforward usage where a Scan query is issued to the Broker, the Scan query can also be issued
|
In addition to straightforward usage where a Scan query is issued to the Broker, the Scan query can also be issued
|
||||||
directly to Historical processes or streaming ingestion tasks. This can be useful if you want to retrieve large
|
directly to Historical processes or streaming ingestion tasks. This can be useful if you want to retrieve large
|
||||||
|
@ -155,12 +155,31 @@ The format of the result when resultFormat equals `compactedList`:
|
||||||
|
|
||||||
## Time Ordering
|
## Time Ordering
|
||||||
|
|
||||||
The Scan query currently supports ordering based on timestamp for non-legacy queries where the limit is less than
|
The Scan query currently supports ordering based on timestamp for non-legacy queries. Note that using time ordering
|
||||||
`druid.query.scan.maxRowsTimeOrderedInMemory` rows. The default value of `druid.query.scan.maxRowsTimeOrderedInMemory`
|
will yield results that do not indicate which segment rows are from. Furthermore, time ordering is only supported
|
||||||
is 100000 rows. The reasoning behind this limit is that the current implementation of time ordering sorts all returned
|
where the result set limit is less than `druid.query.scan.maxRowsQueuedForTimeOrdering` rows and less than
|
||||||
records in memory. Attempting to load too many rows into memory runs the risk of Broker nodes running out of memory.
|
`druid.query.scan.maxSegmentsTimeOrderedInMemory` segments are scanned per Historical. The reasoning behind these
|
||||||
The limit can be configured based on server memory and number of dimensions being queried.
|
limitations is that the implementation of time ordering uses two strategies that can consume too much heap memory
|
||||||
|
if left unbounded. These strategies (listed below) are chosen on a per-Historical basis depending on query
|
||||||
|
result set limit and the number of segments being scanned.
|
||||||
|
|
||||||
|
1. Priority Queue: Each segment on a Historical is opened sequentially. Every row is added to a bounded priority
|
||||||
|
queue which is ordered by timestamp. For every row above the result set limit, the row with the earliest (if descending)
|
||||||
|
or latest (if ascending) timestamp will be dequeued. After every row has been processed, the sorted contents of the
|
||||||
|
priority queue are streamed back to the Broker(s) in batches. Attempting to load too many rows into memory runs the
|
||||||
|
risk of Historical nodes running out of memory. The `druid.query.scan.maxRowsQueuedForTimeOrdering` property protects
|
||||||
|
from this by limiting the number of rows in the query result set when time ordering is used.
|
||||||
|
|
||||||
|
2. K-Way/N-Way Merge: Each segment on a Historical is opened in parallel. Since each segment's rows are already
|
||||||
|
time-ordered, a k-way merge can be performed on the results from each segment. This approach doesn't persist the entire
|
||||||
|
result set in memory (like the Priority Queue) as it streams back batches as they are returned from the merge function.
|
||||||
|
However, attempting to query too many segments could also result in high memory usage due to the need to open
|
||||||
|
decompression and decoding buffers for each. The `druid.query.scan.maxSegmentsTimeOrderedInMemory` limit protects
|
||||||
|
from this by capping the number of segments opened per historical when time ordering is used.
|
||||||
|
|
||||||
|
Both `druid.query.scan.maxRowsQueuedForTimeOrdering` and `druid.query.scan.maxSegmentsTimeOrderedInMemory` are
|
||||||
|
configurable and can be tuned based on hardware specs and number of dimensions being queried.
|
||||||
|
|
||||||
## Legacy mode
|
## Legacy mode
|
||||||
|
|
||||||
The Scan query supports a legacy mode designed for protocol compatibility with the former scan-query contrib extension.
|
The Scan query supports a legacy mode designed for protocol compatibility with the former scan-query contrib extension.
|
||||||
|
@ -180,5 +199,6 @@ is complete.
|
||||||
|
|
||||||
|property|description|values|default|
|
|property|description|values|default|
|
||||||
|--------|-----------|------|-------|
|
|--------|-----------|------|-------|
|
||||||
|druid.query.scan.maxRowsTimeOrderedInMemory|An integer in the range [0, 2147483647]|100000|
|
|druid.query.scan.maxRowsQueuedForTimeOrdering|The maximum number of rows returned when time ordering is used|An integer in [0, 2147483647]|100000|
|
||||||
|druid.query.scan.legacy|Whether legacy mode should be turned on for Scan queries|true or false|false|
|
|druid.query.scan.maxSegmentsTimeOrderedInMemory|The maximum number of segments scanned per historical when time ordering is used|An integer in [0, 2147483647]|50|
|
||||||
|
|druid.query.scan.legacy|Whether legacy mode should be turned on for Scan queries|true or false|false|
|
|
@ -2423,7 +2423,8 @@ public class KafkaIndexTaskTest
|
||||||
new ScanQueryConfig(),
|
new ScanQueryConfig(),
|
||||||
new DefaultGenericQueryMetricsFactory(TestHelper.makeJsonMapper())
|
new DefaultGenericQueryMetricsFactory(TestHelper.makeJsonMapper())
|
||||||
),
|
),
|
||||||
new ScanQueryEngine()
|
new ScanQueryEngine(),
|
||||||
|
new ScanQueryConfig()
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
.build()
|
.build()
|
||||||
|
|
|
@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import com.fasterxml.jackson.annotation.JsonValue;
|
import com.fasterxml.jackson.annotation.JsonValue;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.common.collect.Ordering;
|
||||||
import org.apache.druid.java.util.common.StringUtils;
|
import org.apache.druid.java.util.common.StringUtils;
|
||||||
import org.apache.druid.query.BaseQuery;
|
import org.apache.druid.query.BaseQuery;
|
||||||
import org.apache.druid.query.DataSource;
|
import org.apache.druid.query.DataSource;
|
||||||
|
@ -207,6 +208,12 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
|
||||||
return legacy;
|
return legacy;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Ordering<ScanResultValue> getResultOrdering()
|
||||||
|
{
|
||||||
|
return Ordering.from(new ScanResultValueTimestampComparator(this));
|
||||||
|
}
|
||||||
|
|
||||||
public ScanQuery withNonNullLegacy(final ScanQueryConfig scanQueryConfig)
|
public ScanQuery withNonNullLegacy(final ScanQueryConfig scanQueryConfig)
|
||||||
{
|
{
|
||||||
return Druids.ScanQueryBuilder.copy(this).legacy(legacy != null ? legacy : scanQueryConfig.isLegacy()).build();
|
return Druids.ScanQueryBuilder.copy(this).legacy(legacy != null ? legacy : scanQueryConfig.isLegacy()).build();
|
||||||
|
|
|
@ -137,7 +137,6 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
|
||||||
Math.toIntExact(query.getLimit())
|
Math.toIntExact(query.getLimit())
|
||||||
);
|
);
|
||||||
|
|
||||||
// Batch the scan result values
|
|
||||||
return new ScanResultValueBatchingSequence(unbatched, query.getBatchSize());
|
return new ScanResultValueBatchingSequence(unbatched, query.getBatchSize());
|
||||||
} else if (query.getLimit() > scanQueryConfig.getMaxRowsQueuedForTimeOrdering()) {
|
} else if (query.getLimit() > scanQueryConfig.getMaxRowsQueuedForTimeOrdering()) {
|
||||||
throw new UOE(
|
throw new UOE(
|
||||||
|
@ -307,7 +306,8 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
|
||||||
Yielder<ScanResultValue> inputYielder;
|
Yielder<ScanResultValue> inputYielder;
|
||||||
int batchSize;
|
int batchSize;
|
||||||
|
|
||||||
public ScanResultValueBatchingSequence(Sequence<ScanResultValue> inputSequence, int batchSize) {
|
public ScanResultValueBatchingSequence(Sequence<ScanResultValue> inputSequence, int batchSize)
|
||||||
|
{
|
||||||
this.inputYielder = inputSequence.toYielder(
|
this.inputYielder = inputSequence.toYielder(
|
||||||
null,
|
null,
|
||||||
new YieldingAccumulator<ScanResultValue, ScanResultValue>()
|
new YieldingAccumulator<ScanResultValue, ScanResultValue>()
|
||||||
|
@ -324,9 +324,9 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Nullable
|
|
||||||
public <OutType> Yielder<OutType> toYielder(
|
public <OutType> Yielder<OutType> toYielder(
|
||||||
OutType initValue, YieldingAccumulator<OutType, ScanResultValue> accumulator
|
OutType initValue,
|
||||||
|
YieldingAccumulator<OutType, ScanResultValue> accumulator
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
return makeYielder(initValue, accumulator);
|
return makeYielder(initValue, accumulator);
|
||||||
|
@ -354,7 +354,8 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
return (OutType) new ScanResultValue(null, columns, eventsToAdd);
|
return (OutType) new ScanResultValue(null, columns, eventsToAdd);
|
||||||
} catch (ClassCastException e) {
|
}
|
||||||
|
catch (ClassCastException e) {
|
||||||
return initVal;
|
return initVal;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -75,16 +75,16 @@ public class ScanResultValue implements Comparable<ScanResultValue>
|
||||||
return events;
|
return events;
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getFirstEventTimestamp(ScanQuery query)
|
public long getFirstEventTimestamp(ScanQuery.ResultFormat resultFormat)
|
||||||
{
|
{
|
||||||
if (query.getResultFormat().equals(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)) {
|
if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)) {
|
||||||
return (Long) ((Map<String, Object>) ((List<Object>) this.getEvents()).get(0)).get(ColumnHolder.TIME_COLUMN_NAME);
|
return (Long) ((Map<String, Object>) ((List<Object>) this.getEvents()).get(0)).get(ColumnHolder.TIME_COLUMN_NAME);
|
||||||
} else if (query.getResultFormat().equals(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)) {
|
} else if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)) {
|
||||||
int timeColumnIndex = this.getColumns().indexOf(ColumnHolder.TIME_COLUMN_NAME);
|
int timeColumnIndex = this.getColumns().indexOf(ColumnHolder.TIME_COLUMN_NAME);
|
||||||
List<Object> firstEvent = (List<Object>) ((List<Object>) this.getEvents()).get(0);
|
List<Object> firstEvent = (List<Object>) ((List<Object>) this.getEvents()).get(0);
|
||||||
return (Long) firstEvent.get(timeColumnIndex);
|
return (Long) firstEvent.get(timeColumnIndex);
|
||||||
}
|
}
|
||||||
throw new UOE("Unable to get first event timestamp using result format of [%s]", query.getResultFormat());
|
throw new UOE("Unable to get first event timestamp using result format of [%s]", resultFormat.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<ScanResultValue> toSingleEventScanResultValues()
|
public List<ScanResultValue> toSingleEventScanResultValues()
|
||||||
|
|
|
@ -44,8 +44,8 @@ public class ScanResultValueTimestampComparator implements Comparator<ScanResult
|
||||||
{
|
{
|
||||||
int comparison;
|
int comparison;
|
||||||
comparison = Longs.compare(
|
comparison = Longs.compare(
|
||||||
o1.getFirstEventTimestamp(scanQuery),
|
o1.getFirstEventTimestamp(scanQuery.getResultFormat()),
|
||||||
o2.getFirstEventTimestamp(scanQuery));
|
o2.getFirstEventTimestamp(scanQuery.getResultFormat()));
|
||||||
if (scanQuery.getTimeOrder().equals(ScanQuery.TimeOrder.DESCENDING)) {
|
if (scanQuery.getTimeOrder().equals(ScanQuery.TimeOrder.DESCENDING)) {
|
||||||
return comparison;
|
return comparison;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,67 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.query.scan;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class ScanQueryConfigTest
|
||||||
|
{
|
||||||
|
private final ObjectMapper MAPPER = new DefaultObjectMapper();
|
||||||
|
|
||||||
|
private final ImmutableMap<String, String> CONFIG_MAP = ImmutableMap
|
||||||
|
.<String, String>builder()
|
||||||
|
.put("maxSegmentsTimeOrderedInMemory", "1")
|
||||||
|
.put("maxRowsQueuedForTimeOrdering", "1")
|
||||||
|
.put("legacy", "true")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
private final ImmutableMap<String, String> CONFIG_MAP2 = ImmutableMap
|
||||||
|
.<String, String>builder()
|
||||||
|
.put("legacy", "false")
|
||||||
|
.put("maxSegmentsTimeOrderedInMemory", "42")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
private final ImmutableMap<String, String> CONFIG_MAP_EMPTY = ImmutableMap
|
||||||
|
.<String, String>builder()
|
||||||
|
.build();
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSerde()
|
||||||
|
{
|
||||||
|
final ScanQueryConfig config = MAPPER.convertValue(CONFIG_MAP, ScanQueryConfig.class);
|
||||||
|
Assert.assertEquals(1, config.getMaxRowsQueuedForTimeOrdering());
|
||||||
|
Assert.assertEquals(1, config.getMaxSegmentsTimeOrderedInMemory());
|
||||||
|
Assert.assertTrue(config.isLegacy());
|
||||||
|
|
||||||
|
final ScanQueryConfig config2 = MAPPER.convertValue(CONFIG_MAP2, ScanQueryConfig.class);
|
||||||
|
Assert.assertEquals(100000, config2.getMaxRowsQueuedForTimeOrdering());
|
||||||
|
Assert.assertEquals(42, config2.getMaxSegmentsTimeOrderedInMemory());
|
||||||
|
Assert.assertFalse(config2.isLegacy());
|
||||||
|
|
||||||
|
final ScanQueryConfig config3 = MAPPER.convertValue(CONFIG_MAP_EMPTY, ScanQueryConfig.class);
|
||||||
|
Assert.assertEquals(100000, config3.getMaxRowsQueuedForTimeOrdering());
|
||||||
|
Assert.assertEquals(50, config3.getMaxSegmentsTimeOrderedInMemory());
|
||||||
|
Assert.assertFalse(config3.isLegacy());
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,224 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing,
|
|
||||||
* software distributed under the License is distributed on an
|
|
||||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
||||||
* KIND, either express or implied. See the License for the
|
|
||||||
* specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.druid.query.scan;
|
|
||||||
|
|
||||||
import org.apache.druid.query.Druids;
|
|
||||||
import org.apache.druid.query.Query;
|
|
||||||
import org.apache.druid.query.QueryRunner;
|
|
||||||
import org.apache.druid.query.QuerySegmentWalker;
|
|
||||||
import org.apache.druid.query.spec.QuerySegmentSpec;
|
|
||||||
import org.apache.druid.segment.column.ColumnHolder;
|
|
||||||
import org.joda.time.Interval;
|
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.BeforeClass;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import static org.easymock.EasyMock.createNiceMock;
|
|
||||||
import static org.easymock.EasyMock.expect;
|
|
||||||
import static org.easymock.EasyMock.replay;
|
|
||||||
|
|
||||||
public class ScanQueryQueryToolChestTest
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
private static ScanQueryQueryToolChest chest;
|
|
||||||
private static ScanQueryConfig config;
|
|
||||||
private static int numElements;
|
|
||||||
private static QuerySegmentSpec emptySegmentSpec;
|
|
||||||
|
|
||||||
@BeforeClass
|
|
||||||
public static void setup()
|
|
||||||
{
|
|
||||||
|
|
||||||
config = createNiceMock(ScanQueryConfig.class);
|
|
||||||
expect(config.getMaxRowsQueuedForTimeOrdering()).andReturn(100000);
|
|
||||||
replay(config);
|
|
||||||
chest = new ScanQueryQueryToolChest(config, null);
|
|
||||||
numElements = 1000;
|
|
||||||
emptySegmentSpec = new QuerySegmentSpec()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public List<Interval> getIntervals()
|
|
||||||
{
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public <T> QueryRunner<T> lookup(
|
|
||||||
Query<T> query,
|
|
||||||
QuerySegmentWalker walker
|
|
||||||
)
|
|
||||||
{
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testDescendingHeapsortListScanResultValues()
|
|
||||||
{
|
|
||||||
List<ScanResultValue> inputs = new ArrayList<>();
|
|
||||||
for (long i = 0; i < numElements; i++) {
|
|
||||||
HashMap<String, Object> event = new HashMap<>();
|
|
||||||
event.put("__time", i * 1000);
|
|
||||||
inputs.add(
|
|
||||||
new ScanResultValue(
|
|
||||||
"some segment id",
|
|
||||||
Collections.singletonList("__time"),
|
|
||||||
Collections.singletonList(event)
|
|
||||||
)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
ScanQuery scanQuery = new Druids.ScanQueryBuilder()
|
|
||||||
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)
|
|
||||||
.timeOrder(ScanQuery.TimeOrder.DESCENDING)
|
|
||||||
.dataSource("some data source")
|
|
||||||
.intervals(emptySegmentSpec)
|
|
||||||
.limit(99999)
|
|
||||||
.build();
|
|
||||||
Iterator<ScanResultValue> sorted = chest.sortAndLimitScanResultValues(inputs.iterator(), scanQuery);
|
|
||||||
|
|
||||||
int count = 0;
|
|
||||||
Long previousTime = Long.MAX_VALUE;
|
|
||||||
while (sorted.hasNext()) {
|
|
||||||
count++;
|
|
||||||
ScanResultValue curr = sorted.next();
|
|
||||||
Long currentTime = (Long)
|
|
||||||
((Map<String, Object>) (((List<Object>) curr.getEvents()).get(0))).get(ColumnHolder.TIME_COLUMN_NAME);
|
|
||||||
Assert.assertTrue(currentTime < previousTime);
|
|
||||||
previousTime = currentTime;
|
|
||||||
}
|
|
||||||
Assert.assertEquals(numElements, count);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testAscendingHeapsortListScanResultValues()
|
|
||||||
{
|
|
||||||
List<ScanResultValue> inputs = new ArrayList<>();
|
|
||||||
for (long i = numElements; i > 0; i--) {
|
|
||||||
HashMap<String, Object> event = new HashMap<>();
|
|
||||||
event.put("__time", i * 1000);
|
|
||||||
inputs.add(
|
|
||||||
new ScanResultValue(
|
|
||||||
"some segment id",
|
|
||||||
Collections.singletonList("__time"),
|
|
||||||
Collections.singletonList(event)
|
|
||||||
)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
ScanQuery scanQuery = new Druids.ScanQueryBuilder()
|
|
||||||
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)
|
|
||||||
.timeOrder(ScanQuery.TimeOrder.ASCENDING)
|
|
||||||
.dataSource("some data source")
|
|
||||||
.intervals(emptySegmentSpec)
|
|
||||||
.limit(99999)
|
|
||||||
.build();
|
|
||||||
Iterator<ScanResultValue> sorted = chest.sortAndLimitScanResultValues(inputs.iterator(), scanQuery);
|
|
||||||
|
|
||||||
int count = 0;
|
|
||||||
Long previousTime = -1L;
|
|
||||||
while (sorted.hasNext()) {
|
|
||||||
count++;
|
|
||||||
ScanResultValue curr = sorted.next();
|
|
||||||
Long currentTime = (Long)
|
|
||||||
((Map<String, Object>) (((List<Object>) curr.getEvents()).get(0))).get(ColumnHolder.TIME_COLUMN_NAME);
|
|
||||||
Assert.assertTrue(currentTime > previousTime);
|
|
||||||
previousTime = currentTime;
|
|
||||||
}
|
|
||||||
Assert.assertEquals(numElements, count);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testDescendingHeapsortCompactedListScanResultValues()
|
|
||||||
{
|
|
||||||
List<ScanResultValue> inputs = new ArrayList<>();
|
|
||||||
for (long i = 0; i < numElements; i++) {
|
|
||||||
inputs.add(
|
|
||||||
new ScanResultValue(
|
|
||||||
"some segment id",
|
|
||||||
Collections.singletonList("__time"),
|
|
||||||
Collections.singletonList(Collections.singletonList(new Long(i * 1000)))
|
|
||||||
)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
ScanQuery scanQuery = new Druids.ScanQueryBuilder()
|
|
||||||
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
|
|
||||||
.timeOrder(ScanQuery.TimeOrder.DESCENDING)
|
|
||||||
.dataSource("some data source")
|
|
||||||
.intervals(emptySegmentSpec)
|
|
||||||
.limit(99999)
|
|
||||||
.build();
|
|
||||||
Iterator<ScanResultValue> sorted = chest.sortAndLimitScanResultValues(inputs.iterator(), scanQuery);
|
|
||||||
|
|
||||||
Long previousTime = Long.MAX_VALUE;
|
|
||||||
int count = 0;
|
|
||||||
while (sorted.hasNext()) {
|
|
||||||
count++;
|
|
||||||
ScanResultValue curr = sorted.next();
|
|
||||||
Long currentTime = (Long)
|
|
||||||
((List<Object>) (((List<Object>) curr.getEvents()).get(0))).get(0);
|
|
||||||
Assert.assertTrue(currentTime < previousTime);
|
|
||||||
previousTime = currentTime;
|
|
||||||
}
|
|
||||||
Assert.assertEquals(numElements, count);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testAscendingHeapsortCompactedListScanResultValues()
|
|
||||||
{
|
|
||||||
List<ScanResultValue> inputs = new ArrayList<>();
|
|
||||||
for (long i = numElements; i > 0; i--) {
|
|
||||||
inputs.add(
|
|
||||||
new ScanResultValue(
|
|
||||||
"some segment id",
|
|
||||||
Collections.singletonList("__time"),
|
|
||||||
Collections.singletonList(Collections.singletonList(new Long(i * 1000)))
|
|
||||||
)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
ScanQuery scanQuery = new Druids.ScanQueryBuilder()
|
|
||||||
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
|
|
||||||
.timeOrder(ScanQuery.TimeOrder.ASCENDING)
|
|
||||||
.dataSource("some data source")
|
|
||||||
.intervals(emptySegmentSpec)
|
|
||||||
.limit(99999)
|
|
||||||
.build();
|
|
||||||
Iterator<ScanResultValue> sorted = chest.sortAndLimitScanResultValues(inputs.iterator(), scanQuery);
|
|
||||||
|
|
||||||
Long previousTime = -1L;
|
|
||||||
int count = 0;
|
|
||||||
while (sorted.hasNext()) {
|
|
||||||
count++;
|
|
||||||
ScanResultValue curr = sorted.next();
|
|
||||||
Long currentTime = (Long)
|
|
||||||
((List<Object>) (((List<Object>) curr.getEvents()).get(0))).get(0);
|
|
||||||
Assert.assertTrue(currentTime > previousTime);
|
|
||||||
previousTime = currentTime;
|
|
||||||
}
|
|
||||||
Assert.assertEquals(numElements, count);
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
}
|
|
|
@ -1,71 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing,
|
|
||||||
* software distributed under the License is distributed on an
|
|
||||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
||||||
* KIND, either express or implied. See the License for the
|
|
||||||
* specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.druid.query.scan;
|
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
import org.apache.druid.jackson.DefaultObjectMapper;
|
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
public class ScanResultValueSerdeTest
|
|
||||||
{
|
|
||||||
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testSerdeScanResultValueCompactedList() throws IOException
|
|
||||||
{
|
|
||||||
String segmentId = "some_segment_id";
|
|
||||||
List<String> columns = new ArrayList<>(Arrays.asList("col1", "col2", "col3"));
|
|
||||||
List<String> event = new ArrayList<>(Arrays.asList(
|
|
||||||
"prop1",
|
|
||||||
"prop2",
|
|
||||||
"prop3"
|
|
||||||
));
|
|
||||||
List<List<String>> events = new ArrayList<>(Collections.singletonList(event));
|
|
||||||
ScanResultValue srv = new ScanResultValue(segmentId, columns, events);
|
|
||||||
String serialized = jsonMapper.writeValueAsString(srv);
|
|
||||||
ScanResultValue deserialized = jsonMapper.readValue(serialized, ScanResultValue.class);
|
|
||||||
Assert.assertEquals(srv, deserialized);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testSerdeScanResultValueNonCompactedList() throws IOException
|
|
||||||
{
|
|
||||||
String segmentId = "some_segment_id";
|
|
||||||
List<String> columns = new ArrayList<>(Arrays.asList("col1", "col2", "col3"));
|
|
||||||
Map<String, Object> event = new HashMap<>();
|
|
||||||
event.put("key1", new Integer(4));
|
|
||||||
event.put("key2", "some_string");
|
|
||||||
event.put("key3", new Double(4.1));
|
|
||||||
List<Map<String, Object>> events = new ArrayList<>(Collections.singletonList(event));
|
|
||||||
ScanResultValue srv = new ScanResultValue(segmentId, columns, events);
|
|
||||||
String serialized = jsonMapper.writeValueAsString(srv);
|
|
||||||
ScanResultValue deserialized = jsonMapper.readValue(serialized, ScanResultValue.class);
|
|
||||||
Assert.assertEquals(srv, deserialized);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,122 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.query.scan;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||||
|
import org.apache.druid.segment.column.ColumnHolder;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class ScanResultValueTest
|
||||||
|
{
|
||||||
|
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
|
||||||
|
private static final long TIME_1 = 1234567890000L;
|
||||||
|
private static final long TIME_2 = 9876543210000L;
|
||||||
|
|
||||||
|
private static ScanResultValue compactedListSRV;
|
||||||
|
private static ScanResultValue listSRV;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setup()
|
||||||
|
{
|
||||||
|
String segmentId = "some_segment_id";
|
||||||
|
List<String> columns = new ArrayList<>(Arrays.asList(ColumnHolder.TIME_COLUMN_NAME, "name", "count"));
|
||||||
|
List<Object> event = new ArrayList<>(Arrays.asList(
|
||||||
|
TIME_1,
|
||||||
|
"Feridun",
|
||||||
|
4
|
||||||
|
));
|
||||||
|
List<Object> event2 = new ArrayList<>(Arrays.asList(
|
||||||
|
TIME_2,
|
||||||
|
"Justin",
|
||||||
|
6
|
||||||
|
));
|
||||||
|
|
||||||
|
List<List<Object>> events = Arrays.asList(event, event2);
|
||||||
|
compactedListSRV = new ScanResultValue(segmentId, columns, events);
|
||||||
|
|
||||||
|
Map<String, Object> eventMap1 = new HashMap<>();
|
||||||
|
eventMap1.put(ColumnHolder.TIME_COLUMN_NAME, TIME_1);
|
||||||
|
eventMap1.put("name", "Feridun");
|
||||||
|
eventMap1.put("count", 4);
|
||||||
|
Map<String, Object> eventMap2 = new HashMap<>();
|
||||||
|
eventMap2.put(ColumnHolder.TIME_COLUMN_NAME, TIME_2);
|
||||||
|
eventMap2.put("name", "Justin");
|
||||||
|
eventMap2.put("count", 6);
|
||||||
|
List<Map<String, Object>> eventMaps = Arrays.asList(eventMap1, eventMap2);
|
||||||
|
listSRV = new ScanResultValue(segmentId, columns, eventMaps);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSerdeScanResultValueCompactedList() throws IOException
|
||||||
|
{
|
||||||
|
|
||||||
|
String serialized = JSON_MAPPER.writeValueAsString(compactedListSRV);
|
||||||
|
ScanResultValue deserialized = JSON_MAPPER.readValue(serialized, ScanResultValue.class);
|
||||||
|
Assert.assertEquals(compactedListSRV, deserialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSerdeScanResultValueNonCompactedList() throws IOException
|
||||||
|
{
|
||||||
|
|
||||||
|
String serialized = JSON_MAPPER.writeValueAsString(listSRV);
|
||||||
|
ScanResultValue deserialized = JSON_MAPPER.readValue(serialized, ScanResultValue.class);
|
||||||
|
Assert.assertEquals(listSRV, deserialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetFirstEventTimestampCompactedList()
|
||||||
|
{
|
||||||
|
long timestamp = compactedListSRV.getFirstEventTimestamp(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST);
|
||||||
|
Assert.assertEquals(TIME_1, timestamp);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetFirstEventTimestampNonCompactedList()
|
||||||
|
{
|
||||||
|
long timestamp = listSRV.getFirstEventTimestamp(ScanQuery.ResultFormat.RESULT_FORMAT_LIST);
|
||||||
|
Assert.assertEquals(TIME_1, timestamp);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testToSingleEventScanResultValues()
|
||||||
|
{
|
||||||
|
List<ScanResultValue> compactedListScanResultValues = compactedListSRV.toSingleEventScanResultValues();
|
||||||
|
for (ScanResultValue srv : compactedListScanResultValues) {
|
||||||
|
List<Object> events = (List<Object>) srv.getEvents();
|
||||||
|
Assert.assertEquals(1, events.size());
|
||||||
|
}
|
||||||
|
List<ScanResultValue> listScanResultValues = listSRV.toSingleEventScanResultValues();
|
||||||
|
for (ScanResultValue srv : compactedListScanResultValues) {
|
||||||
|
List<Object> events = (List<Object>) srv.getEvents();
|
||||||
|
Assert.assertEquals(1, events.size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -969,7 +969,6 @@ public class DruidQuery
|
||||||
Ordering.natural().sortedCopy(ImmutableSet.copyOf(outputRowSignature.getRowOrder())),
|
Ordering.natural().sortedCopy(ImmutableSet.copyOf(outputRowSignature.getRowOrder())),
|
||||||
false,
|
false,
|
||||||
ImmutableSortedMap.copyOf(plannerContext.getQueryContext())
|
ImmutableSortedMap.copyOf(plannerContext.getQueryContext())
|
||||||
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue