streaming version of select query (#3307)

* streaming version of select query

* use columns instead of dimensions and metrics;prepare for valueVector;remove granularity

* respect query limit within historical

* use constant

* fix thread name corrupted bug when using jetty qtp thread rather than processing thread while working with SpecificSegmentQueryRunner

* add some test for scan query

* add scan query document

* fix merge conflicts

* add compactedList resultFormat, this format is better for json ser/der

* respect query timeout

* respect query limit on broker

* use static consts and remove unused code
This commit is contained in:
kaijianding 2017-01-20 06:09:53 +08:00 committed by Himanshu
parent ae5a349a54
commit 33ae9dd485
18 changed files with 2353 additions and 48 deletions

View File

@ -104,6 +104,8 @@
<argument>io.druid.extensions:mysql-metadata-storage</argument>
<argument>-c</argument>
<argument>io.druid.extensions:postgresql-metadata-storage</argument>
<argument>-c</argument>
<argument>io.druid.extensions.contrib:scan-query</argument>
<argument>${druid.distribution.pulldeps.opts}</argument>
</arguments>
</configuration>

View File

@ -0,0 +1,157 @@
---
layout: doc_page
---
# Scan query
Scan query returns raw Druid rows in streaming mode.
```json
{
"queryType": "scan",
"dataSource": "wikipedia",
"resultFormat": "list",
"columns":[],
"intervals": [
"2013-01-01/2013-01-02"
],
"batchSize":20480,
"limit":5
}
```
There are several main parts to a scan query:
|property|description|required?|
|--------|-----------|---------|
|queryType|This String should always be "scan"; this is the first thing Druid looks at to figure out how to interpret the query|yes|
|dataSource|A String or Object defining the data source to query, very similar to a table in a relational database. See [DataSource](../querying/datasource.html) for more information.|yes|
|intervals|A JSON Object representing ISO-8601 Intervals. This defines the time ranges to run the query over.|yes|
|resultFormat|How result represented, list or compactedList or valueVector. Currently only `list` and `compactedList` are supported. Default is `list`|no|
|filter|See [Filters](../querying/filters.html)|no|
|columns|A String array of dimensions and metrics to scan. If left empty, all dimensions and metrics are returned.|no|
|batchSize|How many rows buffered before return to client. Default is `20480`|no|
|limit|How many rows to return. If not specified, all rows will be returned.|no|
|context|An additional JSON Object which can be used to specify certain flags.|no|
The format of the result when resultFormat equals to `list`:
```json
[{
"segmentId" : "wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9",
"columns" : [
"timestamp",
"robot",
"namespace",
"anonymous",
"unpatrolled",
"page",
"language",
"newpage",
"user",
"count",
"added",
"delta",
"variation",
"deleted"
],
"events" : [ {
"timestamp" : "2013-01-01T00:00:00.000Z",
"robot" : "1",
"namespace" : "article",
"anonymous" : "0",
"unpatrolled" : "0",
"page" : "11._korpus_(NOVJ)",
"language" : "sl",
"newpage" : "0",
"user" : "EmausBot",
"count" : 1.0,
"added" : 39.0,
"delta" : 39.0,
"variation" : 39.0,
"deleted" : 0.0
}, {
"timestamp" : "2013-01-01T00:00:00.000Z",
"robot" : "0",
"namespace" : "article",
"anonymous" : "0",
"unpatrolled" : "0",
"page" : "112_U.S._580",
"language" : "en",
"newpage" : "1",
"user" : "MZMcBride",
"count" : 1.0,
"added" : 70.0,
"delta" : 70.0,
"variation" : 70.0,
"deleted" : 0.0
}, {
"timestamp" : "2013-01-01T00:00:00.000Z",
"robot" : "0",
"namespace" : "article",
"anonymous" : "0",
"unpatrolled" : "0",
"page" : "113_U.S._243",
"language" : "en",
"newpage" : "1",
"user" : "MZMcBride",
"count" : 1.0,
"added" : 77.0,
"delta" : 77.0,
"variation" : 77.0,
"deleted" : 0.0
}, {
"timestamp" : "2013-01-01T00:00:00.000Z",
"robot" : "0",
"namespace" : "article",
"anonymous" : "0",
"unpatrolled" : "0",
"page" : "113_U.S._73",
"language" : "en",
"newpage" : "1",
"user" : "MZMcBride",
"count" : 1.0,
"added" : 70.0,
"delta" : 70.0,
"variation" : 70.0,
"deleted" : 0.0
}, {
"timestamp" : "2013-01-01T00:00:00.000Z",
"robot" : "0",
"namespace" : "article",
"anonymous" : "0",
"unpatrolled" : "0",
"page" : "113_U.S._756",
"language" : "en",
"newpage" : "1",
"user" : "MZMcBride",
"count" : 1.0,
"added" : 68.0,
"delta" : 68.0,
"variation" : 68.0,
"deleted" : 0.0
} ]
} ]
```
The format of the result when resultFormat equals to `compactedList`:
```json
[{
"segmentId" : "wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9",
"columns" : [
"timestamp", "robot", "namespace", "anonymous", "unpatrolled", "page", "language", "newpage", "user", "count", "added", "delta", "variation", "deleted"
],
"events" : [
["2013-01-01T00:00:00.000Z", "1", "article", "0", "0", "11._korpus_(NOVJ)", "sl", "0", "EmausBot", 1.0, 39.0, 39.0, 39.0, 0.0],
["2013-01-01T00:00:00.000Z", "0", "article", "0", "0", "112_U.S._580", "en", "1", "MZMcBride", 1.0, 70.0, 70.0, 70.0, 0.0],
["2013-01-01T00:00:00.000Z", "0", "article", "0", "0", "113_U.S._243", "en", "1", "MZMcBride", 1.0, 77.0, 77.0, 77.0, 0.0],
["2013-01-01T00:00:00.000Z", "0", "article", "0", "0", "113_U.S._73", "en", "1", "MZMcBride", 1.0, 70.0, 70.0, 70.0, 0.0],
["2013-01-01T00:00:00.000Z", "0", "article", "0", "0", "113_U.S._756", "en", "1", "MZMcBride", 1.0, 68.0, 68.0, 68.0, 0.0]
]
} ]
```
The biggest difference between select query and scan query is that, scan query doesn't retain all rows in memory before rows can be returned to client.
It will cause memory pressure if too many rows required by select query.
Scan query doesn't have this issue.
Scan query can return all rows without issuing another pagination query, which is extremely useful when query against historical or realtime node directly.

View File

@ -65,6 +65,7 @@ All of these community extensions can be downloaded using *pull-deps* with the c
|graphite-emitter|Graphite metrics emitter|[link](../development/extensions-contrib/graphite.html)|
|statsd-emitter|StatsD metrics emitter|[link](../development/extensions-contrib/statsd.html)|
|druid-thrift-extensions|Support thrift ingestion |[link](../development/extensions-contrib/thrift.html)|
|scan-query|Scan query|[link](../development/extensions-contrib/scan-query.html)|
## Promoting Community Extension to Core Extension

View File

@ -0,0 +1,58 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to Metamarkets Group Inc. (Metamarkets) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. Metamarkets licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.9.3-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid.extensions.contrib</groupId>
<artifactId>scan-query</artifactId>
<name>scan-query</name>
<description>streaming version of select query</description>
<dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,390 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.scan;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import io.druid.query.BaseQuery;
import io.druid.query.DataSource;
import io.druid.query.Query;
import io.druid.query.TableDataSource;
import io.druid.query.filter.DimFilter;
import io.druid.query.filter.InDimFilter;
import io.druid.query.filter.SelectorDimFilter;
import io.druid.query.spec.LegacySegmentSpec;
import io.druid.query.spec.QuerySegmentSpec;
import org.joda.time.Interval;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@JsonTypeName("scan")
public class ScanQuery extends BaseQuery<ScanResultValue>
{
public static final String SCAN = "scan";
public static final String RESULT_FORMAT_LIST = "list";
public static final String RESULT_FORMAT_COMPACTED_LIST = "compactedList";
public static final String RESULT_FORMAT_VALUE_VECTOR = "valueVector";
private final String resultFormat;
private final int batchSize;
private final int limit;
private final DimFilter dimFilter;
private final List<String> columns;
@JsonCreator
public ScanQuery(
@JsonProperty("dataSource") DataSource dataSource,
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
@JsonProperty("resultFormat") String resultFormat,
@JsonProperty("batchSize") int batchSize,
@JsonProperty("limit") int limit,
@JsonProperty("filter") DimFilter dimFilter,
@JsonProperty("columns") List<String> columns,
@JsonProperty("context") Map<String, Object> context
)
{
super(dataSource, querySegmentSpec, false, context);
this.resultFormat = resultFormat == null ? RESULT_FORMAT_LIST : resultFormat;
this.batchSize = (batchSize == 0) ? 4096 * 5 : batchSize;
this.limit = (limit == 0) ? Integer.MAX_VALUE : limit;
Preconditions.checkArgument(this.batchSize > 0, "batchSize must be greater than 0");
Preconditions.checkArgument(this.limit > 0, "limit must be greater than 0");
this.dimFilter = dimFilter;
this.columns = columns;
}
@JsonProperty
public String getResultFormat()
{
return resultFormat;
}
@JsonProperty
public int getBatchSize()
{
return batchSize;
}
@JsonProperty
public int getLimit()
{
return limit;
}
@Override
public boolean hasFilters()
{
return dimFilter != null;
}
@Override
public DimFilter getFilter()
{
return dimFilter;
}
@Override
public String getType()
{
return SCAN;
}
@JsonProperty("filter")
public DimFilter getDimensionsFilter()
{
return dimFilter;
}
@JsonProperty
public List<String> getColumns()
{
return columns;
}
@Override
public Query<ScanResultValue> withQuerySegmentSpec(QuerySegmentSpec querySegmentSpec)
{
return new ScanQuery(
getDataSource(),
querySegmentSpec,
resultFormat,
batchSize,
limit,
dimFilter,
columns,
getContext()
);
}
@Override
public Query<ScanResultValue> withDataSource(DataSource dataSource)
{
return new ScanQuery(
dataSource,
getQuerySegmentSpec(),
resultFormat,
batchSize,
limit,
dimFilter,
columns,
getContext()
);
}
@Override
public Query<ScanResultValue> withOverriddenContext(Map<String, Object> contextOverrides)
{
return new ScanQuery(
getDataSource(),
getQuerySegmentSpec(),
resultFormat,
batchSize,
limit,
dimFilter,
columns,
computeOverridenContext(contextOverrides)
);
}
public ScanQuery withDimFilter(DimFilter dimFilter)
{
return new ScanQuery(
getDataSource(),
getQuerySegmentSpec(),
resultFormat,
batchSize,
limit,
dimFilter,
columns,
getContext()
);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
ScanQuery that = (ScanQuery) o;
if (batchSize != that.batchSize) {
return false;
}
if (limit != that.limit) {
return false;
}
if (resultFormat != null ? !resultFormat.equals(that.resultFormat) : that.resultFormat != null) {
return false;
}
if (dimFilter != null ? !dimFilter.equals(that.dimFilter) : that.dimFilter != null) {
return false;
}
return columns != null ? columns.equals(that.columns) : that.columns == null;
}
@Override
public int hashCode()
{
int result = super.hashCode();
result = 31 * result + (resultFormat != null ? resultFormat.hashCode() : 0);
result = 31 * result + batchSize;
result = 31 * result + limit;
result = 31 * result + (dimFilter != null ? dimFilter.hashCode() : 0);
result = 31 * result + (columns != null ? columns.hashCode() : 0);
return result;
}
@Override
public String toString()
{
return "ScanQuery{" +
"dataSource='" + getDataSource() + '\'' +
", querySegmentSpec=" + getQuerySegmentSpec() +
", descending=" + isDescending() +
", resultFormat='" + resultFormat + '\'' +
", batchSize=" + batchSize +
", limit=" + limit +
", dimFilter=" + dimFilter +
", columns=" + columns +
'}';
}
/**
* A Builder for ScanQuery.
* <p/>
* Required: dataSource(), intervals() must be called before build()
* <p/>
* Usage example:
* <pre><code>
* ScanQuery query = new ScanQueryBuilder()
* .dataSource("Example")
* .interval("2010/2013")
* .build();
* </code></pre>
*
* @see io.druid.query.scan.ScanQuery
*/
public static class ScanQueryBuilder
{
private DataSource dataSource;
private QuerySegmentSpec querySegmentSpec;
private Map<String, Object> context;
private String resultFormat;
private int batchSize;
private int limit;
private DimFilter dimFilter;
private List<String> columns;
public ScanQueryBuilder()
{
dataSource = null;
querySegmentSpec = null;
context = null;
resultFormat = null;
batchSize = 0;
limit = 0;
dimFilter = null;
columns = Lists.newArrayList();
}
public ScanQuery build()
{
return new ScanQuery(
dataSource,
querySegmentSpec,
resultFormat,
batchSize,
limit,
dimFilter,
columns,
context
);
}
public ScanQueryBuilder copy(ScanQueryBuilder builder)
{
return new ScanQueryBuilder()
.dataSource(builder.dataSource)
.intervals(builder.querySegmentSpec)
.context(builder.context);
}
public ScanQueryBuilder dataSource(String ds)
{
dataSource = new TableDataSource(ds);
return this;
}
public ScanQueryBuilder dataSource(DataSource ds)
{
dataSource = ds;
return this;
}
public ScanQueryBuilder intervals(QuerySegmentSpec q)
{
querySegmentSpec = q;
return this;
}
public ScanQueryBuilder intervals(String s)
{
querySegmentSpec = new LegacySegmentSpec(s);
return this;
}
public ScanQueryBuilder intervals(List<Interval> l)
{
querySegmentSpec = new LegacySegmentSpec(l);
return this;
}
public ScanQueryBuilder context(Map<String, Object> c)
{
context = c;
return this;
}
public ScanQueryBuilder resultFormat(String r)
{
resultFormat = r;
return this;
}
public ScanQueryBuilder batchSize(int b)
{
batchSize = b;
return this;
}
public ScanQueryBuilder limit(int l)
{
limit = l;
return this;
}
public ScanQueryBuilder filters(String dimensionName, String value)
{
dimFilter = new SelectorDimFilter(dimensionName, value, null);
return this;
}
public ScanQueryBuilder filters(String dimensionName, String value, String... values)
{
dimFilter = new InDimFilter(dimensionName, Lists.asList(value, values), null);
return this;
}
public ScanQueryBuilder filters(DimFilter f)
{
dimFilter = f;
return this;
}
public ScanQueryBuilder columns(List<String> c)
{
columns = c;
return this;
}
public ScanQueryBuilder columns(String... c)
{
columns = Arrays.asList(c);
return this;
}
}
public static ScanQueryBuilder newScanQueryBuilder()
{
return new ScanQueryBuilder();
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.scan;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import io.druid.guice.DruidBinders;
import io.druid.guice.LazySingleton;
import io.druid.initialization.DruidModule;
import java.util.Arrays;
import java.util.List;
public class ScanQueryDruidModule implements DruidModule {
public void configure(Binder binder) {
DruidBinders.queryToolChestBinder(binder)
.addBinding(ScanQuery.class)
.to(ScanQueryQueryToolChest.class)
.in(LazySingleton.class);
DruidBinders.queryRunnerFactoryBinder(binder)
.addBinding(ScanQuery.class)
.to(ScanQueryRunnerFactory.class)
.in(LazySingleton.class);
}
public List<? extends Module> getJacksonModules() {
return Arrays.<Module>asList(
new SimpleModule("ScanQueryDruidModule")
.registerSubtypes(
new NamedType(ScanQuery.class, ScanQuery.SCAN)
)
);
}
}

View File

@ -0,0 +1,244 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.scan;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.druid.granularity.QueryGranularities;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.guava.BaseSequence;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.ColumnSelectorPlus;
import io.druid.query.QueryInterruptedException;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.filter.Filter;
import io.druid.query.select.SelectQueryEngine;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionHandlerUtils;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.Segment;
import io.druid.segment.StorageAdapter;
import io.druid.segment.VirtualColumns;
import io.druid.segment.column.Column;
import io.druid.segment.filter.Filters;
import org.joda.time.Interval;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class ScanQueryEngine
{
private static final SelectQueryEngine.SelectStrategyFactory STRATEGY_FACTORY = new SelectQueryEngine.SelectStrategyFactory();
public Sequence<ScanResultValue> process(
final ScanQuery query,
final Segment segment,
final Map<String, Object> responseContext
)
{
if (responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) != null) {
int count = (int) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT);
if (count >= query.getLimit()) {
return Sequences.empty();
}
}
final Long timeoutAt = (long) responseContext.get(ScanQueryRunnerFactory.CTX_TIMEOUT_AT);
final long start = System.currentTimeMillis();
final StorageAdapter adapter = segment.asStorageAdapter();
if (adapter == null) {
throw new ISE(
"Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped."
);
}
List<String> allDims = Lists.newLinkedList(adapter.getAvailableDimensions());
List<String> allMetrics = Lists.newLinkedList(adapter.getAvailableMetrics());
final List<String> allColumns = Lists.newLinkedList();
if (query.getColumns() != null && !query.getColumns().isEmpty()) {
if (!query.getColumns().contains(ScanResultValue.timestampKey)) {
allColumns.add(ScanResultValue.timestampKey);
}
allColumns.addAll(query.getColumns());
allDims.retainAll(query.getColumns());
allMetrics.retainAll(query.getColumns());
}
else {
if (!allDims.contains(ScanResultValue.timestampKey)) {
allColumns.add(ScanResultValue.timestampKey);
}
allColumns.addAll(allDims);
allColumns.addAll(allMetrics);
}
final List<DimensionSpec> dims = DefaultDimensionSpec.toSpec(allDims);
final List<String> metrics = allMetrics;
final List<Interval> intervals = query.getQuerySegmentSpec().getIntervals();
Preconditions.checkArgument(intervals.size() == 1, "Can only handle a single interval, got[%s]", intervals);
final String segmentId = segment.getIdentifier();
final Filter filter = Filters.convertToCNFFromQueryContext(query, Filters.toFilter(query.getDimensionsFilter()));
if (responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) == null) {
responseContext.put(ScanQueryRunnerFactory.CTX_COUNT, 0);
}
final int limit = query.getLimit() - (int) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT);
return Sequences.concat(
Sequences.map(
adapter.makeCursors(
filter,
intervals.get(0),
VirtualColumns.EMPTY,
QueryGranularities.ALL,
query.isDescending()
),
new Function<Cursor, Sequence<ScanResultValue>>()
{
@Override
public Sequence<ScanResultValue> apply(final Cursor cursor)
{
return new BaseSequence<>(
new BaseSequence.IteratorMaker<ScanResultValue, Iterator<ScanResultValue>>()
{
@Override
public Iterator<ScanResultValue> make()
{
final LongColumnSelector timestampColumnSelector = cursor.makeLongColumnSelector(Column.TIME_COLUMN_NAME);
final List<ColumnSelectorPlus<SelectQueryEngine.SelectColumnSelectorStrategy>> selectorPlusList = Arrays.asList(
DimensionHandlerUtils.createColumnSelectorPluses(
STRATEGY_FACTORY,
Lists.newArrayList(dims),
cursor
)
);
final Map<String, ObjectColumnSelector> metSelectors = Maps.newHashMap();
for (String metric : metrics) {
final ObjectColumnSelector metricSelector = cursor.makeObjectColumnSelector(metric);
metSelectors.put(metric, metricSelector);
}
final int batchSize = query.getBatchSize();
return new Iterator<ScanResultValue>()
{
private int offset = 0;
@Override
public boolean hasNext()
{
return !cursor.isDone() && offset < limit;
}
@Override
public ScanResultValue next()
{
if (System.currentTimeMillis() >= timeoutAt) {
throw new QueryInterruptedException(new TimeoutException());
}
int lastOffset = offset;
Object events = null;
String resultFormat = query.getResultFormat();
if (ScanQuery.RESULT_FORMAT_VALUE_VECTOR.equals(resultFormat)) {
throw new UnsupportedOperationException("valueVector is not supported now");
} else if (ScanQuery.RESULT_FORMAT_COMPACTED_LIST.equals(resultFormat)) {
events = rowsToCompactedList();
} else {
events = rowsToList();
}
responseContext.put(
ScanQueryRunnerFactory.CTX_COUNT,
(int) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) + (offset - lastOffset)
);
responseContext.put(
ScanQueryRunnerFactory.CTX_TIMEOUT_AT,
timeoutAt - (System.currentTimeMillis() - start)
);
return new ScanResultValue(segmentId, allColumns, events);
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
private Object rowsToCompactedList()
{
return Lists.transform(
(List<Map<String, Object>>) rowsToList(),
new Function<Map<String, Object>, Object>()
{
@Override
public Object apply(Map<String, Object> input)
{
List eventValues = Lists.newArrayListWithExpectedSize(allColumns.size());
for (String expectedColumn : allColumns) {
eventValues.add(input.get(expectedColumn));
}
return eventValues;
}
}
);
}
private Object rowsToList()
{
List<Map<String, Object>> events = Lists.newArrayListWithCapacity(batchSize);
for (int i = 0; !cursor.isDone()
&& i < batchSize
&& offset < limit; cursor.advance(), i++, offset++) {
final Map<String, Object> theEvent = SelectQueryEngine.singleEvent(
ScanResultValue.timestampKey,
timestampColumnSelector,
selectorPlusList,
metSelectors
);
events.add(theEvent);
}
return events;
}
private Object rowsToValueVector()
{
// only support list now, we can support ValueVector or Arrow in future
return rowsToList();
}
};
}
@Override
public void cleanup(Iterator<ScanResultValue> iterFromMake)
{
}
}
);
}
}
)
);
}
}

View File

@ -0,0 +1,98 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.scan;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Yielder;
import io.druid.java.util.common.guava.YieldingAccumulator;
import io.druid.java.util.common.parsers.CloseableIterator;
import io.druid.query.QueryRunner;
import java.io.IOException;
import java.util.List;
import java.util.Map;
public class ScanQueryLimitRowIterator implements CloseableIterator<ScanResultValue>
{
private Yielder<ScanResultValue> yielder;
private String resultFormat;
private int limit = 0;
private int count = 0;
public ScanQueryLimitRowIterator(
QueryRunner<ScanResultValue> baseRunner, ScanQuery query,
Map<String, Object> responseContext
)
{
resultFormat = query.getResultFormat();
limit = query.getLimit();
Sequence<ScanResultValue> baseSequence = baseRunner.run(query, responseContext);
yielder = baseSequence.toYielder(
null,
new YieldingAccumulator<ScanResultValue, ScanResultValue>()
{
@Override
public ScanResultValue accumulate(ScanResultValue accumulated, ScanResultValue in)
{
yield();
return in;
}
}
);
}
@Override
public boolean hasNext()
{
return !yielder.isDone() && count < limit;
}
@Override
public ScanResultValue next()
{
ScanResultValue batch = yielder.get();
if (ScanQuery.RESULT_FORMAT_COMPACTED_LIST.equals(resultFormat) ||
ScanQuery.RESULT_FORMAT_LIST.equals(resultFormat)) {
List events = (List) batch.getEvents();
if (events.size() <= limit - count) {
count += events.size();
yielder = yielder.next(null);
return batch;
} else {
// last batch
int left = limit - count;
count = limit;
return new ScanResultValue(batch.getSegmentId(), batch.getColumns(), events.subList(0, left));
}
}
throw new UnsupportedOperationException(ScanQuery.RESULT_FORMAT_VALUE_VECTOR + " is not supported yet");
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
@Override
public void close() throws IOException
{
yielder.close();
}
}

View File

@ -0,0 +1,114 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.scan;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.java.util.common.guava.BaseSequence;
import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.java.util.common.guava.Sequence;
import io.druid.query.DruidMetrics;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.aggregation.MetricManipulationFn;
import java.util.Map;
public class ScanQueryQueryToolChest extends QueryToolChest<ScanResultValue, ScanQuery>
{
private static final TypeReference<ScanResultValue> TYPE_REFERENCE = new TypeReference<ScanResultValue>()
{
};
@Override
public QueryRunner<ScanResultValue> mergeResults(final QueryRunner<ScanResultValue> runner)
{
return new QueryRunner<ScanResultValue>()
{
@Override
public Sequence<ScanResultValue> run(
final Query<ScanResultValue> query, final Map<String, Object> responseContext
)
{
ScanQuery scanQuery = (ScanQuery) query;
if (scanQuery.getLimit() == Integer.MAX_VALUE) {
return runner.run(query, responseContext);
}
return new BaseSequence<>(
new BaseSequence.IteratorMaker<ScanResultValue, ScanQueryLimitRowIterator>()
{
@Override
public ScanQueryLimitRowIterator make()
{
return new ScanQueryLimitRowIterator(runner, (ScanQuery) query, responseContext);
}
@Override
public void cleanup(ScanQueryLimitRowIterator iterFromMake)
{
CloseQuietly.close(iterFromMake);
}
}
);
}
};
}
@Override
public ServiceMetricEvent.Builder makeMetricBuilder(ScanQuery query)
{
return DruidMetrics.makePartialQueryTimeMetric(query);
}
@Override
public Function<ScanResultValue, ScanResultValue> makePreComputeManipulatorFn(
ScanQuery query, MetricManipulationFn fn
)
{
return Functions.identity();
}
@Override
public TypeReference<ScanResultValue> getResultTypeReference()
{
return TYPE_REFERENCE;
}
@Override
public QueryRunner<ScanResultValue> preMergeQueryDecoration(final QueryRunner<ScanResultValue> runner)
{
return new QueryRunner<ScanResultValue>()
{
@Override
public Sequence<ScanResultValue> run(
Query<ScanResultValue> query, Map<String, Object> responseContext
)
{
ScanQuery scanQuery = (ScanQuery) query;
if (scanQuery.getDimensionsFilter() != null) {
scanQuery = scanQuery.withDimFilter(scanQuery.getDimensionsFilter().optimize());
}
return runner.run(scanQuery, responseContext);
}
};
}
}

View File

@ -0,0 +1,128 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.scan;
import com.google.common.base.Function;
import com.google.inject.Inject;
import io.druid.common.utils.JodaUtils;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.Query;
import io.druid.query.QueryContextKeys;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
import io.druid.segment.Segment;
import java.util.Map;
import java.util.concurrent.ExecutorService;
public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValue, ScanQuery>
{
public static final String CTX_TIMEOUT_AT = "timeoutAt";
public static final String CTX_COUNT = "count";
private final ScanQueryQueryToolChest toolChest;
private final ScanQueryEngine engine;
@Inject
public ScanQueryRunnerFactory(
ScanQueryQueryToolChest toolChest,
ScanQueryEngine engine
)
{
this.toolChest = toolChest;
this.engine = engine;
}
@Override
public QueryRunner<ScanResultValue> createRunner(Segment segment)
{
return new ScanQueryRunner(engine, segment);
}
@Override
public QueryRunner<ScanResultValue> mergeRunners(
ExecutorService queryExecutor,
final Iterable<QueryRunner<ScanResultValue>> queryRunners
)
{
// in single thread and in jetty thread instead of processing thread
return new QueryRunner<ScanResultValue>()
{
@Override
public Sequence<ScanResultValue> run(
final Query<ScanResultValue> query, final Map<String, Object> responseContext
)
{
final Number queryTimeout = query.getContextValue(QueryContextKeys.TIMEOUT, null);
final long timeoutAt = queryTimeout == null
? JodaUtils.MAX_INSTANT : System.currentTimeMillis() + queryTimeout.longValue();
responseContext.put(CTX_TIMEOUT_AT, timeoutAt);
return Sequences.concat(
Sequences.map(
Sequences.simple(queryRunners),
new Function<QueryRunner<ScanResultValue>, Sequence<ScanResultValue>>()
{
@Override
public Sequence<ScanResultValue> apply(final QueryRunner<ScanResultValue> input)
{
return input.run(query, responseContext);
}
}
)
);
}
};
}
@Override
public QueryToolChest<ScanResultValue, ScanQuery> getToolchest()
{
return toolChest;
}
private class ScanQueryRunner implements QueryRunner<ScanResultValue>
{
private final ScanQueryEngine engine;
private final Segment segment;
public ScanQueryRunner(ScanQueryEngine engine, Segment segment)
{
this.engine = engine;
this.segment = segment;
}
@Override
public Sequence<ScanResultValue> run(
Query<ScanResultValue> query, Map<String, Object> responseContext
)
{
if (!(query instanceof ScanQuery)) {
throw new ISE("Got a [%s] which isn't a %s", query.getClass(), ScanQuery.class);
}
// it happens in unit tests
if (responseContext.get(CTX_TIMEOUT_AT) == null) {
responseContext.put(CTX_TIMEOUT_AT, JodaUtils.MAX_INSTANT);
};
return engine.process((ScanQuery) query, segment, responseContext);
}
}
}

View File

@ -0,0 +1,115 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.scan;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
public class ScanResultValue implements Comparable<ScanResultValue>
{
public static final String timestampKey = "timestamp";
private final String segmentId;
private final List<String> columns;
private final Object events;
@JsonCreator
public ScanResultValue(
@JsonProperty("segmentId") String segmentId,
@JsonProperty("columns") List<String> columns,
@JsonProperty("events") Object events
)
{
this.segmentId = segmentId;
this.columns = columns;
this.events = events;
}
@JsonProperty
public String getSegmentId()
{
return segmentId;
}
@JsonProperty
public List<String> getColumns()
{
return columns;
}
@JsonProperty
public Object getEvents()
{
return events;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ScanResultValue that = (ScanResultValue) o;
if (segmentId != null ? !segmentId.equals(that.segmentId) : that.segmentId != null) {
return false;
}
if (columns != null ? !columns.equals(that.columns) : that.columns != null) {
return false;
}
return events != null ? events.equals(that.events) : that.events == null;
}
@Override
public int hashCode()
{
int result = segmentId != null ? segmentId.hashCode() : 0;
result = 31 * result + (columns != null ? columns.hashCode() : 0);
result = 31 * result + (events != null ? events.hashCode() : 0);
return result;
}
@Override
public String toString()
{
return "ScanResultValue{" +
"segmentId='" + segmentId + '\'' +
", columns=" + columns +
", events=" + events +
'}';
}
@Override
public int compareTo(ScanResultValue that)
{
if (that == null) {
return 1;
}
if (segmentId != null && that.segmentId != null) {
return segmentId.compareTo(that.segmentId);
}
return segmentId != null ? 1 : 0;
}
}

View File

@ -0,0 +1 @@
io.druid.query.scan.ScanQueryDruidModule

View File

@ -0,0 +1,242 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.scan;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.io.CharSource;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.granularity.QueryGranularities;
import io.druid.java.util.common.guava.MergeSequence;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.TableDataSource;
import io.druid.query.select.SelectQueryRunnerTest;
import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.Segment;
import io.druid.segment.TestIndex;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
*/
@RunWith(Parameterized.class)
public class MultiSegmentScanQueryTest
{
private static final ScanQueryQueryToolChest toolChest = new ScanQueryQueryToolChest();
private static final QueryRunnerFactory<ScanResultValue, ScanQuery> factory = new ScanQueryRunnerFactory(
toolChest,
new ScanQueryEngine()
);
// time modified version of druid.sample.tsv
public static final String[] V_0112 = {
"2011-01-12T00:00:00.000Z spot automotive preferred apreferred 100.000000",
"2011-01-12T01:00:00.000Z spot business preferred bpreferred 100.000000",
"2011-01-12T02:00:00.000Z spot entertainment preferred epreferred 100.000000",
"2011-01-12T03:00:00.000Z spot health preferred hpreferred 100.000000",
"2011-01-12T04:00:00.000Z spot mezzanine preferred mpreferred 100.000000",
"2011-01-12T05:00:00.000Z spot news preferred npreferred 100.000000",
"2011-01-12T06:00:00.000Z spot premium preferred ppreferred 100.000000",
"2011-01-12T07:00:00.000Z spot technology preferred tpreferred 100.000000",
"2011-01-12T08:00:00.000Z spot travel preferred tpreferred 100.000000",
"2011-01-12T09:00:00.000Z total_market mezzanine preferred mpreferred 1000.000000",
"2011-01-12T10:00:00.000Z total_market premium preferred ppreferred 1000.000000",
"2011-01-12T11:00:00.000Z upfront mezzanine preferred mpreferred 800.000000 value",
"2011-01-12T12:00:00.000Z upfront premium preferred ppreferred 800.000000 value",
"2011-01-12T13:00:00.000Z upfront premium preferred ppreferred2 800.000000 value"
};
public static final String[] V_0113 = {
"2011-01-13T00:00:00.000Z spot automotive preferred apreferred 94.874713",
"2011-01-13T01:00:00.000Z spot business preferred bpreferred 103.629399",
"2011-01-13T02:00:00.000Z spot entertainment preferred epreferred 110.087299",
"2011-01-13T03:00:00.000Z spot health preferred hpreferred 114.947403",
"2011-01-13T04:00:00.000Z spot mezzanine preferred mpreferred 104.465767",
"2011-01-13T05:00:00.000Z spot news preferred npreferred 102.851683",
"2011-01-13T06:00:00.000Z spot premium preferred ppreferred 108.863011",
"2011-01-13T07:00:00.000Z spot technology preferred tpreferred 111.356672",
"2011-01-13T08:00:00.000Z spot travel preferred tpreferred 106.236928",
"2011-01-13T09:00:00.000Z total_market mezzanine preferred mpreferred 1040.945505",
"2011-01-13T10:00:00.000Z total_market premium preferred ppreferred 1689.012875",
"2011-01-13T11:00:00.000Z upfront mezzanine preferred mpreferred 826.060182 value",
"2011-01-13T12:00:00.000Z upfront premium preferred ppreferred 1564.617729 value"
};
private static Segment segment0;
private static Segment segment1;
@BeforeClass
public static void setup() throws IOException
{
CharSource v_0112 = CharSource.wrap(StringUtils.join(V_0112, "\n"));
CharSource v_0113 = CharSource.wrap(StringUtils.join(V_0113, "\n"));
IncrementalIndex index0 = TestIndex.loadIncrementalIndex(newIndex("2011-01-12T00:00:00.000Z"), v_0112);
IncrementalIndex index1 = TestIndex.loadIncrementalIndex(newIndex("2011-01-13T00:00:00.000Z"), v_0113);
segment0 = new IncrementalIndexSegment(index0, makeIdentifier(index0, "v1"));
segment1 = new IncrementalIndexSegment(index1, makeIdentifier(index1, "v1"));
}
private static String makeIdentifier(IncrementalIndex index, String version)
{
return makeIdentifier(index.getInterval(), version);
}
private static String makeIdentifier(Interval interval, String version)
{
return DataSegment.makeDataSegmentIdentifier(
QueryRunnerTestHelper.dataSource,
interval.getStart(),
interval.getEnd(),
version,
NoneShardSpec.instance()
);
}
private static IncrementalIndex newIndex(String minTimeStamp)
{
return newIndex(minTimeStamp, 10000);
}
private static IncrementalIndex newIndex(String minTimeStamp, int maxRowCount)
{
final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder()
.withMinTimestamp(new DateTime(minTimeStamp).getMillis())
.withQueryGranularity(QueryGranularities.HOUR)
.withMetrics(TestIndex.METRIC_AGGS)
.build();
return new OnheapIncrementalIndex(schema, true, maxRowCount);
}
@AfterClass
public static void clear()
{
IOUtils.closeQuietly(segment0);
IOUtils.closeQuietly(segment1);
}
@Parameterized.Parameters(name = "limit={0},batchSize={1}")
public static Iterable<Object[]> constructorFeeder() throws IOException
{
return QueryRunnerTestHelper.cartesian(Arrays.asList(0, 1, 3, 7, 10, 20, 1000), Arrays.asList(0, 1, 3, 6, 7, 10, 123, 2000));
}
private final int limit;
private final int batchSize;
public MultiSegmentScanQueryTest(int limit, int batchSize)
{
this.limit = limit;
this.batchSize = batchSize;
}
private ScanQuery.ScanQueryBuilder newBuilder()
{
return ScanQuery.newScanQueryBuilder()
.dataSource(new TableDataSource(QueryRunnerTestHelper.dataSource))
.intervals(SelectQueryRunnerTest.I_0112_0114)
.batchSize(batchSize)
.columns(Arrays.<String>asList())
.limit(limit);
}
@Test
public void testMergeRunnersWithLimit()
{
ScanQuery query = newBuilder().build();
List<ScanResultValue> results = Sequences.toList(
factory.mergeRunners(MoreExecutors.sameThreadExecutor(), ImmutableList.of(
factory.createRunner(segment0),
factory.createRunner(segment1)
)).run(query, new HashMap<String, Object>()),
Lists.<ScanResultValue>newArrayList()
);
int totalCount = 0;
for (ScanResultValue result : results) {
System.out.println(((List) result.getEvents()).size());
totalCount += ((List) result.getEvents()).size();
}
Assert.assertEquals(
totalCount,
limit != 0 ? Math.min(limit, V_0112.length + V_0113.length) : V_0112.length + V_0113.length
);
}
@Test
public void testMergeResultsWithLimit()
{
QueryRunner<ScanResultValue> runner = toolChest.mergeResults(
new QueryRunner<ScanResultValue>() {
@Override
public Sequence<ScanResultValue> run(
Query<ScanResultValue> query, Map<String, Object> responseContext
)
{
// simulate results back from 2 historicals
List<Sequence<ScanResultValue>> sequences = Lists.newArrayListWithExpectedSize(2);
sequences.add(factory.createRunner(segment0).run(query, new HashMap<String, Object>()));
sequences.add(factory.createRunner(segment1).run(query, new HashMap<String, Object>()));
return new MergeSequence<>(
query.getResultOrdering(),
Sequences.simple(sequences)
);
}
}
);
ScanQuery query = newBuilder().build();
List<ScanResultValue> results = Sequences.toList(
runner.run(query, new HashMap<String, Object>()),
Lists.<ScanResultValue>newArrayList()
);
int totalCount = 0;
for (ScanResultValue result : results) {
totalCount += ((List) result.getEvents()).size();
}
Assert.assertEquals(
totalCount,
limit != 0 ? Math.min(limit, V_0112.length + V_0113.length) : V_0112.length + V_0113.length
);
}
}

View File

@ -0,0 +1,605 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.scan;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.ObjectArrays;
import com.google.common.collect.Sets;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.TableDataSource;
import io.druid.query.extraction.MapLookupExtractor;
import io.druid.query.filter.AndDimFilter;
import io.druid.query.filter.DimFilter;
import io.druid.query.filter.SelectorDimFilter;
import io.druid.query.lookup.LookupExtractionFn;
import io.druid.query.spec.LegacySegmentSpec;
import io.druid.query.spec.QuerySegmentSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
*/
@RunWith(Parameterized.class)
public class ScanQueryRunnerTest
{
// copied from druid.sample.tsv
public static final String[] V_0112 = {
"2011-01-12T00:00:00.000Z spot automotive preferred apreferred 100.000000",
"2011-01-12T00:00:00.000Z spot business preferred bpreferred 100.000000",
"2011-01-12T00:00:00.000Z spot entertainment preferred epreferred 100.000000",
"2011-01-12T00:00:00.000Z spot health preferred hpreferred 100.000000",
"2011-01-12T00:00:00.000Z spot mezzanine preferred mpreferred 100.000000",
"2011-01-12T00:00:00.000Z spot news preferred npreferred 100.000000",
"2011-01-12T00:00:00.000Z spot premium preferred ppreferred 100.000000",
"2011-01-12T00:00:00.000Z spot technology preferred tpreferred 100.000000",
"2011-01-12T00:00:00.000Z spot travel preferred tpreferred 100.000000",
"2011-01-12T00:00:00.000Z total_market mezzanine preferred mpreferred 1000.000000",
"2011-01-12T00:00:00.000Z total_market premium preferred ppreferred 1000.000000",
"2011-01-12T00:00:00.000Z upfront mezzanine preferred mpreferred 800.000000 value",
"2011-01-12T00:00:00.000Z upfront premium preferred ppreferred 800.000000 value"
};
public static final String[] V_0113 = {
"2011-01-13T00:00:00.000Z spot automotive preferred apreferred 94.874713",
"2011-01-13T00:00:00.000Z spot business preferred bpreferred 103.629399",
"2011-01-13T00:00:00.000Z spot entertainment preferred epreferred 110.087299",
"2011-01-13T00:00:00.000Z spot health preferred hpreferred 114.947403",
"2011-01-13T00:00:00.000Z spot mezzanine preferred mpreferred 104.465767",
"2011-01-13T00:00:00.000Z spot news preferred npreferred 102.851683",
"2011-01-13T00:00:00.000Z spot premium preferred ppreferred 108.863011",
"2011-01-13T00:00:00.000Z spot technology preferred tpreferred 111.356672",
"2011-01-13T00:00:00.000Z spot travel preferred tpreferred 106.236928",
"2011-01-13T00:00:00.000Z total_market mezzanine preferred mpreferred 1040.945505",
"2011-01-13T00:00:00.000Z total_market premium preferred ppreferred 1689.012875",
"2011-01-13T00:00:00.000Z upfront mezzanine preferred mpreferred 826.060182 value",
"2011-01-13T00:00:00.000Z upfront premium preferred ppreferred 1564.617729 value"
};
public static final QuerySegmentSpec I_0112_0114 = new LegacySegmentSpec(
new Interval("2011-01-12/2011-01-14")
);
public static final String[] V_0112_0114 = ObjectArrays.concat(V_0112, V_0113, String.class);
private static final ScanQueryQueryToolChest toolChest = new ScanQueryQueryToolChest();
@Parameterized.Parameters(name = "{0}")
public static Iterable<Object[]> constructorFeeder() throws IOException
{
return QueryRunnerTestHelper.cartesian(
QueryRunnerTestHelper.makeQueryRunners(
new ScanQueryRunnerFactory(
toolChest,
new ScanQueryEngine()
)
)
);
}
private final QueryRunner runner;
public ScanQueryRunnerTest(QueryRunner runner)
{
this.runner = runner;
}
private ScanQuery.ScanQueryBuilder newTestQuery()
{
return ScanQuery.newScanQueryBuilder()
.dataSource(new TableDataSource(QueryRunnerTestHelper.dataSource))
.columns(Arrays.<String>asList())
.intervals(QueryRunnerTestHelper.fullOnInterval)
.limit(3);
}
@Test
public void testFullOnSelect()
{
List<String> columns = Lists.newArrayList(
ScanResultValue.timestampKey,
"market",
"quality",
"placement",
"placementish",
"partial_null_column",
"null_column",
"index",
"indexMin",
"indexMaxPlusTen",
"quality_uniques"
);
ScanQuery query = newTestQuery()
.intervals(I_0112_0114)
.build();
HashMap<String, Object> context = new HashMap<String, Object>();
Iterable<ScanResultValue> results = Sequences.toList(
runner.run(query, context),
Lists.<ScanResultValue>newArrayList()
);
List<ScanResultValue> expectedResults = toExpected(
toFullEvents(V_0112_0114),
columns,
0,
3
);
verify(expectedResults, populateNullColumnAtLastForQueryableIndexCase(results, "null_column"));
}
@Test
public void testFullOnSelectAsCompactedList()
{
final List<String> columns = Lists.newArrayList(
ScanResultValue.timestampKey,
"market",
"quality",
"placement",
"placementish",
"partial_null_column",
"null_column",
"index",
"indexMin",
"indexMaxPlusTen",
"quality_uniques"
);
ScanQuery query = newTestQuery()
.intervals(I_0112_0114)
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.build();
HashMap<String, Object> context = new HashMap<String, Object>();
Iterable<ScanResultValue> results = Sequences.toList(
runner.run(query, context),
Lists.<ScanResultValue>newArrayList()
);
List<ScanResultValue> expectedResults = toExpected(
toFullEvents(V_0112_0114),
columns,
0,
3
);
verify(expectedResults, populateNullColumnAtLastForQueryableIndexCase(compactedListToRow(results), "null_column"));
}
@Test
public void testSelectWithDimsAndMets()
{
ScanQuery query = newTestQuery()
.intervals(I_0112_0114)
.columns(QueryRunnerTestHelper.marketDimension, QueryRunnerTestHelper.indexMetric)
.build();
HashMap<String, Object> context = new HashMap<String, Object>();
Iterable<ScanResultValue> results = Sequences.toList(
runner.run(query, context),
Lists.<ScanResultValue>newArrayList()
);
List<ScanResultValue> expectedResults = toExpected(
toEvents(
new String[]{
ScanResultValue.timestampKey + ":TIME",
QueryRunnerTestHelper.marketDimension + ":STRING",
null,
null,
null,
QueryRunnerTestHelper.indexMetric + ":FLOAT"
},
V_0112_0114
),
Lists.newArrayList(ScanResultValue.timestampKey, "market", "index"),
0,
3
);
verify(expectedResults, results);
}
@Test
public void testSelectWithDimsAndMetsAsCompactedList()
{
ScanQuery query = newTestQuery()
.intervals(I_0112_0114)
.columns(QueryRunnerTestHelper.marketDimension, QueryRunnerTestHelper.indexMetric)
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.build();
HashMap<String, Object> context = new HashMap<String, Object>();
Iterable<ScanResultValue> results = Sequences.toList(
runner.run(query, context),
Lists.<ScanResultValue>newArrayList()
);
List<ScanResultValue> expectedResults = toExpected(
toEvents(
new String[]{
ScanResultValue.timestampKey + ":TIME",
QueryRunnerTestHelper.marketDimension + ":STRING",
null,
null,
null,
QueryRunnerTestHelper.indexMetric + ":FLOAT"
},
V_0112_0114
),
Lists.newArrayList(ScanResultValue.timestampKey, "market", "index"),
0,
3
);
verify(expectedResults, compactedListToRow(results));
}
@Test
public void testFullOnSelectWithFilterAndLimit()
{
// limits
for (int limit : new int[]{3, 1, 5, 7, 0}) {
ScanQuery query = newTestQuery()
.intervals(I_0112_0114)
.filters(new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot", null))
.columns(QueryRunnerTestHelper.qualityDimension, QueryRunnerTestHelper.indexMetric)
.limit(limit)
.build();
HashMap<String, Object> context = new HashMap<String, Object>();
Iterable<ScanResultValue> results = Sequences.toList(
runner.run(query, context),
Lists.<ScanResultValue>newArrayList()
);
final List<List<Map<String, Object>>> events = toEvents(
new String[]{
ScanResultValue.timestampKey + ":TIME",
null,
QueryRunnerTestHelper.qualityDimension + ":STRING",
null,
null,
QueryRunnerTestHelper.indexMetric + ":FLOAT"
},
// filtered values with day granularity
new String[]{
"2011-01-12T00:00:00.000Z spot automotive preferred apreferred 100.000000",
"2011-01-12T00:00:00.000Z spot business preferred bpreferred 100.000000",
"2011-01-12T00:00:00.000Z spot entertainment preferred epreferred 100.000000",
"2011-01-12T00:00:00.000Z spot health preferred hpreferred 100.000000",
"2011-01-12T00:00:00.000Z spot mezzanine preferred mpreferred 100.000000",
"2011-01-12T00:00:00.000Z spot news preferred npreferred 100.000000",
"2011-01-12T00:00:00.000Z spot premium preferred ppreferred 100.000000",
"2011-01-12T00:00:00.000Z spot technology preferred tpreferred 100.000000",
"2011-01-12T00:00:00.000Z spot travel preferred tpreferred 100.000000"
},
new String[]{
"2011-01-13T00:00:00.000Z spot automotive preferred apreferred 94.874713",
"2011-01-13T00:00:00.000Z spot business preferred bpreferred 103.629399",
"2011-01-13T00:00:00.000Z spot entertainment preferred epreferred 110.087299",
"2011-01-13T00:00:00.000Z spot health preferred hpreferred 114.947403",
"2011-01-13T00:00:00.000Z spot mezzanine preferred mpreferred 104.465767",
"2011-01-13T00:00:00.000Z spot news preferred npreferred 102.851683",
"2011-01-13T00:00:00.000Z spot premium preferred ppreferred 108.863011",
"2011-01-13T00:00:00.000Z spot technology preferred tpreferred 111.356672",
"2011-01-13T00:00:00.000Z spot travel preferred tpreferred 106.236928"
}
);
List<ScanResultValue> expectedResults = toExpected(
events,
Lists.newArrayList(ScanResultValue.timestampKey, "quality", "index"),
0,
limit
);
verify(expectedResults, results);
}
}
@Test
public void testSelectWithFilterLookupExtractionFn()
{
Map<String, String> extractionMap = new HashMap<>();
extractionMap.put("total_market", "replaced");
MapLookupExtractor mapLookupExtractor = new MapLookupExtractor(extractionMap, false);
LookupExtractionFn lookupExtractionFn = new LookupExtractionFn(mapLookupExtractor, false, null, true, true);
ScanQuery query = newTestQuery()
.intervals(I_0112_0114)
.filters(new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "replaced", lookupExtractionFn))
.columns(QueryRunnerTestHelper.qualityDimension, QueryRunnerTestHelper.indexMetric)
.build();
Iterable<ScanResultValue> results = Sequences.toList(
runner.run(query, Maps.newHashMap()),
Lists.<ScanResultValue>newArrayList()
);
Iterable<ScanResultValue> resultsOptimize = Sequences.toList(
toolChest.postMergeQueryDecoration(toolChest.mergeResults(toolChest.preMergeQueryDecoration(runner))).
run(query, Maps.<String, Object>newHashMap()), Lists.<ScanResultValue>newArrayList()
);
final List<List<Map<String, Object>>> events = toEvents(
new String[]{
ScanResultValue.timestampKey + ":TIME",
null,
QueryRunnerTestHelper.qualityDimension + ":STRING",
null,
null,
QueryRunnerTestHelper.indexMetric + ":FLOAT"
},
// filtered values with day granularity
new String[]{
"2011-01-12T00:00:00.000Z total_market mezzanine preferred mpreferred 1000.000000",
"2011-01-12T00:00:00.000Z total_market premium preferred ppreferred 1000.000000"
},
new String[]{
"2011-01-13T00:00:00.000Z total_market mezzanine preferred mpreferred 1040.945505",
"2011-01-13T00:00:00.000Z total_market premium preferred ppreferred 1689.012875"
}
);
List<ScanResultValue> expectedResults = toExpected(
events,
Lists.newArrayList(ScanResultValue.timestampKey, QueryRunnerTestHelper.qualityDimension, QueryRunnerTestHelper.indexMetric),
0,
3
);
verify(expectedResults, results);
verify(expectedResults, resultsOptimize);
}
@Test
public void testFullSelectNoResults()
{
ScanQuery query = newTestQuery()
.intervals(I_0112_0114)
.filters(
new AndDimFilter(
Arrays.<DimFilter>asList(
new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "spot", null),
new SelectorDimFilter(QueryRunnerTestHelper.marketDimension, "foo", null)
)
)
)
.build();
Iterable<ScanResultValue> results = Sequences.toList(
runner.run(query, Maps.newHashMap()),
Lists.<ScanResultValue>newArrayList()
);
List<ScanResultValue> expectedResults = Arrays.asList(
);
verify(expectedResults, populateNullColumnAtLastForQueryableIndexCase(results, "null_column"));
}
@Test
public void testFullSelectNoDimensionAndMetric()
{
ScanQuery query = newTestQuery()
.intervals(I_0112_0114)
.columns("foo", "foo2")
.build();
Iterable<ScanResultValue> results = Sequences.toList(
runner.run(query, Maps.newHashMap()),
Lists.<ScanResultValue>newArrayList()
);
final List<List<Map<String, Object>>> events = toEvents(
new String[]{
ScanResultValue.timestampKey + ":TIME"
},
V_0112_0114
);
List<ScanResultValue> expectedResults = toExpected(
events,
Lists.<String>newArrayList(ScanResultValue.timestampKey, "foo", "foo2"),
0,
3
);
verify(expectedResults, results);
}
private List<List<Map<String, Object>>> toFullEvents(final String[]... valueSet)
{
return toEvents(
new String[]{
ScanResultValue.timestampKey + ":TIME",
QueryRunnerTestHelper.marketDimension + ":STRING",
QueryRunnerTestHelper.qualityDimension + ":STRING",
QueryRunnerTestHelper.placementDimension + ":STRING",
QueryRunnerTestHelper.placementishDimension + ":STRINGS",
QueryRunnerTestHelper.indexMetric + ":FLOAT",
QueryRunnerTestHelper.partialNullDimension + ":STRING"
},
valueSet
);
}
private List<List<Map<String, Object>>> toEvents(final String[] dimSpecs, final String[]... valueSet)
{
List<String> values = Lists.newArrayList();
for (String[] vSet : valueSet) {
values.addAll(Arrays.asList(vSet));
}
List<List<Map<String, Object>>> events = Lists.newArrayList();
events.add(
Lists.newArrayList(
Iterables.transform(
values, new Function<String, Map<String, Object>>()
{
@Override
public Map<String, Object> apply(String input)
{
Map<String, Object> event = Maps.newHashMap();
String[] values = input.split("\\t");
for (int i = 0; i < dimSpecs.length; i++) {
if (dimSpecs[i] == null || i >= dimSpecs.length || i >= values.length) {
continue;
}
String[] specs = dimSpecs[i].split(":");
event.put(
specs[0],
specs.length == 1 || specs[1].equals("STRING") ? values[i] :
specs[1].equals("TIME") ? new DateTime(values[i]) :
specs[1].equals("FLOAT") ? Float.valueOf(values[i]) :
specs[1].equals("DOUBLE") ? Double.valueOf(values[i]) :
specs[1].equals("LONG") ? Long.valueOf(values[i]) :
specs[1].equals("NULL") ? null :
specs[1].equals("STRINGS") ? Arrays.asList(values[i].split("\u0001")) :
values[i]
);
}
return event;
}
}
)
)
);
return events;
}
private List<ScanResultValue> toExpected(
List<List<Map<String, Object>>> targets,
List<String> columns,
final int offset,
final int limit
)
{
List<ScanResultValue> expected = Lists.newArrayListWithExpectedSize(targets.size());
for (List<Map<String, Object>> group : targets) {
List<Map<String, Object>> events = Lists.newArrayListWithExpectedSize(limit);
int end = Math.min(group.size(), offset + limit);
if (end == 0) {
end = group.size();
}
events.addAll(group.subList(offset, end));
expected.add(
new ScanResultValue(
QueryRunnerTestHelper.segmentId,
columns,
events
)
);
}
return expected;
}
private static void verify(
Iterable<ScanResultValue> expectedResults,
Iterable<ScanResultValue> actualResults
)
{
Iterator<ScanResultValue> expectedIter = expectedResults.iterator();
Iterator<ScanResultValue> actualIter = actualResults.iterator();
while (expectedIter.hasNext()) {
ScanResultValue expected = expectedIter.next();
ScanResultValue actual = actualIter.next();
Assert.assertEquals(expected.getSegmentId(), actual.getSegmentId());
Set exColumns = Sets.newTreeSet(expected.getColumns());
Set acColumns = Sets.newTreeSet(actual.getColumns());
Assert.assertEquals(exColumns, acColumns);
Iterator<Map<String, Object>> expectedEvts = ((List<Map<String, Object>>) expected.getEvents()).iterator();
Iterator<Map<String, Object>> actualEvts = ((List<Map<String, Object>>) actual.getEvents()).iterator();
while (expectedEvts.hasNext()) {
Map<String, Object> exHolder = expectedEvts.next();
Map<String, Object> acHolder = actualEvts.next();
for (Map.Entry<String, Object> ex : exHolder.entrySet()) {
Object actVal = acHolder.get(ex.getKey());
// work around for current II limitations
if (acHolder.get(ex.getKey()) instanceof Double) {
actVal = ((Double) actVal).floatValue();
}
Assert.assertEquals("invalid value for " + ex.getKey(), ex.getValue(), actVal);
}
}
if (actualEvts.hasNext()) {
throw new ISE("This event iterator should be exhausted!");
}
}
if (actualIter.hasNext()) {
throw new ISE("This iterator should be exhausted!");
}
}
private static Iterable<ScanResultValue> populateNullColumnAtLastForQueryableIndexCase(
Iterable<ScanResultValue> results,
String columnName
)
{
// A Queryable index does not have the null column when it has loaded a index.
for (ScanResultValue value : results) {
List<String> columns = value.getColumns();
if (columns.contains(columnName)) {
break;
}
columns.add(columnName);
}
return results;
}
private Iterable<ScanResultValue> compactedListToRow(Iterable<ScanResultValue> results) {
return Iterables.transform(results, new Function<ScanResultValue, ScanResultValue>()
{
@Override
public ScanResultValue apply(ScanResultValue input)
{
List mapEvents = Lists.newLinkedList();
List events = ((List) input.getEvents());
for (int i = 0; i < events.size(); i++) {
Iterator compactedEventIter = ((List) events.get(i)).iterator();
Map mapEvent = new LinkedHashMap();
for (String column : input.getColumns()) {
mapEvent.put(column, compactedEventIter.next());
}
mapEvents.add(mapEvent);
}
return new ScanResultValue(input.getSegmentId(), input.getColumns(), mapEvents);
}
});
}
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.scan;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.TableDataSource;
import io.druid.query.spec.LegacySegmentSpec;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
public class ScanQuerySpecTest
{
private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
@Test
public void testSerializationLegacyString() throws Exception
{
String legacy =
"{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"testing\"},"
+ "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2011-01-12T00:00:00.000Z/2011-01-14T00:00:00.000Z\"]},"
+ "\"filter\":null,"
+ "\"columns\":[\"market\",\"quality\",\"index\"],"
+ "\"limit\":3,"
+ "\"context\":null}";
String current =
"{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"table\",\"name\":\"testing\"},"
+ "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2011-01-12T00:00:00.000Z/2011-01-14T00:00:00.000Z\"]},"
+ "\"resultFormat\":\"list\","
+ "\"batchSize\":20480,"
+ "\"limit\":3,"
+ "\"filter\":null,"
+ "\"columns\":[\"market\",\"quality\",\"index\"],"
+ "\"context\":null,"
+ "\"descending\":false}";
ScanQuery query = new ScanQuery(
new TableDataSource(QueryRunnerTestHelper.dataSource),
new LegacySegmentSpec(new Interval("2011-01-12/2011-01-14")),
null,
0,
3,
null,
Arrays.<String>asList("market", "quality", "index"),
null
);
String actual = jsonMapper.writeValueAsString(query);
Assert.assertEquals(current, actual);
Assert.assertEquals(query, jsonMapper.readValue(actual, ScanQuery.class));
Assert.assertEquals(query, jsonMapper.readValue(legacy, ScanQuery.class));
}
}

View File

@ -123,6 +123,7 @@
<module>extensions-contrib/virtual-columns</module>
<module>extensions-contrib/thrift-extensions</module>
<module>extensions-contrib/ambari-metrics-emitter</module>
<module>extensions-contrib/scan-query</module>
</modules>
<dependencyManagement>

View File

@ -64,7 +64,7 @@ public class SelectQueryEngine
{
private static final SelectStrategyFactory STRATEGY_FACTORY = new SelectStrategyFactory();
private static class SelectStrategyFactory implements ColumnSelectorStrategyFactory<SelectColumnSelectorStrategy>
public static class SelectStrategyFactory implements ColumnSelectorStrategyFactory<SelectColumnSelectorStrategy>
{
@Override
public SelectColumnSelectorStrategy makeColumnSelectorStrategy(
@ -202,23 +202,12 @@ public class SelectQueryEngine
int lastOffset = offset.startOffset();
for (; !cursor.isDone() && offset.hasNext(); cursor.advance(), offset.next()) {
final Map<String, Object> theEvent = Maps.newLinkedHashMap();
theEvent.put(EventHolder.timestampKey, new DateTime(timestampColumnSelector.get()));
for (ColumnSelectorPlus<SelectColumnSelectorStrategy> selectorPlus : selectorPlusList) {
selectorPlus.getColumnSelectorStrategy().addRowValuesToSelectResult(selectorPlus.getOutputName(), selectorPlus.getSelector(), theEvent);
}
for (Map.Entry<String, ObjectColumnSelector> metSelector : metSelectors.entrySet()) {
final String metric = metSelector.getKey();
final ObjectColumnSelector selector = metSelector.getValue();
if (selector == null) {
theEvent.put(metric, null);
} else {
theEvent.put(metric, selector.get());
}
}
final Map<String, Object> theEvent = singleEvent(
EventHolder.timestampKey,
timestampColumnSelector,
selectorPlusList,
metSelectors
);
builder.addEntry(
new EventHolder(
@ -236,4 +225,31 @@ public class SelectQueryEngine
}
);
}
public static Map<String, Object> singleEvent(
String timestampKey,
LongColumnSelector timestampColumnSelector,
List<ColumnSelectorPlus<SelectColumnSelectorStrategy>> selectorPlusList,
Map<String, ObjectColumnSelector> metSelectors
)
{
final Map<String, Object> theEvent = Maps.newLinkedHashMap();
theEvent.put(timestampKey, new DateTime(timestampColumnSelector.get()));
for (ColumnSelectorPlus<SelectColumnSelectorStrategy> selectorPlus : selectorPlusList) {
selectorPlus.getColumnSelectorStrategy().addRowValuesToSelectResult(selectorPlus.getOutputName(), selectorPlus.getSelector(), theEvent);
}
for (Map.Entry<String, ObjectColumnSelector> metSelector : metSelectors.entrySet()) {
final String metric = metSelector.getKey();
final ObjectColumnSelector selector = metSelector.getValue();
if (selector == null) {
theEvent.put(metric, null);
} else {
theEvent.put(metric, selector.get());
}
}
return theEvent;
}
}

View File

@ -250,38 +250,43 @@ public class QueryResource implements QueryCountStatsProvider
@Override
public void write(OutputStream outputStream) throws IOException, WebApplicationException
{
// json serializer will always close the yielder
CountingOutputStream os = new CountingOutputStream(outputStream);
jsonWriter.writeValue(os, yielder);
try {
// json serializer will always close the yielder
CountingOutputStream os = new CountingOutputStream(outputStream);
jsonWriter.writeValue(os, yielder);
os.flush(); // Some types of OutputStream suppress flush errors in the .close() method.
os.close();
successfulQueryCount.incrementAndGet();
final long queryTime = System.currentTimeMillis() - start;
emitter.emit(
DruidMetrics.makeQueryTimeMetric(theToolChest, jsonMapper, theQuery, req.getRemoteAddr())
.setDimension("success", "true")
.build("query/time", queryTime)
);
emitter.emit(
DruidMetrics.makeQueryTimeMetric(theToolChest, jsonMapper, theQuery, req.getRemoteAddr())
.build("query/bytes", os.getCount())
);
os.flush(); // Some types of OutputStream suppress flush errors in the .close() method.
os.close();
successfulQueryCount.incrementAndGet();
final long queryTime = System.currentTimeMillis() - start;
emitter.emit(
DruidMetrics.makeQueryTimeMetric(theToolChest, jsonMapper, theQuery, req.getRemoteAddr())
.setDimension("success", "true")
.build("query/time", queryTime)
);
emitter.emit(
DruidMetrics.makeQueryTimeMetric(theToolChest, jsonMapper, theQuery, req.getRemoteAddr())
.build("query/bytes", os.getCount())
);
requestLogger.log(
new RequestLogLine(
new DateTime(start),
req.getRemoteAddr(),
theQuery,
new QueryStats(
ImmutableMap.<String, Object>of(
"query/time", queryTime,
"query/bytes", os.getCount(),
"success", true
)
)
)
);
requestLogger.log(
new RequestLogLine(
new DateTime(start),
req.getRemoteAddr(),
theQuery,
new QueryStats(
ImmutableMap.<String, Object>of(
"query/time", queryTime,
"query/bytes", os.getCount(),
"success", true
)
)
)
);
}
finally {
Thread.currentThread().setName(currThreadName);
}
}
},
context.getContentType()